00001
00005
00006 #include "charm++.h"
00007 #include "trace-summary.h"
00008 #include "trace-summaryBOC.h"
00009
00010 #define DEBUGF(x) // CmiPrintf x
00011
00012 #define VER 7.1
00013
00014 #define INVALIDEP -2
00015 #define TRACEON_EP -3
00016
00017
00018 #define DefaultBinCount (1000*60*1)
00019
00020 CkpvStaticDeclare(TraceSummary*, _trace);
00021 static int _numEvents = 0;
00022 #define NUM_DUMMY_EPS 9
00023 CkpvDeclare(int, binCount);
00024 CkpvDeclare(double, binSize);
00025 CkpvDeclare(double, version);
00026
00027
00028 CkpvDeclare(int, previouslySentBins);
00029
00030
00031
00039 class compressedBuffer {
00040 public:
00041 char* buf;
00042 int pos;
00043
00044 compressedBuffer(){
00045 buf = NULL;
00046 pos = 0;
00047 }
00048
00049 compressedBuffer(int bytes){
00050 buf = (char*)malloc(bytes);
00051 pos = 0;
00052 }
00053
00054 compressedBuffer(void *buffer){
00055 buf = (char*)buffer;
00056 pos = 0;
00057 }
00058
00059 void init(void *buffer){
00060 buf = (char*)buffer;
00061 pos = 0;
00062 }
00063
00064 inline void * currentPtr(){
00065 return (void*)(buf+pos);
00066 }
00067
00068 template <typename T>
00069 T read(int offset){
00070
00071 T v;
00072 memcpy(&v, buf+offset, sizeof(T));
00073 return v;
00074 }
00075
00076 template <typename T>
00077 void write(T v, int offset){
00078 T v2 = v;
00079
00080 memcpy(buf+offset, &v2, sizeof(T));
00081 }
00082
00083 template <typename T>
00084 void increment(int offset){
00085 T temp;
00086 temp = read<T>(offset);
00087 temp ++;
00088 write<T>(temp, offset);
00089 }
00090
00091 template <typename T>
00092 void accumulate(T v, int offset){
00093 T temp;
00094 temp = read<T>(offset);
00095 temp += v;
00096 write<T>(temp, offset);
00097 }
00098
00099 template <typename T>
00100 int push(T v){
00101 int oldpos = pos;
00102 write<T>(v, pos);
00103 pos += sizeof(T);
00104 return oldpos;
00105 }
00106
00107 template <typename T>
00108 T pop(){
00109 T temp = read<T>(pos);
00110 pos += sizeof(T);
00111 return temp;
00112 }
00113
00114 template <typename T>
00115 T peek(){
00116 T temp = read<T>(pos);
00117 return temp;
00118 }
00119
00120 template <typename T0, typename T>
00121 T peekSecond(){
00122 T temp;
00123 memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
00124 return temp;
00125 }
00126
00127 int datalength(){
00128 return pos;
00129 }
00130
00131 void * buffer(){
00132 return (void*) buf;
00133 }
00134
00135 void freeBuf(){
00136 free(buf);
00137 }
00138
00139 ~compressedBuffer(){
00140
00141 }
00142
00143 };
00144
00145
00146
00147
00148
00149 CkGroupID traceSummaryGID;
00150 bool summaryCcsStreaming;
00151
00152 int sumonly = 0;
00153 int sumDetail = 0;
00154
00159 void _createTracesummary(char **argv)
00160 {
00161 DEBUGF(("%d createTraceSummary\n", CkMyPe()));
00162 CkpvInitialize(TraceSummary*, _trace);
00163 CkpvInitialize(int, previouslySentBins);
00164 CkpvAccess(previouslySentBins) = 0;
00165 CkpvAccess(_trace) = new TraceSummary(argv);
00166 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
00167 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
00168 }
00169
00170
00172 void CkSummary_StartPhase(int phase)
00173 {
00174 CkpvAccess(_trace)->startPhase(phase);
00175 }
00176
00177
00179 void CkSummary_MarkEvent(int eventType)
00180 {
00181 CkpvAccess(_trace)->addEventType(eventType);
00182 }
00183
00184 static inline void writeU(FILE* fp, int u)
00185 {
00186 fprintf(fp, "%4d", u);
00187 }
00188
00189 PhaseEntry::PhaseEntry()
00190 {
00191 int _numEntries=_entryTable.size();
00192
00193 nEPs = _numEntries+10;
00194 count = new int[nEPs];
00195 times = new double[nEPs];
00196 maxtimes = new double[nEPs];
00197 for (int i=0; i<nEPs; i++) {
00198 count[i] = 0;
00199 times[i] = 0.0;
00200 maxtimes[i] = 0.;
00201 }
00202 }
00203
00204 SumLogPool::~SumLogPool()
00205 {
00206 if (!sumonly) {
00207 write();
00208 fclose(fp);
00209 if (sumDetail) fclose(sdfp);
00210 }
00211
00212 if (markcount > 0)
00213 for (int i=0; i<MAX_MARKS; i++) {
00214 for (int j=0; j<events[i].length(); j++)
00215 delete events[i][j];
00216 }
00217 delete[] pool;
00218 delete[] epInfo;
00219 delete[] cpuTime;
00220 delete[] numExecutions;
00221 }
00222
00223 void SumLogPool::addEventType(int eventType, double time)
00224 {
00225 if (eventType <0 || eventType >= MAX_MARKS) {
00226 CkPrintf("Invalid event type %d!\n", eventType);
00227 return;
00228 }
00229 MarkEntry *e = new MarkEntry;
00230 e->time = time;
00231 events[eventType].push_back(e);
00232 markcount ++;
00233 }
00234
00235 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES)
00236 {
00237
00238 cpuTime = NULL;
00239 poolSize = CkpvAccess(binCount);
00240 if (poolSize % 2) poolSize++;
00241 pool = new BinEntry[poolSize];
00242 _MEMCHECK(pool);
00243
00244 this->pgm = new char[strlen(pgm)+1];
00245 strcpy(this->pgm,pgm);
00246
00247 #if 0
00248
00249 if (CkMyPe() == 0) {
00250 char *fname =
00251 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
00252 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
00253 stsfp = fopen(fname, "w+");
00254
00255 if (stsfp == 0) {
00256 CmiAbort("Cannot open summary sts file for writing.\n");
00257 }
00258 delete[] fname;
00259 }
00260 #endif
00261
00262
00263 markcount = 0;
00264 }
00265
00266 void SumLogPool::initMem()
00267 {
00268 int _numEntries=_entryTable.size();
00269 epInfoSize = _numEntries + NUM_DUMMY_EPS + 1;
00270 epInfo = new SumEntryInfo[epInfoSize];
00271 _MEMCHECK(epInfo);
00272
00273 cpuTime = NULL;
00274 numExecutions = NULL;
00275 if (sumDetail) {
00276 cpuTime = new double[poolSize*epInfoSize];
00277 _MEMCHECK(cpuTime);
00278 memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
00279 numExecutions = new int[poolSize*epInfoSize];
00280 _MEMCHECK(numExecutions);
00281 memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
00282
00283
00284
00285
00286
00287
00288
00289
00290 }
00291 }
00292
00293 int SumLogPool::getUtilization(int interval, int ep) {
00294 return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize));
00295 }
00296
00297 void SumLogPool::write(void)
00298 {
00299 int i;
00300 unsigned int j;
00301 int _numEntries=_entryTable.size();
00302
00303 fp = NULL;
00304 sdfp = NULL;
00305
00306
00307
00308 if (!sumonly) {
00309 char pestr[10];
00310 sprintf(pestr, "%d", CkMyPe());
00311 int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
00312 char *fname = new char[len+1];
00313
00314 sprintf(fname, "%s.%s.sum", pgm, pestr);
00315 do {
00316 fp = fopen(fname, "w+");
00317 } while (!fp && errno == EINTR);
00318 if (!fp) {
00319 CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
00320 CmiAbort("Cannot open Summary Trace File for writing...\n");
00321 }
00322
00323 if (sumDetail) {
00324 sprintf(fname, "%s.%s.sumd", pgm, pestr);
00325 do {
00326 sdfp = fopen(fname, "w+");
00327 } while (!sdfp && errno == EINTR);
00328 if(!sdfp) {
00329 CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
00330 }
00331 }
00332 delete[] fname;
00333 }
00334
00335 fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
00336 if (CkpvAccess(version)>=3.0)
00337 {
00338 fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
00339 }
00340 fprintf(fp, "\n");
00341
00342
00343 #if 1
00344 int last=pool[0].getU();
00345 writeU(fp, last);
00346 int count=1;
00347 for(j=1; j<numBins; j++) {
00348 int u = pool[j].getU();
00349 if (last == u) {
00350 count++;
00351 }
00352 else {
00353 if (count > 1) fprintf(fp, "+%d", count);
00354 writeU(fp, u);
00355 last = u;
00356 count = 1;
00357 }
00358 }
00359 if (count > 1) fprintf(fp, "+%d", count);
00360 #else
00361 for(j=0; j<numEntries; j++)
00362 pool[j].write(fp);
00363 #endif
00364 fprintf(fp, "\n");
00365
00366
00367 fprintf(fp, "EPExeTime: ");
00368 for (i=0; i<_numEntries; i++)
00369 fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
00370 fprintf(fp, "\n");
00371
00372 fprintf(fp, "EPCallTime: ");
00373 for (i=0; i<_numEntries; i++)
00374 fprintf(fp, "%d ", epInfo[i].epCount);
00375 fprintf(fp, "\n");
00376
00377 fprintf(fp, "MaxEPTime: ");
00378 for (i=0; i<_numEntries; i++)
00379 fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
00380 fprintf(fp, "\n");
00381 #if 0
00382 for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
00383 for (j=0; j<_numEntries; j++)
00384 fprintf(fp, "%d ", epInfo[j].hist[i]);
00385 fprintf(fp, "\n");
00386 }
00387 #endif
00388
00389 if (CkpvAccess(version)>=2.0)
00390 {
00391 fprintf(fp, "NumMarks: %d ", markcount);
00392 for (i=0; i<MAX_MARKS; i++) {
00393 for(int j=0; j<events[i].length(); j++)
00394 fprintf(fp, "%d %f ", i, events[i][j]->time);
00395 }
00396 fprintf(fp, "\n");
00397 }
00398
00399 if (CkpvAccess(version)>=3.0)
00400 {
00401 phaseTab.write(fp);
00402 }
00403
00404 if (CkpvAccess(version)>=7.1) {
00405 fprintf(fp, "IdlePercent: ");
00406 int last=pool[0].getUIdle();
00407 writeU(fp, last);
00408 int count=1;
00409 for(j=1; j<numBins; j++) {
00410 int u = pool[j].getUIdle();
00411 if (last == u) {
00412 count++;
00413 }
00414 else {
00415 if (count > 1) fprintf(fp, "+%d", count);
00416 writeU(fp, u);
00417 last = u;
00418 count = 1;
00419 }
00420 }
00421 if (count > 1) fprintf(fp, "+%d", count);
00422 fprintf(fp, "\n");
00423 }
00424
00425
00426
00427 if (sumDetail) {
00428 fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
00429 CkpvAccess(version), CkMyPe(), CkNumPes(),
00430 numBins, _numEntries, CkpvAccess(binSize));
00431
00432
00433
00434 fprintf(sdfp, "ExeTimePerEPperInterval ");
00435 unsigned int e, i;
00436 long last= (long) (getCPUtime(0,0)*1.0e6);
00437 int count=0;
00438 fprintf(sdfp, "%ld", last);
00439 for(e=0; e<_numEntries; e++) {
00440 for(i=0; i<numBins; i++) {
00441
00442 long u= (long) (getCPUtime(i,e)*1.0e6);
00443 if (last == u) {
00444 count++;
00445 } else {
00446
00447 if (count > 1) fprintf(sdfp, "+%d", count);
00448 fprintf(sdfp, " %ld", u);
00449 last = u;
00450 count = 1;
00451 }
00452 }
00453 }
00454 if (count > 1) fprintf(sdfp, "+%d", count);
00455 fprintf(sdfp, "\n");
00456
00457
00458 fprintf(sdfp, "EPCallTimePerInterval ");
00459 last= getNumExecutions(0,0);
00460 count=0;
00461 fprintf(sdfp, "%ld", last);
00462 for(e=0; e<_numEntries; e++) {
00463 for(i=0; i<numBins; i++) {
00464
00465 long u= getNumExecutions(i, e);
00466 if (last == u) {
00467 count++;
00468 } else {
00469
00470 if (count > 1) fprintf(sdfp, "+%d", count);
00471 fprintf(sdfp, " %ld", u);
00472 last = u;
00473 count = 1;
00474 }
00475 }
00476 }
00477 if (count > 1) fprintf(sdfp, "+%d", count);
00478 fprintf(sdfp, "\n");
00479 }
00480 }
00481
00482 void SumLogPool::writeSts(void)
00483 {
00484
00485 char *fname =
00486 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
00487 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
00488 stsfp = fopen(fname, "w+");
00489
00490 if (stsfp == 0) {
00491 CmiAbort("Cannot open summary sts file for writing.\n");
00492 }
00493 delete[] fname;
00494
00495 traceWriteSTS(stsfp,_numEvents);
00496 for(int i=0;i<_numEvents;i++)
00497 fprintf(stsfp, "EVENT %d Event%d\n", i, i);
00498 fprintf(stsfp, "END\n");
00499
00500 fclose(stsfp);
00501 }
00502
00503
00504 void SumLogPool::add(double time, double idleTime, int pe)
00505 {
00506 new (&pool[numBins++]) BinEntry(time, idleTime);
00507 if (poolSize==numBins) {
00508 shrink();
00509 }
00510 }
00511
00512
00513
00514 void SumLogPool::setEp(int epidx, double time)
00515 {
00516 if (epidx >= epInfoSize) {
00517 CmiAbort("Invalid entry point!!\n");
00518 }
00519
00520 epInfo[epidx].setTime(time);
00521
00522 phaseTab.setEp(epidx, time);
00523 }
00524
00525
00526
00527 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
00528 {
00529 if (epIdx >= epInfoSize) {
00530 CmiAbort("Too many entry points!!\n");
00531 }
00532
00533 double binSz = CkpvAccess(binSize);
00534 int startingBinIdx, endingBinIdx;
00535 startingBinIdx = (int)(startTime/binSz);
00536 endingBinIdx = (int)(endTime/binSz);
00537
00538 while (endingBinIdx >= poolSize) {
00539 shrink();
00540 CmiAssert(CkpvAccess(binSize) > binSz);
00541 binSz = CkpvAccess(binSize);
00542 startingBinIdx = (int)(startTime/binSz);
00543 endingBinIdx = (int)(endTime/binSz);
00544 }
00545
00546 if (startingBinIdx == endingBinIdx) {
00547 addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
00548 } else if (startingBinIdx < endingBinIdx) {
00549 addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
00550 while(++startingBinIdx < endingBinIdx)
00551 addToCPUtime(startingBinIdx, epIdx, binSz);
00552 addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
00553 } else {
00554 CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
00555 startTime, endTime);
00556 CmiAbort("Error: end time of EP is less than start time\n");
00557 }
00558
00559 incNumExecutions(startingBinIdx, epIdx);
00560 }
00561
00562
00563 void SumLogPool::shrink(void)
00564 {
00565
00566
00567
00568
00569 int entries = numBins/2;
00570 for (int i=0; i<entries; i++)
00571 {
00572 pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
00573 pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
00574 if (sumDetail)
00575 for (int e=0; e < epInfoSize; e++) {
00576 setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
00577 setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
00578 }
00579 }
00580
00581 if (sumDetail) {
00582 memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
00583 memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
00584 }
00585 numBins = entries;
00586 CkpvAccess(binSize) *= 2;
00587
00588
00589 }
00590
00591 void SumLogPool::shrink(double _maxBinSize)
00592 {
00593 while(CkpvAccess(binSize) < _maxBinSize)
00594 {
00595 shrink();
00596 };
00597 }
00598 int BinEntry::getU()
00599 {
00600 return (int)(_time * 100.0 / CkpvAccess(binSize));
00601 }
00602
00603 int BinEntry::getUIdle() {
00604 return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
00605 }
00606
00607 void BinEntry::write(FILE* fp)
00608 {
00609 writeU(fp, getU());
00610 }
00611
00612 TraceSummary::TraceSummary(char **argv):msgNum(0),binStart(0.0),idleStart(0.0),
00613 binTime(0.0),binIdle(0.0)
00614 {
00615 if (CkpvAccess(traceOnPe) == 0) return;
00616
00617
00618 if (CmiTimerAbsolute()) binStart = CmiInitTime();
00619
00620 CkpvInitialize(int, binCount);
00621 CkpvInitialize(double, binSize);
00622 CkpvInitialize(double, version);
00623 CkpvAccess(binSize) = BIN_SIZE;
00624 CkpvAccess(version) = VER;
00625 CkpvAccess(binCount) = DefaultBinCount;
00626 if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
00627 if (CkMyPe() == 0)
00628 CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
00629 CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
00630 "CPU usage log time resolution");
00631 CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
00632 "Write this .sum file version");
00633
00634 epThreshold = 0.001;
00635 CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
00636 "Execution time histogram lower bound");
00637 epInterval = 0.001;
00638 CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
00639 "Execution time histogram bin size");
00640
00641 sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
00642
00643 if (!sumonly)
00644 sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
00645
00646 _logPool = new SumLogPool(CkpvAccess(traceRoot));
00647
00648 execEp=INVALIDEP;
00649 inIdle = 0;
00650 inExec = 0;
00651 depth = 0;
00652 }
00653
00654 void TraceSummary::traceClearEps(void)
00655 {
00656 _logPool->clearEps();
00657 }
00658
00659 void TraceSummary::traceWriteSts(void)
00660 {
00661 if(CkMyPe()==0)
00662 _logPool->writeSts();
00663 }
00664
00665 void TraceSummary::traceClose(void)
00666 {
00667 if(CkMyPe()==0)
00668 _logPool->writeSts();
00669 CkpvAccess(_trace)->endComputation();
00670
00671 delete _logPool;
00672 CkpvAccess(_traces)->removeTrace(this);
00673 }
00674
00675 void TraceSummary::beginExecute(CmiObjId *tid)
00676 {
00677 beginExecute(-1,-1,_threadEP,-1);
00678 }
00679
00680 void TraceSummary::beginExecute(envelope *e, void *obj)
00681 {
00682
00683 if (e==NULL) {
00684 beginExecute(-1,-1,_threadEP,-1);
00685 }
00686 else {
00687 beginExecute(-1,-1,e->getEpIdx(),-1);
00688 }
00689 }
00690
00691 void TraceSummary::beginExecute(char *msg)
00692 {
00693 #if CMK_SMP_TRACE_COMMTHREAD
00694
00695 envelope *e = (envelope *)msg;
00696 int num = _entryTable.size();
00697 int ep = e->getEpIdx();
00698 if(ep<0 || ep>=num) return;
00699 if(_entryTable[ep]->traceEnabled)
00700 beginExecute(-1,-1,e->getEpIdx(),-1);
00701 #endif
00702 }
00703
00704 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
00705 {
00706 if (execEp == TRACEON_EP) {
00707 endExecute();
00708 }
00709 CmiAssert(inIdle == 0);
00710 if (inExec == 0) {
00711 CmiAssert(depth == 0);
00712 inExec = 1;
00713 }
00714 depth ++;
00715
00716
00717 if (depth > 1) return;
00718
00719
00720
00721
00722
00723
00724
00725
00726 execEp=ep;
00727 double t = TraceTimer();
00728
00729
00730 start = t;
00731 double ts = binStart;
00732
00733 while ((ts = ts + CkpvAccess(binSize)) < t) {
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745 _logPool->add(binTime, binIdle, CkMyPe());
00746 binTime=0.0;
00747 binIdle = 0.0;
00748 binStart = ts;
00749 }
00750 }
00751
00752 void TraceSummary::endExecute()
00753 {
00754 CmiAssert(inIdle == 0 && inExec == 1);
00755 depth --;
00756 if (depth == 0) inExec = 0;
00757 CmiAssert(depth >= 0);
00758
00759
00760 if (depth != 0) return;
00761
00762 double t = TraceTimer();
00763 double ts = start;
00764 double nts = binStart;
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774 if (execEp == INVALIDEP) {
00775 TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
00776 return;
00777 }
00778
00779 if (execEp >= 0)
00780 {
00781 _logPool->setEp(execEp, t-ts);
00782 }
00783
00784 while ((nts = nts + CkpvAccess(binSize)) < t)
00785 {
00786
00787 binTime += nts-ts;
00788 binStart = nts;
00789
00790 _logPool->add(binTime, binIdle, CkMyPe());
00791 binTime = 0.0;
00792 binIdle = 0.0;
00793 ts = nts;
00794 }
00795 binTime += t - ts;
00796
00797 if (sumDetail && execEp >= 0 )
00798 _logPool->updateSummaryDetail(execEp, start, t);
00799
00800 execEp = INVALIDEP;
00801 }
00802
00803 void TraceSummary::endExecute(char *msg){
00804 #if CMK_SMP_TRACE_COMMTHREAD
00805
00806 envelope *e = (envelope *)msg;
00807 int num = _entryTable.size();
00808 int ep = e->getEpIdx();
00809 if(ep<0 || ep>=num) return;
00810 if(_entryTable[ep]->traceEnabled){
00811 endExecute();
00812 }
00813 #endif
00814 }
00815
00816 void TraceSummary::beginIdle(double currT)
00817 {
00818 if (execEp == TRACEON_EP) {
00819 endExecute();
00820 }
00821
00822 CmiAssert(inIdle == 0 && inExec == 0);
00823 inIdle = 1;
00824
00825
00826 double t = TraceTimer(currT);
00827
00828
00829
00830 idleStart = t;
00831 double ts = binStart;
00832
00833 while ((ts = ts + CkpvAccess(binSize)) < t) {
00834 _logPool->add(binTime, binIdle, CkMyPe());
00835 binTime=0.0;
00836 binIdle = 0.0;
00837 binStart = ts;
00838 }
00839 }
00840
00841 void TraceSummary::endIdle(double currT)
00842 {
00843 CmiAssert(inIdle == 1 && inExec == 0);
00844 inIdle = 0;
00845
00846
00847 double t = TraceTimer(currT);
00848 double t_idleStart = idleStart;
00849 double t_binStart = binStart;
00850
00851 while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
00852 {
00853
00854 binIdle += t_binStart - t_idleStart;
00855 binStart = t_binStart;
00856 _logPool->add(binTime, binIdle, CkMyPe());
00857 binTime = 0.0;
00858 binIdle = 0.0;
00859 t_idleStart = t_binStart;
00860 }
00861 binIdle += t - t_idleStart;
00862 }
00863
00864 void TraceSummary::traceBegin(void)
00865 {
00866
00867
00868 beginExecute(-1, -1, TRACEON_EP, -1, -1);
00869 }
00870
00871 void TraceSummary::traceEnd(void)
00872 {
00873 endExecute();
00874 }
00875
00876 void TraceSummary::beginPack(void)
00877 {
00878 packstart = CmiWallTimer();
00879 }
00880
00881 void TraceSummary::endPack(void)
00882 {
00883 _logPool->setEp(_packEP, CmiWallTimer() - packstart);
00884 if (sumDetail)
00885 _logPool->updateSummaryDetail(_packEP, TraceTimer(packstart), TraceTimer(CmiWallTimer()));
00886 }
00887
00888 void TraceSummary::beginUnpack(void)
00889 {
00890 unpackstart = CmiWallTimer();
00891 }
00892
00893 void TraceSummary::endUnpack(void)
00894 {
00895 _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
00896 if (sumDetail)
00897 _logPool->updateSummaryDetail(_unpackEP, TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
00898 }
00899
00900 void TraceSummary::beginComputation(void)
00901 {
00902
00903 _logPool->initMem();
00904 }
00905
00906 void TraceSummary::endComputation(void)
00907 {
00908 static int done = 0;
00909 if (done) return;
00910 done = 1;
00911 if (msgNum==0) {
00912
00913 _logPool->add(binTime, binIdle, CkMyPe());
00914 binTime = 0.0;
00915 binIdle = 0.0;
00916 msgNum ++;
00917
00918 binStart += CkpvAccess(binSize);
00919 double t = TraceTimer();
00920 double ts = binStart;
00921 while (ts < t)
00922 {
00923 _logPool->add(binTime, binIdle, CkMyPe());
00924 binTime=0.0;
00925 binIdle = 0.0;
00926 ts += CkpvAccess(binSize);
00927 }
00928
00929 }
00930 }
00931
00932 void TraceSummary::addEventType(int eventType)
00933 {
00934 _logPool->addEventType(eventType, TraceTimer());
00935 }
00936
00937 void TraceSummary::startPhase(int phase)
00938 {
00939 _logPool->startPhase(phase);
00940 }
00941
00942 void TraceSummary::traceEnableCCS() {
00943 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
00944 sumProxy.initCCS();
00945 }
00946
00947
00948 void TraceSummary::fillData(double *buffer, double reqStartTime,
00949 double reqBinSize, int reqNumBins) {
00950
00951
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964 int binOffset = (int)(reqStartTime/reqBinSize);
00965 for (int i=binOffset; i<binOffset + reqNumBins; i++) {
00966
00967 buffer[i-binOffset] = pool()->getTime(i);
00968 }
00969 }
00970
00971 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
00972
00973 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
00974
00975 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
00976 CkCallback cb(CkReductionTarget(TraceSummaryBOC, maxBinSize), sumProxy[0]);
00977 contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
00978 }
00979
00980
00981 void TraceSummaryBOC::maxBinSize(double _maxBinSize)
00982 {
00983 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
00984 sumProxy.shrink(_maxBinSize);
00985 }
00986
00987 void TraceSummaryBOC::shrink(double _mBin){
00988 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
00989 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
00990 _maxBinSize = _mBin;
00991 if(CkpvAccess(binSize) < _maxBinSize)
00992 {
00993 CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
00994 }
00995 double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();
00996 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
00997 CkCallback cb(CkReductionTarget(TraceSummaryBOC, sumData), sumProxy[0]);
00998 contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
00999 }
01000
01001 void TraceSummaryBOC::sumData(double *sumData, int totalsize) {
01002 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
01003 UInt numBins = totalsize/epNums;
01004 int numEntries = epNums - NUM_DUMMY_EPS - 1;
01005 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
01006 sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
01007 FILE *sumfp = fopen(fname, "w+");
01008 delete [] fname;
01009 fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
01010 CkpvAccess(version), CkNumPes(),
01011 numBins, numEntries, _maxBinSize);
01012 for(int i=0; i<numBins; i++){
01013 for(int j=0; j<numEntries; j++)
01014 {
01015 fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
01016 }
01017 }
01018 fclose(sumfp);
01019
01020 CkContinueExit();
01021 }
01022
01024
01025 void TraceSummaryBOC::initCCS() {
01026 if(firstTime){
01027 CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
01028
01029 lastRequestedIndexBlock = 0;
01030 indicesPerBlock = 1000;
01031 collectionGranularity = 0.001;
01032 nBufferedBins = 0;
01033
01034
01035
01036 if (CkMyPe() == 0) {
01037 ccsBufferedData = new CkVec<double>();
01038
01039 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01040 CkPrintf("Trace Summary now listening in for CCS Client\n");
01041 CcsRegisterHandler("CkPerfSummaryCcsClientCB",
01042 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
01043 CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar",
01044 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0]));
01045
01046 CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
01047 CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
01048 (void *)this);
01049 summaryCcsStreaming = true;
01050 }
01051 firstTime = false;
01052 }
01053 }
01054
01060 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
01061 double *sendBuffer;
01062
01063 CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
01064
01065 CkPrintf("Responding ...\n");
01066 int datalength = 0;
01067
01068
01069 if (ccsBufferedData->length() == 0) {
01070 sendBuffer = new double[1];
01071 sendBuffer[0] = -13.37;
01072 datalength = sizeof(double);
01073 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
01074 delete [] sendBuffer;
01075 } else {
01076 sendBuffer = ccsBufferedData->getVec();
01077 datalength = ccsBufferedData->length()*sizeof(double);
01078 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
01079 ccsBufferedData->free();
01080 }
01081 CkPrintf("Response Sent. Proceeding with computation.\n");
01082 delete m;
01083 }
01084
01085
01091 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
01092 unsigned char *sendBuffer;
01093
01094 CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
01095
01096 CkPrintf("Responding ...\n");
01097 int datalength = 0;
01098
01099 if (ccsBufferedData->length() == 0) {
01100 sendBuffer = new unsigned char[1];
01101 sendBuffer[0] = 255;
01102 datalength = sizeof(unsigned char);
01103 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
01104 delete [] sendBuffer;
01105 } else {
01106 double * doubleData = ccsBufferedData->getVec();
01107 int numData = ccsBufferedData->length();
01108
01109
01110 sendBuffer = new unsigned char[numData];
01111
01112 for(int i=0;i<numData;i++){
01113 sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0;
01114 }
01115
01116 datalength = sizeof(unsigned char) * numData;
01117
01118 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
01119 ccsBufferedData->free();
01120 delete [] sendBuffer;
01121 }
01122 CkPrintf("Response Sent. Proceeding with computation.\n");
01123 delete m;
01124 }
01125
01126
01127
01128 void startCollectData(void *data, double currT) {
01129 CkAssert(CkMyPe() == 0);
01130
01131 TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
01132 int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
01133 double collectionGranularity = sumObj->collectionGranularity;
01134 int indicesPerBlock = sumObj->indicesPerBlock;
01135
01136 double startTime = lastRequestedIndexBlock*
01137 collectionGranularity * indicesPerBlock;
01138 int numIndicesToGet = (int)floor((currT - startTime)/
01139 collectionGranularity);
01140 int numBlocksToGet = numIndicesToGet/indicesPerBlock;
01141
01142
01143
01144 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01145
01146 sumProxy.collectSummaryData(startTime,
01147 collectionGranularity,
01148 numBlocksToGet*indicesPerBlock);
01149
01150 sumObj->lastRequestedIndexBlock += numBlocksToGet;
01151 }
01152
01153 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
01154 int numBins) {
01155
01156
01157 double *contribution = new double[numBins];
01158 for (int i=0; i<numBins; i++) {
01159 contribution[i] = 0.0;
01160 }
01161 CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
01162
01163
01164
01165
01166
01167
01168
01169 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01170 CkCallback cb(CkReductionTarget(TraceSummaryBOC, summaryDataCollected), sumProxy[0]);
01171 contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double,
01172 cb);
01173 delete [] contribution;
01174 }
01175
01176 void TraceSummaryBOC::summaryDataCollected(double *recvData, int numBins) {
01177 CkAssert(CkMyPe() == 0);
01178
01179
01180
01181
01182
01183 for (int i=0; i<numBins; i++) {
01184 ccsBufferedData->insertAtEnd(recvData[i]);
01185 }
01186 }
01187
01188
01189
01190
01191 void TraceSummaryBOC::startSumOnly()
01192 {
01193 CmiAssert(CkMyPe() == 0);
01194
01195 CProxy_TraceSummaryBOC p(traceSummaryGID);
01196 int size = CkpvAccess(_trace)->pool()->getNumEntries();
01197 p.askSummary(size);
01198 }
01199
01200 void TraceSummaryBOC::askSummary(int size)
01201 {
01202 if (CkpvAccess(_trace) == NULL) return;
01203
01204 int traced = CkpvAccess(_trace)->traceOnPE();
01205
01206 BinEntry *reductionBuffer = new BinEntry[size+1];
01207 reductionBuffer[size].time() = traced;
01208 reductionBuffer[size].getIdleTime() = 0;
01209 if (traced) {
01210 CkpvAccess(_trace)->endComputation();
01211 int n = CkpvAccess(_trace)->pool()->getNumEntries();
01212 BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
01213 if (n>size) n=size;
01214 for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
01215 }
01216
01217 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01218 CkCallback cb(CkReductionTarget(TraceSummaryBOC, sendSummaryBOC), 0, sumProxy);
01219 contribute(sizeof(BinEntry)*(size+1), reductionBuffer,
01220 CkReduction::sum_double, cb);
01221 delete [] reductionBuffer;
01222 }
01223
01224 void TraceSummaryBOC::sendSummaryBOC(double *results, int n)
01225 {
01226 if (CkpvAccess(_trace) == NULL) return;
01227
01228 CkAssert(CkMyPe() == 0);
01229
01230 nBins = n-1;
01231 bins = (BinEntry *)results;
01232 nTracedPEs = (int)bins[n-1].time();
01233
01234
01235 write();
01236
01237 CkContinueExit();
01238 }
01239
01240 void TraceSummaryBOC::write(void)
01241 {
01242 unsigned int j;
01243
01244 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
01245 sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
01246 FILE *sumfp = fopen(fname, "w+");
01247
01248 if(sumfp == 0)
01249 CmiAbort("Cannot open summary sts file for writing.\n");
01250 delete[] fname;
01251
01252 int _numEntries=_entryTable.size();
01253 fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
01254 fprintf(sumfp, "\n");
01255
01256
01257 #if 0
01258 int last=pool[0].getU();
01259 writeU(fp, last);
01260 int count=1;
01261 for(j=1; j<numEntries; j++) {
01262 int u = pool[j].getU();
01263 if (last == u) {
01264 count++;
01265 }
01266 else {
01267 if (count > 1) fprintf(fp, "+%d", count);
01268 writeU(fp, u);
01269 last = u;
01270 count = 1;
01271 }
01272 }
01273 if (count > 1) fprintf(fp, "+%d", count);
01274 #else
01275 for(j=0; j<nBins; j++) {
01276 bins[j].time() /= nTracedPEs;
01277 bins[j].write(sumfp);
01278 }
01279 #endif
01280 fprintf(sumfp, "\n");
01281 fclose(sumfp);
01282
01283 }
01284
01285 static void CombineSummary()
01286 {
01287 #if CMK_TRACE_ENABLED
01288 CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
01289
01290 if ((sumonly || sumDetail) && traceSummaryGID.isZero()) {
01291 CkContinueExit();
01292 return;
01293 }
01294
01295 if (sumonly) {
01296 CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
01297
01298 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01299 sumProxy[0].startSumOnly();
01300 }else if(sumDetail)
01301 {
01302 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
01303 sumProxy.traceSummaryParallelShutdown(-1);
01304 }
01305 else {
01306 CkContinueExit();
01307 }
01308 #else
01309 CkContinueExit();
01310 #endif
01311 }
01312
01313 void initTraceSummaryBOC()
01314 {
01315 #ifdef __BIGSIM__
01316 if(BgNodeRank()==0) {
01317 #else
01318 if (CkMyRank() == 0) {
01319 #endif
01320 registerExitFn(CombineSummary);
01321 }
01322 }
01323
01324 #include "TraceSummary.def.h"
01325