00001
00005
00006 #include "trace-utilization.h"
00007
00008
00009 CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
00010
00011
00013 CkReduction::reducerType sumDetailCompressedReducer;
00014
00015
00016
00017
00018 void collectUtilizationData(void *, double) {
00019 traceUtilizationGroupProxy.collectSumDetailData();
00020 }
00021
00022
00023 CkpvStaticDeclare(TraceUtilization*, _trace);
00024
00029 void _createTraceutilization(char **argv)
00030 {
00031
00032
00033
00034 CkAssert(sizeof(short) == 2);
00035 sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction, false, "sumDetailCompressedReduction");
00036
00037 CkpvInitialize(TraceUtilization*, _trace);
00038 CkpvAccess(_trace) = new TraceUtilization();
00039 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
00040
00041 }
00042
00043
00044
00045 void TraceUtilization::beginExecute(CmiObjId *tid)
00046 {
00047 beginExecute(-1,-1,_threadEP,-1);
00048 }
00049
00050 void TraceUtilization::beginExecute(envelope *e, void *obj)
00051 {
00052
00053 if (e==NULL) {
00054 beginExecute(-1,-1,_threadEP,-1);
00055 }
00056 else {
00057 beginExecute(-1,-1,e->getEpIdx(),-1);
00058 }
00059 }
00060
00061 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
00062 {
00063 if (execEp != INVALIDEP) {
00064 TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
00065 return;
00066 }
00067
00068 execEp=ep;
00069 start = TraceTimer();
00070 }
00071
00072
00073 void TraceUtilization::endExecute(void)
00074 {
00075
00076 if (execEp == TRACEON_EP) {
00077
00078
00079 execEp = INVALIDEP;
00080 return;
00081 }
00082
00083 double endTime = TraceTimer();
00084
00085 updateCpuTime(execEp, start, endTime);
00086
00087
00088 execEp = INVALIDEP;
00089 }
00090
00091
00092
00093 void TraceUtilization::addEventType(int eventType)
00094 {
00095 CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
00096 }
00097
00098
00099
00100
00115 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
00116 CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n", storedSumDetailResults.size() );
00117
00118 int datalength;
00119
00120 #if 0
00121
00122 compressedBuffer fakeMessage = fakeCompressedMessage();
00123 CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
00124 fakeMessage.freeBuf();
00125
00126 #else
00127
00128 if (storedSumDetailResults.size() == 0) {
00129 compressedBuffer b = emptyCompressedBuffer();
00130 CcsSendDelayedReply(m->reply, b.datalength(), b.buffer());
00131 b.freeBuf();
00132 } else {
00133 CkReductionMsg * msg = storedSumDetailResults.front();
00134 storedSumDetailResults.pop_front();
00135
00136
00137 void *sendBuffer = (void *)msg->getData();
00138 datalength = msg->getSize();
00139 CcsSendDelayedReply(m->reply, datalength, sendBuffer);
00140
00141 delete msg;
00142 }
00143
00144
00145 #endif
00146
00147
00148 delete m;
00149 }
00150
00151
00152
00153 void TraceUtilizationBOC::collectSumDetailData() {
00154 TraceUtilization* t = CkpvAccess(_trace);
00155
00156
00157 if (t->cpuTimeEntriesAvailable() - t->cpuTimeEntriesSentSoFar() < BIN_PER_SEC)
00158 return;
00159
00160 compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
00161
00162
00163
00164
00165
00166
00167 #if 0
00168 b = fakeCompressedMessage();
00169 #endif
00170
00171
00172
00173
00174 CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
00175 contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
00176
00177 b.freeBuf();
00178 }
00179
00180
00181 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
00182 CkAssert(CkMyPe() == 0);
00183
00184 compressedBuffer b(msg->getData());
00185 CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
00186
00187 storedSumDetailResults.push_back(msg);
00188
00189
00190
00191
00192
00193 }
00194
00195
00196
00197 void TraceUtilization::writeSts(void) {
00198
00199 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
00200 sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
00201 FILE* stsfp = fopen(fname, "w+");
00202 if (stsfp == 0) {
00203 CmiAbort("Cannot open summary sts file for writing.\n");
00204 }
00205 delete[] fname;
00206
00207 traceWriteSTS(stsfp,0);
00208 fprintf(stsfp, "END\n");
00209
00210 fclose(stsfp);
00211 }
00212
00213
00214
00216
00217
00218
00220 compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
00221
00222
00223
00224 src.pos = 0;
00225
00226 compressedBuffer dest(100000);
00227
00228 int numBins = src.pop<numBins_T>();
00229 int numProcs = src.pop<numProcs_T>();
00230
00231 dest.push<numBins_T>(numBins);
00232 dest.push<numProcs_T>(numProcs);
00233
00234
00235 for(int i=0;i<numBins;i++){
00236 double utilizationInOther = 0.0;
00237
00238 entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
00239 int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
00240
00241 CkAssert(numEntriesInSrcBin < 200);
00242
00243 for(int j=0;j<numEntriesInSrcBin;j++){
00244 ep_T ep = src.pop<ep_T>();
00245 double v = src.pop<utilization_T>();
00246
00247 if(v < threshold * 250.0){
00248
00249 utilizationInOther += v / 250.0;
00250 } else {
00251
00252 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
00253 dest.push<ep_T>(ep);
00254 dest.push<utilization_T>(v);
00255 }
00256
00257 }
00258
00259
00260 if(utilizationInOther > 0.0){
00261 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
00262 dest.push<ep_T>(other_EP);
00263 if(utilizationInOther > 1.0)
00264 utilizationInOther = 1.0;
00265 dest.push<utilization_T>(utilizationInOther*250.0);
00266 }
00267
00268 }
00269
00270 return dest;
00271 }
00272
00273
00274
00275
00277 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
00278
00279
00280 compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
00281 int *numProcsRepresentedInMessage = new int[nMsg];
00282
00283 int numBins = 0;
00284 int totalsize = 0;
00285 int totalProcsAcrossAllMessages = 0;
00286
00287 for (int i=0;i<nMsg;i++) {
00288 incomingMsgs[i].init(msgs[i]->getData());
00289
00290
00291
00292
00293
00294 totalsize += msgs[i]->getSize();
00295
00296
00297
00298
00299
00300 if(i==0)
00301 numBins = incomingMsgs[i].pop<numBins_T>();
00302 else
00303 CkAssert( numBins == incomingMsgs[i].pop<numBins_T>() );
00304
00305
00306 numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
00307 totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
00308
00309 }
00310
00311 compressedBuffer dest(totalsize + 100);
00312
00313
00314 dest.push<numBins_T>(numBins);
00315 dest.push<numProcs_T>(totalProcsAcrossAllMessages);
00316
00317 for(int i=0; i<numBins; i++){
00318 mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
00319 }
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329 compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
00330
00331
00332
00333
00334
00335
00336 CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());
00337 dest.freeBuf();
00338 delete[] incomingMsgs;
00339 delete[] numProcsRepresentedInMessage;
00340 return m;
00341 }
00342
00343
00344
00345
00346
00347
00348
00350 compressedBuffer fakeCompressedMessage(){
00351 CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
00352
00353 compressedBuffer fakeBuf(10000);
00354
00355 int numBins = 55;
00356 int numProcs = 1000;
00357
00358
00359 fakeBuf.push<numBins_T>(numBins);
00360 fakeBuf.push<numProcs_T>(numProcs);
00361 for(int i=0; i<numBins; i++){
00362 int numRecords = 3;
00363 fakeBuf.push<entriesInBin_T>(numRecords);
00364 for(int j=0;j<numRecords;j++){
00365 fakeBuf.push<ep_T>(j*10+2);
00366 fakeBuf.push<utilization_T>(120.00);
00367 }
00368 }
00369
00370
00371
00372
00373 CkAssert(isCompressedBufferSane(fakeBuf));
00374
00375 return fakeBuf;
00376 }
00377
00378
00380 compressedBuffer emptyCompressedBuffer(){
00381 compressedBuffer result(sizeof(numBins_T));
00382 result.push<numBins_T>(0);
00383 return result;
00384 }
00385
00386
00387
00388
00390 void printCompressedBuf(compressedBuffer b){
00391
00392 b.pos = 0;
00393 int numEntries = b.pop<numBins_T>();
00394 CkPrintf("Buffer contains %d records\n", numEntries);
00395 int numProcs = b.pop<numProcs_T>();
00396 CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
00397
00398 for(int i=0;i<numEntries;i++){
00399 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
00400 if(recordLength > 0){
00401 CkPrintf(" Record %d is of length %d : ", i, recordLength);
00402
00403 for(int j=0;j<recordLength;j++){
00404 ep_T ep = b.pop<ep_T>();
00405 utilization_T v = b.pop<utilization_T>();
00406 CkPrintf("(%d,%f) ", ep, v);
00407 }
00408
00409 CkPrintf("\n");
00410 }
00411 }
00412 }
00413
00414
00415
00416 bool isCompressedBufferSane(compressedBuffer b){
00417
00418 b.pos = 0;
00419 numBins_T numBins = b.pop<numBins_T>();
00420 numProcs_T numProcs = b.pop<numProcs_T>();
00421
00422 if(numBins > 2000){
00423 ckout << "WARNING: numBins=" << numBins << endl;
00424 return false;
00425 }
00426
00427 for(int i=0;i<numBins;i++){
00428 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
00429 if(recordLength > 200){
00430 ckout << "WARNING: recordLength=" << recordLength << endl;
00431 return false;
00432 }
00433
00434 if(recordLength > 0){
00435
00436 for(int j=0;j<recordLength;j++){
00437 ep_T ep = b.pop<ep_T>();
00438 utilization_T v = b.pop<utilization_T>();
00439
00440 if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
00441 ckout << "WARNING: ep=" << ep << " v=" << v << endl;
00442 return false;
00443 }
00444 }
00445
00446 }
00447 }
00448
00449 return true;
00450 }
00451
00452
00453
00454 double averageUtilizationInBuffer(compressedBuffer b){
00455
00456 b.pos = 0;
00457 numBins_T numBins = b.pop<numBins_T>();
00458 numProcs_T numProcs = b.pop<numProcs_T>();
00459
00460
00461
00462 double totalUtilization = 0.0;
00463
00464 for(int i=0;i<numBins;i++) {
00465 entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();
00466 for(int j=0;j<entriesInBin;j++){
00467 ep_T ep = b.pop<ep_T>();
00468 totalUtilization += b.pop<utilization_T>();
00469 }
00470 }
00471
00472 return totalUtilization / numBins / 2.5;
00473 }
00474
00475
00476
00477 void sanityCheckCompressedBuf(compressedBuffer b){
00478 CkAssert(isCompressedBufferSane(b));
00479 }
00480
00481
00482
00483 double TraceUtilization::sumUtilization(int startBin, int endBin){
00484 int epInfoSize = getEpInfoSize();
00485
00486 double a = 0.0;
00487
00488 for(int i=startBin; i<=endBin; i++){
00489 for(int j=0; j<epInfoSize; j++){
00490 a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
00491 }
00492 }
00493 return a;
00494 }
00495
00496
00498 compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
00499
00500
00501 int startBin = cpuTimeEntriesSentSoFar();
00502 int numEntries = getEpInfoSize();
00503
00504 int endBin = startBin + desiredBinsToSend - 1;
00505 int binsToSend = endBin - startBin + 1;
00506 CkAssert(binsToSend >= desiredBinsToSend );
00507 incrementNumCpuTimeEntriesSent(binsToSend);
00508
00509
00510 #if 0
00511 bool nonePrinted = true;
00512 for(int i=0;i<(NUM_BINS-1000);i+=1000){
00513 double expectedU = sumUtilization(i, i+999);
00514 if(expectedU > 0.0){
00515 CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(), i, i+999, expectedU);
00516 nonePrinted = false;
00517 }
00518 }
00519
00520 if(nonePrinted)
00521 CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
00522
00523 fflush(stdout);
00524 #endif
00525
00526 int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
00527 compressedBuffer b(bufferSize);
00528
00529 b.push<numBins_T>(binsToSend);
00530 b.push<numProcs_T>(1);
00531
00532
00533 for(int i=0; i<binsToSend; i++) {
00534
00535
00536 int numEntriesInRecordOffset = b.push<entriesInBin_T>(0);
00537
00538 for(int e=0; e<numEntries; e++) {
00539 double scaledUtilization = getUtilization(i+startBin,e) * 2.5;
00540 if(scaledUtilization > 0.0) {
00541
00542 if(scaledUtilization > 250.0)
00543 scaledUtilization = 250.0;
00544
00545 b.push<ep_T>(e);
00546 b.push<utilization_T>(scaledUtilization);
00547
00548 b.increment<entriesInBin_T>(numEntriesInRecordOffset);
00549 }
00550 }
00551 }
00552
00553
00554
00555
00556 return b;
00557 }
00558
00559
00560
00561
00562
00566 void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
00567
00568 int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
00569
00570
00571
00572
00573 int *remainingEntriesToRead = new int[numSrcBuf];
00574 for(int i=0;i<numSrcBuf;i++){
00575 remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
00576 }
00577
00578 int count = 0;
00579
00580 for(int i=0;i<numSrcBuf;i++){
00581 count += remainingEntriesToRead[i];
00582 }
00583
00584 while (count>0) {
00585
00586 int minEp = 10000;
00587 for(int i=0;i<numSrcBuf;i++){
00588 if(remainingEntriesToRead[i]>0){
00589 int ep = srcBufferArray[i].peek<ep_T>();
00590 if(ep < minEp){
00591 minEp = ep;
00592 }
00593 }
00594 }
00595
00596
00597
00598 destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
00599
00600
00601 double v = 0.0;
00602 for(int i=0;i<numSrcBuf;i++){
00603 if(remainingEntriesToRead[i]>0){
00604 int ep = srcBufferArray[i].peek<ep_T>();
00605 if(ep == minEp){
00606 srcBufferArray[i].pop<ep_T>();
00607 double util = srcBufferArray[i].pop<utilization_T>();
00608 v += util * numProcsRepresentedInMessage[i];
00609 remainingEntriesToRead[i]--;
00610 count --;
00611 }
00612 }
00613 }
00614
00615
00616 destBuf.push<ep_T>(minEp);
00617 destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
00618
00619 }
00620
00621
00622 delete [] remainingEntriesToRead;
00623
00624
00625 }
00626
00627
00628
00629 #include "TraceUtilization.def.h"
00630
00631