00001 #include <charm++.h>
00002 #include <iostream>
00003 #include <fstream>
00004 #include <string>
00005 #include <map>
00006 #include <set>
00007 #include <vector>
00008 #include <utility>
00009
00010
00011 #if CMK_WITH_CONTROLPOINT
00012
00013
00014 #include "PathHistory.decl.h"
00015 #include "LBDatabase.h"
00016 #include "pathHistory.h"
00017
00018
00019 #include <register.h>
00020
00021 #include "trace-projections.h"
00022
00023
00032 CProxy_pathHistoryManager pathHistoryManagerProxy;
00033
00034 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath);
00035 CkpvDeclare(double, timeEntryMethodStarted);
00036
00037 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00038
00039 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
00041 CkpvDeclare(int, pathHistoryTableLastIdx);
00042 #endif
00043
00044
00046 class pathHistoryMain : public CBase_pathHistoryMain {
00047 public:
00048 pathHistoryMain(CkArgMsg* args){
00049 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00050 pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
00051 #endif
00052 }
00053 ~pathHistoryMain(){}
00054 };
00055
00056
00057 pathHistoryManager::pathHistoryManager(){
00058
00059 }
00060
00061
00068 void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
00069 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00070 int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
00071
00072 #if DEBUGPRINT > 2
00073 CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
00074 #endif
00075 CkAssert(count==0 || count==1);
00076
00077 if(count > 0){
00078 PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
00079 int idx = path.sender_history_table_idx;
00080 int pe = path.sender_pe;
00081
00082 #if DEBUGPRINT > 2
00083 CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
00084 #endif
00085
00086
00087 pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
00088 for(int i=0;i<msg->historySize;i++){
00089 newmsg->history[i] = msg->history[i];
00090 }
00091 newmsg->history[msg->historySize] = path;
00092 newmsg->historySize = msg->historySize+1;
00093 newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
00094 newmsg->cb = msg->cb;
00095 newmsg->table_idx = idx;
00096
00097 if(idx > -1 && pe > -1){
00098
00099 CkAssert(pe < CkNumPes() && pe >= 0);
00100 thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
00101 } else {
00102 CkPrintf("Traced critical path back to its origin.\n");
00103 if(msg->saveAsProjectionsUserEvents){
00104
00105
00106 pathForUser = new(msg->historySize+1) pathInformationMsg;
00107 for(int i=0;i<msg->historySize;i++){
00108 pathForUser->history[i] = msg->history[i];
00109 }
00110 pathForUser->history[msg->historySize] = path;
00111 pathForUser->historySize = msg->historySize+1;
00112 pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
00113 pathForUser->cb = msg->cb;
00114 pathForUser->table_idx = idx;
00115
00116 CkPrintf("Broadcasting it to all PE\n");
00117 thisProxy.broadcastCriticalPathProjections(newmsg);
00118 } else {
00119 newmsg->cb.send(newmsg);
00120 }
00121
00122 }
00123 } else {
00124 CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
00125 }
00126
00127 delete msg;
00128 #else
00129 CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
00130 #endif
00131 }
00132
00133
00134 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
00135
00136 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00137 CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
00138 int me = CkMyPe();
00139 int intersectsLocalPE = false;
00140
00141
00142
00143 for(int i=msg->historySize-1;i>=0;i--){
00144 if(CkMyPe() == msg->history[i].local_pe){
00145
00146
00147
00148
00149 traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
00150
00151 intersectsLocalPE = true;
00152 }
00153
00154 }
00155
00156 traceRegisterUserEvent("Critical Path", 32000);
00157
00158
00159 #define PRUNE_CRITICAL_PATH_LOGS 0
00160
00161 #if PRUNE_CRITICAL_PATH_LOGS
00162
00163
00164 enableTraceLogOutput();
00165 if(! intersectsLocalPE){
00166 disableTraceLogOutput();
00167 CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
00168 }
00169 #endif
00170
00171 #if TRACE_ALL_PATH_TABLE_ENTRIES
00172
00173 std::map< int, PathHistoryTableEntry >::iterator iter;
00174 for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
00175 double startTime = iter->second.get_start_time();
00176 double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
00177 traceUserBracketEvent(32001, startTime, endTime);
00178 }
00179 #endif
00180
00181 int data=1;
00182 CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]);
00183 contribute(sizeof(int), &data, CkReduction::sum_int, cb);
00184
00185 #endif
00186
00187 }
00188
00189 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
00190 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00191 CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
00192 pathForUser->cb.send(pathForUser);
00193 pathForUser = NULL;
00194 #endif
00195 }
00196
00197
00198
00199
00201 void useThisCriticalPathForPriorities(){
00202 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00203 pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
00204 #endif
00205 }
00206
00207
00209 void automaticallySetMessagePriority(envelope *env){
00210 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00211
00212 #if DEBUG
00213 if(env->getPriobits() == 8*sizeof(int)){
00214 CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
00215 } else if(env->getPriobits() == 0) {
00216 CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
00217 } else {
00218 CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
00219 }
00220 #endif
00221
00222
00223 const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
00224
00225 if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
00226
00227 switch(env->getMsgtype()) {
00228 case ForArrayEltMsg:
00229 case ForChareMsg:
00230 {
00231 const int ep = env->getsetArrayEp();
00232 const int arr = env->getArrayMgrIdx();
00233
00234 const std::pair<int,int> k = std::make_pair(arr, ep);
00235 const int count = criticalPathForPriorityCounts.count(k);
00236
00237 #if DEBUG
00238 CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
00239 #endif
00240
00241 if(count > 0){
00242
00243 #if DEBUG
00244 CkPrintf("Prio auto high\n");
00245 #endif
00246 *(int*)(env->getPrioPtr()) = -5;
00247 } else {
00248
00249 #if DEBUG
00250 CkPrintf("Prio auto low: %d,%d\n", arr, ep);
00251 #endif
00252 *(int*)(env->getPrioPtr()) = 5;
00253 }
00254
00255 }
00256 break;
00257
00258 case ForNodeBocMsg:
00259 CkPrintf("Can't Critical Path Autoprioritize a ForNodeBocMsg\n");
00260 break;
00261
00262 case ForBocMsg:
00263 CkPrintf("Can't Critical Path Autoprioritize a ForBocMsg\n");
00264 break;
00265
00266 case ArrayEltInitMsg:
00267
00268 CkPrintf("Can't Critical Path Autoprioritize a ArrayEltInitMsg\n");
00269 break;
00270
00271 default:
00272 CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
00273 break;
00274 }
00275
00276 }
00277
00278 #endif
00279 }
00280
00281
00282
00283 void pathHistoryManager::useCriticalPathForPriories(){
00284 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00285
00286
00287
00288
00289 CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy);
00290 traceCriticalPathBack(cb, false);
00291 #endif
00292 }
00293
00294
00295
00296 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
00297 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00298
00299 CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
00300 fflush(stdout);
00301
00302 criticalPathForPriorityCounts.clear();
00303
00304
00305 for(int i=msg->historySize-1;i>=0;i--){
00306
00307 PathHistoryTableEntry &e = msg->history[i];
00308
00309 #if DEBUG
00310 if(CkMyPe() == 0){
00311 CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
00312 }
00313 #endif
00314
00315 const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
00316 if(criticalPathForPriorityCounts.count(k) == 1)
00317 criticalPathForPriorityCounts[k]++;
00318 else
00319 criticalPathForPriorityCounts[k] = 1;
00320 }
00321
00322
00323
00324
00325
00326 if(CkMyPe() == 0){
00327 std::map< std::pair<int,int>, int>::iterator iter;
00328 for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
00329 const std::pair<int,int> k = iter->first;
00330 const int c = iter->second;
00331
00332 CkPrintf("[%d] On critical path EP %d,%d occurs %d times\n", CkMyPe(), k.first, k.second, c);
00333
00334 }
00335 }
00336
00337 #endif
00338 }
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350 void initializeCriticalPath(void){
00351 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00352 CkpvInitialize(MergeablePathHistory, currentlyExecutingPath);
00353 CkpvInitialize(double, timeEntryMethodStarted);
00354 CkpvAccess(timeEntryMethodStarted) = 0.0;
00355
00356
00357 CkpvInitialize(PathHistoryTableType, pathHistoryTable);
00358 CkpvInitialize(int, pathHistoryTableLastIdx);
00359 CkpvAccess(pathHistoryTableLastIdx) = 0;
00360 #endif
00361 }
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00373
00374 void PathHistoryEnvelope::reset(){
00375 totalTime = 0.0;
00376 sender_history_table_idx = -1;
00377 }
00378
00379
00380 void PathHistoryEnvelope::print() const {
00381 CkPrintf("print() is not implemented\n");
00382 }
00383
00385
00386
00387 void PathHistoryEnvelope::incrementTotalTime(double time){
00388 totalTime += time;
00389 }
00390
00391
00392
00393 void PathHistoryEnvelope::setDebug100(){
00394 totalTime = 100.0;
00395 }
00396
00397
00398
00400 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
00401
00402 int new_idx = addToTable();
00403
00404
00405 CkAssert(env != NULL);
00406 env->pathHistory.set_sender_history_table_idx(new_idx);
00407 env->pathHistory.setTime(local_path_time + preceding_path_time);
00408
00409 #if 0
00410
00411 char *note = new char[4096];
00412 sprintf(note, "addToTableAndEnvelope<br> ");
00413 env->pathHistory.printHTMLToString(note+strlen(note));
00414 traceUserSuppliedNote(note);
00415 delete[] note;
00416 #endif
00417
00418 return new_idx;
00419 }
00420
00421
00423 int PathHistoryTableEntry::addToTable(){
00424 int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
00425 CkpvAccess(pathHistoryTable)[new_idx] = *this;
00426 return new_idx;
00427 }
00428 #endif
00429
00430
00431
00432
00433 void resetThisEntryPath(void) {
00434 CkpvAccess(currentlyExecutingPath).reset();
00435 }
00436
00437
00439 void saveCurrentPathAsUserEvent(const char* prefix){
00440 if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
00441
00442
00443 #if 0
00444 char *note = new char[4096];
00445 sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
00446 CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
00447 traceUserSuppliedNote(note);
00448 delete[] note;
00449 #endif
00450
00451 } else {
00452 #if 0
00453 traceUserEvent(5010);
00454 #endif
00455 }
00456
00457 }
00458
00459
00460 void setCurrentlyExecutingPathTo100(void){
00461 CkpvAccess(currentlyExecutingPath).setDebug100();
00462 }
00463
00464
00466 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
00467 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00468 pathInformationMsg *newmsg = new(0) pathInformationMsg;
00469 newmsg->historySize = 0;
00470 newmsg->cb = cb;
00471 newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
00472 newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
00473 int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
00474 CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe, CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
00475 CkAssert(pe < CkNumPes() && pe >= 0);
00476 pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
00477 #else
00478 pathInformationMsg * pathForUser = new(0) pathInformationMsg;
00479 pathForUser->historySize = 0;
00480 pathForUser->cb = CkCallback();
00481 pathForUser->table_idx = -1;
00482 cb.send(pathForUser);
00483 #endif
00484 }
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00496 void printEPInfo(){
00497 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00498 CkPrintf("printEPInfo():\n");
00499 CkPrintf("There are %d EPs\n", (int)_entryTable.size());
00500 for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
00501 CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
00502 #endif
00503 }
00504
00505
00506
00507
00509 void criticalPath_start(envelope * env){
00510 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00511 #if DEBUG
00512 CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d time=%lf\n", env->getSrcPe(), env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
00513 #endif
00514
00515 CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
00516 CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
00517 CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
00518
00519 CkpvAccess(currentlyExecutingPath).sanity_check();
00520
00521 CkpvAccess(currentlyExecutingPath).local_ep = -1;
00522 CkpvAccess(currentlyExecutingPath).local_arr = -1;
00523
00524 double now = CmiWallTimer();
00525 CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
00526 CkpvAccess(timeEntryMethodStarted) = now;
00527
00528 switch(env->getMsgtype()) {
00529 case ForArrayEltMsg:
00530
00531 CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
00532 CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
00533
00534 break;
00535
00536 case ForNodeBocMsg:
00537
00538 break;
00539
00540 case ForChareMsg:
00541
00542 break;
00543
00544 case ForBocMsg:
00545
00546 break;
00547
00548 case ArrayEltInitMsg:
00549
00550 break;
00551
00552 default:
00553 CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
00554 }
00555
00556
00557
00558 saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
00559
00560
00561 #endif
00562 }
00563
00564
00566 void criticalPath_send(envelope * sendingEnv){
00567 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00568 #if DEBUG
00569 CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
00570 #endif
00571 double now = CmiWallTimer();
00572 PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
00573 entry.addToTableAndEnvelope(sendingEnv);
00574
00575 #endif
00576 }
00577
00578
00580 void criticalPath_end(){
00581 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00582 saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
00583
00584 CkpvAccess(currentlyExecutingPath).reset();
00585
00586 #if DEBUG
00587 CkPrintf("criticalPath_end()\n");
00588 #endif
00589
00590 #endif
00591 }
00592
00593
00594
00597 void criticalPath_split(){
00598 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00599 saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
00600
00601
00602 double now = CmiWallTimer();
00603 PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
00604 int tableidx = entry.addToTable();
00605
00606
00607
00608
00609
00610 CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
00611 CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
00612 CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
00613 CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
00614 CkpvAccess(timeEntryMethodStarted) = now;
00615 #endif
00616 }
00617
00618
00619
00620
00621
00622 #include "PathHistory.def.h"
00623
00626 #endif