00001
00005
00006 #include "trace-utilization.h"
00007
00008
00009 CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
00010
00011
00013 CkReduction::reducerType sumDetailCompressedReducer;
00014
00015
00016 void collectUtilizationData(void *ignore, double currT) {
00017
00018 static int numTimesCalled = 0;
00019 numTimesCalled ++;
00020 if(numTimesCalled > 4){
00021 traceUtilizationGroupProxy.collectSumDetailData();
00022 }
00023 }
00024
00025
00026 CkpvStaticDeclare(TraceUtilization*, _trace);
00027
00032 void _createTraceutilization(char **argv)
00033 {
00034
00035
00036
00037 CkAssert(sizeof(short) == 2);
00038 sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
00039
00040 CkpvInitialize(TraceUtilization*, _trace);
00041 CkpvAccess(_trace) = new TraceUtilization();
00042 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
00043
00044 }
00045
00046
00047
00048 void TraceUtilization::beginExecute(CmiObjId *tid)
00049 {
00050 beginExecute(-1,-1,_threadEP,-1);
00051 }
00052
00053 void TraceUtilization::beginExecute(envelope *e)
00054 {
00055
00056 if (e==NULL) {
00057 beginExecute(-1,-1,_threadEP,-1);
00058 }
00059 else {
00060 beginExecute(-1,-1,e->getEpIdx(),-1);
00061 }
00062 }
00063
00064 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
00065 {
00066 if (execEp != INVALIDEP) {
00067 TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
00068 return;
00069 }
00070
00071 execEp=ep;
00072 start = TraceTimer();
00073 }
00074
00075
00076 void TraceUtilization::endExecute(void)
00077 {
00078
00079 if (execEp == TRACEON_EP) {
00080
00081
00082 return;
00083 }
00084
00085 double endTime = TraceTimer();
00086
00087 updateCpuTime(execEp, start, endTime);
00088
00089
00090 execEp = INVALIDEP;
00091 }
00092
00093
00094
00095 void TraceUtilization::addEventType(int eventType)
00096 {
00097 CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
00098 }
00099
00100
00101
00102
00117 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
00118 CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n", storedSumDetailResults.size() );
00119
00120 int datalength;
00121
00122 #if 0
00123
00124 compressedBuffer fakeMessage = fakeCompressedMessage();
00125 CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
00126 fakeMessage.freeBuf();
00127
00128 #else
00129
00130 if (storedSumDetailResults.size() == 0) {
00131 compressedBuffer b = emptyCompressedBuffer();
00132 CcsSendDelayedReply(m->reply, b.datalength(), b.buffer());
00133 b.freeBuf();
00134 } else {
00135 CkReductionMsg * msg = storedSumDetailResults.front();
00136 storedSumDetailResults.pop_front();
00137
00138
00139 void *sendBuffer = (void *)msg->getData();
00140 datalength = msg->getSize();
00141 CcsSendDelayedReply(m->reply, datalength, sendBuffer);
00142
00143 delete msg;
00144 }
00145
00146
00147 #endif
00148
00149
00150 delete m;
00151 }
00152
00153
00154
00155 void TraceUtilizationBOC::collectSumDetailData() {
00156 TraceUtilization* t = CkpvAccess(_trace);
00157
00158 compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
00159
00160
00161
00162
00163
00164
00165 #if 0
00166 b = fakeCompressedMessage();
00167 #endif
00168
00169
00170
00171
00172 CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
00173 contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
00174
00175 b.freeBuf();
00176 }
00177
00178
00179 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
00180 CkAssert(CkMyPe() == 0);
00181
00182 compressedBuffer b(msg->getData());
00183 CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
00184
00185 storedSumDetailResults.push_back(msg);
00186
00187
00188
00189
00190
00191 }
00192
00193
00194
00195 void TraceUtilization::writeSts(void) {
00196
00197 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
00198 sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
00199 FILE* stsfp = fopen(fname, "w+");
00200 if (stsfp == 0) {
00201 CmiAbort("Cannot open summary sts file for writing.\n");
00202 }
00203 delete[] fname;
00204
00205 traceWriteSTS(stsfp,0);
00206 fprintf(stsfp, "END\n");
00207
00208 fclose(stsfp);
00209 }
00210
00211
00212
00214
00215
00216
00218 compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
00219
00220
00221
00222 src.pos = 0;
00223
00224 compressedBuffer dest(100000);
00225
00226 int numBins = src.pop<numBins_T>();
00227 int numProcs = src.pop<numProcs_T>();
00228
00229 dest.push<numBins_T>(numBins);
00230 dest.push<numProcs_T>(numProcs);
00231
00232
00233 for(int i=0;i<numBins;i++){
00234 double utilizationInOther = 0.0;
00235
00236 entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
00237 int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
00238
00239 CkAssert(numEntriesInSrcBin < 200);
00240
00241 for(int j=0;j<numEntriesInSrcBin;j++){
00242 ep_T ep = src.pop<ep_T>();
00243 double v = src.pop<utilization_T>();
00244
00245 if(v < threshold * 250.0){
00246
00247 utilizationInOther += v / 250.0;
00248 } else {
00249
00250 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
00251 dest.push<ep_T>(ep);
00252 dest.push<utilization_T>(v);
00253 }
00254
00255 }
00256
00257
00258 if(utilizationInOther > 0.0){
00259 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
00260 dest.push<ep_T>(other_EP);
00261 if(utilizationInOther > 1.0)
00262 utilizationInOther = 1.0;
00263 dest.push<utilization_T>(utilizationInOther*250.0);
00264 }
00265
00266 }
00267
00268 return dest;
00269 }
00270
00271
00272
00273
00275 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
00276
00277
00278 compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
00279 int *numProcsRepresentedInMessage = new int[nMsg];
00280
00281 int numBins = 0;
00282 int totalsize = 0;
00283 int totalProcsAcrossAllMessages = 0;
00284
00285 for (int i=0;i<nMsg;i++) {
00286 incomingMsgs[i].init(msgs[i]->getData());
00287
00288
00289
00290
00291
00292 totalsize += msgs[i]->getSize();
00293
00294
00295
00296
00297
00298 if(i==0)
00299 numBins = incomingMsgs[i].pop<numBins_T>();
00300 else
00301 CkAssert( numBins == incomingMsgs[i].pop<numBins_T>() );
00302
00303
00304 numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
00305 totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
00306
00307 }
00308
00309 compressedBuffer dest(totalsize + 100);
00310
00311
00312 dest.push<numBins_T>(numBins);
00313 dest.push<numProcs_T>(totalProcsAcrossAllMessages);
00314
00315 for(int i=0; i<numBins; i++){
00316 mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
00317 }
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
00328
00329
00330
00331
00332
00333
00334 CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());
00335 dest.freeBuf();
00336 delete[] incomingMsgs;
00337 return m;
00338 }
00339
00340
00341
00342
00343
00344
00345
00347 compressedBuffer fakeCompressedMessage(){
00348 CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
00349
00350 compressedBuffer fakeBuf(10000);
00351
00352 int numBins = 55;
00353 int numProcs = 1000;
00354
00355
00356 fakeBuf.push<numBins_T>(numBins);
00357 fakeBuf.push<numProcs_T>(numProcs);
00358 for(int i=0; i<numBins; i++){
00359 int numRecords = 3;
00360 fakeBuf.push<entriesInBin_T>(numRecords);
00361 for(int j=0;j<numRecords;j++){
00362 fakeBuf.push<ep_T>(j*10+2);
00363 fakeBuf.push<utilization_T>(120.00);
00364 }
00365 }
00366
00367
00368
00369
00370 CkAssert(isCompressedBufferSane(fakeBuf));
00371
00372 return fakeBuf;
00373 }
00374
00375
00377 compressedBuffer emptyCompressedBuffer(){
00378 compressedBuffer result(sizeof(numBins_T));
00379 result.push<numBins_T>(0);
00380 return result;
00381 }
00382
00383
00384
00385
00387 void printCompressedBuf(compressedBuffer b){
00388
00389 b.pos = 0;
00390 int numEntries = b.pop<numBins_T>();
00391 CkPrintf("Buffer contains %d records\n", numEntries);
00392 int numProcs = b.pop<numProcs_T>();
00393 CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
00394
00395 for(int i=0;i<numEntries;i++){
00396 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
00397 if(recordLength > 0){
00398 CkPrintf(" Record %d is of length %d : ", i, recordLength);
00399
00400 for(int j=0;j<recordLength;j++){
00401 ep_T ep = b.pop<ep_T>();
00402 utilization_T v = b.pop<utilization_T>();
00403 CkPrintf("(%d,%f) ", ep, v);
00404 }
00405
00406 CkPrintf("\n");
00407 }
00408 }
00409 }
00410
00411
00412
00413 bool isCompressedBufferSane(compressedBuffer b){
00414
00415 b.pos = 0;
00416 numBins_T numBins = b.pop<numBins_T>();
00417 numProcs_T numProcs = b.pop<numProcs_T>();
00418
00419 if(numBins > 2000){
00420 ckout << "WARNING: numBins=" << numBins << endl;
00421 return false;
00422 }
00423
00424 for(int i=0;i<numBins;i++){
00425 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
00426 if(recordLength > 200){
00427 ckout << "WARNING: recordLength=" << recordLength << endl;
00428 return false;
00429 }
00430
00431 if(recordLength > 0){
00432
00433 for(int j=0;j<recordLength;j++){
00434 ep_T ep = b.pop<ep_T>();
00435 utilization_T v = b.pop<utilization_T>();
00436
00437 if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
00438 ckout << "WARNING: ep=" << ep << " v=" << v << endl;
00439 return false;
00440 }
00441 }
00442
00443 }
00444 }
00445
00446 return true;
00447 }
00448
00449
00450
00451 double averageUtilizationInBuffer(compressedBuffer b){
00452
00453 b.pos = 0;
00454 numBins_T numBins = b.pop<numBins_T>();
00455 numProcs_T numProcs = b.pop<numProcs_T>();
00456
00457
00458
00459 double totalUtilization = 0.0;
00460
00461 for(int i=0;i<numBins;i++) {
00462 entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();
00463 for(int j=0;j<entriesInBin;j++){
00464 ep_T ep = b.pop<ep_T>();
00465 totalUtilization += b.pop<utilization_T>();
00466 }
00467 }
00468
00469 return totalUtilization / numBins / 2.5;
00470 }
00471
00472
00473
00474 void sanityCheckCompressedBuf(compressedBuffer b){
00475 CkAssert(isCompressedBufferSane(b));
00476 }
00477
00478
00479
00480 double TraceUtilization::sumUtilization(int startBin, int endBin){
00481 int epInfoSize = getEpInfoSize();
00482
00483 double a = 0.0;
00484
00485 for(int i=startBin; i<=endBin; i++){
00486 for(int j=0; j<epInfoSize; j++){
00487 a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
00488 }
00489 }
00490 return a;
00491 }
00492
00493
00495 compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
00496
00497
00498 int startBin = cpuTimeEntriesSentSoFar();
00499 int numEntries = getEpInfoSize();
00500
00501 int endBin = startBin + desiredBinsToSend - 1;
00502 int binsToSend = endBin - startBin + 1;
00503 CkAssert(binsToSend >= desiredBinsToSend );
00504 incrementNumCpuTimeEntriesSent(binsToSend);
00505
00506
00507 #if 0
00508 bool nonePrinted = true;
00509 for(int i=0;i<(NUM_BINS-1000);i+=1000){
00510 double expectedU = sumUtilization(i, i+999);
00511 if(expectedU > 0.0){
00512 CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(), i, i+999, expectedU);
00513 nonePrinted = false;
00514 }
00515 }
00516
00517 if(nonePrinted)
00518 CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
00519
00520 fflush(stdout);
00521 #endif
00522
00523 int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
00524 compressedBuffer b(bufferSize);
00525
00526 b.push<numBins_T>(binsToSend);
00527 b.push<numProcs_T>(1);
00528
00529
00530 for(int i=0; i<binsToSend; i++) {
00531
00532
00533 int numEntriesInRecordOffset = b.push<entriesInBin_T>(0);
00534
00535 for(int e=0; e<numEntries; e++) {
00536 double scaledUtilization = getUtilization(i+startBin,e) * 2.5;
00537 if(scaledUtilization > 0.0) {
00538
00539 if(scaledUtilization > 250.0)
00540 scaledUtilization = 250.0;
00541
00542 b.push<ep_T>(e);
00543 b.push<utilization_T>(scaledUtilization);
00544
00545 b.increment<entriesInBin_T>(numEntriesInRecordOffset);
00546 }
00547 }
00548 }
00549
00550
00551
00552
00553 return b;
00554 }
00555
00556
00557
00558
00559
00563 void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
00564
00565 int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
00566
00567
00568
00569
00570 int *remainingEntriesToRead = new int[numSrcBuf];
00571 for(int i=0;i<numSrcBuf;i++){
00572 remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
00573 }
00574
00575 int count = 0;
00576
00577 for(int i=0;i<numSrcBuf;i++){
00578 count += remainingEntriesToRead[i];
00579 }
00580
00581 while (count>0) {
00582
00583 int minEp = 10000;
00584 for(int i=0;i<numSrcBuf;i++){
00585 if(remainingEntriesToRead[i]>0){
00586 int ep = srcBufferArray[i].peek<ep_T>();
00587 if(ep < minEp){
00588 minEp = ep;
00589 }
00590 }
00591 }
00592
00593
00594
00595 destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
00596
00597
00598 double v = 0.0;
00599 for(int i=0;i<numSrcBuf;i++){
00600 if(remainingEntriesToRead[i]>0){
00601 int ep = srcBufferArray[i].peek<ep_T>();
00602 if(ep == minEp){
00603 srcBufferArray[i].pop<ep_T>();
00604 double util = srcBufferArray[i].pop<utilization_T>();
00605 v += util * numProcsRepresentedInMessage[i];
00606 remainingEntriesToRead[i]--;
00607 count --;
00608 }
00609 }
00610 }
00611
00612
00613 destBuf.push<ep_T>(minEp);
00614 destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
00615
00616 }
00617
00618
00619 delete [] remainingEntriesToRead;
00620
00621
00622 }
00623
00624
00625
00626 #include "TraceUtilization.def.h"
00627
00628