00001 #include <charm++.h>
00002 #include <iostream>
00003 #include <fstream>
00004 #include <string>
00005 #include <map>
00006 #include <set>
00007 #include <vector>
00008 #include <utility>
00009
00010
00011 #include "PathHistory.decl.h"
00012 #include "LBDatabase.h"
00013 #include "pathHistory.h"
00014 #include <register.h>
00015
00016 #include "trace-projections.h"
00017
00024 CProxy_pathHistoryManager pathHistoryManagerProxy;
00025
00026 CkpvDeclare(int, traceLastHop);
00027
00028 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath);
00029 CkpvDeclare(double, timeEntryMethodStarted);
00030
00032 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
00034 CkpvDeclare(int, pathHistoryTableLastIdx);
00035
00036
00038 class pathHistoryMain : public CBase_pathHistoryMain {
00039 public:
00040 pathHistoryMain(CkArgMsg* args){
00041 #if USE_CRITICAL_PATH_HEADER_ARRAY
00042 pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
00043 #endif
00044 delete args;
00045 }
00046 ~pathHistoryMain(){}
00047 };
00048
00049 pathHistoryManager::pathHistoryManager(){ }
00050
00058 void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
00059 int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
00060
00061 #if DEBUGPRINT > 2
00062 CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
00063 #endif
00064 CkAssert(count==0 || count==1);
00065
00066 if(count > 0){
00067 PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
00068 int idx = path.sender_history_table_idx;
00069 int pe = path.sender_pe;
00070
00071 #if DEBUG
00072 CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d history size %d \n", msg->table_idx, CkMyPe(), pe, idx, msg->historySize);
00073 #endif
00074
00075
00076 pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
00077 for(int i=0;i<msg->historySize;i++){
00078 newmsg->history[i] = msg->history[i];
00079 }
00080 newmsg->history[msg->historySize] = path;
00081 newmsg->historySize = msg->historySize+1;
00082 newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
00083 newmsg->cb = msg->cb;
00084 newmsg->hops = msg->hops -1;
00085 newmsg->table_idx = idx;
00086
00087 if(msg->hops > 0 ){
00088
00089 CkAssert(pe < CkNumPes() && pe >= 0);
00090 thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
00091 } else {
00092 if(msg->saveAsProjectionsUserEvents){
00093
00094
00095 pathForUser = new(msg->historySize+1) pathInformationMsg;
00096 for(int i=0;i<msg->historySize;i++){
00097 pathForUser->history[i] = msg->history[i];
00098 }
00099 pathForUser->history[msg->historySize] = path;
00100 pathForUser->historySize = msg->historySize+1;
00101 pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
00102 pathForUser->cb = msg->cb;
00103 pathForUser->table_idx = idx;
00104
00105 CkPrintf("Broadcasting it to all PE\n");
00106 thisProxy.broadcastCriticalPathProjections(newmsg);
00107 } else {
00108 newmsg->cb.send(newmsg);
00109 }
00110
00111 }
00112 } else {
00113 CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
00114 }
00115
00116 delete msg;
00117 }
00118
00119
00120 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
00121
00122 CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
00123 int me = CkMyPe();
00124 int intersectsLocalPE = false;
00125
00126
00127
00128 for(int i=msg->historySize-1;i>=0;i--){
00129 if(CkMyPe() == msg->history[i].local_pe){
00130
00131
00132
00133 traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
00134
00135 intersectsLocalPE = true;
00136 }
00137
00138 }
00139
00140 traceRegisterUserEvent("Critical Path", 32000);
00141
00142
00143 #define PRUNE_CRITICAL_PATH_LOGS 0
00144
00145 #if PRUNE_CRITICAL_PATH_LOGS
00146
00147
00148 enableTraceLogOutput();
00149 if(! intersectsLocalPE){
00150 disableTraceLogOutput();
00151 CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
00152 }
00153 #endif
00154
00155 #if TRACE_ALL_PATH_TABLE_ENTRIES
00156
00157 std::map< int, PathHistoryTableEntry >::iterator iter;
00158 for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
00159 double startTime = iter->second.get_start_time();
00160 double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
00161 traceUserBracketEvent(32001, startTime, endTime);
00162 }
00163 #endif
00164
00165 int data=1;
00166 CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]);
00167 contribute(sizeof(int), &data, CkReduction::sum_int, cb);
00168
00169
00170 }
00171
00172 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
00173 CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
00174 pathForUser->cb.send(pathForUser);
00175 pathForUser = NULL;
00176 }
00177
00179 void useThisCriticalPathForPriorities(){
00180 pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
00181 }
00182
00183
00185 void automaticallySetMessagePriority(envelope *env){
00186 int ep = env->getEpIdx();
00187 if (ep==CkIndex_CkArray::recvBroadcast(0))
00188 ep = env->getsetArrayBcastEp();
00189 #if DEBUG
00190 CkPrintf("----------- ep = %d %s \n", ep, _entryTable[ep]->name);
00191 if(env->getPriobits() == 8*sizeof(int)){
00192 CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
00193 } else if(env->getPriobits() == 0) {
00194 CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
00195 } else {
00196 CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
00197 }
00198 #endif
00199
00200
00201 const std::map< int, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
00202
00203 if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
00204 switch(env->getMsgtype()) {
00205 case ForArrayEltMsg:
00206 case ForIDedObjMsg:
00207 case ForChareMsg:
00208 case ForNodeBocMsg:
00209 case ForBocMsg:
00210 case ArrayEltInitMsg:
00211 {
00212 const int arr = env->getArrayMgrIdx();
00213 const int count = criticalPathForPriorityCounts.count(ep);
00214 #if DEBUG
00215 CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
00216 #endif
00217
00218 if(count > 0){
00219
00220 #if DEBUG
00221 CkPrintf("Prio auto high %d %s \n", ep, _entryTable[ep]->name);
00222 #endif
00223 *(int*)(env->getPrioPtr()) = -5;
00224 } else {
00225
00226 #if DEBUG
00227 CkPrintf("Prio auto low: %d,%d\n", arr, ep);
00228 #endif
00229 *(int*)(env->getPrioPtr()) = 0;
00230 }
00231
00232 }
00233 break;
00234
00235 default:
00236
00237 break;
00238 }
00239
00240 }
00241 }
00242
00243 void pathHistoryManager::useCriticalPathForPriories(){
00244
00245
00246 CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy);
00247 traceCriticalPathBack(cb, false);
00248 }
00249
00250 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
00251
00252 fflush(stdout);
00253
00254 criticalPathForPriorityCounts.clear();
00255
00256
00257 for(int i=msg->historySize-1;i>=0;i--){
00258
00259 PathHistoryTableEntry &e = msg->history[i];
00260
00261
00262 #if DEBUG
00263 if(CkMyPe() == 0){
00264 char name[100];
00265 if(e.local_ep >=0)
00266 strcpy(name, _entryTable[e.local_ep]->name);
00267 else
00268 strcpy(name, "unknow");
00269
00270 CkPrintf("\t[%d] Path Step %d: local_path_time=%lf ep=%d starttime=%lf preceding path time=%lf pe=%d name=%s\n",CkMyPe(), i, e.get_local_path_time(), e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe, name);
00271 }
00272 #endif
00273
00274 if(criticalPathForPriorityCounts.count(e.local_ep) == 1)
00275 criticalPathForPriorityCounts[e.local_ep]++;
00276 else
00277 criticalPathForPriorityCounts[e.local_ep] = 1;
00278 }
00279
00280
00281 if(CkMyPe() == 0){
00282 std::map< int, int>::iterator iter;
00283 for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
00284 int epidx = iter->first;
00285 const int c = iter->second;
00286
00287 #if DEBUG
00288 CkPrintf("[%d] On critical path EP %d occurs %d times\n", CkMyPe(), epidx, c);
00289 #endif
00290
00291 }
00292 }
00293 }
00294
00296 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
00297
00298 int new_idx = addToTable();
00299
00300 #if USE_CRITICAL_PATH_HEADER_ARRAY
00301
00302 CkAssert(env != NULL);
00303 env->pathHistory.set_sender_history_table_idx(new_idx);
00304 env->pathHistory.setTime(local_path_time + preceding_path_time);
00305 #if DEBUG
00306 if(local_path_time > 0.1)
00307 CkPrintf("------########## %d generating new msg env critical path length %f:%f:%f app time %f id:%d\n", CkMyPe(), local_path_time , preceding_path_time, local_path_time+preceding_path_time, CkWallTimer(), new_idx);
00308 #endif
00309 #endif
00310
00311 return new_idx;
00312 }
00313
00315 int PathHistoryTableEntry::addToTable(){
00316 int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
00317 CkpvAccess(pathHistoryTable)[new_idx] = *this;
00318 #if DEBUG
00319 CkPrintf("-------- add to entry %d ----- %d %d %d \n", new_idx, local_ep, local_pe, sender_history_table_idx);
00320 #endif
00321 return new_idx;
00322 }
00323
00324 void initializeCriticalPath(void){
00325 CkpvInitialize(MergeablePathHistory, currentlyExecutingPath);
00326 CkpvInitialize(double, timeEntryMethodStarted);
00327 CkpvAccess(timeEntryMethodStarted) = 0.0;
00328 CkpvInitialize(PathHistoryTableType, pathHistoryTable);
00329 CkpvInitialize(int, pathHistoryTableLastIdx);
00330 CkpvAccess(pathHistoryTableLastIdx) = 0;
00331 CkpvInitialize(int, traceLastHop);
00332 CkpvAccess(traceLastHop) = 0;
00333 }
00334
00335
00336 void resetThisEntryPath(void) {
00337 CkpvAccess(currentlyExecutingPath).reset();
00338 }
00339
00340
00342 void saveCurrentPathAsUserEvent(const char* prefix){
00343 if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
00344
00345
00346 #if 0
00347 char *note = new char[4096];
00348 sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
00349 CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
00350 traceUserSuppliedNote(note);
00351 delete[] note;
00352 #endif
00353
00354 } else {
00355 #if 0
00356 traceUserEvent(5010);
00357 #endif
00358 }
00359
00360 }
00361
00362
00363 void setCurrentlyExecutingPathTo100(void){
00364 CkpvAccess(currentlyExecutingPath).setDebug100();
00365 }
00366
00368 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
00369 pathInformationMsg *newmsg = new(0) pathInformationMsg;
00370 newmsg->historySize = 0;
00371 newmsg->cb = cb;
00372 newmsg->hops = CkpvAccess(currentlyExecutingPath).hops - CkpvAccess(traceLastHop) -1;
00373 CkpvAccess(traceLastHop) = CkpvAccess(currentlyExecutingPath).hops;
00374 newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
00375 newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
00376 int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
00377
00378 CkAssert(pe < CkNumPes() && pe >= 0);
00379 pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
00380 }
00381
00383 void printEPInfo(){
00384 CkPrintf("printEPInfo():\n");
00385 CkPrintf("There are %d EPs\n", (int)_entryTable.size());
00386 for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
00387 CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
00388 }
00389
00390 #if USE_CRITICAL_PATH_HEADER_ARRAY
00391
00392 void criticalPath_setep(int epIdx){
00393 CkpvAccess(currentlyExecutingPath).local_ep = epIdx;
00394 #if DEBUG
00395 if(epIdx >= 0)
00396 CkPrintf(" setting current method name %d %s",epIdx, _entryTable[epIdx]->name);
00397 CkPrintf("\n");
00398 #endif
00399 }
00400
00402 void criticalPath_start(envelope * env){
00403 CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
00404 CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
00405 CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
00406 CkpvAccess(currentlyExecutingPath).hops = env->pathHistory.getHops() + 1;
00407
00408 CkpvAccess(currentlyExecutingPath).sanity_check();
00409
00410 CkpvAccess(currentlyExecutingPath).local_ep = env->getEpIdx();
00411
00412 double now = CmiWallTimer();
00413 CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
00414 CkpvAccess(timeEntryMethodStarted) = now;
00415
00416 switch(env->getMsgtype()) {
00417 case ForArrayEltMsg:
00418 case ArrayEltInitMsg:
00419 CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
00420 break;
00421
00422 case ForNodeBocMsg:
00423 break;
00424
00425 case ForBocMsg:
00426
00427 break;
00428
00429 case ForChareMsg:
00430
00431 break;
00432
00433 default:
00434 break;
00435
00436 }
00437
00438 if(CkpvAccess(currentlyExecutingPath).local_ep == CkIndex_CkArray::recvBroadcast(0))
00439 CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayBcastEp();
00440
00441 #if DEBUG
00442 CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d time=%lf current ep=%d ", env->getSrcPe(), env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime(), env->getEpIdx() );
00443 if(env->getEpIdx() >= 0)
00444 CkPrintf(" current method name %d %s", CkpvAccess(currentlyExecutingPath).local_ep, _entryTable[CkpvAccess(currentlyExecutingPath).local_ep]->name);
00445 CkPrintf("\n");
00446 #endif
00447
00448 saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
00449 }
00450
00451
00453 void criticalPath_send(envelope * sendingEnv){
00454 #if DEBUG
00455 CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
00456 #endif
00457 double now = CmiWallTimer();
00458 PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
00459 entry.addToTableAndEnvelope(sendingEnv);
00460 sendingEnv->pathHistory.setHops(CkpvAccess(currentlyExecutingPath).hops);
00461 automaticallySetMessagePriority(sendingEnv);
00462 }
00463
00465 void criticalPath_end(){
00466 saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
00467
00468 CkpvAccess(currentlyExecutingPath).reset();
00469
00470 #if DEBUG
00471 CkPrintf("criticalPath_end()\n");
00472 #endif
00473 }
00474
00475 #endif
00476
00479 void criticalPath_split(){
00480 saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
00481
00482
00483 double now = CmiWallTimer();
00484 PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
00485 int tableidx = entry.addToTable();
00486
00487
00488 CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
00489 CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
00490 CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
00491 CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
00492 CkpvAccess(timeEntryMethodStarted) = now;
00493 }
00494
00495 MergeablePathHistory* saveCurrentPath()
00496 {
00497 MergeablePathHistory *savedPath = new MergeablePathHistory(CkpvAccess(currentlyExecutingPath));
00498 return savedPath;
00499 }
00500
00501
00502 void mergePathHistory(MergeablePathHistory* tmp)
00503 {
00504 CkpvAccess(currentlyExecutingPath).updateMax(*tmp);
00505 }
00506
00507 #include "PathHistory.def.h"
00508