00001
00005
00006 #include <string.h>
00007
00008 #include "charm++.h"
00009 #include "trace-projections.h"
00010 #include "trace-projectionsBOC.h"
00011 #include "TopoManager.h"
00012
00013 #if DEBUG_PROJ
00014 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
00015 #else
00016 #define DEBUGF(...)
00017 #endif
00018 #define DEBUGN(...) // easy way to selectively disable DEBUGs
00019
00020 #define DefaultLogBufSize 1000000
00021 #define DEBUG_KMEANS 0
00022
00023
00024
00025
00026 bool checknested=false;
00027
00028 #ifdef PROJ_ANALYSIS
00029
00030 CkGroupID traceProjectionsGID;
00031 CkGroupID kMeansGID;
00032
00033
00034
00035 CkReductionMsg *outlierReduction(int nMsgs,
00036 CkReductionMsg **msgs);
00037 CkReductionMsg *minMaxReduction(int nMsgs,
00038 CkReductionMsg **msgs);
00039 CkReduction::reducerType outlierReductionType;
00040 CkReduction::reducerType minMaxReductionType;
00041 #endif // PROJ_ANALYSIS
00042
00043 CkpvStaticDeclare(TraceProjections*, _trace);
00044 CtvExtern(int,curThreadEvent);
00045
00046 CkpvDeclare(CmiInt8, CtrLogBufSize);
00047
00048 typedef CkVec<char *> usrEventVec;
00049 CkpvStaticDeclare(usrEventVec, usrEventlist);
00050 class UsrEvent {
00051 public:
00052 int e;
00053 char *str;
00054 UsrEvent(int _e, char* _s): e(_e),str(_s) {}
00055 };
00056 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
00057
00058
00059 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrStats);
00060
00061
00062 #if CMK_TRACE_ENABLED
00064 void disableTraceLogOutput()
00065 {
00066 CkpvAccess(_trace)->setWriteData(false);
00067 }
00068
00070 void enableTraceLogOutput()
00071 {
00072 CkpvAccess(_trace)->setWriteData(true);
00073 }
00074 #endif
00075
00076 #if ! CMK_TRACE_ENABLED
00077 static int warned=0;
00078 #define OPTIMIZED_VERSION \
00079 if (!warned) { warned=1; \
00080 CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
00081 #else
00082 #define OPTIMIZED_VERSION
00083 #endif // CMK_TRACE_ENABLED
00084
00085
00086
00087
00088 #if CMK_TRACE_LOGFILE_NUM_CONTROL
00089 #define OPEN_LOG openLog("a");
00090 #define CLOSE_LOG closeLog();
00091 #else
00092 #define OPEN_LOG
00093 #define CLOSE_LOG
00094 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
00095
00100 void _createTraceprojections(char **argv)
00101 {
00102 DEBUGF("%d createTraceProjections\n", CkMyPe());
00103 CkpvInitialize(CkVec<char *>, usrEventlist);
00104 CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
00105 CkpvInitialize(CkVec<UsrEvent *>*, usrStats);
00106 CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
00107 CkpvAccess(usrStats) = new CkVec<UsrEvent *>();
00108 #if CMK_BIGSIM_CHARM
00109
00110
00111 #endif //CMK_BIGSIM_CHARM
00112 CkpvInitialize(TraceProjections*, _trace);
00113 CkpvAccess(_trace) = new TraceProjections(argv);
00114 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
00115 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
00116 }
00117
00118
00119
00120 struct TraceThreadListener {
00121 struct CthThreadListener base;
00122 int event;
00123 int msgType;
00124 int ep;
00125 int srcPe;
00126 int ml;
00127 CmiObjId idx;
00128 };
00129
00130 static void traceThreadListener_suspend(struct CthThreadListener *l)
00131 {
00132 TraceThreadListener *a=(TraceThreadListener *)l;
00133
00134
00135 traceSuspend();
00136 }
00137
00138 static void traceThreadListener_resume(struct CthThreadListener *l)
00139 {
00140 TraceThreadListener *a=(TraceThreadListener *)l;
00141
00142
00143 _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
00144 CthGetThreadID(a->base.thread), NULL);
00145 a->event=-1;
00146 a->srcPe=CkMyPe();
00147 a->ml=0;
00148 }
00149
00150 static void traceThreadListener_free(struct CthThreadListener *l)
00151 {
00152 TraceThreadListener *a=(TraceThreadListener *)l;
00153 delete a;
00154 }
00155
00156 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
00157 {
00158 #if CMK_TRACE_ENABLED
00159
00160 TraceThreadListener *a= new TraceThreadListener;
00161
00162 a->base.suspend=traceThreadListener_suspend;
00163 a->base.resume=traceThreadListener_resume;
00164 a->base.free=traceThreadListener_free;
00165 a->event=e->getEvent();
00166 a->msgType=e->getMsgtype();
00167 a->ep=e->getEpIdx();
00168 a->srcPe=e->getSrcPe();
00169 a->ml=e->getTotalsize();
00170
00171 CthAddListener(tid, (CthThreadListener *)a);
00172 #endif
00173 }
00174
00175 void LogPool::openLog(const char *mode)
00176 {
00177 #if CMK_USE_ZLIB
00178 if(compressed) {
00179 do {
00180 zfp = gzopen(fname, mode);
00181 } while (!zfp && (errno == EINTR || errno == EMFILE));
00182 if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
00183 } else {
00184 do {
00185 fp = fopen(fname, mode);
00186 } while (!fp && (errno == EINTR || errno == EMFILE));
00187 if (!fp) {
00188 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
00189 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
00190 }
00191 }
00192 #else
00193 do {
00194 fp = fopen(fname, mode);
00195 } while (!fp && (errno == EINTR || errno == EMFILE));
00196 if (!fp) {
00197 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
00198 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
00199 }
00200 #endif
00201 }
00202
00203 void LogPool::closeLog(void)
00204 {
00205 #if CMK_USE_ZLIB
00206 if(compressed) {
00207 gzclose(zfp);
00208 return;
00209 }
00210 #endif
00211 #if !defined(_WIN32)
00212 fsync(fileno(fp));
00213 #endif
00214 fclose(fp);
00215 }
00216
00217 LogPool::LogPool(char *pgm) {
00218 pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
00219
00220 writeSummaryFiles = false;
00221 writeData = true;
00222 numEntries = 0;
00223 lastCreationEvent = -1;
00224
00225 prevTime = 0.0;
00226 timeErr = 0.0;
00227 globalStartTime = 0.0;
00228 globalEndTime = 0.0;
00229 headerWritten = false;
00230 numPhases = 0;
00231 hasFlushed = false;
00232
00233 keepPhase = NULL;
00234
00235 fileCreated = false;
00236 poolSize = CkpvAccess(CtrLogBufSize);
00237 pgmname = new char[strlen(pgm)+1];
00238 strcpy(pgmname, pgm);
00239
00240
00241 statisLastProcessTimer = 0;
00242 statisLastIdleTimer = 0;
00243 statisLastPackTimer = 0;
00244 statisLastUnpackTimer = 0;
00245 statisTotalExecutionTime = 0;
00246 statisTotalIdleTime = 0;
00247 statisTotalPackTime = 0;
00248 statisTotalUnpackTime = 0;
00249 statisTotalCreationMsgs = 0;
00250 statisTotalCreationBytes = 0;
00251 statisTotalMCastMsgs = 0;
00252 statisTotalMCastBytes = 0;
00253 statisTotalEnqueueMsgs = 0;
00254 statisTotalDequeueMsgs = 0;
00255 statisTotalRecvMsgs = 0;
00256 statisTotalRecvBytes = 0;
00257 statisTotalMemAlloc = 0;
00258 statisTotalMemFree = 0;
00259
00260 }
00261
00262 void LogPool::createFile(const char *fix)
00263 {
00264 if (fileCreated) {
00265 return;
00266 }
00267
00268 if(CmiNumPartitions() > 1) {
00269 CmiMkdir(CkpvAccess(partitionRoot));
00270 }
00271
00272 char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1;
00273 char *pathPlusFilePrefix = new char[1024];
00274
00275 if(nSubdirs > 0){
00276 int sd = CkMyPe() % nSubdirs;
00277 char *subdir = new char[1024];
00278 sprintf(subdir, "%s.projdir.%d", pgmname, sd);
00279 CmiMkdir(subdir);
00280 sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
00281 delete[] subdir;
00282 } else {
00283 sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
00284 }
00285
00286 char pestr[10];
00287 sprintf(pestr, "%d", CkMyPe());
00288 #if CMK_USE_ZLIB
00289 int len;
00290 if(compressed)
00291 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
00292 else
00293 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
00294 #else
00295 int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
00296 #endif
00297
00298 fname = new char[len];
00299 #if CMK_USE_ZLIB
00300 if(compressed) {
00301 sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
00302 }
00303 else {
00304 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
00305 }
00306 #else
00307 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
00308 #endif
00309 fileCreated = true;
00310 delete[] pathPlusFilePrefix;
00311 openLog("w");
00312 CLOSE_LOG
00313 }
00314
00315 void LogPool::createSts(const char *fix)
00316 {
00317 CkAssert(CkMyPe() == 0);
00318 if(CmiNumPartitions() > 1) {
00319 CmiMkdir(CkpvAccess(partitionRoot));
00320 }
00321
00322
00323 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
00324 sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
00325 do
00326 {
00327 stsfp = fopen(fname, "w");
00328 } while (!stsfp && (errno == EINTR || errno == EMFILE));
00329 if(stsfp==0){
00330 CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
00331 CmiAbort("Error!!\n");
00332 }
00333 delete[] fname;
00334 }
00335
00336 void LogPool::createRC()
00337 {
00338
00339 fname =
00340 new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
00341 sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
00342 do {
00343 rcfp = fopen(fname, "w");
00344 } while (!rcfp && (errno == EINTR || errno == EMFILE));
00345 if (rcfp==0) {
00346 CmiAbort("Cannot open projections configuration file for writing.\n");
00347 }
00348 delete[] fname;
00349 }
00350
00351 LogPool::~LogPool()
00352 {
00353 if (writeData) {
00354 if(writeSummaryFiles)
00355 writeStatis();
00356 writeLog();
00357 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
00358 closeLog();
00359 #endif
00360 }
00361
00362 #if CMK_BIGSIM_CHARM
00363 extern int correctTimeLog;
00364 if (correctTimeLog) {
00365 createFile("-bg");
00366 if (CkMyPe() == 0) {
00367 createSts("-bg");
00368 }
00369 writeHeader();
00370 if (CkMyPe() == 0) writeSts(NULL);
00371 postProcessLog();
00372 }
00373 #endif
00374
00375 delete[] pool;
00376 delete [] fname;
00377 }
00378
00379 void LogPool::writeHeader()
00380 {
00381 if (headerWritten) return;
00382 headerWritten = true;
00383 if(!binary) {
00384 #if CMK_USE_ZLIB
00385 if(compressed) {
00386 gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
00387 }
00388 else
00389 #endif
00390 {
00391 fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
00392 }
00393 }
00394 else {
00395 fwrite(&numEntries,sizeof(numEntries),1,fp);
00396 }
00397 }
00398
00399 void LogPool::writeLog(void)
00400 {
00401 createFile();
00402 OPEN_LOG
00403 writeHeader();
00404 write(0);
00405 CLOSE_LOG
00406 }
00407
00408 void LogPool::write(int writedelta)
00409 {
00410
00411
00412
00413
00414 PUP::er *p = NULL;
00415 if (binary) {
00416 p = new PUP::toDisk(writedelta?deltafp:fp);
00417 }
00418 #if CMK_USE_ZLIB
00419 else if (compressed) {
00420 p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
00421 }
00422 #endif
00423 else {
00424 p = new toProjectionsFile(writedelta?deltafp:fp);
00425 }
00426 CmiAssert(p);
00427 int curPhase = 0;
00428
00429
00430
00431 for(UInt i=0; i<numEntries; i++) {
00432 if (!writedelta) {
00433 if (keepPhase == NULL) {
00434
00435 pool[i].pup(*p);
00436 } else {
00437
00438
00439 if (pool[i].type == END_PHASE) {
00440
00441 pool[i].pup(*p);
00442 curPhase++;
00443 } else if (pool[i].type == BEGIN_COMPUTATION ||
00444 pool[i].type == END_COMPUTATION) {
00445
00446 pool[i].pup(*p);
00447 } else if (keepPhase[curPhase]) {
00448 pool[i].pup(*p);
00449 }
00450 }
00451 }
00452 else {
00453
00454
00455 double time = pool[i].time;
00456 if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
00457 {
00458 double timeDiff = (time-prevTime)*1.0e6;
00459 UInt intTimeDiff = (UInt)timeDiff;
00460 timeErr += timeDiff - intTimeDiff;
00461 if (timeErr > 1.0) {
00462 timeErr -= 1.0;
00463 intTimeDiff++;
00464 }
00465 pool[i].time = intTimeDiff/1.0e6;
00466 }
00467 pool[i].pup(*p);
00468 pool[i].time = time;
00469 prevTime = time;
00470 }
00471 }
00472 delete p;
00473 delete [] keepPhase;
00474 }
00475
00476 void LogPool::writeSts(void)
00477 {
00478
00479 int i;
00480
00481 fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
00482 fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
00483 fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
00484 #if CMK_HAS_COUNTER_PAPI
00485 fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
00486
00487
00488 char eventName[PAPI_MAX_STR_LEN];
00489 for (i=0;i<NUMPAPIEVENTS;i++) {
00490 PAPI_event_code_to_name(papiEvents[i], eventName);
00491 fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
00492 }
00493 #endif
00494
00495 traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
00496 fprintf(stsfp, "TOTAL_STATS %d\n", (int) CkpvAccess(usrStats)->length());
00497 for(i=0;i<CkpvAccess(usrEvents)->length();i++){
00498 fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
00499 }
00500
00501 for(i=0;i<CkpvAccess(usrStats)->length();i++){
00502 fprintf(stsfp, "STAT %d %s\n", (*CkpvAccess(usrStats))[i]->e, (*CkpvAccess(usrStats))[i]->str);
00503 }
00504 }
00505
00506 void LogPool::writeSts(TraceProjections *traceProj){
00507 writeSts();
00508 fprintf(stsfp, "END\n");
00509 fclose(stsfp);
00510 }
00511
00512 void LogPool::writeRC(void)
00513 {
00514
00515 #ifdef PROJ_ANALYSIS
00516 CkAssert(CkMyPe() == 0);
00517 fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
00518 (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
00519 fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
00520 (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
00521
00522
00523
00524
00525
00526
00527
00528 #endif //PROJ_ANALYSIS
00529 fclose(rcfp);
00530 }
00531
00532 void LogPool::writeStatis(void)
00533 {
00534
00535
00536 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
00537 sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
00538 do
00539 {
00540 statisfp = fopen(fname, "w");
00541 } while (!statisfp && (errno == EINTR || errno == EMFILE));
00542 if(statisfp==0){
00543 CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
00544 CmiAbort("Error!!\n");
00545 }
00546 delete[] fname;
00547 double totaltime = endComputationTime - beginComputationTime;
00548 fprintf(statisfp, "time(sec) percentage\n");
00549 fprintf(statisfp, "Time: \t%f\n", totaltime);
00550 fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100);
00551 fprintf(statisfp, "Overhead: \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100);
00552 fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100);
00553 fprintf(statisfp, "Pack: \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100);
00554 fprintf(statisfp, "Unpack: \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100);
00555 fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs);
00556 fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs);
00557 fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs);
00558 fclose(statisfp);
00559 }
00560
00561 #if CMK_BIGSIM_CHARM
00562 static void updateProjLog(void *data, double t, double recvT, void *ptr)
00563 {
00564 LogEntry *log = (LogEntry *)data;
00565 FILE *fp = *(FILE **)ptr;
00566 log->time = t;
00567 log->recvTime = recvT<0.0?0:recvT;
00568
00569 toProjectionsFile p(fp);
00570 log->pup(p);
00571 }
00572 #endif
00573
00574
00575 void LogPool::flushLogBuffer()
00576 {
00577 if (numEntries) {
00578 double writeTime = TraceTimer();
00579 writeLog();
00580 hasFlushed = true;
00581 numEntries = 0;
00582 lastCreationEvent = -1;
00583 new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
00584 new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
00585
00586 if (!traceProjectionsGID.isZero()) {
00587 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
00588 bocProxy[0].flush_warning(CkMyPe());
00589 }
00590 }
00591 }
00592
00593 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
00594 double time, int event, int pe, int ml, CmiObjId *id,
00595 double recvT, double cpuT, int numPe, double statVal)
00596 {
00597 switch(type)
00598 {
00599 case BEGIN_COMPUTATION:
00600 beginComputationTime = time;
00601 break;
00602 case END_COMPUTATION:
00603 endComputationTime = time;
00604 break;
00605 case CREATION:
00606 statisTotalCreationMsgs++;
00607 statisTotalCreationBytes += ml;
00608 break;
00609 case CREATION_MULTICAST:
00610 statisTotalMCastMsgs++;
00611 statisTotalMCastBytes += ml;
00612 break;
00613 case ENQUEUE:
00614 statisTotalEnqueueMsgs++;
00615 break;
00616 case DEQUEUE:
00617 statisTotalDequeueMsgs++;
00618 break;
00619 case MESSAGE_RECV:
00620 statisTotalRecvMsgs++;
00621 statisTotalRecvBytes += ml;
00622 break;
00623 case MEMORY_MALLOC:
00624 statisTotalMemAlloc++;
00625 break;
00626 case MEMORY_FREE:
00627 statisTotalMemFree++;
00628 break;
00629 case BEGIN_PROCESSING:
00630 statisLastProcessTimer = time;
00631 break;
00632 case BEGIN_UNPACK:
00633 statisLastUnpackTimer = time;
00634 break;
00635 case BEGIN_PACK:
00636 statisLastPackTimer = time;
00637 break;
00638 case BEGIN_IDLE:
00639 statisLastIdleTimer = time;
00640 break;
00641 case END_PROCESSING:
00642 statisTotalExecutionTime += (time - statisLastProcessTimer);
00643 break;
00644 case END_UNPACK:
00645 statisTotalUnpackTime += (time - statisLastUnpackTimer);
00646 break;
00647 case END_PACK:
00648 statisTotalPackTime += (time - statisLastPackTimer);
00649 break;
00650 case END_IDLE:
00651 statisTotalIdleTime += (time - statisLastIdleTimer);
00652 break;
00653 default:
00654 break;
00655 }
00656
00657 if (type == CREATION ||
00658 type == CREATION_MULTICAST ||
00659 type == CREATION_BCAST) {
00660 lastCreationEvent = numEntries;
00661 }
00662 new (&pool[numEntries++])
00663 LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe, statVal);
00664 if ((type == END_PHASE) || (type == END_COMPUTATION)) {
00665 numPhases++;
00666 }
00667 if(poolSize==numEntries) {
00668 flushLogBuffer();
00669 #if CMK_BIGSIM_CHARM
00670 extern int correctTimeLog;
00671 if (correctTimeLog) CmiAbort("I/O interrupt!\n");
00672 #endif
00673 }
00674 #if CMK_BIGSIM_CHARM
00675 switch (type) {
00676 case BEGIN_PROCESSING:
00677 pool[numEntries-1].recvTime = BgGetRecvTime();
00678 case END_PROCESSING:
00679 case BEGIN_COMPUTATION:
00680 case END_COMPUTATION:
00681 case CREATION:
00682 case BEGIN_PACK:
00683 case END_PACK:
00684 case BEGIN_UNPACK:
00685 case END_UNPACK:
00686 case USER_EVENT_PAIR:
00687 bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
00688 }
00689 #endif
00690 }
00691
00692 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
00693 #ifndef CMK_BIGSIM_CHARM
00694 if (type == CREATION ||
00695 type == CREATION_MULTICAST ||
00696 type == CREATION_BCAST) {
00697 lastCreationEvent = numEntries;
00698 }
00699 new (&pool[numEntries++])
00700 LogEntry(time,type,funcID,lineNum,fileName);
00701 if(poolSize == numEntries){
00702 flushLogBuffer();
00703 }
00704 #endif
00705 }
00706
00707 void LogPool::addUserBracketEventNestedID(unsigned char type, double time,
00708 UShort mIdx, int event, int nestedID) {
00709 new (&pool[numEntries++])
00710 LogEntry(time, type, mIdx, 0, event, CkMyPe(), 0, 0, 0, 0, 0, 0, nestedID);
00711 if(poolSize == numEntries){
00712 flushLogBuffer();
00713 }
00714 }
00715
00716
00717 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
00718 #ifndef CMK_BIGSIM_CHARM
00719 if (type == CREATION ||
00720 type == CREATION_MULTICAST ||
00721 type == CREATION_BCAST) {
00722 lastCreationEvent = numEntries;
00723 }
00724 new (&pool[numEntries++])
00725 LogEntry(type,time,memUsage);
00726 if(poolSize == numEntries){
00727 flushLogBuffer();
00728 }
00729 #endif
00730
00731 }
00732
00733 void LogPool::addUserSupplied(int data){
00734
00735 add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
00736
00737
00738 pool[numEntries-1].setUserSuppliedData(data);
00739 }
00740
00741
00742 void LogPool::addUserSuppliedNote(char *note){
00743
00744 add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
00745
00746
00747 pool[numEntries-1].setUserSuppliedNote(note);
00748 }
00749
00750 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
00751
00752 #ifndef CMK_BIGSIM_CHARM
00753 #if MPI_TRACE_MACHINE_HACK
00754
00755
00756
00757 #define MPI_TEST_EVENT_ID 60
00758 #define MPI_IPROBE_EVENT_ID 70
00759 int lastEvent = pool[numEntries-1].event;
00760 if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
00761
00762
00763 pool[numEntries].endTime = et;
00764 }else{
00765 new (&pool[numEntries++])
00766 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
00767 }
00768 #else
00769 new (&pool[numEntries++])
00770 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
00771 #endif
00772 if(poolSize == numEntries){
00773 flushLogBuffer();
00774 }
00775 #endif
00776 }
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
00788 int event, int pe, int ml, CmiObjId *id,
00789 double recvT, int numPe, const int *pelist)
00790 {
00791 lastCreationEvent = numEntries;
00792 new (&pool[numEntries++])
00793 LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
00794 if(poolSize==numEntries) {
00795 flushLogBuffer();
00796 }
00797 }
00798
00799 void LogPool::postProcessLog()
00800 {
00801 #if CMK_BIGSIM_CHARM
00802 bgUpdateProj(1);
00803 #endif
00804 }
00805
00806 void LogPool::modLastEntryTimestamp(double ts)
00807 {
00808 pool[numEntries-1].time = ts;
00809
00810 }
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843 void LogEntry::pup(PUP::er &p)
00844 {
00845 int i;
00846 CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
00847 char ret = '\n';
00848
00849 p|type;
00850 if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
00851 if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
00852
00853 switch (type) {
00854 case USER_EVENT:
00855 case USER_EVENT_PAIR:
00856 case BEGIN_USER_EVENT_PAIR:
00857 case END_USER_EVENT_PAIR:
00858 p|mIdx; p|itime; p|event; p|pe; p|nestedID;
00859 break;
00860 case BEGIN_IDLE:
00861 case END_IDLE:
00862 case BEGIN_PACK:
00863 case END_PACK:
00864 case BEGIN_UNPACK:
00865 case END_UNPACK:
00866 p|itime; p|pe;
00867 break;
00868 case BEGIN_PROCESSING:
00869 if (p.isPacking()) {
00870 irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
00871 icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
00872 }
00873 p|mIdx; p|eIdx; p|itime; p|event; p|pe;
00874 p|msglen; p|irecvtime;
00875 {
00876 const int ndims = _chareTable[_entryTable[eIdx]->chareIdx]->ndims;
00877
00878 if (ndims >= 1) {
00879 if (ndims >= 4) {
00880 short int* idShorts = (short int*)&(id.id);
00881 for (int i = 0; i < ndims; i++)
00882 p | idShorts[i];
00883 }
00884 else {
00885 for (int i = 0; i < ndims; i++)
00886 p | id.id[i];
00887 }
00888 }
00889 else {
00890 p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
00891 }
00892 }
00893 p|icputime;
00894 #if CMK_HAS_COUNTER_PAPI
00895
00896 for (i=0; i<NUMPAPIEVENTS; i++) {
00897
00898
00899 p|papiValues[i];
00900
00901 }
00902 #else
00903
00904 #endif
00905 if (p.isUnpacking()) {
00906 recvTime = irecvtime/1.0e6;
00907 cputime = icputime/1.0e6;
00908 }
00909 break;
00910 case END_PROCESSING:
00911 if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
00912 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
00913 #if CMK_HAS_COUNTER_PAPI
00914
00915 for (i=0; i<NUMPAPIEVENTS; i++) {
00916
00917
00918 p|papiValues[i];
00919 }
00920 #else
00921
00922 #endif
00923 if (p.isUnpacking()) cputime = icputime/1.0e6;
00924 break;
00925 case USER_SUPPLIED:
00926 p|userSuppliedData;
00927 p|itime;
00928 break;
00929 case USER_SUPPLIED_NOTE:
00930 p|itime;
00931 int length;
00932 length=0;
00933 if (p.isPacking()) length = strlen(userSuppliedNote);
00934 p | length;
00935 char space;
00936 space = ' ';
00937 p | space;
00938 if (p.isUnpacking()) {
00939 userSuppliedNote = new char[length+1];
00940 userSuppliedNote[length] = '\0';
00941 }
00942 PUParray(p,userSuppliedNote, length);
00943 break;
00944 case USER_SUPPLIED_BRACKETED_NOTE:
00945
00946 p|itime;
00947 p|iEndTime;
00948 p|event;
00949 int length2;
00950 length2=0;
00951 if (p.isPacking()) length2 = strlen(userSuppliedNote);
00952 p | length2;
00953 char space2;
00954 space2 = ' ';
00955 p | space2;
00956 if (p.isUnpacking()) {
00957 userSuppliedNote = new char[length2+1];
00958 userSuppliedNote[length2] = '\0';
00959 }
00960 PUParray(p,userSuppliedNote, length2);
00961 break;
00962 case MEMORY_USAGE_CURRENT:
00963 p | memUsage;
00964 p | itime;
00965 break;
00966 case USER_STAT:
00967 p | itime;
00968 p | cputime;
00969 p | stat;
00970 p | pe;
00971 p | mIdx;
00972 break;
00973 case CREATION:
00974 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00975 p|mIdx; p|eIdx; p|itime;
00976 p|event; p|pe; p|msglen; p|irecvtime;
00977 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00978 break;
00979 case CREATION_BCAST:
00980 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00981 p|mIdx; p|eIdx; p|itime;
00982 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
00983 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00984 break;
00985 case CREATION_MULTICAST:
00986 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00987 p|mIdx; p|eIdx; p|itime;
00988 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
00989 if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
00990 for (i=0; i<numpes; i++) p|pes[i];
00991 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00992 break;
00993 case MESSAGE_RECV:
00994 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
00995 break;
00996
00997 case ENQUEUE:
00998 case DEQUEUE:
00999 p|mIdx; p|itime; p|event; p|pe;
01000 break;
01001
01002 case BEGIN_INTERRUPT:
01003 case END_INTERRUPT:
01004 p|itime; p|event; p|pe;
01005 break;
01006
01007
01008
01009
01010 case BEGIN_COMPUTATION:
01011 case END_COMPUTATION:
01012 case BEGIN_TRACE:
01013 case END_TRACE:
01014 p|itime;
01015 break;
01016 case END_PHASE:
01017 p|eIdx;
01018 p|itime;
01019 break;
01020 default:
01021 CmiError("***Internal Error*** Wierd Event %d.\n", type);
01022 break;
01023 }
01024 if (p.isUnpacking()) time = itime/1.0e6;
01025 p|ret;
01026 }
01027
01028 TraceProjections::TraceProjections(char **argv):
01029 _logPool(NULL), curevent(0), inEntry(false), computationStarted(false),
01030 traceNestedEvents(false), converseExit(false),
01031 currentPhaseID(0), lastPhaseEvent(NULL), endTime(0.0)
01032 {
01033
01034 if (CkpvAccess(traceOnPe) == 0) return;
01035
01036 CkpvInitialize(CmiInt8, CtrLogBufSize);
01037 CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
01038 if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize),
01039 "Log entries to buffer per I/O")) {
01040 if (CkMyPe() == 0) {
01041 CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
01042 }
01043 }
01044 checknested =
01045 CmiGetArgFlagDesc(argv,"+checknested",
01046 "check projections nest begin end execute events");
01047 traceNestedEvents =
01048 CmiGetArgFlagDesc(argv,"+tracenested",
01049 "trace projections nest begin/end execute events");
01050 int binary =
01051 CmiGetArgFlagDesc(argv,"+binary-trace",
01052 "Write log files in binary format");
01053
01054 int nSubdirs = 0;
01055 CmiGetArgIntDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
01056
01057
01058 #if CMK_USE_ZLIB
01059 int compressed = true;
01060 CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
01061 int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
01062 compressed = compressed && !disableCompressed;
01063 #else
01064
01065 CmiGetArgFlagDesc(argv,"+gz-trace",
01066 "Write log files pre-compressed with gzip");
01067 if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
01068 #endif
01069
01070 int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files ");
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080 _logPool = new LogPool(CkpvAccess(traceRoot));
01081 _logPool->setNumSubdirs(nSubdirs);
01082 _logPool->setBinary(binary);
01083 _logPool->setWriteSummaryFiles(writeSummaryFiles);
01084 #if CMK_USE_ZLIB
01085 _logPool->setCompressed(compressed);
01086 #endif
01087 if (CkMyPe() == 0) {
01088 _logPool->createSts();
01089 _logPool->createRC();
01090 }
01091 funcCount=1;
01092
01093 #if CMK_HAS_COUNTER_PAPI
01094 initPAPI();
01095 #endif
01096 }
01097
01098 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
01099 {
01100 OPTIMIZED_VERSION
01101 CkAssert(e==-1 || e>=0);
01102 CkAssert(evt != NULL);
01103 int event;
01104 int biggest = -1;
01105 for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
01106 int cur = (*CkpvAccess(usrEvents))[i]->e;
01107 if (cur == e) {
01108
01109 if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0)
01110 return e;
01111 else
01112 CmiAbort("UserEvent double registered!");
01113 }
01114 if (cur > biggest) biggest = cur;
01115 }
01116
01117
01118 if (e==-1) event = biggest+1;
01119 else event = e;
01120 CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
01121 return event;
01122 }
01123
01124
01125 int TraceProjections::traceRegisterUserStat(const char* evt, int e)
01126 {
01127 OPTIMIZED_VERSION
01128 CkAssert(e==-1 || e>=0);
01129 CkAssert(evt != NULL);
01130 int event;
01131 int biggest = -1;
01132 for (int i=0; i<CkpvAccess(usrStats)->length(); i++) {
01133 int cur = (*CkpvAccess(usrStats))[i]->e;
01134 if (cur == e) {
01135
01136 if (strcmp((*CkpvAccess(usrStats))[i]->str, evt) == 0)
01137 return e;
01138 else
01139 CmiAbort("UserStat double registered!");
01140 }
01141 if (cur > biggest) biggest = cur;
01142 }
01143
01144
01145 if (e==-1) event = biggest+1;
01146 else event = e;
01147 CkpvAccess(usrStats)->push_back(new UsrEvent(event,(char *)evt));
01148 return event;
01149 }
01150
01151 void TraceProjections::traceClearEps(void)
01152 {
01153
01154
01155
01156 }
01157
01158 void TraceProjections::traceWriteSts(void)
01159 {
01160 if(CkMyPe()==0)
01161 _logPool->writeSts(this);
01162 }
01163
01180 void TraceProjections::traceClose(void)
01181 {
01182 #ifdef PROJ_ANALYSIS
01183
01184
01185 if (_logPool == NULL) {
01186 return;
01187 }
01188 if(CkMyPe()==0){
01189 _logPool->writeSts(this);
01190 _logPool->writeRC();
01191 }
01192 CkpvAccess(_trace)->endComputation();
01193 delete _logPool;
01194 _logPool = NULL;
01195
01196 CkpvAccess(_traces)->removeTrace(this);
01197 #else
01198
01199
01200 if (_logPool == NULL) {
01201 return;
01202 }
01203 if(CkMyPe()==0){
01204 _logPool->writeSts(this);
01205 }
01206 CkpvAccess(_trace)->endComputation();
01207 delete _logPool;
01208 _logPool = NULL;
01209
01210 CkpvAccess(_traces)->removeTrace(this);
01211
01212 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01213 bocProxy.ckLocalBranch()->print_warning();
01214 #endif
01215 }
01216
01223 void TraceProjections::closeTrace() {
01224
01225 if (CkMyPe() == 0 && _logPool!= NULL) {
01226
01227 _logPool->writeSts(this);
01228 _logPool->writeRC();
01229
01230
01231 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01232 bocProxy.ckLocalBranch()->print_warning();
01233 }
01234 delete _logPool;
01235
01236 }
01237
01238 #if CMK_SMP_TRACE_COMMTHREAD
01239 void TraceProjections::traceBeginOnCommThread()
01240 {
01241 if (!computationStarted) return;
01242 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
01243 }
01244
01245 void TraceProjections::traceEndOnCommThread()
01246 {
01247 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
01248 }
01249 #endif
01250
01251 void TraceProjections::traceBegin(void)
01252 {
01253 if (!computationStarted) return;
01254 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
01255 }
01256
01257 void TraceProjections::traceEnd(void)
01258 {
01259 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
01260 }
01261
01262 void TraceProjections::userEvent(int e)
01263 {
01264 if (!computationStarted) return;
01265 _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
01266 }
01267
01268 void TraceProjections::userBracketEvent(int e, double bt, double et, int nestedID=0)
01269 {
01270 if (!computationStarted) return;
01271
01272 _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(bt),
01273 e, curevent, nestedID);
01274 _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(et),
01275 e, curevent++, nestedID);
01276 }
01277
01278 void TraceProjections::beginUserBracketEvent(int e, int nestedID=0)
01279 {
01280 if (!computationStarted) return;
01281 _logPool->addUserBracketEventNestedID(BEGIN_USER_EVENT_PAIR, TraceTimer(),
01282 e, curevent++, nestedID);
01283 }
01284
01285 void TraceProjections::endUserBracketEvent(int e, int nestedID=0)
01286 {
01287 if (!computationStarted) return;
01288 _logPool->addUserBracketEventNestedID(END_USER_EVENT_PAIR, TraceTimer(),
01289 e, curevent++, nestedID);
01290 }
01291
01292 void TraceProjections::userSuppliedData(int d)
01293 {
01294 if (!computationStarted) return;
01295 _logPool->addUserSupplied(d);
01296 }
01297
01298 void TraceProjections::userSuppliedNote(char *note)
01299 {
01300 if (!computationStarted) return;
01301 _logPool->addUserSuppliedNote(note);
01302 }
01303
01304
01305 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
01306 {
01307 if (!computationStarted) return;
01308 _logPool->addUserSuppliedBracketedNote(note, eventID, bt, et);
01309 }
01310
01311 void TraceProjections::memoryUsage(double m)
01312 {
01313 if (!computationStarted) return;
01314 _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
01315
01316 }
01317
01318 void TraceProjections::updateStatPair(int e, double stat, double time)
01319 {
01320 if (!computationStarted) return;
01321 _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, time, 0, stat);
01322 }
01323
01324
01325 void TraceProjections::updateStat(int e, double stat)
01326 {
01327 if (!computationStarted) return;
01328 _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, -1, 0, stat);
01329 }
01330
01331 void TraceProjections::creation(envelope *e, int ep, int num)
01332 {
01333 #if CMK_TRACE_ENABLED
01334 double curTime = TraceTimer();
01335 if (e == 0) {
01336 CtvAccess(curThreadEvent) = curevent;
01337 _logPool->add(CREATION, ForChareMsg, ep, curTime,
01338 curevent++, CkMyPe(), 0, NULL, 0, 0.0);
01339 } else {
01340 int type=e->getMsgtype();
01341 e->setEvent(curevent);
01342 CpvAccess(curPeEvent) = curevent;
01343 if (num > 1) {
01344 _logPool->add(CREATION_BCAST, type, ep, curTime,
01345 curevent++, CkMyPe(), e->getTotalsize(),
01346 NULL, 0, 0.0, num);
01347 } else {
01348 _logPool->add(CREATION, type, ep, curTime,
01349 curevent++, CkMyPe(), e->getTotalsize(),
01350 NULL, 0, 0.0);
01351 }
01352 }
01353 #endif
01354 }
01355
01356
01357 void TraceProjections::creation(char *msg)
01358 {
01359 #if CMK_SMP_TRACE_COMMTHREAD
01360
01361 envelope *e = (envelope *)msg;
01362 int ep = e->getEpIdx();
01363 if(_entryTable[ep]->traceEnabled) {
01364 creation(e, ep, 1);
01365 e->setSrcPe(CkMyPe());
01366 }
01367 #endif
01368 }
01369
01370 void TraceProjections::traceCommSetMsgID(char *msg)
01371 {
01372 #if CMK_SMP_TRACE_COMMTHREAD
01373
01374 envelope *e = (envelope *)msg;
01375 int ep = e->getEpIdx();
01376 if(_entryTable[ep]->traceEnabled) {
01377 e->setSrcPe(CkMyPe());
01378 e->setEvent(curevent);
01379 }
01380 #endif
01381 }
01382
01383 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
01384 {
01385 #if CMK_TRACE_ENABLED
01386
01387 *pe = *event = -1;
01388 envelope *e = (envelope *)msg;
01389 int ep = e->getEpIdx();
01390 if(_entryTable[ep]->traceEnabled) {
01391 *pe = e->getSrcPe();
01392 *event = e->getEvent();
01393 }
01394 #endif
01395 }
01396
01397 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
01398 {
01399 #if CMK_TRACE_ENABLED
01400
01401 envelope *e = (envelope *)msg;
01402 int ep = e->getEpIdx();
01403 if(ep<=0 || ep>=_entryTable.size()) return;
01404 if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
01405 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
01406 if(_entryTable[ep]->traceEnabled) {
01407 e->setSrcPe(pe);
01408 e->setEvent(event);
01409 }
01410 #endif
01411 }
01412
01413
01414
01415
01416
01417
01418 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
01419 const int *pelist)
01420 {
01421 #if CMK_TRACE_ENABLED
01422 double curTime = TraceTimer();
01423 if (e==0) {
01424 CtvAccess(curThreadEvent)=curevent;
01425 _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
01426 CkMyPe(), 0, 0, 0.0, num, pelist);
01427 } else {
01428 int type=e->getMsgtype();
01429 e->setEvent(curevent);
01430 _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
01431 e->getTotalsize(), 0, 0.0, num, pelist);
01432 }
01433 #endif
01434 }
01435
01436 void TraceProjections::creationDone(int num)
01437 {
01438
01439
01440 double curTime = TraceTimer();
01441 int idx = _logPool->lastCreationEvent;
01442 while (idx >=0 && num >0 ) {
01443 LogEntry &log = _logPool->pool[idx];
01444 if ((log.type == CREATION) ||
01445 (log.type == CREATION_BCAST) ||
01446 (log.type == CREATION_MULTICAST)) {
01447 log.recvTime = curTime - log.time;
01448 num --;
01449 }
01450 idx--;
01451 }
01452 }
01453
01454 void TraceProjections::beginExecute(CmiObjId *tid)
01455 {
01456 #if CMK_HAS_COUNTER_PAPI
01457 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
01458 CmiAbort("PAPI failed to read at begin execute!\n");
01459 }
01460 #endif
01461 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01462 execEvent = CtvAccess(curThreadEvent);
01463 execEp = (-1);
01464 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
01465 execEvent,CkMyPe(), 0, tid);
01466 #if CMK_HAS_COUNTER_PAPI
01467 _logPool->addPapi(CkpvAccess(papiValues));
01468 #endif
01469 inEntry = true;
01470 }
01471
01472 void TraceProjections::beginExecute(envelope *e, void *obj)
01473 {
01474 #if CMK_TRACE_ENABLED
01475 if(e==0) {
01476 #if CMK_HAS_COUNTER_PAPI
01477 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
01478 CmiAbort("PAPI failed to read at begin execute!\n");
01479 }
01480 #endif
01481 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01482 execEvent = CtvAccess(curThreadEvent);
01483 execEp = (-1);
01484 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
01485 execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
01486 #if CMK_HAS_COUNTER_PAPI
01487 _logPool->addPapi(CkpvAccess(papiValues));
01488 #endif
01489 inEntry = true;
01490 } else {
01491 beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
01492 e->getSrcPe(),e->getTotalsize());
01493 }
01494 #endif
01495 }
01496
01497 void TraceProjections::beginExecute(char *msg){
01498 #if CMK_SMP_TRACE_COMMTHREAD
01499
01500 envelope *e = (envelope *)msg;
01501 int ep = e->getEpIdx();
01502 if(_entryTable[ep]->traceEnabled)
01503 beginExecute(e);
01504 #endif
01505 }
01506
01507 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
01508 int mlen, CmiObjId *idx, void *obj )
01509 {
01510 if (traceNestedEvents) {
01511 if (!nestedEvents.empty()) {
01512 endExecuteLocal();
01513 }
01514 nestedEvents.emplace(event, msgType, ep, srcPe, mlen, idx);
01515 }
01516 beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
01517 }
01518
01519 void TraceProjections::changeLastEntryTimestamp(double ts)
01520 {
01521 _logPool->modLastEntryTimestamp(ts);
01522 }
01523
01524 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
01525 int mlen, CmiObjId *idx)
01526 {
01527 #if CMK_HAS_COUNTER_PAPI
01528 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
01529 CmiAbort("PAPI failed to read at begin execute!\n");
01530 }
01531 #endif
01532 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01533 execEvent=event;
01534 execEp=ep;
01535 execPe=srcPe;
01536 _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
01537 srcPe, mlen, idx, 0.0, TraceCpuTimer());
01538 #if CMK_HAS_COUNTER_PAPI
01539 _logPool->addPapi(CkpvAccess(papiValues));
01540 #endif
01541 inEntry = true;
01542 }
01543
01544 void TraceProjections::endExecute(void)
01545 {
01546 if (traceNestedEvents && !nestedEvents.empty()) nestedEvents.pop();
01547 endExecuteLocal();
01548 if (traceNestedEvents) {
01549 if (!nestedEvents.empty()) {
01550 NestedEvent &ne = nestedEvents.top();
01551 beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
01552 }
01553 }
01554 }
01555
01556 void TraceProjections::endExecute(char *msg)
01557 {
01558 #if CMK_SMP_TRACE_COMMTHREAD
01559
01560 envelope *e = (envelope *)msg;
01561 int ep = e->getEpIdx();
01562 if(_entryTable[ep]->traceEnabled)
01563 endExecute();
01564 #endif
01565 }
01566
01567 void TraceProjections::endExecuteLocal(void)
01568 {
01569 #if CMK_HAS_COUNTER_PAPI
01570 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
01571 CmiAbort("PAPI failed to read at end execute!\n");
01572 }
01573 #endif
01574 if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
01575 double cputime = TraceCpuTimer();
01576 double now = TraceTimer();
01577 if(execEp == (-1)) {
01578 _logPool->add(END_PROCESSING, 0, _threadEP, now,
01579 execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
01580 } else {
01581 _logPool->add(END_PROCESSING, 0, execEp, now,
01582 execEvent, execPe, 0, NULL, 0.0, cputime);
01583 }
01584 #if CMK_HAS_COUNTER_PAPI
01585 _logPool->addPapi(CkpvAccess(papiValues));
01586 #endif
01587 inEntry = false;
01588 }
01589
01590 void TraceProjections::messageRecv(char *env, int pe)
01591 {
01592 #if 0
01593 envelope *e = (envelope *)env;
01594 int msgType = e->getMsgtype();
01595 int ep = e->getEpIdx();
01596 #if 0
01597 if (msgType==NewChareMsg || msgType==NewVChareMsg
01598 || msgType==ForChareMsg || msgType==ForVidMsg
01599 || msgType==BocInitMsg || msgType==NodeBocInitMsg
01600 || msgType==ForBocMsg || msgType==ForNodeBocMsg)
01601 ep = e->getEpIdx();
01602 else
01603 ep = _threadEP;
01604 #endif
01605 _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
01606 curevent++, e->getSrcPe(), e->getTotalsize());
01607 #endif
01608 }
01609
01610 void TraceProjections::beginIdle(double curWallTime)
01611 {
01612 _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
01613 }
01614
01615 void TraceProjections::endIdle(double curWallTime)
01616 {
01617 _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
01618 }
01619
01620 void TraceProjections::beginPack(void)
01621 {
01622 _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
01623 }
01624
01625 void TraceProjections::endPack(void)
01626 {
01627 _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
01628 }
01629
01630 void TraceProjections::beginUnpack(void)
01631 {
01632 _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
01633 }
01634
01635 void TraceProjections::endUnpack(void)
01636 {
01637 _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
01638 }
01639
01640 void TraceProjections::enqueue(envelope *) {}
01641
01642 void TraceProjections::dequeue(envelope *) {}
01643
01644 void TraceProjections::beginComputation(void)
01645 {
01646 computationStarted = true;
01647
01648
01649
01650 if (CkpvAccess(traceOnPe) != 0) {
01651 void (*ptr)() = registerMachineUserEvents();
01652 if (ptr != NULL) {
01653 ptr();
01654 }
01655 }
01656
01657
01658 _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
01659 #if CMK_HAS_COUNTER_PAPI
01660
01661 if(CkpvAccess(papiStarted) == 0)
01662 {
01663 if (PAPI_start(CkpvAccess(papiEventSet)) != PAPI_OK) {
01664 CmiAbort("PAPI failed to start designated counters!\n");
01665 }
01666 CkpvAccess(papiStarted) = 1;
01667 }
01668 #endif
01669 }
01670
01671 void TraceProjections::endComputation(void)
01672 {
01673 #if CMK_HAS_COUNTER_PAPI
01674
01675
01676 if(CkpvAccess(papiStopped) == 0) {
01677 if (PAPI_stop(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
01678 CkPrintf("Warning: PAPI failed to stop correctly!\n");
01679 }
01680 CkpvAccess(papiStopped) = 1;
01681 }
01682
01683
01684 #endif
01685 endTime = TraceTimer();
01686 _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
01687
01688
01689
01690
01691 }
01692
01693 int TraceProjections::idxRegistered(int idx)
01694 {
01695 int idxVecLen = idxVec.size();
01696 for(int i=0; i<idxVecLen; i++)
01697 {
01698 if(idx == idxVec[i])
01699 return 1;
01700 }
01701 return 0;
01702 }
01703
01704
01705 void toProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
01706 {
01707 for (int i=0;i<n;i++)
01708 switch(t) {
01709 case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
01710 case Tuchar:
01711 case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
01712 case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
01713 case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
01714 case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
01715 case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
01716 case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
01717 case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
01718 case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
01719 case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
01720 #ifdef CMK_PUP_LONG_LONG
01721 case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
01722 case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
01723 #endif
01724 default: CmiAbort("Unrecognized pup type code!");
01725 };
01726 }
01727
01728 void fromProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
01729 {
01730 for (int i=0;i<n;i++)
01731 switch(t) {
01732 case Tchar: {
01733 char c = fgetc(f);
01734 if (c==EOF)
01735 parseError("Could not match character");
01736 else
01737 ((char *)p)[i] = c;
01738 break;
01739 }
01740 case Tuchar:
01741 case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
01742 case Tshort:((short *)p)[i]=(short)readInt(); break;
01743 case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
01744 case Tint: ((int *)p)[i]=readInt(); break;
01745 case Tuint: ((unsigned int *)p)[i]=readUint(); break;
01746 case Tlong: ((long *)p)[i]=readInt(); break;
01747 case Tulong:((unsigned long *)p)[i]=readUint(); break;
01748 case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
01749 case Tdouble:((double *)p)[i]=readDouble(); break;
01750 #ifdef CMK_PUP_LONG_LONG
01751 case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
01752 case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
01753 #endif
01754 default: CmiAbort("Unrecognized pup type code!");
01755 };
01756 }
01757
01758 #if CMK_USE_ZLIB
01759 void toProjectionsGZFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
01760 {
01761 for (int i=0;i<n;i++)
01762 switch(t) {
01763 case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
01764 case Tuchar:
01765 case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
01766 case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
01767 case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
01768 case Tint: gzprintf(f," %d",((int *)p)[i]); break;
01769 case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
01770 case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
01771 case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
01772 case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
01773 case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
01774 #ifdef CMK_PUP_LONG_LONG
01775 case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
01776 case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
01777 #endif
01778 default: CmiAbort("Unrecognized pup type code!");
01779 };
01780 }
01781 #endif
01782
01783 void TraceProjections::endPhase() {
01784 double currentPhaseTime = TraceTimer();
01785 if (lastPhaseEvent != NULL) {
01786 } else {
01787 if (_logPool->pool != NULL) {
01788
01789 } else {
01790 CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
01791 _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
01792 }
01793 }
01794
01795
01796
01797
01798
01799
01800 lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
01801 _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
01802 currentPhaseID++;
01803 }
01804
01805 #ifdef PROJ_ANALYSIS
01806
01807
01808
01809
01810
01811 void registerOutlierReduction() {
01812 outlierReductionType =
01813 CkReduction::addReducer(outlierReduction, false, "outlierReduction");
01814 minMaxReductionType =
01815 CkReduction::addReducer(minMaxReduction, false, "minMaxReduction");
01816 }
01817
01830 static void TraceProjectionsExitHandler()
01831 {
01832 #if CMK_TRACE_ENABLED
01833 #if DEBUG_KMEANS
01834 CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
01835 #endif
01836 if (!traceProjectionsGID.isZero()) {
01837 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01838 bocProxy.traceProjectionsParallelShutdown(CkMyPe());
01839 } else {
01840 CkContinueExit();
01841 }
01842 #else
01843 CkContinueExit();
01844 #endif
01845 }
01846
01847
01848
01849
01850
01851 void initTraceProjectionsBOC()
01852 {
01853
01854 #ifdef __BIGSIM__
01855 if (BgNodeRank() == 0) {
01856 #else
01857 if (CkMyRank() == 0) {
01858 #endif
01859 registerExitFn(TraceProjectionsExitHandler);
01860 }
01861 #if 0
01862 }
01863 #endif
01864 }
01865
01866
01867
01868
01869
01870
01871
01872
01873
01874 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
01876
01877 bool findStartTime = (CmiTimerAbsolute()==1);
01878 traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
01879 if (findOutliers) {
01880 kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
01881 numKSeeds,
01882 peNumKeep,
01883 entryThreshold,
01884 outlierUsePhases);
01885 }
01886
01887 delete msg;
01888 }
01889
01890
01891 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
01892 #if DEBUG_KMEANS
01893 CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
01894 #endif
01895 endPe = pe;
01896 if (CkMyPe() == 0) {
01897 analysisStartTime = CmiWallTimer();
01898 }
01899 if (CkpvAccess(_trace)->_logPool != NULL ){
01900 CkpvAccess(_trace)->endComputation();
01901
01902
01903 CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
01904 CkpvAccess(_traces)->clearTrace();
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914 }
01915 CProxy_KMeansBOC kMeansProxy(kMeansGID);
01916 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01917 if (findOutliers) {
01918 parModulesRemaining++;
01919 kMeansProxy[CkMyPe()].startKMeansAnalysis();
01920 }
01921 parModulesRemaining++;
01922 if (findStartTime)
01923 bocProxy[CkMyPe()].startTimeAnalysis();
01924 else
01925 bocProxy[CkMyPe()].startEndTimeAnalysis();
01926 }
01927
01928
01929 void TraceProjectionsBOC::flush_warning(int pe)
01930 {
01931 CmiAssert(CkMyPe() == 0);
01932 std::set<int>::iterator it;
01933 it = list.find(pe);
01934 if (it == list.end()) list.insert(pe);
01935 flush_count++;
01936 }
01937
01938 void TraceProjectionsBOC::print_warning()
01939 {
01940 CmiAssert(CkMyPe() == 0);
01941 if (flush_count == 0) return;
01942 std::set<int>::iterator it;
01943 CkPrintf("*************************************************************\n");
01944 CkPrintf("Warning: Projections log flushed to disk %d times on %d cores:", flush_count, list.size());
01945 for (it=list.begin(); it!=list.end(); it++)
01946 CkPrintf(" %d", *it);
01947 CkPrintf(".\n");
01948 CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program.\n");
01949 CkPrintf("Warning: This may be fixed by specifying a larger +logsize (current value %d).\n", CkpvAccess(CtrLogBufSize));
01950 CkPrintf("*************************************************************\n");
01951 }
01952
01953
01954 void KMeansBOC::startKMeansAnalysis() {
01955
01956 LogPool *pool = CkpvAccess(_trace)->_logPool;
01957
01958 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
01959
01960 CkCallback cb(CkReductionTarget(KMeansBOC, flushCheck), 0, thisProxy);
01961 contribute(sizeof(bool), &(pool->hasFlushed), CkReduction::logical_or_bool, cb);
01962 }
01963
01964
01965 void KMeansBOC::flushCheck(bool someFlush) {
01966
01967
01968
01969 if (!someFlush) {
01970
01971 CProxy_KMeansBOC kMeansProxy(kMeansGID);
01972 kMeansProxy.flushCheckDone();
01973 } else {
01974
01975 CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
01976
01977 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01978 bocProxy[0].kMeansDoneFlushed();
01979 }
01980 }
01981
01982
01983 void KMeansBOC::flushCheckDone() {
01984
01985
01986 LogPool *pool = CkpvAccess(_trace)->_logPool;
01987
01988
01989
01990 numEntryMethods = _entryTable.size();
01991 numMetrics = numEntryMethods + 2;
01992
01993
01994 markedBegin = false;
01995 markedIdle = false;
01996 beginBlockTime = 0.0;
01997 beginIdleBlockTime = 0.0;
01998 lastBeginEPIdx = -1;
01999
02000 lastPhaseIdx = 0;
02001 currentExecTimes = NULL;
02002 currentPhase = 0;
02003 selected = false;
02004
02005 pool->initializePhases();
02006
02007
02008 incKSeeds = new double[numK*numMetrics];
02009 keepMetric = new bool[numMetrics];
02010
02011
02012
02013
02014 thisProxy[CkMyPe()].getNextPhaseMetrics();
02015 }
02016
02017
02018 void KMeansBOC::getNextPhaseMetrics() {
02019
02020
02021
02022
02023
02024
02025
02026
02027
02028 if (usePhases) {
02029 DEBUGF("[%d] Using Phases\n", CkMyPe());
02030 } else {
02031 DEBUGF("[%d] NOT using Phases\n", CkMyPe());
02032 }
02033
02034 if (currentExecTimes != NULL) {
02035 delete [] currentExecTimes;
02036 }
02037 currentExecTimes = new double[numMetrics];
02038 for (int i=0; i<numMetrics; i++) {
02039 currentExecTimes[i] = 0.0;
02040 }
02041
02042 int numEventMethods = _entryTable.size();
02043 LogPool *pool = CkpvAccess(_trace)->_logPool;
02044
02045 CkAssert(pool->numEntries > lastPhaseIdx);
02046 double totalPhaseTime = 0.0;
02047 double totalActiveTime = 0.0;
02048
02049 for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
02050 if (pool->pool[i].type == BEGIN_PROCESSING) {
02051
02052 if (!markedBegin) {
02053 markedBegin = true;
02054 }
02055 beginBlockTime = pool->pool[i].time;
02056 lastBeginEPIdx = pool->pool[i].eIdx;
02057 } else if (pool->pool[i].type == END_PROCESSING) {
02058
02059
02060
02061
02062
02063 if (markedBegin) {
02064 markedBegin = false;
02065 if (pool->pool[i].event < 0)
02066 {
02067
02068 continue;
02069 }
02070 currentExecTimes[pool->pool[i].eIdx] +=
02071 pool->pool[i].time - beginBlockTime;
02072 totalActiveTime += pool->pool[i].time - beginBlockTime;
02073 lastBeginEPIdx = -1;
02074 }
02075 } else if (pool->pool[i].type == BEGIN_IDLE) {
02076
02077 if (!markedIdle) {
02078 markedIdle = true;
02079 }
02080 beginIdleBlockTime = pool->pool[i].time;
02081 } else if (pool->pool[i].type == END_IDLE) {
02082
02083 if (markedIdle) {
02084 markedIdle = false;
02085 currentExecTimes[numEventMethods] +=
02086 pool->pool[i].time - beginIdleBlockTime;
02087 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02088 }
02089 } else if (pool->pool[i].type == END_PHASE) {
02090
02091 if (usePhases) {
02092
02093 if (i != lastPhaseIdx) {
02094 totalPhaseTime =
02095 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
02096
02097
02098
02099 if (markedBegin) {
02100 CkAssert(lastBeginEPIdx >= 0);
02101 currentExecTimes[lastBeginEPIdx] +=
02102 pool->pool[i].time - beginBlockTime;
02103 totalActiveTime += pool->pool[i].time - beginBlockTime;
02104
02105 beginBlockTime = pool->pool[i].time;
02106 }
02107
02108 if (markedIdle) {
02109 currentExecTimes[numEventMethods] +=
02110 pool->pool[i].time - beginIdleBlockTime;
02111 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02112
02113 beginIdleBlockTime = pool->pool[i].time;
02114 }
02115 if (totalActiveTime <= totalPhaseTime) {
02116 currentExecTimes[numEventMethods+1] =
02117 totalPhaseTime - totalActiveTime;
02118 } else {
02119 currentExecTimes[numEventMethods+1] = 0.0;
02120 CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
02121 CkMyPe(), currentPhase);
02122 }
02123 collectKMeansData();
02124
02125 lastPhaseIdx = i;
02126 break;
02127 }
02128 }
02129 } else if (pool->pool[i].type == END_COMPUTATION) {
02130 if (markedBegin) {
02131 CkAssert(lastBeginEPIdx >= 0);
02132 currentExecTimes[lastBeginEPIdx] +=
02133 pool->pool[i].time - beginBlockTime;
02134 totalActiveTime += pool->pool[i].time - beginBlockTime;
02135 }
02136 if (markedIdle) {
02137 currentExecTimes[numEventMethods] +=
02138 pool->pool[i].time - beginIdleBlockTime;
02139 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02140 }
02141 totalPhaseTime =
02142 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
02143 if (totalActiveTime <= totalPhaseTime) {
02144 currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
02145 } else {
02146 currentExecTimes[numEventMethods+1] = 0.0;
02147 CkPrintf("[%d] Warning: Overhead found to be negative!\n",
02148 CkMyPe());
02149 }
02150 collectKMeansData();
02151 }
02152 }
02153 }
02154
02168 void KMeansBOC::collectKMeansData() {
02169 int minOffset = numMetrics;
02170 int maxOffset = 2*numMetrics;
02171 int sosOffset = 3*numMetrics;
02172
02173
02174
02175 double *reductionMsg = new double[numMetrics*4];
02176
02177 for (int i=0; i<numMetrics; i++) {
02178 reductionMsg[i] = currentExecTimes[i];
02179
02180 reductionMsg[minOffset + i] = currentExecTimes[i];
02181 reductionMsg[maxOffset + i] = currentExecTimes[i];
02182
02183 reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
02184 }
02185
02186 CkCallback cb(CkReductionTarget(KMeansBOC, globalMetricRefinement),
02187 0, thisProxy);
02188 contribute((numMetrics*4)*sizeof(double), reductionMsg,
02189 outlierReductionType, cb);
02190 delete [] reductionMsg;
02191 }
02192
02193
02194
02195
02196
02197
02198 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
02199 CkAssert(CkMyPe() == 0);
02200
02201
02202
02203 int sumOffset = 0;
02204 int minOffset = numMetrics;
02205 int maxOffset = 2*numMetrics;
02206 int sosOffset = 3*numMetrics;
02207
02208
02209 KMeansStatsMessage *outmsg =
02210 new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
02211 outmsg->numMetrics = numMetrics;
02212 outmsg->numKPos = numK*numMetrics;
02213 outmsg->numStats = numMetrics*4;
02214
02215
02216 double *totalExecTimes = (double*)msg->getData();
02217 double totalTime = 0.0;
02218
02219 for (int i=0; i<numMetrics; i++) {
02220 DEBUGN("%lf\n", totalExecTimes[i]);
02221 totalTime += totalExecTimes[i];
02222
02223
02224 outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
02225
02226
02227
02228
02229 outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
02230 outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
02231 totalExecTimes[minOffset + i];
02232
02233
02234 outmsg->stats[sosOffset + i] =
02235 sqrt((totalExecTimes[sosOffset + i] -
02236 2*(outmsg->stats[i])*totalExecTimes[i] +
02237 (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
02238 CkNumPes());
02239 }
02240
02241 for (int i=0; i<numMetrics; i++) {
02242
02243
02244
02245
02246
02247
02248
02249 keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
02250 entryThreshold) &&
02251 (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
02252 if (keepMetric[i]) {
02253 DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
02254 totalExecTimes[maxOffset + i], totalTime/CkNumPes());
02255 } else {
02256 DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
02257 }
02258 outmsg->filter[i] = keepMetric[i];
02259 }
02260
02261 delete msg;
02262
02263
02264 kSeeds = new double[numK*numMetrics];
02265
02266 numKReported = 0;
02267 kNumMembers = new int[numK];
02268
02269
02270
02271 srand(11337);
02272 for (int k=0; k<numK; k++) {
02273 DEBUGF("Seed %d | ", k);
02274 for (int m=0; m<numMetrics; m++) {
02275 double factor = totalExecTimes[maxOffset + m] -
02276 totalExecTimes[minOffset + m];
02277
02278
02279
02280
02281 kSeeds[numMetrics*k + m] =
02282 ((rand()*1.0)/RAND_MAX)*factor;
02283 if (keepMetric[m]) {
02284 DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
02285 }
02286 outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
02287 }
02288 DEBUGF("\n");
02289 kNumMembers[k] = 0;
02290 }
02291
02292
02293 thisProxy.findInitialClusters(outmsg);
02294 }
02295
02296
02297
02298
02299 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
02300
02301 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
02302
02303 phaseIter = 0;
02304
02305
02306 CkAssert(numMetrics == msg->numMetrics);
02307 for (int i=0; i<numMetrics; i++) {
02308 keepMetric[i] = msg->filter[i];
02309 }
02310
02311
02312
02313
02314
02315
02316 CkAssert(numMetrics*4 == msg->numStats);
02317 for (int i=0; i<numMetrics; i++) {
02318 currentExecTimes[i] -= msg->stats[numMetrics + i];
02319
02320
02321
02322
02323 }
02324
02325
02326
02327 CkAssert(numK*numMetrics == msg->numKPos);
02328 for (int i=0; i<msg->numKPos; i++) {
02329 incKSeeds[i] = msg->kSeedsPos[i];
02330 }
02331
02332
02333 minDistance = calculateDistance(0);
02334 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
02335 currentPhase, phaseIter, minDistance);
02336 minK = 0;
02337 for (int i=1; i<numK; i++) {
02338 double distance = calculateDistance(i);
02339 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
02340 currentPhase, phaseIter, i, distance);
02341 if (distance < minDistance) {
02342 minDistance = distance;
02343 minK = i;
02344 }
02345 }
02346
02347
02348
02349
02350
02351
02352
02353
02354
02355
02356
02357
02358
02359
02360
02361
02362
02363 double *modVector = new double[numK*(numMetrics+1)];
02364 for (int i=0; i<numK; i++) {
02365 for (int j=0; j<numMetrics+1; j++) {
02366 modVector[i*(numMetrics+1) + j] = 0.0;
02367 }
02368 }
02369 for (int i=0; i<numMetrics; i++) {
02370
02371 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
02372 }
02373 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
02374
02375 CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
02376 0, thisProxy);
02377 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
02378 CkReduction::sum_double, cb);
02379 delete [] modVector;
02380 }
02381
02382 double KMeansBOC::calculateDistance(int k) {
02383 double ret = 0.0;
02384 for (int i=0; i<numMetrics; i++) {
02385 if (keepMetric[i]) {
02386 DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n",
02387 CkMyPe(), currentPhase, phaseIter, i,
02388 currentExecTimes[i], incKSeeds[k*numMetrics + i]);
02389 ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
02390 }
02391 }
02392 return sqrt(ret);
02393 }
02394
02395 void KMeansBOC::updateKSeeds(double *modVector, int n) {
02396 CkAssert(CkMyPe() == 0);
02397
02398
02399
02400
02401 CkAssert(numK*(numMetrics+1) == n);
02402
02403
02404 bool hasChanges = false;
02405 for (int i=0; i<numK; i++) {
02406 hasChanges = hasChanges ||
02407 (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
02408 }
02409 if (!hasChanges) {
02410 findRepresentatives();
02411 } else {
02412 int overallChange = 0;
02413 for (int i=0; i<numK; i++) {
02414 int change = (int)modVector[i*(numMetrics+1) + numMetrics];
02415 if (change != 0) {
02416 overallChange += change;
02417
02418
02419
02420
02421
02422
02423
02424
02425
02426 CkAssert((kNumMembers[i] + change >= 0) &&
02427 (kNumMembers[i] + change <= CkNumPes()));
02428 if (kNumMembers[i] == 0) {
02429 CkAssert(change > 0);
02430 for (int j=0; j<numMetrics; j++) {
02431 kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
02432 }
02433 } else if (kNumMembers[i] + change == 0) {
02434
02435 } else {
02436 for (int j=0; j<numMetrics; j++) {
02437 kSeeds[i*numMetrics + j] *= kNumMembers[i];
02438 kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
02439 kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
02440 }
02441 }
02442 kNumMembers[i] += change;
02443 }
02444 DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
02445 CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
02446 }
02447
02448
02449 KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
02450 outmsg->numKPos = numK*numMetrics;
02451 for (int i=0; i<numK*numMetrics; i++) {
02452 outmsg->kSeedsPos[i] = kSeeds[i];
02453 }
02454
02455 thisProxy.updateSeedMembership(outmsg);
02456 }
02457 }
02458
02459
02460 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
02461
02462
02463
02464 phaseIter++;
02465
02466
02467
02468 CkAssert(numK*numMetrics == msg->numKPos);
02469 for (int i=0; i<msg->numKPos; i++) {
02470 incKSeeds[i] = msg->kSeedsPos[i];
02471 }
02472
02473
02474 lastMinK = minK;
02475 minDistance = calculateDistance(0);
02476 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
02477 currentPhase, phaseIter, minDistance);
02478
02479 minK = 0;
02480 for (int i=1; i<numK; i++) {
02481 double distance = calculateDistance(i);
02482 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
02483 currentPhase, phaseIter, i, distance);
02484 if (distance < minDistance) {
02485 minDistance = distance;
02486 minK = i;
02487 }
02488 }
02489
02490 double *modVector = new double[numK*(numMetrics+1)];
02491 for (int i=0; i<numK; i++) {
02492 for (int j=0; j<numMetrics+1; j++) {
02493 modVector[i*(numMetrics+1) + j] = 0.0;
02494 }
02495 }
02496
02497 if (minK != lastMinK) {
02498 for (int i=0; i<numMetrics; i++) {
02499 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
02500 modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
02501 }
02502 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
02503 modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
02504 }
02505
02506 CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
02507 0, thisProxy);
02508 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
02509 CkReduction::sum_double, cb);
02510 delete [] modVector;
02511 }
02512
02513 void KMeansBOC::findRepresentatives() {
02514
02515
02516
02517 int numNonEmptyClusters = 0;
02518 for (int i=0; i<numK; i++) {
02519 if (kNumMembers[i] > 0) {
02520 numNonEmptyClusters++;
02521 }
02522 }
02523
02524 int numRepresentatives = peNumKeep;
02525
02526
02527
02528 if (numRepresentatives < numNonEmptyClusters) {
02529 numRepresentatives = numNonEmptyClusters;
02530 }
02531
02532 int slotsRemaining = numRepresentatives;
02533
02534 DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining,
02535 numNonEmptyClusters);
02536
02537
02538
02539
02540
02541 int exemplarsPerCluster = 1;
02542 slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
02543
02544 int numCandidateOutliers = CkNumPes() -
02545 exemplarsPerCluster*numNonEmptyClusters;
02546
02547 double *remainders = new double[numK];
02548 int *assigned = new int[numK];
02549 exemplarChoicesLeft = new int[numK];
02550 outlierChoicesLeft = new int[numK];
02551
02552 for (int i=0; i<numK; i++) {
02553 assigned[i] = 0;
02554 remainders[i] =
02555 (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
02556 slotsRemaining / numCandidateOutliers;
02557 if (remainders[i] >= 0.0) {
02558 assigned[i] = (int)floor(remainders[i]);
02559 remainders[i] -= assigned[i];
02560 } else {
02561 remainders[i] = 0.0;
02562 }
02563 }
02564
02565 for (int i=0; i<numK; i++) {
02566 slotsRemaining -= assigned[i];
02567 }
02568 CkAssert(slotsRemaining >= 0);
02569
02570
02571
02572 while (slotsRemaining > 0) {
02573 double max = 0.0;
02574 int winner = 0;
02575 for (int i=0; i<numK; i++) {
02576 if (remainders[i] > max) {
02577 max = remainders[i];
02578 winner = i;
02579 }
02580 }
02581 assigned[winner]++;
02582 remainders[winner] = 0.0;
02583 slotsRemaining--;
02584 }
02585
02586
02587
02588 numSelectionIter = exemplarsPerCluster;
02589 for (int i=0; i<numK; i++) {
02590 if (assigned[i] > numSelectionIter) {
02591 numSelectionIter = assigned[i];
02592 }
02593 }
02594 DEBUGF("Selection Iterations = %d\n", numSelectionIter);
02595
02596 for (int i=0; i<numK; i++) {
02597 if (kNumMembers[i] > 0) {
02598 exemplarChoicesLeft[i] = exemplarsPerCluster;
02599 outlierChoicesLeft[i] = assigned[i];
02600 } else {
02601 exemplarChoicesLeft[i] = 0;
02602 outlierChoicesLeft[i] = 0;
02603 }
02604 DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
02605 outlierChoicesLeft[i]);
02606 }
02607
02608 delete [] assigned;
02609 delete [] remainders;
02610
02611
02612 KSelectionMessage *outmsg = NULL;
02613 if (numSelectionIter > 0) {
02614 outmsg = new (numK, numK, numK) KSelectionMessage;
02615 outmsg->numKMinIDs = numK;
02616 outmsg->numKMaxIDs = numK;
02617 for (int i=0; i<numK; i++) {
02618 outmsg->minIDs[i] = -1;
02619 outmsg->maxIDs[i] = -1;
02620 }
02621 thisProxy.collectDistances(outmsg);
02622 } else {
02623 CkPrintf("Warning: No selection iteration from the start!\n");
02624
02625 thisProxy.phaseDone();
02626 }
02627 }
02628
02629
02630
02631
02632
02633
02634
02635
02636
02637
02638 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
02639
02640
02641
02642 DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
02643 lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
02644 if ((CkMyPe() == msg->minIDs[lastMinK]) ||
02645 (CkMyPe() == msg->maxIDs[lastMinK])) {
02646 CkAssert(!selected);
02647 selected = true;
02648 }
02649
02650
02651
02652 double *minMaxAndIDs = NULL;
02653
02654 minMaxAndIDs = new double[numK*4];
02655
02656 for (int i=0; i<numK; i++) {
02657 minMaxAndIDs[i*4] = -1.0;
02658 minMaxAndIDs[i*4+1] = -1.0;
02659 minMaxAndIDs[i*4+2] = -1.0;
02660 minMaxAndIDs[i*4+3] = -1.0;
02661 }
02662
02663 if (!selected) {
02664 DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
02665 minMaxAndIDs[lastMinK*4] = minDistance;
02666 minMaxAndIDs[lastMinK*4+1] = CkMyPe();
02667 minMaxAndIDs[lastMinK*4+2] = minDistance;
02668 minMaxAndIDs[lastMinK*4+3] = CkMyPe();
02669 }
02670 delete msg;
02671
02672 CkCallback cb(CkReductionTarget(KMeansBOC, findNextMinMax),
02673 0, thisProxy);
02674 contribute(numK*4*sizeof(double), minMaxAndIDs,
02675 minMaxReductionType, cb);
02676 }
02677
02678 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
02679
02680
02681
02682
02683
02684 if (numSelectionIter > 0) {
02685 double *incInfo = (double*)msg->getData();
02686
02687 KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
02688 outmsg->numKMinIDs = numK;
02689 outmsg->numKMaxIDs = numK;
02690
02691 for (int i=0; i<numK; i++) {
02692 DEBUGF("%d | %lf %d %lf %d \n", i,
02693 incInfo[i*4], (int)incInfo[i*4+1],
02694 incInfo[i*4+2], (int)incInfo[i*4+3]);
02695 }
02696
02697 for (int i=0; i<numK; i++) {
02698 if (exemplarChoicesLeft[i] > 0) {
02699 outmsg->minIDs[i] = (int)incInfo[i*4+1];
02700 exemplarChoicesLeft[i]--;
02701 } else {
02702 outmsg->minIDs[i] = -1;
02703 }
02704 if (outlierChoicesLeft[i] > 0) {
02705 outmsg->maxIDs[i] = (int)incInfo[i*4+3];
02706 outlierChoicesLeft[i]--;
02707 } else {
02708 outmsg->maxIDs[i] = -1;
02709 }
02710 }
02711 thisProxy.collectDistances(outmsg);
02712 numSelectionIter--;
02713 } else {
02714
02715 thisProxy.phaseDone();
02716 }
02717 }
02718
02725 void KMeansBOC::phaseDone() {
02726
02727
02728
02729 LogPool *pool = CkpvAccess(_trace)->_logPool;
02730 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
02731
02732
02733 if (!selected) {
02734 if (usePhases) {
02735 pool->keepPhase[currentPhase] = false;
02736 } else {
02737
02738 pool->setAllPhases(false);
02739 }
02740 }
02741
02742
02743
02744 if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
02745
02746 contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, kMeansDone),
02747 0, bocProxy));
02748 } else {
02749
02750
02751
02752
02753
02754 currentPhase++;
02755 thisProxy[CkMyPe()].getNextPhaseMetrics();
02756 }
02757 }
02758
02759 void TraceProjectionsBOC::startTimeAnalysis()
02760 {
02761 double startTime = 0.0;
02762 if (CkpvAccess(_trace)->_logPool->numEntries>0)
02763 startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
02764 CkCallback cb(CkReductionTarget(TraceProjectionsBOC, startTimeDone), thisProxy);
02765 contribute(sizeof(double), &startTime, CkReduction::min_double, cb);
02766 }
02767
02768 void TraceProjectionsBOC::startTimeDone(double result)
02769 {
02770
02771
02772 if (CkpvAccess(_trace) != NULL) {
02773 CkpvAccess(_trace)->_logPool->globalStartTime = result;
02774 CkpvAccess(_trace)->_logPool->setNewStartTime();
02775
02776 }
02777 thisProxy[CkMyPe()].startEndTimeAnalysis();
02778 }
02779
02780 void TraceProjectionsBOC::startEndTimeAnalysis()
02781 {
02782
02783
02784 endTime = CkpvAccess(_trace)->endTime;
02785
02786
02787 CkCallback cb(CkReductionTarget(TraceProjectionsBOC, endTimeDone),
02788 0, thisProxy);
02789 contribute(sizeof(double), &endTime, CkReduction::max_double, cb);
02790 }
02791
02792 void TraceProjectionsBOC::endTimeDone(double result)
02793 {
02794
02795
02796 CkAssert(CkMyPe() == 0);
02797 parModulesRemaining--;
02798 if (CkpvAccess(_trace) != NULL && CkpvAccess(_trace)->_logPool != NULL) {
02799 CkpvAccess(_trace)->_logPool->globalEndTime = result - CkpvAccess(_trace)->_logPool->globalStartTime;
02800
02801
02802 }
02803 if (parModulesRemaining == 0) {
02804 thisProxy[CkMyPe()].finalize();
02805 }
02806 }
02807
02808 void TraceProjectionsBOC::kMeansDone() {
02809
02810 if(CkMyPe()==0) CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
02811
02812 CkAssert(CkMyPe() == 0);
02813 parModulesRemaining--;
02814 CkPrintf("K-Means Analysis Time = %lf seconds\n",
02815 CmiWallTimer()-analysisStartTime);
02816 if (parModulesRemaining == 0) {
02817 thisProxy[CkMyPe()].finalize();
02818 }
02819 }
02820
02826 void TraceProjectionsBOC::kMeansDoneFlushed() {
02827 CkAssert(CkMyPe() == 0);
02828 parModulesRemaining--;
02829 CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
02830 CmiWallTimer()-analysisStartTime);
02831 if (parModulesRemaining == 0) {
02832 thisProxy[CkMyPe()].finalize();
02833 }
02834 }
02835
02836 void TraceProjectionsBOC::finalize()
02837 {
02838 CkAssert(CkMyPe() == 0);
02839
02840
02841 thisProxy.closingTraces();
02842 }
02843
02844
02845 void TraceProjectionsBOC::closingTraces() {
02846 CkpvAccess(_trace)->closeTrace();
02847
02848
02849 int pe = 0;
02850 if (endPe != -1) pe = endPe;
02851 contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, closeParallelShutdown), pe, thisProxy));
02852 }
02853
02854
02855
02856
02857 void TraceProjectionsBOC::closeParallelShutdown(void) {
02858 CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
02859 if (!CkpvAccess(_trace)->converseExit) {
02860 CkContinueExit();
02861 }
02862 }
02863
02864
02865
02866
02867 CkReductionMsg *outlierReduction(int nMsgs,
02868 CkReductionMsg **msgs) {
02869 int numBytes = 0;
02870 int numMetrics = 0;
02871 double *ret = NULL;
02872
02873 if (nMsgs == 1) {
02874
02875 return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
02876 }
02877
02878 if (nMsgs > 1) {
02879 numBytes = msgs[0]->getSize();
02880
02881 if (numBytes%sizeof(double) != 0) {
02882 CkAbort("Outlier Reduction Size incompatible with doubles!\n");
02883 }
02884 if ((numBytes/sizeof(double))%4 != 0) {
02885 CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
02886 }
02887 numMetrics = (numBytes/sizeof(double))/4;
02888 ret = new double[numMetrics*4];
02889
02890
02891 for (int i=0; i<numMetrics*4; i++) {
02892 ret[i] = ((double *)msgs[0]->getData())[i];
02893 }
02894
02895
02896 for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
02897 for (int i=0; i<numMetrics; i++) {
02898
02899 ret[i] += ((double *)msgs[msgIdx]->getData())[i];
02900
02901 ret[numMetrics + i] =
02902 (ret[numMetrics + i] <
02903 ((double *)msgs[msgIdx]->getData())[numMetrics + i])
02904 ? ret[numMetrics + i] :
02905 ((double *)msgs[msgIdx]->getData())[numMetrics + i];
02906
02907 ret[2*numMetrics + i] =
02908 (ret[2*numMetrics + i] >
02909 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
02910 ? ret[2*numMetrics + i] :
02911 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
02912
02913 ret[3*numMetrics + i] +=
02914 ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
02915 }
02916 }
02917 }
02918
02919
02920 return CkReductionMsg::buildNew(numBytes,ret);
02921 }
02922
02923
02924
02925
02926
02927
02928
02929 CkReductionMsg *minMaxReduction(int nMsgs,
02930 CkReductionMsg **msgs) {
02931 CkAssert(nMsgs > 0);
02932
02933 int numBytes = msgs[0]->getSize();
02934 CkAssert(numBytes%sizeof(double) == 0);
02935 int numK = (numBytes/sizeof(double))/4;
02936
02937 double *ret = new double[numK*4];
02938
02939 for (int i=0; i<numK; i++) {
02940 ret[i*4] = -1.0;
02941 ret[i*4+1] = -1.0;
02942 ret[i*4+2] = -1.0;
02943 ret[i*4+3] = -1.0;
02944 }
02945
02946
02947 for (int i=0; i<nMsgs; i++) {
02948 double *temp = (double *)msgs[i]->getData();
02949 for (int j=0; j<numK; j++) {
02950
02951 if (ret[j*4+1] < 0) {
02952
02953 if (temp[j*4+1] >= 0) {
02954 ret[j*4] = temp[j*4];
02955 ret[j*4+1] = temp[j*4+1];
02956 }
02957 } else {
02958
02959 if (temp[j*4+1] >= 0) {
02960 if (temp[j*4] < ret[j*4]) {
02961 ret[j*4] = temp[j*4];
02962 ret[j*4+1] = temp[j*4+1];
02963 }
02964 }
02965 }
02966
02967 if (ret[j*4+3] < 0) {
02968
02969 if (temp[j*4+3] >= 0) {
02970 ret[j*4+2] = temp[j*4+2];
02971 ret[j*4+3] = temp[j*4+3];
02972 }
02973 } else {
02974
02975 if (temp[j*4+3] >= 0) {
02976 if (temp[j*4+2] > ret[j*4+2]) {
02977 ret[j*4+2] = temp[j*4+2];
02978 ret[j*4+3] = temp[j*4+3];
02979 }
02980 }
02981 }
02982 }
02983 }
02984 CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
02985 delete [] ret;
02986 return redmsg;
02987 }
02988
02989 #include "TraceProjections.def.h"
02990 #endif //PROJ_ANALYSIS
02991