00001
00005
00006 #include <string.h>
00007
00008 #include "charm++.h"
00009 #include "trace-projections.h"
00010 #include "trace-projectionsBOC.h"
00011 #include "TopoManager.h"
00012
00013 #if DEBUG_PROJ
00014 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
00015 #else
00016 #define DEBUGF(...)
00017 #endif
00018 #define DEBUGN(...) // easy way to selectively disable DEBUGs
00019
00020 #define DefaultLogBufSize 1000000
00021
00022
00023
00024
00025 int deltaLog;
00026 int nonDeltaLog;
00027
00028 int checknested=0;
00029
00030 #ifdef PROJ_ANALYSIS
00031
00032 CkGroupID traceProjectionsGID;
00033 CkGroupID kMeansGID;
00034
00035
00036
00037 CkReductionMsg *outlierReduction(int nMsgs,
00038 CkReductionMsg **msgs);
00039 CkReductionMsg *minMaxReduction(int nMsgs,
00040 CkReductionMsg **msgs);
00041 CkReduction::reducerType outlierReductionType;
00042 CkReduction::reducerType minMaxReductionType;
00043 #endif // PROJ_ANALYSIS
00044
00045 CkpvStaticDeclare(TraceProjections*, _trace);
00046 CtvStaticDeclare(int,curThreadEvent);
00047
00048 CkpvDeclare(CmiInt8, CtrLogBufSize);
00049
00050 typedef CkVec<char *> usrEventVec;
00051 CkpvStaticDeclare(usrEventVec, usrEventlist);
00052 class UsrEvent {
00053 public:
00054 int e;
00055 char *str;
00056 UsrEvent(int _e, char* _s): e(_e),str(_s) {}
00057 };
00058 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
00059
00060
00061
00062 #if CMK_TRACE_ENABLED
00064 void disableTraceLogOutput()
00065 {
00066 CkpvAccess(_trace)->setWriteData(false);
00067 }
00068
00070 void enableTraceLogOutput()
00071 {
00072 CkpvAccess(_trace)->setWriteData(true);
00073 }
00074
00076 void flushTraceLog()
00077 {
00078 CkpvAccess(_trace)->traceFlushLog();
00079 }
00080 #endif
00081
00082 #if ! CMK_TRACE_ENABLED
00083 static int warned=0;
00084 #define OPTIMIZED_VERSION \
00085 if (!warned) { warned=1; \
00086 CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
00087 #else
00088 #define OPTIMIZED_VERSION
00089 #endif // CMK_TRACE_ENABLED
00090
00091
00092
00093
00094 #if CMK_TRACE_LOGFILE_NUM_CONTROL
00095 #define OPEN_LOG openLog("a");
00096 #define CLOSE_LOG closeLog();
00097 #else
00098 #define OPEN_LOG
00099 #define CLOSE_LOG
00100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
00101
00102 #if CMK_HAS_COUNTER_PAPI
00103 #ifdef USE_SPP_PAPI
00104 int papiEvents[NUMPAPIEVENTS];
00105 #else
00106 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
00107 #endif
00108 #endif // CMK_HAS_COUNTER_PAPI
00109
00114 void _createTraceprojections(char **argv)
00115 {
00116 DEBUGF("%d createTraceProjections\n", CkMyPe());
00117 CkpvInitialize(CkVec<char *>, usrEventlist);
00118 CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
00119 CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
00120 #if CMK_BIGSIM_CHARM
00121
00122
00123 #endif //CMK_BIGSIM_CHARM
00124 CkpvInitialize(TraceProjections*, _trace);
00125 CkpvAccess(_trace) = new TraceProjections(argv);
00126 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
00127 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
00128 }
00129
00130
00131
00132 struct TraceThreadListener {
00133 struct CthThreadListener base;
00134 int event;
00135 int msgType;
00136 int ep;
00137 int srcPe;
00138 int ml;
00139 CmiObjId idx;
00140 };
00141
00142 extern "C"
00143 void traceThreadListener_suspend(struct CthThreadListener *l)
00144 {
00145 TraceThreadListener *a=(TraceThreadListener *)l;
00146
00147
00148 traceSuspend();
00149 }
00150
00151 extern "C"
00152 void traceThreadListener_resume(struct CthThreadListener *l)
00153 {
00154 TraceThreadListener *a=(TraceThreadListener *)l;
00155
00156
00157 _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
00158 CthGetThreadID(a->base.thread));
00159 a->event=-1;
00160 a->srcPe=CkMyPe();
00161 a->ml=0;
00162 }
00163
00164 extern "C"
00165 void traceThreadListener_free(struct CthThreadListener *l)
00166 {
00167 TraceThreadListener *a=(TraceThreadListener *)l;
00168 delete a;
00169 }
00170
00171 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
00172 {
00173 #if CMK_TRACE_ENABLED
00174
00175 TraceThreadListener *a= new TraceThreadListener;
00176
00177 a->base.suspend=traceThreadListener_suspend;
00178 a->base.resume=traceThreadListener_resume;
00179 a->base.free=traceThreadListener_free;
00180 a->event=e->getEvent();
00181 a->msgType=e->getMsgtype();
00182 a->ep=e->getEpIdx();
00183 a->srcPe=e->getSrcPe();
00184 a->ml=e->getTotalsize();
00185
00186 CthAddListener(tid, (CthThreadListener *)a);
00187 #endif
00188 }
00189
00190 void LogPool::openLog(const char *mode)
00191 {
00192 #if CMK_PROJECTIONS_USE_ZLIB
00193 if(compressed) {
00194 if (nonDeltaLog) {
00195 do {
00196 zfp = gzopen(fname, mode);
00197 } while (!zfp && (errno == EINTR || errno == EMFILE));
00198 if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
00199 }
00200 if (deltaLog) {
00201 do {
00202 deltazfp = gzopen(dfname, mode);
00203 } while (!deltazfp && (errno == EINTR || errno == EMFILE));
00204 if (!deltazfp)
00205 CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
00206 }
00207 } else {
00208 if (nonDeltaLog) {
00209 do {
00210 fp = fopen(fname, mode);
00211 } while (!fp && (errno == EINTR || errno == EMFILE));
00212 if (!fp) {
00213 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
00214 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
00215 }
00216 }
00217 if (deltaLog) {
00218 do {
00219 deltafp = fopen(dfname, mode);
00220 } while (!deltafp && (errno == EINTR || errno == EMFILE));
00221 if (!deltafp)
00222 CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
00223 }
00224 }
00225 #else
00226 if (nonDeltaLog) {
00227 do {
00228 fp = fopen(fname, mode);
00229 } while (!fp && (errno == EINTR || errno == EMFILE));
00230 if (!fp) {
00231 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
00232 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
00233 }
00234 }
00235 if (deltaLog) {
00236 do {
00237 deltafp = fopen(dfname, mode);
00238 } while (!deltafp && (errno == EINTR || errno == EMFILE));
00239 if(!deltafp)
00240 CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
00241 }
00242 #endif
00243 }
00244
00245 void LogPool::closeLog(void)
00246 {
00247 #if CMK_PROJECTIONS_USE_ZLIB
00248 if(compressed) {
00249 if (nonDeltaLog) gzclose(zfp);
00250 if (deltaLog) gzclose(deltazfp);
00251 return;
00252 }
00253 #endif
00254 if (nonDeltaLog) {
00255 #if !defined(_WIN32) || defined(__CYGWIN__)
00256 fsync(fileno(fp));
00257 #endif
00258 fclose(fp);
00259 }
00260 if (deltaLog) {
00261 #if !defined(_WIN32) || defined(__CYGWIN__)
00262 fsync(fileno(deltafp));
00263 #endif
00264 fclose(deltafp);
00265 }
00266 }
00267
00268 LogPool::LogPool(char *pgm) {
00269 pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
00270
00271 writeData = true;
00272 numEntries = 0;
00273
00274 prevTime = 0.0;
00275 timeErr = 0.0;
00276 globalStartTime = 0.0;
00277 globalEndTime = 0.0;
00278 headerWritten = 0;
00279 numPhases = 0;
00280 hasFlushed = false;
00281
00282 keepPhase = NULL;
00283
00284 fileCreated = false;
00285 poolSize = CkpvAccess(CtrLogBufSize);
00286 pgmname = new char[strlen(pgm)+1];
00287 strcpy(pgmname, pgm);
00288 }
00289
00290 void LogPool::createFile(const char *fix)
00291 {
00292 if (fileCreated) {
00293 return;
00294 }
00295
00296 char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1;
00297 char *pathPlusFilePrefix = new char[1024];
00298
00299 if(nSubdirs > 0){
00300 int sd = CkMyPe() % nSubdirs;
00301 char *subdir = new char[1024];
00302 sprintf(subdir, "%s.projdir.%d", pgmname, sd);
00303 CmiMkdir(subdir);
00304 sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
00305 delete[] subdir;
00306 } else {
00307 sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
00308 }
00309
00310 char pestr[10];
00311 sprintf(pestr, "%d", CkMyPe());
00312 #if CMK_PROJECTIONS_USE_ZLIB
00313 int len;
00314 if(compressed)
00315 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
00316 else
00317 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
00318 #else
00319 int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
00320 #endif
00321
00322 if (nonDeltaLog) {
00323 fname = new char[len];
00324 }
00325 if (deltaLog) {
00326 dfname = new char[len];
00327 }
00328 #if CMK_PROJECTIONS_USE_ZLIB
00329 if(compressed) {
00330 if (deltaLog && nonDeltaLog) {
00331 sprintf(fname, "%s.%s.logold.gz", pathPlusFilePrefix, pestr);
00332 sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
00333 } else {
00334 if (nonDeltaLog) {
00335 sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
00336 } else {
00337 sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
00338 }
00339 }
00340 } else {
00341 if (deltaLog && nonDeltaLog) {
00342 sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
00343 sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
00344 } else {
00345 if (nonDeltaLog) {
00346 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
00347 } else {
00348 sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
00349 }
00350 }
00351 }
00352 #else
00353 if (deltaLog && nonDeltaLog) {
00354 sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
00355 sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
00356 } else {
00357 if (nonDeltaLog) {
00358 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
00359 } else {
00360 sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
00361 }
00362 }
00363 #endif
00364 fileCreated = true;
00365 delete[] pathPlusFilePrefix;
00366 openLog("w+");
00367 CLOSE_LOG
00368 }
00369
00370 void LogPool::createSts(const char *fix)
00371 {
00372 CkAssert(CkMyPe() == 0);
00373
00374 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
00375 sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
00376 do
00377 {
00378 stsfp = fopen(fname, "w");
00379 } while (!stsfp && (errno == EINTR || errno == EMFILE));
00380 if(stsfp==0){
00381 CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
00382 CmiAbort("Error!!\n");
00383 }
00384 delete[] fname;
00385 }
00386
00387 void LogPool::createTopo(const char *fix)
00388 {
00389 CkAssert(CkMyPe() == 0);
00390
00391 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
00392 sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
00393 do
00394 {
00395 topofp = fopen(fname, "w");
00396 } while (!stsfp && (errno == EINTR || errno == EMFILE));
00397 if(stsfp==0){
00398 CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
00399 CmiAbort("Error!!\n");
00400 }
00401 delete[] fname;
00402 }
00403
00404 void LogPool::createRC()
00405 {
00406
00407 fname =
00408 new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
00409 sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
00410 do {
00411 rcfp = fopen(fname, "w");
00412 } while (!rcfp && (errno == EINTR || errno == EMFILE));
00413 if (rcfp==0) {
00414 CmiAbort("Cannot open projections configuration file for writing.\n");
00415 }
00416 delete[] fname;
00417 }
00418
00419 LogPool::~LogPool()
00420 {
00421 if (writeData) {
00422 writeLog();
00423 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
00424 closeLog();
00425 #endif
00426 }
00427
00428 #if CMK_BIGSIM_CHARM
00429 extern int correctTimeLog;
00430 if (correctTimeLog) {
00431 createFile("-bg");
00432 if (CkMyPe() == 0) {
00433 createSts("-bg");
00434 }
00435 writeHeader();
00436 if (CkMyPe() == 0) writeSts(NULL);
00437 postProcessLog();
00438 }
00439 #endif
00440
00441 delete[] pool;
00442 delete [] fname;
00443 }
00444
00445 void LogPool::writeHeader()
00446 {
00447 if (headerWritten) return;
00448 headerWritten = 1;
00449 if(!binary) {
00450 #if CMK_PROJECTIONS_USE_ZLIB
00451 if(compressed) {
00452 if (nonDeltaLog) {
00453 gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
00454 }
00455 if (deltaLog) {
00456 gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
00457 }
00458 }
00459 else
00460 #endif
00461 {
00462 if (nonDeltaLog) {
00463 fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
00464 }
00465 if (deltaLog) {
00466 fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
00467 }
00468 }
00469 }
00470 else {
00471 if (nonDeltaLog) {
00472 fwrite(&numEntries,sizeof(numEntries),1,fp);
00473 }
00474 if (deltaLog) {
00475 fwrite(&numEntries,sizeof(numEntries),1,deltafp);
00476 }
00477 }
00478 }
00479
00480 void LogPool::writeLog(void)
00481 {
00482 createFile();
00483 OPEN_LOG
00484 writeHeader();
00485 if (nonDeltaLog) write(0);
00486 if (deltaLog) write(1);
00487 CLOSE_LOG
00488 }
00489
00490 void LogPool::write(int writedelta)
00491 {
00492
00493
00494
00495
00496 PUP::er *p = NULL;
00497 if (binary) {
00498 p = new PUP::toDisk(writedelta?deltafp:fp);
00499 }
00500 #if CMK_PROJECTIONS_USE_ZLIB
00501 else if (compressed) {
00502 p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
00503 }
00504 #endif
00505 else {
00506 p = new toProjectionsFile(writedelta?deltafp:fp);
00507 }
00508 CmiAssert(p);
00509 int curPhase = 0;
00510
00511
00512
00513 for(UInt i=0; i<numEntries; i++) {
00514 if (!writedelta) {
00515 if (keepPhase == NULL) {
00516
00517 pool[i].pup(*p);
00518 } else {
00519
00520
00521 if (pool[i].type == END_PHASE) {
00522
00523 pool[i].pup(*p);
00524 curPhase++;
00525 } else if (pool[i].type == BEGIN_COMPUTATION ||
00526 pool[i].type == END_COMPUTATION) {
00527
00528 pool[i].pup(*p);
00529 } else if (keepPhase[curPhase]) {
00530 pool[i].pup(*p);
00531 }
00532 }
00533 }
00534 else {
00535
00536
00537 double time = pool[i].time;
00538 if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
00539 {
00540 double timeDiff = (time-prevTime)*1.0e6;
00541 UInt intTimeDiff = (UInt)timeDiff;
00542 timeErr += timeDiff - intTimeDiff;
00543 if (timeErr > 1.0) {
00544 timeErr -= 1.0;
00545 intTimeDiff++;
00546 }
00547 pool[i].time = intTimeDiff/1.0e6;
00548 }
00549 pool[i].pup(*p);
00550 pool[i].time = time;
00551 prevTime = time;
00552 }
00553 }
00554 delete p;
00555 delete [] keepPhase;
00556 }
00557
00558 void LogPool::writeSts(void)
00559 {
00560
00561 int i;
00562
00563 fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
00564 fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
00565 fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
00566 #if CMK_HAS_COUNTER_PAPI
00567 fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
00568
00569
00570 char eventName[PAPI_MAX_STR_LEN];
00571 for (i=0;i<NUMPAPIEVENTS;i++) {
00572 PAPI_event_code_to_name(papiEvents[i], eventName);
00573 fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
00574 }
00575 #endif
00576 traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
00577 for(i=0;i<CkpvAccess(usrEvents)->length();i++){
00578 fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
00579 }
00580 }
00581
00582 void LogPool::writeSts(TraceProjections *traceProj){
00583 writeSts();
00584 if (traceProj != NULL) {
00585 CkHashtableIterator *funcIter = traceProj->getfuncIterator();
00586 funcIter->seekStart();
00587 int numFuncs = traceProj->getFuncNumber();
00588 fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
00589 while(funcIter->hasNext()) {
00590 StrKey *key;
00591 int *obj = (int *)funcIter->next((void **)&key);
00592 fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
00593 }
00594 }
00595 fprintf(stsfp, "END\n");
00596 fclose(stsfp);
00597 }
00598
00599 void LogPool::writeRC(void)
00600 {
00601
00602 #ifdef PROJ_ANALYSIS
00603 CkAssert(CkMyPe() == 0);
00604 fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
00605 (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
00606 fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
00607 (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
00608
00609
00610
00611
00612
00613
00614
00615 #endif //PROJ_ANALYSIS
00616 fclose(rcfp);
00617 }
00618
00619 void LogPool::writeTopo(void)
00620 {
00621 TopoManager tmgr;
00622 tmgr.printAllocation(topofp);
00623 fclose(topofp);
00624 }
00625
00626 #if CMK_BIGSIM_CHARM
00627 static void updateProjLog(void *data, double t, double recvT, void *ptr)
00628 {
00629 LogEntry *log = (LogEntry *)data;
00630 FILE *fp = *(FILE **)ptr;
00631 log->time = t;
00632 log->recvTime = recvT<0.0?0:recvT;
00633
00634 toProjectionsFile p(fp);
00635 log->pup(p);
00636 }
00637 #endif
00638
00639
00640 void LogPool::flushLogBuffer()
00641 {
00642 if (numEntries) {
00643 double writeTime = TraceTimer();
00644 writeLog();
00645 hasFlushed = true;
00646 numEntries = 0;
00647 new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
00648 new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
00649 }
00650 }
00651
00652 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
00653 double time, int event, int pe, int ml, CmiObjId *id,
00654 double recvT, double cpuT, int numPe)
00655 {
00656 new (&pool[numEntries++])
00657 LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
00658 if ((type == END_PHASE) || (type == END_COMPUTATION)) {
00659 numPhases++;
00660 }
00661 if(poolSize==numEntries) {
00662 flushLogBuffer();
00663 #if CMK_BIGSIM_CHARM
00664 extern int correctTimeLog;
00665 if (correctTimeLog) CmiAbort("I/O interrupt!\n");
00666 #endif
00667 }
00668 #if CMK_BIGSIM_CHARM
00669 switch (type) {
00670 case BEGIN_PROCESSING:
00671 pool[numEntries-1].recvTime = BgGetRecvTime();
00672 case END_PROCESSING:
00673 case BEGIN_COMPUTATION:
00674 case END_COMPUTATION:
00675 case CREATION:
00676 case BEGIN_PACK:
00677 case END_PACK:
00678 case BEGIN_UNPACK:
00679 case END_UNPACK:
00680 case USER_EVENT_PAIR:
00681 bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
00682 }
00683 #endif
00684 }
00685
00686 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
00687 #ifndef CMK_BIGSIM_CHARM
00688 new (&pool[numEntries++])
00689 LogEntry(time,type,funcID,lineNum,fileName);
00690 if(poolSize == numEntries){
00691 flushLogBuffer();
00692 }
00693 #endif
00694 }
00695
00696
00697
00698 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
00699 #ifndef CMK_BIGSIM_CHARM
00700 new (&pool[numEntries++])
00701 LogEntry(type,time,memUsage);
00702 if(poolSize == numEntries){
00703 flushLogBuffer();
00704 }
00705 #endif
00706
00707 }
00708
00709
00710
00711 void LogPool::addUserSupplied(int data){
00712
00713 add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
00714
00715
00716 pool[numEntries-1].setUserSuppliedData(data);
00717 }
00718
00719
00720 void LogPool::addUserSuppliedNote(char *note){
00721
00722 add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
00723
00724
00725 pool[numEntries-1].setUserSuppliedNote(note);
00726 }
00727
00728 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
00729
00730 #ifndef CMK_BIGSIM_CHARM
00731 #if MPI_TRACE_MACHINE_HACK
00732
00733
00734
00735 #define MPI_TEST_EVENT_ID 60
00736 #define MPI_IPROBE_EVENT_ID 70
00737 int lastEvent = pool[numEntries-1].event;
00738 if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
00739
00740
00741 pool[numEntries].endTime = et;
00742 }else{
00743 new (&pool[numEntries++])
00744 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
00745 }
00746 #else
00747 new (&pool[numEntries++])
00748 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
00749 #endif
00750 if(poolSize == numEntries){
00751 flushLogBuffer();
00752 }
00753 #endif
00754 }
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
00766 int event, int pe, int ml, CmiObjId *id,
00767 double recvT, int numPe, int *pelist)
00768 {
00769 new (&pool[numEntries++])
00770 LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
00771 if(poolSize==numEntries) {
00772 flushLogBuffer();
00773 }
00774 }
00775
00776 void LogPool::postProcessLog()
00777 {
00778 #if CMK_BIGSIM_CHARM
00779 bgUpdateProj(1);
00780 #endif
00781 }
00782
00783 void LogPool::modLastEntryTimestamp(double ts)
00784 {
00785 pool[numEntries-1].time = ts;
00786
00787 }
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820 void LogEntry::pup(PUP::er &p)
00821 {
00822 int i;
00823 CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
00824 char ret = '\n';
00825
00826 p|type;
00827 if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
00828 if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
00829
00830 switch (type) {
00831 case USER_EVENT:
00832 case USER_EVENT_PAIR:
00833 p|mIdx; p|itime; p|event; p|pe;
00834 break;
00835 case BEGIN_IDLE:
00836 case END_IDLE:
00837 case BEGIN_PACK:
00838 case END_PACK:
00839 case BEGIN_UNPACK:
00840 case END_UNPACK:
00841 p|itime; p|pe;
00842 break;
00843 case BEGIN_PROCESSING:
00844 if (p.isPacking()) {
00845 irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
00846 icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
00847 }
00848 p|mIdx; p|eIdx; p|itime; p|event; p|pe;
00849 p|msglen; p|irecvtime;
00850 p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
00851 p|icputime;
00852 #if CMK_HAS_COUNTER_PAPI
00853
00854 for (i=0; i<NUMPAPIEVENTS; i++) {
00855
00856
00857 p|papiValues[i];
00858
00859 }
00860 #else
00861
00862 #endif
00863 if (p.isUnpacking()) {
00864 recvTime = irecvtime/1.0e6;
00865 cputime = icputime/1.0e6;
00866 }
00867 break;
00868 case END_PROCESSING:
00869 if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
00870 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
00871 #if CMK_HAS_COUNTER_PAPI
00872
00873 for (i=0; i<NUMPAPIEVENTS; i++) {
00874
00875
00876 p|papiValues[i];
00877 }
00878 #else
00879
00880 #endif
00881 if (p.isUnpacking()) cputime = icputime/1.0e6;
00882 break;
00883 case USER_SUPPLIED:
00884 p|userSuppliedData;
00885 p|itime;
00886 break;
00887 case USER_SUPPLIED_NOTE:
00888 p|itime;
00889 int length;
00890 if (p.isPacking()) length = strlen(userSuppliedNote);
00891 p | length;
00892 char space;
00893 space = ' ';
00894 p | space;
00895 if (p.isUnpacking()) {
00896 userSuppliedNote = new char[length+1];
00897 userSuppliedNote[length] = '\0';
00898 }
00899 PUParray(p,userSuppliedNote, length);
00900 break;
00901 case USER_SUPPLIED_BRACKETED_NOTE:
00902
00903 p|itime;
00904 p|iEndTime;
00905 p|event;
00906 int length2;
00907 if (p.isPacking()) length2 = strlen(userSuppliedNote);
00908 p | length2;
00909 char space2;
00910 space2 = ' ';
00911 p | space2;
00912 if (p.isUnpacking()) {
00913 userSuppliedNote = new char[length+1];
00914 userSuppliedNote[length] = '\0';
00915 }
00916 PUParray(p,userSuppliedNote, length2);
00917 break;
00918 case MEMORY_USAGE_CURRENT:
00919 p | memUsage;
00920 p | itime;
00921 break;
00922 case CREATION:
00923 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00924 p|mIdx; p|eIdx; p|itime;
00925 p|event; p|pe; p|msglen; p|irecvtime;
00926 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00927 break;
00928 case CREATION_BCAST:
00929 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00930 p|mIdx; p|eIdx; p|itime;
00931 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
00932 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00933 break;
00934 case CREATION_MULTICAST:
00935 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
00936 p|mIdx; p|eIdx; p|itime;
00937 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
00938 if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
00939 for (i=0; i<numpes; i++) p|pes[i];
00940 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
00941 break;
00942 case MESSAGE_RECV:
00943 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
00944 break;
00945
00946 case ENQUEUE:
00947 case DEQUEUE:
00948 p|mIdx; p|itime; p|event; p|pe;
00949 break;
00950
00951 case BEGIN_INTERRUPT:
00952 case END_INTERRUPT:
00953 p|itime; p|event; p|pe;
00954 break;
00955
00956
00957
00958
00959 case BEGIN_COMPUTATION:
00960 case END_COMPUTATION:
00961 case BEGIN_TRACE:
00962 case END_TRACE:
00963 p|itime;
00964 break;
00965 case BEGIN_FUNC:
00966 p | itime;
00967 p | mIdx;
00968 p | event;
00969 if(!p.isUnpacking()){
00970 p(fName,flen-1);
00971 }
00972 break;
00973 case END_FUNC:
00974 p | itime;
00975 p | mIdx;
00976 break;
00977 case END_PHASE:
00978 p|eIdx;
00979 p|itime;
00980 break;
00981 default:
00982 CmiError("***Internal Error*** Wierd Event %d.\n", type);
00983 break;
00984 }
00985 if (p.isUnpacking()) time = itime/1.0e6;
00986 p|ret;
00987 }
00988
00989 TraceProjections::TraceProjections(char **argv):
00990 curevent(0), inEntry(0), computationStarted(0),
00991 converseExit(0), endTime(0.0), traceNestedEvents(0),
00992 currentPhaseID(0), lastPhaseEvent(NULL)
00993 {
00994
00995
00996 if (CkpvAccess(traceOnPe) == 0) return;
00997
00998 CtvInitialize(int,curThreadEvent);
00999 CkpvInitialize(CmiInt8, CtrLogBufSize);
01000 CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
01001 CtvAccess(curThreadEvent)=0;
01002 if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize),
01003 "Log entries to buffer per I/O")) {
01004 if (CkMyPe() == 0) {
01005 CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
01006 }
01007 }
01008 checknested =
01009 CmiGetArgFlagDesc(argv,"+checknested",
01010 "check projections nest begin end execute events");
01011 traceNestedEvents =
01012 CmiGetArgFlagDesc(argv,"+tracenested",
01013 "trace projections nest begin/end execute events");
01014 int binary =
01015 CmiGetArgFlagDesc(argv,"+binary-trace",
01016 "Write log files in binary format");
01017
01018 CmiInt8 nSubdirs = 0;
01019 CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
01020
01021
01022 #if CMK_PROJECTIONS_USE_ZLIB
01023 int compressed = true;
01024 CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
01025 int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
01026 compressed = compressed && !disableCompressed;
01027 #else
01028
01029 CmiGetArgFlagDesc(argv,"+gz-trace",
01030 "Write log files pre-compressed with gzip");
01031 if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
01032 #endif
01033
01034
01035
01036
01037
01038
01039
01040
01041 nonDeltaLog = 1;
01042 deltaLog = 0;
01043 deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
01044 "Generate Delta encoded and simple timestamped log files");
01045
01046 _logPool = new LogPool(CkpvAccess(traceRoot));
01047 _logPool->setNumSubdirs(nSubdirs);
01048 _logPool->setBinary(binary);
01049 #if CMK_PROJECTIONS_USE_ZLIB
01050 _logPool->setCompressed(compressed);
01051 #endif
01052 if (CkMyPe() == 0) {
01053 _logPool->createSts();
01054 _logPool->createRC();
01055 _logPool->createTopo();
01056 }
01057 funcCount=1;
01058
01059 #if CMK_HAS_COUNTER_PAPI
01060
01061 int papiRetValue;
01062 if(CkMyRank()==0){
01063 papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
01064 if (papiRetValue != PAPI_VER_CURRENT) {
01065 CmiAbort("PAPI Library initialization failure!\n");
01066 }
01067 #if CMK_SMP
01068 if(PAPI_thread_init(pthread_self) != PAPI_OK){
01069 CmiAbort("PAPI could not be initialized in SMP mode!\n");
01070 }
01071 #endif
01072 }
01073
01074 #if CMK_SMP
01075
01076 #if CMK_SMP_TRACE_COMMTHREAD
01077 CmiNodeAllBarrier();
01078 #else
01079 CmiNodeBarrier();
01080 #endif
01081 #endif
01082
01083 papiEventSet = PAPI_NULL;
01084 if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
01085 CmiAbort("PAPI failed to create event set!\n");
01086 }
01087 #ifdef USE_SPP_PAPI
01088
01089 if(PAPI_query_event(PAPI_FP_OPS)==PAPI_OK) {
01090 papiEvents[0] = PAPI_FP_OPS;
01091 }else{
01092 if(CmiMyPe()==0){
01093 CmiAbort("WARNING: PAPI_FP_OPS doesn't exist on this platform!");
01094 }
01095 }
01096 if(PAPI_query_event(PAPI_TOT_INS)==PAPI_OK) {
01097 papiEvents[1] = PAPI_TOT_INS;
01098 }else{
01099 CmiAbort("WARNING: PAPI_TOT_INS doesn't exist on this platform!");
01100 }
01101 int EventCode;
01102 int ret;
01103 ret=PAPI_event_name_to_code("perf::PERF_COUNT_HW_CACHE_LL:MISS",&EventCode);
01104 if(PAPI_query_event(EventCode)==PAPI_OK) {
01105 papiEvents[2] = EventCode;
01106 }else{
01107 CmiAbort("WARNING: perf::PERF_COUNT_HW_CACHE_LL:MISS doesn't exist on this platform!");
01108 }
01109 ret=PAPI_event_name_to_code("DATA_PREFETCHER:ALL",&EventCode);
01110 if(PAPI_query_event(EventCode)==PAPI_OK) {
01111 papiEvents[3] = EventCode;
01112 }else{
01113 CmiAbort("WARNING: DATA_PREFETCHER:ALL doesn't exist on this platform!");
01114 }
01115 if(PAPI_query_event(PAPI_L1_DCA)==PAPI_OK) {
01116 papiEvents[4] = PAPI_L1_DCA;
01117 }else{
01118 CmiAbort("WARNING: PAPI_L1_DCA doesn't exist on this platform!");
01119 }
01120 if(PAPI_query_event(PAPI_TOT_CYC)==PAPI_OK) {
01121 papiEvents[5] = PAPI_TOT_CYC;
01122 }else{
01123 CmiAbort("WARNING: PAPI_TOT_CYC doesn't exist on this platform!");
01124 }
01125 #else
01126
01127 #endif
01128 papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
01129 if (papiRetValue < 0) {
01130 if (papiRetValue == PAPI_ECNFLCT) {
01131 CmiAbort("PAPI events conflict! Please re-assign event types!\n");
01132 } else {
01133 char error_str[PAPI_MAX_STR_LEN];
01134 PAPI_perror(papiRetValue,error_str,PAPI_MAX_STR_LEN);
01135 CmiPrintf("PAPI failed with error %s val %d\n",error_str,papiRetValue);
01136 CmiAbort("PAPI failed to add designated events!\n");
01137 }
01138 }
01139 if(CkMyPe()==0)
01140 {
01141 CmiPrintf("Registered %d PAPI counters:",NUMPAPIEVENTS);
01142 char nameBuf[PAPI_MAX_STR_LEN];
01143 for(int i=0;i<NUMPAPIEVENTS;i++)
01144 {
01145 PAPI_event_code_to_name(papiEvents[i], nameBuf);
01146 CmiPrintf("%s ",nameBuf);
01147 }
01148 CmiPrintf("\n");
01149 }
01150 memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
01151 #endif
01152 }
01153
01154 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
01155 {
01156 OPTIMIZED_VERSION
01157 CkAssert(e==-1 || e>=0);
01158 CkAssert(evt != NULL);
01159 int event;
01160 int biggest = -1;
01161 for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
01162 int cur = (*CkpvAccess(usrEvents))[i]->e;
01163 if (cur == e) {
01164
01165 if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0)
01166 return e;
01167 else
01168 CmiAbort("UserEvent double registered!");
01169 }
01170 if (cur > biggest) biggest = cur;
01171 }
01172
01173
01174 if (e==-1) event = biggest+1;
01175 else event = e;
01176 CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
01177 return event;
01178 }
01179
01180 void TraceProjections::traceClearEps(void)
01181 {
01182
01183
01184
01185 }
01186
01187 void TraceProjections::traceWriteSts(void)
01188 {
01189 if(CkMyPe()==0)
01190 _logPool->writeSts(this);
01191 }
01192
01209 void TraceProjections::traceClose(void)
01210 {
01211 #ifdef PROJ_ANALYSIS
01212
01213
01214
01215 converseExit = 1;
01216 if (CkMyPe() == 0) {
01217 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01218 bocProxy.traceProjectionsParallelShutdown(-1);
01219 }
01220 if(CkMyRank() == CkMyNodeSize()){
01221 CkpvAccess(_trace)->endComputation();
01222 delete _logPool;
01223
01224 CkpvAccess(_traces)->removeTrace(this);
01225 }
01226 #else
01227
01228
01229 if (_logPool == NULL) {
01230 return;
01231 }
01232 if(CkMyPe()==0){
01233 _logPool->writeSts(this);
01234 }
01235 CkpvAccess(_trace)->endComputation();
01236 delete _logPool;
01237
01238 CkpvAccess(_traces)->removeTrace(this);
01239 #endif
01240 }
01241
01248 void TraceProjections::closeTrace() {
01249
01250 if (CkMyPe() == 0) {
01251
01252 _logPool->writeSts(this);
01253 _logPool->writeRC();
01254 _logPool->writeTopo();
01255
01256 }
01257 delete _logPool;
01258 }
01259
01260 #if CMK_SMP_TRACE_COMMTHREAD
01261 void TraceProjections::traceBeginOnCommThread()
01262 {
01263 if (!computationStarted) return;
01264 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
01265 }
01266
01267 void TraceProjections::traceEndOnCommThread()
01268 {
01269 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
01270 }
01271 #endif
01272
01273 void TraceProjections::traceBegin(void)
01274 {
01275 if (!computationStarted) return;
01276 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
01277 }
01278
01279 void TraceProjections::traceEnd(void)
01280 {
01281 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
01282 }
01283
01284 void TraceProjections::userEvent(int e)
01285 {
01286 if (!computationStarted) return;
01287 _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
01288 }
01289
01290 void TraceProjections::userBracketEvent(int e, double bt, double et)
01291 {
01292 if (!computationStarted) return;
01293
01294 _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
01295 _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
01296 }
01297
01298 void TraceProjections::userSuppliedData(int d)
01299 {
01300 if (!computationStarted) return;
01301 _logPool->addUserSupplied(d);
01302 }
01303
01304 void TraceProjections::userSuppliedNote(char *note)
01305 {
01306 if (!computationStarted) return;
01307 _logPool->addUserSuppliedNote(note);
01308 }
01309
01310
01311 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
01312 {
01313 if (!computationStarted) return;
01314 _logPool->addUserSuppliedBracketedNote(note, eventID, bt, et);
01315 }
01316
01317 void TraceProjections::memoryUsage(double m)
01318 {
01319 if (!computationStarted) return;
01320 _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
01321
01322 }
01323
01324
01325 void TraceProjections::creation(envelope *e, int ep, int num)
01326 {
01327 double curTime = TraceTimer();
01328 if (e == 0) {
01329 CtvAccess(curThreadEvent) = curevent;
01330 _logPool->add(CREATION, ForChareMsg, ep, curTime,
01331 curevent++, CkMyPe(), 0, NULL, 0, 0.0);
01332 } else {
01333 int type=e->getMsgtype();
01334 e->setEvent(curevent);
01335 if (num > 1) {
01336 _logPool->add(CREATION_BCAST, type, ep, curTime,
01337 curevent++, CkMyPe(), e->getTotalsize(),
01338 NULL, 0, 0.0, num);
01339 } else {
01340 _logPool->add(CREATION, type, ep, curTime,
01341 curevent++, CkMyPe(), e->getTotalsize(),
01342 NULL, 0, 0.0);
01343 }
01344 }
01345 }
01346
01347
01348 void TraceProjections::creation(char *msg)
01349 {
01350 #if CMK_SMP_TRACE_COMMTHREAD
01351
01352 envelope *e = (envelope *)msg;
01353 int ep = e->getEpIdx();
01354 if(_entryTable[ep]->traceEnabled) {
01355 creation(e, ep, 1);
01356 e->setSrcPe(CkMyPe());
01357 }
01358 #endif
01359 }
01360
01361 void TraceProjections::traceCommSetMsgID(char *msg)
01362 {
01363 #if CMK_SMP_TRACE_COMMTHREAD
01364
01365 envelope *e = (envelope *)msg;
01366 int ep = e->getEpIdx();
01367 if(_entryTable[ep]->traceEnabled) {
01368 e->setSrcPe(CkMyPe());
01369 e->setEvent(curevent);
01370 }
01371 #endif
01372 }
01373
01374 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
01375 {
01376
01377 *pe = *event = -1;
01378 envelope *e = (envelope *)msg;
01379 int ep = e->getEpIdx();
01380 if(_entryTable[ep]->traceEnabled) {
01381 *pe = e->getSrcPe();
01382 *event = e->getEvent();
01383 }
01384 }
01385
01386 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
01387 {
01388
01389 envelope *e = (envelope *)msg;
01390 int ep = e->getEpIdx();
01391 if(ep<=0 || ep>=_entryTable.size()) return;
01392 if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
01393 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
01394 if(_entryTable[ep]->traceEnabled) {
01395 e->setSrcPe(pe);
01396 e->setEvent(event);
01397 }
01398 }
01399
01400
01401
01402
01403
01404
01405 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
01406 int *pelist)
01407 {
01408 double curTime = TraceTimer();
01409 if (e==0) {
01410 CtvAccess(curThreadEvent)=curevent;
01411 _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
01412 CkMyPe(), 0, 0, 0.0, num, pelist);
01413 } else {
01414 int type=e->getMsgtype();
01415 e->setEvent(curevent);
01416 _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
01417 e->getTotalsize(), 0, 0.0, num, pelist);
01418 }
01419 }
01420
01421 void TraceProjections::creationDone(int num)
01422 {
01423
01424
01425 double curTime = TraceTimer();
01426 int idx = _logPool->numEntries-1;
01427 while (idx >=0 && num >0 ) {
01428 LogEntry &log = _logPool->pool[idx];
01429 if ((log.type == CREATION) ||
01430 (log.type == CREATION_BCAST) ||
01431 (log.type == CREATION_MULTICAST)) {
01432 log.recvTime = curTime - log.time;
01433 num --;
01434 }
01435 idx--;
01436 }
01437 }
01438
01439 void TraceProjections::beginExecute(CmiObjId *tid)
01440 {
01441 #if CMK_HAS_COUNTER_PAPI
01442 if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
01443 CmiAbort("PAPI failed to read at begin execute!\n");
01444 }
01445 #endif
01446 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01447 execEvent = CtvAccess(curThreadEvent);
01448 execEp = (-1);
01449 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
01450 execEvent,CkMyPe(), 0, tid);
01451 #if CMK_HAS_COUNTER_PAPI
01452 _logPool->addPapi(papiValues);
01453 #endif
01454 inEntry = 1;
01455 }
01456
01457 void TraceProjections::beginExecute(envelope *e)
01458 {
01459 if(e==0) {
01460 #if CMK_HAS_COUNTER_PAPI
01461 if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
01462 CmiAbort("PAPI failed to read at begin execute!\n");
01463 }
01464 #endif
01465 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01466 execEvent = CtvAccess(curThreadEvent);
01467 execEp = (-1);
01468 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
01469 execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
01470 #if CMK_HAS_COUNTER_PAPI
01471 _logPool->addPapi(papiValues);
01472 #endif
01473 inEntry = 1;
01474 } else {
01475 beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
01476 e->getSrcPe(),e->getTotalsize());
01477 }
01478 }
01479
01480 void TraceProjections::beginExecute(char *msg){
01481 #if CMK_SMP_TRACE_COMMTHREAD
01482
01483 envelope *e = (envelope *)msg;
01484 int ep = e->getEpIdx();
01485 if(_entryTable[ep]->traceEnabled)
01486 beginExecute(e);
01487 #endif
01488 }
01489
01490 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
01491 int mlen, CmiObjId *idx)
01492 {
01493 if (traceNestedEvents) {
01494 if (! nestedEvents.isEmpty()) {
01495 endExecuteLocal();
01496 }
01497 nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
01498 }
01499 beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
01500 }
01501
01502 void TraceProjections::changeLastEntryTimestamp(double ts)
01503 {
01504 _logPool->modLastEntryTimestamp(ts);
01505 }
01506
01507 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
01508 int mlen, CmiObjId *idx)
01509 {
01510 #if CMK_HAS_COUNTER_PAPI
01511 if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
01512 CmiAbort("PAPI failed to read at begin execute!\n");
01513 }
01514 #endif
01515 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
01516 execEvent=event;
01517 execEp=ep;
01518 execPe=srcPe;
01519 _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
01520 srcPe, mlen, idx, 0.0, TraceCpuTimer());
01521 #if CMK_HAS_COUNTER_PAPI
01522 _logPool->addPapi(papiValues);
01523 #endif
01524 inEntry = 1;
01525 }
01526
01527 void TraceProjections::endExecute(void)
01528 {
01529 if (traceNestedEvents) nestedEvents.deq();
01530 endExecuteLocal();
01531 if (traceNestedEvents) {
01532 if (! nestedEvents.isEmpty()) {
01533 NestedEvent &ne = nestedEvents.peek();
01534 beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
01535 }
01536 }
01537 }
01538
01539 void TraceProjections::endExecute(char *msg)
01540 {
01541 #if CMK_SMP_TRACE_COMMTHREAD
01542
01543 envelope *e = (envelope *)msg;
01544 int ep = e->getEpIdx();
01545 if(_entryTable[ep]->traceEnabled)
01546 endExecute();
01547 #endif
01548 }
01549
01550 void TraceProjections::endExecuteLocal(void)
01551 {
01552 #if CMK_HAS_COUNTER_PAPI
01553 if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
01554 CmiAbort("PAPI failed to read at end execute!\n");
01555 }
01556 #endif
01557 if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
01558 double cputime = TraceCpuTimer();
01559 if(execEp == (-1)) {
01560 _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
01561 execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
01562 } else {
01563 _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
01564 execEvent, execPe, 0, NULL, 0.0, cputime);
01565 }
01566 #if CMK_HAS_COUNTER_PAPI
01567 _logPool->addPapi(papiValues);
01568 #endif
01569 inEntry = 0;
01570 }
01571
01572 void TraceProjections::messageRecv(char *env, int pe)
01573 {
01574 #if 0
01575 envelope *e = (envelope *)env;
01576 int msgType = e->getMsgtype();
01577 int ep = e->getEpIdx();
01578 #if 0
01579 if (msgType==NewChareMsg || msgType==NewVChareMsg
01580 || msgType==ForChareMsg || msgType==ForVidMsg
01581 || msgType==BocInitMsg || msgType==NodeBocInitMsg
01582 || msgType==ForBocMsg || msgType==ForNodeBocMsg)
01583 ep = e->getEpIdx();
01584 else
01585 ep = _threadEP;
01586 #endif
01587 _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
01588 curevent++, e->getSrcPe(), e->getTotalsize());
01589 #endif
01590 }
01591
01592 void TraceProjections::beginIdle(double curWallTime)
01593 {
01594 _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
01595 }
01596
01597 void TraceProjections::endIdle(double curWallTime)
01598 {
01599 _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
01600 }
01601
01602 void TraceProjections::beginPack(void)
01603 {
01604 _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
01605 }
01606
01607 void TraceProjections::endPack(void)
01608 {
01609 _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
01610 }
01611
01612 void TraceProjections::beginUnpack(void)
01613 {
01614 _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
01615 }
01616
01617 void TraceProjections::endUnpack(void)
01618 {
01619 _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
01620 }
01621
01622 void TraceProjections::enqueue(envelope *) {}
01623
01624 void TraceProjections::dequeue(envelope *) {}
01625
01626 void TraceProjections::beginComputation(void)
01627 {
01628 computationStarted = 1;
01629
01630
01631
01632
01633 if (CkpvAccess(traceOnPe) != 0) {
01634 void (*ptr)() = registerMachineUserEvents();
01635 if (ptr != NULL) {
01636 ptr();
01637 }
01638 }
01639
01640
01641 _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
01642 #if CMK_HAS_COUNTER_PAPI
01643
01644 if (PAPI_start(papiEventSet) != PAPI_OK) {
01645 CmiAbort("PAPI failed to start designated counters!\n");
01646 }
01647 #endif
01648 }
01649
01650 void TraceProjections::endComputation(void)
01651 {
01652 #if CMK_HAS_COUNTER_PAPI
01653
01654
01655 if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
01656 CkPrintf("Warning: PAPI failed to stop correctly!\n");
01657 }
01658
01659
01660 #endif
01661 endTime = TraceTimer();
01662 _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
01663
01664
01665
01666
01667 }
01668
01669 int TraceProjections::idxRegistered(int idx)
01670 {
01671 int idxVecLen = idxVec.size();
01672 for(int i=0; i<idxVecLen; i++)
01673 {
01674 if(idx == idxVec[i])
01675 return 1;
01676 }
01677 return 0;
01678 }
01679
01680 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
01681 StrKey k((char*)name,strlen(name));
01682 int num = funcHashtable.get(k);
01683
01684 if(num!=0) {
01685 return;
01686
01687
01688 }
01689
01690 int isIdxExisting=0;
01691 if(idxSpecifiedByUser)
01692 isIdxExisting=idxRegistered(idx);
01693 if(isIdxExisting){
01694 return;
01695
01696
01697 }
01698
01699 if(idxSpecifiedByUser) {
01700 char *st = new char[strlen(name)+1];
01701 memcpy(st,name,strlen(name)+1);
01702 StrKey *newKey = new StrKey(st,strlen(st));
01703 int &ref = funcHashtable.put(*newKey);
01704 ref=idx;
01705 funcCount++;
01706 idxVec.push_back(idx);
01707 } else {
01708 char *st = new char[strlen(name)+1];
01709 memcpy(st,name,strlen(name)+1);
01710 StrKey *newKey = new StrKey(st,strlen(st));
01711 int &ref = funcHashtable.put(*newKey);
01712 ref=funcCount;
01713 num = funcCount;
01714 funcCount++;
01715 idx = num;
01716 idxVec.push_back(idx);
01717 }
01718 }
01719
01720 void TraceProjections::beginFunc(char *name,char *file,int line){
01721 StrKey k(name,strlen(name));
01722 unsigned short num = (unsigned short)funcHashtable.get(k);
01723 beginFunc(num,file,line);
01724 }
01725
01726 void TraceProjections::beginFunc(int idx,char *file,int line){
01727 if(idx <= 0){
01728 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
01729 }
01730 _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
01731 }
01732
01733 void TraceProjections::endFunc(char *name){
01734 StrKey k(name,strlen(name));
01735 int num = funcHashtable.get(k);
01736 endFunc(num);
01737 }
01738
01739 void TraceProjections::endFunc(int num){
01740 if(num <= 0){
01741 printf("endFunc without start :O\n");
01742 }
01743 _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
01744 }
01745
01746
01747 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
01748 {
01749 for (int i=0;i<n;i++)
01750 switch(t) {
01751 case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
01752 case Tuchar:
01753 case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
01754 case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
01755 case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
01756 case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
01757 case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
01758 case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
01759 case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
01760 case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
01761 case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
01762 #ifdef CMK_PUP_LONG_LONG
01763 case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
01764 case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
01765 #endif
01766 default: CmiAbort("Unrecognized pup type code!");
01767 };
01768 }
01769
01770 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
01771 {
01772 for (int i=0;i<n;i++)
01773 switch(t) {
01774 case Tchar: {
01775 char c = fgetc(f);
01776 if (c==EOF)
01777 parseError("Could not match character");
01778 else
01779 ((char *)p)[i] = c;
01780 break;
01781 }
01782 case Tuchar:
01783 case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
01784 case Tshort:((short *)p)[i]=(short)readInt(); break;
01785 case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
01786 case Tint: ((int *)p)[i]=readInt(); break;
01787 case Tuint: ((unsigned int *)p)[i]=readUint(); break;
01788 case Tlong: ((long *)p)[i]=readInt(); break;
01789 case Tulong:((unsigned long *)p)[i]=readUint(); break;
01790 case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
01791 case Tdouble:((double *)p)[i]=readDouble(); break;
01792 #ifdef CMK_PUP_LONG_LONG
01793 case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
01794 case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
01795 #endif
01796 default: CmiAbort("Unrecognized pup type code!");
01797 };
01798 }
01799
01800 #if CMK_PROJECTIONS_USE_ZLIB
01801 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
01802 {
01803 for (int i=0;i<n;i++)
01804 switch(t) {
01805 case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
01806 case Tuchar:
01807 case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
01808 case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
01809 case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
01810 case Tint: gzprintf(f," %d",((int *)p)[i]); break;
01811 case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
01812 case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
01813 case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
01814 case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
01815 case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
01816 #ifdef CMK_PUP_LONG_LONG
01817 case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
01818 case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
01819 #endif
01820 default: CmiAbort("Unrecognized pup type code!");
01821 };
01822 }
01823 #endif
01824
01825 void TraceProjections::endPhase() {
01826 double currentPhaseTime = TraceTimer();
01827 if (lastPhaseEvent != NULL) {
01828 } else {
01829 if (_logPool->pool != NULL) {
01830
01831 } else {
01832 CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
01833 _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
01834 }
01835 }
01836
01837
01838
01839
01840
01841
01842 lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
01843 _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
01844 currentPhaseID++;
01845 }
01846
01847 #ifdef PROJ_ANALYSIS
01848
01849
01850
01851
01852
01853 void registerOutlierReduction() {
01854 outlierReductionType =
01855 CkReduction::addReducer(outlierReduction);
01856 minMaxReductionType =
01857 CkReduction::addReducer(minMaxReduction);
01858 }
01859
01872
01873 extern "C" void TraceProjectionsExitHandler()
01874 {
01875 #if CMK_TRACE_ENABLED
01876
01877 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01878 bocProxy.traceProjectionsParallelShutdown(CkMyPe());
01879 #else
01880 CkExit();
01881 #endif
01882 }
01883
01884
01885
01886
01887
01888 void initTraceProjectionsBOC()
01889 {
01890
01891 #ifdef __BIGSIM__
01892 if (BgNodeRank() == 0) {
01893 #else
01894 if (CkMyRank() == 0) {
01895 #endif
01896 registerExitFn(TraceProjectionsExitHandler);
01897 }
01898 #if 0
01899 }
01900 #endif
01901 }
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
01913
01914 bool findOutliers = false;
01915 bool outlierAutomatic = true;
01916 int numKSeeds = 10;
01917
01918 int peNumKeep = CkNumPes();
01919 double entryThreshold = 0.0;
01920 bool outlierUsePhases = false;
01921 if (outlierAutomatic) {
01922 CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
01923 "Number of cluster seeds to apply at outlier analysis.");
01924 CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep",
01925 &peNumKeep, "Number of Processors to retain data");
01926 CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
01927 "Minimum significance of entry points to be considered for clustering (%).");
01928 findOutliers =
01929 CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
01930 outlierUsePhases =
01931 CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
01932 "Apply automatic outlier analysis to any available phases.");
01933 if (outlierUsePhases) {
01934
01935
01936 findOutliers = true;
01937 }
01938 }
01939 bool findStartTime = (CmiTimerAbsolute()==1);
01940 traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
01941 if (findOutliers) {
01942 kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
01943 numKSeeds,
01944 peNumKeep,
01945 entryThreshold,
01946 outlierUsePhases);
01947 }
01948 }
01949
01950
01951 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
01952
01953 endPe = pe;
01954 if (CkMyPe() == 0) {
01955 analysisStartTime = CmiWallTimer();
01956 }
01957 CkpvAccess(_trace)->endComputation();
01958
01959
01960 CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
01961 CkpvAccess(_traces)->clearTrace();
01962
01963
01964
01965
01966
01967
01968
01969
01970 CProxy_KMeansBOC kMeansProxy(kMeansGID);
01971 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
01972 if (findOutliers) {
01973 parModulesRemaining++;
01974 kMeansProxy[CkMyPe()].startKMeansAnalysis();
01975 }
01976 parModulesRemaining++;
01977 if (findStartTime)
01978 bocProxy[CkMyPe()].startTimeAnalysis();
01979 else
01980 bocProxy[CkMyPe()].startEndTimeAnalysis();
01981 }
01982
01983
01984 void KMeansBOC::startKMeansAnalysis() {
01985
01986 LogPool *pool = CkpvAccess(_trace)->_logPool;
01987
01988 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
01989 int flushInt = 0;
01990 if (pool->hasFlushed) {
01991 flushInt = 1;
01992 }
01993
01994 CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL),
01995 0, thisProxy);
01996 contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);
01997 }
01998
01999
02000 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
02001 int someFlush = *((int *)msg->getData());
02002
02003
02004
02005 if (someFlush == 0) {
02006
02007 CProxy_KMeansBOC kMeansProxy(kMeansGID);
02008 kMeansProxy.flushCheckDone();
02009 } else {
02010
02011 CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
02012
02013 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
02014 bocProxy[0].kMeansDone();
02015 }
02016 }
02017
02018
02019 void KMeansBOC::flushCheckDone() {
02020
02021
02022 LogPool *pool = CkpvAccess(_trace)->_logPool;
02023
02024
02025
02026 numEntryMethods = _entryTable.size();
02027 numMetrics = numEntryMethods + 2;
02028
02029
02030 markedBegin = false;
02031 markedIdle = false;
02032 beginBlockTime = 0.0;
02033 beginIdleBlockTime = 0.0;
02034 lastBeginEPIdx = -1;
02035
02036 lastPhaseIdx = 0;
02037 currentExecTimes = NULL;
02038 currentPhase = 0;
02039 selected = false;
02040
02041 pool->initializePhases();
02042
02043
02044 incKSeeds = new double[numK*numMetrics];
02045 keepMetric = new bool[numMetrics];
02046
02047
02048
02049
02050 thisProxy[CkMyPe()].getNextPhaseMetrics();
02051 }
02052
02053
02054 void KMeansBOC::getNextPhaseMetrics() {
02055
02056
02057
02058
02059
02060
02061
02062
02063
02064 if (usePhases) {
02065 DEBUGF("[%d] Using Phases\n", CkMyPe());
02066 } else {
02067 DEBUGF("[%d] NOT using Phases\n", CkMyPe());
02068 }
02069
02070 if (currentExecTimes != NULL) {
02071 delete [] currentExecTimes;
02072 }
02073 currentExecTimes = new double[numMetrics];
02074 for (int i=0; i<numMetrics; i++) {
02075 currentExecTimes[i] = 0.0;
02076 }
02077
02078 int numEventMethods = _entryTable.size();
02079 LogPool *pool = CkpvAccess(_trace)->_logPool;
02080
02081 CkAssert(pool->numEntries > lastPhaseIdx);
02082 double totalPhaseTime = 0.0;
02083 double totalActiveTime = 0.0;
02084
02085 for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
02086 if (pool->pool[i].type == BEGIN_PROCESSING) {
02087
02088 if (!markedBegin) {
02089 markedBegin = true;
02090 }
02091 beginBlockTime = pool->pool[i].time;
02092 lastBeginEPIdx = pool->pool[i].eIdx;
02093 } else if (pool->pool[i].type == END_PROCESSING) {
02094
02095
02096
02097
02098
02099 if (markedBegin) {
02100 markedBegin = false;
02101 if (pool->pool[i].event < 0)
02102 {
02103
02104 continue;
02105 }
02106 currentExecTimes[pool->pool[i].eIdx] +=
02107 pool->pool[i].time - beginBlockTime;
02108 totalActiveTime += pool->pool[i].time - beginBlockTime;
02109 lastBeginEPIdx = -1;
02110 }
02111 } else if (pool->pool[i].type == BEGIN_IDLE) {
02112
02113 if (!markedIdle) {
02114 markedIdle = true;
02115 }
02116 beginIdleBlockTime = pool->pool[i].time;
02117 } else if (pool->pool[i].type == END_IDLE) {
02118
02119 if (markedIdle) {
02120 markedIdle = false;
02121 currentExecTimes[numEventMethods] +=
02122 pool->pool[i].time - beginIdleBlockTime;
02123 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02124 }
02125 } else if (pool->pool[i].type == END_PHASE) {
02126
02127 if (usePhases) {
02128
02129 if (i != lastPhaseIdx) {
02130 totalPhaseTime =
02131 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
02132
02133
02134
02135 if (markedBegin) {
02136 CkAssert(lastBeginEPIdx >= 0);
02137 currentExecTimes[lastBeginEPIdx] +=
02138 pool->pool[i].time - beginBlockTime;
02139 totalActiveTime += pool->pool[i].time - beginBlockTime;
02140
02141 beginBlockTime = pool->pool[i].time;
02142 }
02143
02144 if (markedIdle) {
02145 currentExecTimes[numEventMethods] +=
02146 pool->pool[i].time - beginIdleBlockTime;
02147 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02148
02149 beginIdleBlockTime = pool->pool[i].time;
02150 }
02151 if (totalActiveTime <= totalPhaseTime) {
02152 currentExecTimes[numEventMethods+1] =
02153 totalPhaseTime - totalActiveTime;
02154 } else {
02155 currentExecTimes[numEventMethods+1] = 0.0;
02156 CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
02157 CkMyPe(), currentPhase);
02158 }
02159 collectKMeansData();
02160
02161 lastPhaseIdx = i;
02162 break;
02163 }
02164 }
02165 } else if (pool->pool[i].type == END_COMPUTATION) {
02166 if (markedBegin) {
02167 CkAssert(lastBeginEPIdx >= 0);
02168 currentExecTimes[lastBeginEPIdx] +=
02169 pool->pool[i].time - beginBlockTime;
02170 totalActiveTime += pool->pool[i].time - beginBlockTime;
02171 }
02172 if (markedIdle) {
02173 currentExecTimes[numEventMethods] +=
02174 pool->pool[i].time - beginIdleBlockTime;
02175 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
02176 }
02177 totalPhaseTime =
02178 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
02179 if (totalActiveTime <= totalPhaseTime) {
02180 currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
02181 } else {
02182 currentExecTimes[numEventMethods+1] = 0.0;
02183 CkPrintf("[%d] Warning: Overhead found to be negative!\n",
02184 CkMyPe());
02185 }
02186 collectKMeansData();
02187 }
02188 }
02189 }
02190
02204 void KMeansBOC::collectKMeansData() {
02205 int minOffset = numMetrics;
02206 int maxOffset = 2*numMetrics;
02207 int sosOffset = 3*numMetrics;
02208
02209
02210
02211 double *reductionMsg = new double[numMetrics*4];
02212
02213 for (int i=0; i<numMetrics; i++) {
02214 reductionMsg[i] = currentExecTimes[i];
02215
02216 reductionMsg[minOffset + i] = currentExecTimes[i];
02217 reductionMsg[maxOffset + i] = currentExecTimes[i];
02218
02219 reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
02220 }
02221
02222 CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL),
02223 0, thisProxy);
02224 contribute((numMetrics*4)*sizeof(double), reductionMsg,
02225 outlierReductionType, cb);
02226 }
02227
02228
02229
02230
02231
02232
02233 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
02234 CkAssert(CkMyPe() == 0);
02235
02236
02237
02238 int sumOffset = 0;
02239 int minOffset = numMetrics;
02240 int maxOffset = 2*numMetrics;
02241 int sosOffset = 3*numMetrics;
02242
02243
02244 KMeansStatsMessage *outmsg =
02245 new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
02246 outmsg->numMetrics = numMetrics;
02247 outmsg->numKPos = numK*numMetrics;
02248 outmsg->numStats = numMetrics*4;
02249
02250
02251 double *totalExecTimes = (double *)msg->getData();
02252 double totalTime = 0.0;
02253
02254 for (int i=0; i<numMetrics; i++) {
02255 DEBUGN("%lf\n", totalExecTimes[i]);
02256 totalTime += totalExecTimes[i];
02257
02258
02259 outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
02260
02261
02262
02263
02264 outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
02265 outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
02266 totalExecTimes[minOffset + i];
02267
02268
02269 outmsg->stats[sosOffset + i] =
02270 sqrt((totalExecTimes[sosOffset + i] -
02271 2*(outmsg->stats[i])*totalExecTimes[i] +
02272 (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
02273 CkNumPes());
02274 }
02275
02276 for (int i=0; i<numMetrics; i++) {
02277
02278
02279
02280
02281
02282
02283
02284 keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
02285 entryThreshold) &&
02286 (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
02287 if (keepMetric[i]) {
02288 DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
02289 totalExecTimes[maxOffset + i], totalTime/CkNumPes());
02290 } else {
02291 DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
02292 }
02293 outmsg->filter[i] = keepMetric[i];
02294 }
02295
02296 delete msg;
02297
02298
02299 kSeeds = new double[numK*numMetrics];
02300
02301 numKReported = 0;
02302 kNumMembers = new int[numK];
02303
02304
02305
02306 srand(11337);
02307 for (int k=0; k<numK; k++) {
02308 DEBUGF("Seed %d | ", k);
02309 for (int m=0; m<numMetrics; m++) {
02310 double factor = totalExecTimes[maxOffset + m] -
02311 totalExecTimes[minOffset + m];
02312
02313
02314
02315
02316 kSeeds[numMetrics*k + m] =
02317 ((rand()*1.0)/RAND_MAX)*factor;
02318 if (keepMetric[m]) {
02319 DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
02320 }
02321 outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
02322 }
02323 DEBUGF("\n");
02324 kNumMembers[k] = 0;
02325 }
02326
02327
02328 thisProxy.findInitialClusters(outmsg);
02329 }
02330
02331
02332
02333
02334 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
02335
02336 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
02337
02338 phaseIter = 0;
02339
02340
02341 CkAssert(numMetrics == msg->numMetrics);
02342 for (int i=0; i<numMetrics; i++) {
02343 keepMetric[i] = msg->filter[i];
02344 }
02345
02346
02347
02348
02349
02350
02351 CkAssert(numMetrics*4 == msg->numStats);
02352 for (int i=0; i<numMetrics; i++) {
02353 currentExecTimes[i] -= msg->stats[numMetrics + i];
02354
02355
02356
02357
02358 }
02359
02360
02361
02362 CkAssert(numK*numMetrics == msg->numKPos);
02363 for (int i=0; i<msg->numKPos; i++) {
02364 incKSeeds[i] = msg->kSeedsPos[i];
02365 }
02366
02367
02368 minDistance = calculateDistance(0);
02369 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
02370 currentPhase, phaseIter, minDistance);
02371 minK = 0;
02372 for (int i=1; i<numK; i++) {
02373 double distance = calculateDistance(i);
02374 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
02375 currentPhase, phaseIter, i, distance);
02376 if (distance < minDistance) {
02377 minDistance = distance;
02378 minK = i;
02379 }
02380 }
02381
02382
02383
02384
02385
02386
02387
02388
02389
02390
02391
02392
02393
02394
02395
02396
02397
02398 double *modVector = new double[numK*(numMetrics+1)];
02399 for (int i=0; i<numK; i++) {
02400 for (int j=0; j<numMetrics+1; j++) {
02401 modVector[i*(numMetrics+1) + j] = 0.0;
02402 }
02403 }
02404 for (int i=0; i<numMetrics; i++) {
02405
02406 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
02407 }
02408 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
02409
02410 CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL),
02411 0, thisProxy);
02412 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
02413 CkReduction::sum_double, cb);
02414 }
02415
02416 double KMeansBOC::calculateDistance(int k) {
02417 double ret = 0.0;
02418 for (int i=0; i<numMetrics; i++) {
02419 if (keepMetric[i]) {
02420 DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n",
02421 CkMyPe(), currentPhase, phaseIter, i,
02422 currentExecTimes[i], incKSeeds[k*numMetrics + i]);
02423 ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
02424 }
02425 }
02426 return sqrt(ret);
02427 }
02428
02429 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
02430 CkAssert(CkMyPe() == 0);
02431
02432
02433
02434 double *modVector = (double *)msg->getData();
02435
02436 CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
02437
02438
02439 bool hasChanges = false;
02440 for (int i=0; i<numK; i++) {
02441 hasChanges = hasChanges ||
02442 (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
02443 }
02444 if (!hasChanges) {
02445 delete msg;
02446 findRepresentatives();
02447 } else {
02448 int overallChange = 0;
02449 for (int i=0; i<numK; i++) {
02450 int change = (int)modVector[i*(numMetrics+1) + numMetrics];
02451 if (change != 0) {
02452 overallChange += change;
02453
02454
02455
02456
02457
02458
02459
02460
02461
02462 CkAssert((kNumMembers[i] + change >= 0) &&
02463 (kNumMembers[i] + change <= CkNumPes()));
02464 if (kNumMembers[i] == 0) {
02465 CkAssert(change > 0);
02466 for (int j=0; j<numMetrics; j++) {
02467 kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
02468 }
02469 } else if (kNumMembers[i] + change == 0) {
02470
02471 } else {
02472 for (int j=0; j<numMetrics; j++) {
02473 kSeeds[i*numMetrics + j] *= kNumMembers[i];
02474 kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
02475 kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
02476 }
02477 }
02478 kNumMembers[i] += change;
02479 }
02480 DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
02481 CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
02482 }
02483 delete msg;
02484
02485
02486 KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
02487 outmsg->numKPos = numK*numMetrics;
02488 for (int i=0; i<numK*numMetrics; i++) {
02489 outmsg->kSeedsPos[i] = kSeeds[i];
02490 }
02491
02492 thisProxy.updateSeedMembership(outmsg);
02493 }
02494 }
02495
02496
02497 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
02498
02499
02500
02501 phaseIter++;
02502
02503
02504
02505 CkAssert(numK*numMetrics == msg->numKPos);
02506 for (int i=0; i<msg->numKPos; i++) {
02507 incKSeeds[i] = msg->kSeedsPos[i];
02508 }
02509
02510
02511 lastMinK = minK;
02512 minDistance = calculateDistance(0);
02513 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
02514 currentPhase, phaseIter, minDistance);
02515
02516 minK = 0;
02517 for (int i=1; i<numK; i++) {
02518 double distance = calculateDistance(i);
02519 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
02520 currentPhase, phaseIter, i, distance);
02521 if (distance < minDistance) {
02522 minDistance = distance;
02523 minK = i;
02524 }
02525 }
02526
02527 double *modVector = new double[numK*(numMetrics+1)];
02528 for (int i=0; i<numK; i++) {
02529 for (int j=0; j<numMetrics+1; j++) {
02530 modVector[i*(numMetrics+1) + j] = 0.0;
02531 }
02532 }
02533
02534 if (minK != lastMinK) {
02535 for (int i=0; i<numMetrics; i++) {
02536 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
02537 modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
02538 }
02539 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
02540 modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
02541 }
02542
02543 CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL),
02544 0, thisProxy);
02545 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
02546 CkReduction::sum_double, cb);
02547 }
02548
02549 void KMeansBOC::findRepresentatives() {
02550
02551
02552
02553 int numNonEmptyClusters = 0;
02554 for (int i=0; i<numK; i++) {
02555 if (kNumMembers[i] > 0) {
02556 numNonEmptyClusters++;
02557 }
02558 }
02559
02560 int numRepresentatives = peNumKeep;
02561
02562
02563
02564 if (numRepresentatives < numNonEmptyClusters) {
02565 numRepresentatives = numNonEmptyClusters;
02566 }
02567
02568 int slotsRemaining = numRepresentatives;
02569
02570 DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining,
02571 numNonEmptyClusters);
02572
02573
02574
02575
02576
02577 int exemplarsPerCluster = 1;
02578 slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
02579
02580 int numCandidateOutliers = CkNumPes() -
02581 exemplarsPerCluster*numNonEmptyClusters;
02582
02583 double *remainders = new double[numK];
02584 int *assigned = new int[numK];
02585 exemplarChoicesLeft = new int[numK];
02586 outlierChoicesLeft = new int[numK];
02587
02588 for (int i=0; i<numK; i++) {
02589 assigned[i] = 0;
02590 remainders[i] =
02591 (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
02592 slotsRemaining / numCandidateOutliers;
02593 if (remainders[i] >= 0.0) {
02594 assigned[i] = (int)floor(remainders[i]);
02595 remainders[i] -= assigned[i];
02596 } else {
02597 remainders[i] = 0.0;
02598 }
02599 }
02600
02601 for (int i=0; i<numK; i++) {
02602 slotsRemaining -= assigned[i];
02603 }
02604 CkAssert(slotsRemaining >= 0);
02605
02606
02607
02608 while (slotsRemaining > 0) {
02609 double max = 0.0;
02610 int winner = 0;
02611 for (int i=0; i<numK; i++) {
02612 if (remainders[i] > max) {
02613 max = remainders[i];
02614 winner = i;
02615 }
02616 }
02617 assigned[winner]++;
02618 remainders[winner] = 0.0;
02619 slotsRemaining--;
02620 }
02621
02622
02623
02624 numSelectionIter = exemplarsPerCluster;
02625 for (int i=0; i<numK; i++) {
02626 if (assigned[i] > numSelectionIter) {
02627 numSelectionIter = assigned[i];
02628 }
02629 }
02630 DEBUGF("Selection Iterations = %d\n", numSelectionIter);
02631
02632 for (int i=0; i<numK; i++) {
02633 if (kNumMembers[i] > 0) {
02634 exemplarChoicesLeft[i] = exemplarsPerCluster;
02635 outlierChoicesLeft[i] = assigned[i];
02636 } else {
02637 exemplarChoicesLeft[i] = 0;
02638 outlierChoicesLeft[i] = 0;
02639 }
02640 DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
02641 outlierChoicesLeft[i]);
02642 }
02643
02644 delete [] assigned;
02645 delete [] remainders;
02646
02647
02648 KSelectionMessage *outmsg = NULL;
02649 if (numSelectionIter > 0) {
02650 outmsg = new (numK, numK, numK) KSelectionMessage;
02651 outmsg->numKMinIDs = numK;
02652 outmsg->numKMaxIDs = numK;
02653 for (int i=0; i<numK; i++) {
02654 outmsg->minIDs[i] = -1;
02655 outmsg->maxIDs[i] = -1;
02656 }
02657 thisProxy.collectDistances(outmsg);
02658 } else {
02659 CkPrintf("Warning: No selection iteration from the start!\n");
02660
02661 thisProxy.phaseDone();
02662 }
02663 }
02664
02665
02666
02667
02668
02669
02670
02671
02672
02673
02674 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
02675
02676
02677
02678 DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
02679 lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
02680 if ((CkMyPe() == msg->minIDs[lastMinK]) ||
02681 (CkMyPe() == msg->maxIDs[lastMinK])) {
02682 CkAssert(!selected);
02683 selected = true;
02684 }
02685
02686
02687
02688 double *minMaxAndIDs = NULL;
02689
02690 minMaxAndIDs = new double[numK*4];
02691
02692 for (int i=0; i<numK; i++) {
02693 minMaxAndIDs[i*4] = -1.0;
02694 minMaxAndIDs[i*4+1] = -1.0;
02695 minMaxAndIDs[i*4+2] = -1.0;
02696 minMaxAndIDs[i*4+3] = -1.0;
02697 }
02698
02699 if (!selected) {
02700 DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
02701 minMaxAndIDs[lastMinK*4] = minDistance;
02702 minMaxAndIDs[lastMinK*4+1] = CkMyPe();
02703 minMaxAndIDs[lastMinK*4+2] = minDistance;
02704 minMaxAndIDs[lastMinK*4+3] = CkMyPe();
02705 }
02706 delete msg;
02707
02708 CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL),
02709 0, thisProxy);
02710 contribute(numK*4*sizeof(double), minMaxAndIDs,
02711 minMaxReductionType, cb);
02712 }
02713
02714 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
02715
02716
02717
02718
02719
02720 if (numSelectionIter > 0) {
02721 double *incInfo = (double *)msg->getData();
02722
02723 KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
02724 outmsg->numKMinIDs = numK;
02725 outmsg->numKMaxIDs = numK;
02726
02727 for (int i=0; i<numK; i++) {
02728 DEBUGF("%d | %lf %d %lf %d \n", i,
02729 incInfo[i*4], (int)incInfo[i*4+1],
02730 incInfo[i*4+2], (int)incInfo[i*4+3]);
02731 }
02732
02733 for (int i=0; i<numK; i++) {
02734 if (exemplarChoicesLeft[i] > 0) {
02735 outmsg->minIDs[i] = (int)incInfo[i*4+1];
02736 exemplarChoicesLeft[i]--;
02737 } else {
02738 outmsg->minIDs[i] = -1;
02739 }
02740 if (outlierChoicesLeft[i] > 0) {
02741 outmsg->maxIDs[i] = (int)incInfo[i*4+3];
02742 outlierChoicesLeft[i]--;
02743 } else {
02744 outmsg->maxIDs[i] = -1;
02745 }
02746 }
02747 thisProxy.collectDistances(outmsg);
02748 numSelectionIter--;
02749 } else {
02750
02751 thisProxy.phaseDone();
02752 }
02753 }
02754
02761 void KMeansBOC::phaseDone() {
02762
02763
02764
02765 LogPool *pool = CkpvAccess(_trace)->_logPool;
02766 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
02767
02768
02769 if (!selected) {
02770 if (usePhases) {
02771 pool->keepPhase[currentPhase] = false;
02772 } else {
02773
02774 pool->setAllPhases(false);
02775 }
02776 }
02777
02778
02779
02780 if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
02781
02782 int dummy = 0;
02783 CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL),
02784 0, bocProxy);
02785 contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
02786 } else {
02787
02788
02789
02790
02791
02792 currentPhase++;
02793 thisProxy[CkMyPe()].getNextPhaseMetrics();
02794 }
02795 }
02796
02797 void TraceProjectionsBOC::startTimeAnalysis()
02798 {
02799 double startTime = 0.0;
02800 if (CkpvAccess(_trace)->_logPool->numEntries>0)
02801 startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
02802 CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
02803 contribute(sizeof(double), &startTime, CkReduction::min_double, cb);
02804 }
02805
02806 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
02807 {
02808
02809
02810 if (CkpvAccess(_trace) != NULL) {
02811 CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
02812 CkpvAccess(_trace)->_logPool->setNewStartTime();
02813
02814 }
02815 delete msg;
02816 thisProxy[CkMyPe()].startEndTimeAnalysis();
02817 }
02818
02819 void TraceProjectionsBOC::startEndTimeAnalysis()
02820 {
02821
02822
02823 endTime = CkpvAccess(_trace)->endTime;
02824
02825
02826 CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL),
02827 0, thisProxy);
02828 contribute(sizeof(double), &endTime, CkReduction::max_double, cb);
02829 }
02830
02831 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
02832 {
02833
02834
02835 CkAssert(CkMyPe() == 0);
02836 parModulesRemaining--;
02837 if (CkpvAccess(_trace) != NULL) {
02838 CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
02839
02840
02841 }
02842 delete msg;
02843 if (parModulesRemaining == 0) {
02844 thisProxy[CkMyPe()].finalize();
02845 }
02846 }
02847
02848 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
02849
02850 if(CkMyPe()==0) CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
02851
02852 CkAssert(CkMyPe() == 0);
02853 parModulesRemaining--;
02854 CkPrintf("K-Means Analysis Time = %lf seconds\n",
02855 CmiWallTimer()-analysisStartTime);
02856 delete msg;
02857 if (parModulesRemaining == 0) {
02858 thisProxy[CkMyPe()].finalize();
02859 }
02860 }
02861
02867 void TraceProjectionsBOC::kMeansDone() {
02868 CkAssert(CkMyPe() == 0);
02869 parModulesRemaining--;
02870 CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
02871 CmiWallTimer()-analysisStartTime);
02872 if (parModulesRemaining == 0) {
02873 thisProxy[CkMyPe()].finalize();
02874 }
02875 }
02876
02877 void TraceProjectionsBOC::finalize()
02878 {
02879 CkAssert(CkMyPe() == 0);
02880
02881
02882 thisProxy.closingTraces();
02883 }
02884
02885
02886 void TraceProjectionsBOC::closingTraces() {
02887 CkpvAccess(_trace)->closeTrace();
02888
02889
02890 int pe = 0;
02891 if (endPe != -1) pe = endPe;
02892 CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL),
02893 pe, thisProxy);
02894 contribute(0, NULL, CkReduction::sum_int, cb);
02895 }
02896
02897
02898
02899
02900 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
02901 CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
02902 delete msg;
02903
02904 if (!CkpvAccess(_trace)->converseExit) {
02905 CkExit();
02906 }
02907 }
02908
02909
02910
02911
02912 CkReductionMsg *outlierReduction(int nMsgs,
02913 CkReductionMsg **msgs) {
02914 int numBytes = 0;
02915 int numMetrics = 0;
02916 double *ret = NULL;
02917
02918 if (nMsgs == 1) {
02919
02920 return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
02921 }
02922
02923 if (nMsgs > 1) {
02924 numBytes = msgs[0]->getSize();
02925
02926 if (numBytes%sizeof(double) != 0) {
02927 CkAbort("Outlier Reduction Size incompatible with doubles!\n");
02928 }
02929 if ((numBytes/sizeof(double))%4 != 0) {
02930 CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
02931 }
02932 numMetrics = (numBytes/sizeof(double))/4;
02933 ret = new double[numMetrics*4];
02934
02935
02936 for (int i=0; i<numMetrics*4; i++) {
02937 ret[i] = ((double *)msgs[0]->getData())[i];
02938 }
02939
02940
02941 for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
02942 for (int i=0; i<numMetrics; i++) {
02943
02944 ret[i] += ((double *)msgs[msgIdx]->getData())[i];
02945
02946 ret[numMetrics + i] =
02947 (ret[numMetrics + i] <
02948 ((double *)msgs[msgIdx]->getData())[numMetrics + i])
02949 ? ret[numMetrics + i] :
02950 ((double *)msgs[msgIdx]->getData())[numMetrics + i];
02951
02952 ret[2*numMetrics + i] =
02953 (ret[2*numMetrics + i] >
02954 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
02955 ? ret[2*numMetrics + i] :
02956 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
02957
02958 ret[3*numMetrics + i] +=
02959 ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
02960 }
02961 }
02962 }
02963
02964
02965 return CkReductionMsg::buildNew(numBytes,ret);
02966 }
02967
02968
02969
02970
02971
02972
02973
02974 CkReductionMsg *minMaxReduction(int nMsgs,
02975 CkReductionMsg **msgs) {
02976 CkAssert(nMsgs > 0);
02977
02978 int numBytes = msgs[0]->getSize();
02979 CkAssert(numBytes%sizeof(double) == 0);
02980 int numK = (numBytes/sizeof(double))/4;
02981
02982 double *ret = new double[numK*4];
02983
02984 for (int i=0; i<numK; i++) {
02985 ret[i*4] = -1.0;
02986 ret[i*4+1] = -1.0;
02987 ret[i*4+2] = -1.0;
02988 ret[i*4+3] = -1.0;
02989 }
02990
02991
02992 for (int i=0; i<nMsgs; i++) {
02993 double *temp = (double *)msgs[i]->getData();
02994 for (int j=0; j<numK; j++) {
02995
02996 if (ret[j*4+1] < 0) {
02997
02998 if (temp[j*4+1] >= 0) {
02999 ret[j*4] = temp[j*4];
03000 ret[j*4+1] = temp[j*4+1];
03001 }
03002 } else {
03003
03004 if (temp[j*4+1] >= 0) {
03005 if (temp[j*4] < ret[j*4]) {
03006 ret[j*4] = temp[j*4];
03007 ret[j*4+1] = temp[j*4+1];
03008 }
03009 }
03010 }
03011
03012 if (ret[j*4+3] < 0) {
03013
03014 if (temp[j*4+3] >= 0) {
03015 ret[j*4+2] = temp[j*4+2];
03016 ret[j*4+3] = temp[j*4+3];
03017 }
03018 } else {
03019
03020 if (temp[j*4+3] >= 0) {
03021 if (temp[j*4+2] > ret[j*4+2]) {
03022 ret[j*4+2] = temp[j*4+2];
03023 ret[j*4+3] = temp[j*4+3];
03024 }
03025 }
03026 }
03027 }
03028 }
03029 CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
03030 delete [] ret;
03031 return redmsg;
03032 }
03033
03034 #include "TraceProjections.def.h"
03035 #endif //PROJ_ANALYSIS
03036