00001
00009 #include "charm.h"
00010 #include "ck.h"
00011 #include "ckmessagelogging.h"
00012 #include "queueing.h"
00013 #include <sys/types.h>
00014 #include <signal.h>
00015 #include "CentralLB.h"
00016
00017 #ifdef _FAULT_MLOG_
00018
00019
00020 #define COLLECT_STATS_MSGS 0
00021 #define COLLECT_STATS_MSGS_TOTAL 0
00022 #define COLLECT_STATS_MSG_COUNT 0
00023 #define COLLECT_STATS_MEMORY 0
00024 #define COLLECT_STATS_TEAM 0
00025
00026 #define RECOVERY_SEND "SEND"
00027 #define RECOVERY_PROCESS "PROCESS"
00028
00029 #define DEBUG_MEM(x) //x
00030 #define DEBUG(x) // x
00031 #define DEBUG_RESTART(x) //x
00032 #define DEBUGLB(x) // x
00033 #define DEBUG_TEAM(x) // x
00034 #define DEBUG_PERF(x) // x
00035 #define DEBUG_CHECKPOINT 1
00036 #define DEBUG_NOW(x) x
00037 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
00038 #define DEBUG_PE_NOW(x,y) if(CkMyPe() == x) y
00039 #define DEBUG_RECOVERY(x) //x
00040
00041 #define FAIL_DET_THRESHOLD 10
00042
00043 extern const char *idx2str(const CkArrayIndex &ind);
00044 extern const char *idx2str(const ArrayElement *el);
00045
00046 void getGlobalStep(CkGroupID gID);
00047
00048 bool fault_aware(CkObjID &recver);
00049 void createObjIDList(void *data,ChareMlogData *mlogData);
00050 inline bool isLocal(int destPE);
00051 inline bool isTeamLocal(int destPE);
00052 void printLog(CkObjID *log);
00053
00054 int _restartFlag=false;
00055 int _numRestartResponses=0;
00056
00057 char *checkpointDirectory=".";
00058 int unAckedCheckpoint=0;
00059 int countUpdateHomeAcks=0;
00060
00061 extern int teamSize;
00062 extern int chkptPeriod;
00063 extern bool fastRecovery;
00064 extern int parallelRecovery;
00065
00066 char *killFile;
00067 char *faultFile;
00068 int killFlag=0;
00069 int faultFlag=0;
00070 int restartingMlogFlag=0;
00071 void readKillFile();
00072 double killTime=0.0;
00073 double faultMean;
00074 int checkpointCount=0;
00075 int diskCkptFlag = 0;
00076 static char fName[100];
00077
00078
00079
00080 CpvDeclare(Chare *,_currentObj);
00081
00082 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
00083
00084 CpvDeclare(char *, _incarnation);
00085
00086
00087
00088 CpvDeclare(int, _numEmigrantRecObjs);
00089 CpvDeclare(int, _numImmigrantRecObjs);
00090 CpvDeclare(std::vector<CkLocation *> *, _immigrantRecObjs);
00091
00092
00093 #if COLLECT_STATS_MSGS
00094 int *numMsgsTarget;
00095 int *sizeMsgsTarget;
00096 int totalMsgsTarget;
00097 float totalMsgsSize;
00098 #endif
00099 #if COLLECT_STATS_MEMORY
00100 int msgLogSize;
00101 int bufferedDetsSize;
00102 int storedDetsSize;
00103 #endif
00104
00105
00106 int _pingHandlerIdx;
00107 int _checkpointRequestHandlerIdx;
00108 int _storeCheckpointHandlerIdx;
00109 int _checkpointAckHandlerIdx;
00110 int _getCheckpointHandlerIdx;
00111 int _recvCheckpointHandlerIdx;
00112 int _dummyMigrationHandlerIdx;
00113 int _getGlobalStepHandlerIdx;
00114 int _recvGlobalStepHandlerIdx;
00115 int _updateHomeRequestHandlerIdx;
00116 int _updateHomeAckHandlerIdx;
00117 int _resendMessagesHandlerIdx;
00118 int _receivedDetDataHandlerIdx;
00119 int _distributedLocationHandlerIdx;
00120 int _sendBackLocationHandlerIdx;
00121
00122 void setTeamRecovery(void *data, ChareMlogData *mlogData);
00123 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
00124 int _falseRestart =0;
00131
00132 int onGoingLoadBalancing=0;
00133 void *centralLb;
00134 void (*resumeLbFnPtr)(void *);
00135 int _receiveMlogLocationHandlerIdx;
00136 int _checkpointBarrierHandlerIdx;
00137 int _checkpointBarrierAckHandlerIdx;
00138 int _startCheckpointIdx;
00139 int _endCheckpointIdx;
00140 int donotCountMigration=0;
00141 int countLBMigratedAway=0;
00142 int countLBToMigrate=0;
00143 int migrationDoneCalled=0;
00144 int checkpointBarrierCount=0;
00145 int globalResumeCount=0;
00146 CkGroupID globalLBID;
00147 int restartDecisionNumber=-1;
00148 double lastCompletedAlarm=0;
00149 double lastRestart=0;
00150 CkCallback ckptCallback;
00151
00152
00153 int _receiveLocationHandlerIdx;
00154
00155 #if CMK_CONVERSE_MPI
00156 static int heartBeatHandlerIdx;
00157 static int heartBeatCheckHandlerIdx;
00158 static int partnerFailureHandlerIdx;
00159 static double lastPingTime = -1;
00160
00161 void mpi_restart_crashed(int pe, int rank);
00162 int find_spare_mpirank(int pe, int partition);
00163
00164 void heartBeatPartner();
00165 void heartBeatHandler(void *msg);
00166 void heartBeatCheckHandler();
00167 void partnerFailureHandler(char *msg);
00168 int getReverseCheckPointPE();
00169 int inCkptFlag = 0;
00170 #endif
00171
00172 static void *doNothingMsg(int * size, void * data, void ** remote, int count){
00173 return data;
00174 }
00175
00176
00177
00181 void _messageLoggingInit(){
00182 if(CkMyPe() == 0)
00183 CkPrintf("[%d] Fast Message Logging Support \n",CkMyPe());
00184
00185
00186 CpvInitialize(Chare *,_currentObj);
00187
00188
00189 _pingHandlerIdx = CkRegisterHandler(_pingHandler);
00190
00191
00192 _storeCheckpointHandlerIdx = CkRegisterHandler(_storeCheckpointHandler);
00193 _checkpointAckHandlerIdx = CkRegisterHandler( _checkpointAckHandler);
00194 _checkpointRequestHandlerIdx = CkRegisterHandler(_checkpointRequestHandler);
00195
00196
00197 _getCheckpointHandlerIdx = CkRegisterHandler(_getCheckpointHandler);
00198 _recvCheckpointHandlerIdx = CkRegisterHandler(_recvCheckpointHandler);
00199 _resendMessagesHandlerIdx = CkRegisterHandler(_resendMessagesHandler);
00200 _distributedLocationHandlerIdx=CkRegisterHandler(_distributedLocationHandler);
00201 _sendBackLocationHandlerIdx=CkRegisterHandler(_sendBackLocationHandler);
00202 _dummyMigrationHandlerIdx = CkRegisterHandler(_dummyMigrationHandler);
00203
00204
00205 _receiveMlogLocationHandlerIdx=CkRegisterHandler(_receiveMlogLocationHandler);
00206 _getGlobalStepHandlerIdx=CkRegisterHandler(_getGlobalStepHandler);
00207 _recvGlobalStepHandlerIdx=CkRegisterHandler(_recvGlobalStepHandler);
00208 _checkpointBarrierHandlerIdx=CkRegisterHandler(_checkpointBarrierHandler);
00209 _checkpointBarrierAckHandlerIdx=CkRegisterHandler(_checkpointBarrierAckHandler);
00210 _startCheckpointIdx = CkRegisterHandler(_startCheckpointHandler);
00211 _endCheckpointIdx = CkRegisterHandler(_endCheckpointHandler);
00212
00213
00214 _receiveLocationHandlerIdx=CkRegisterHandler(_receiveLocationHandler);
00215
00216
00217 #if CMK_CONVERSE_MPI
00218 heartBeatHandlerIdx = CkRegisterHandler(heartBeatHandler);
00219 heartBeatCheckHandlerIdx = CkRegisterHandler(heartBeatCheckHandler);
00220 partnerFailureHandlerIdx = CkRegisterHandler(partnerFailureHandler);
00221 #endif
00222
00223
00224 CpvInitialize(char *, _incarnation);
00225 CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
00226 for(int i=0; i<CmiNumPes(); i++){
00227 CpvAccess(_incarnation)[i] = 0;
00228 }
00229
00230
00231 CpvInitialize(int, _numEmigrantRecObjs);
00232 CpvAccess(_numEmigrantRecObjs) = 0;
00233 CpvInitialize(int, _numImmigrantRecObjs);
00234 CpvAccess(_numImmigrantRecObjs) = 0;
00235
00236 CpvInitialize(std::vector<CkLocation *> *, _immigrantRecObjs);
00237 CpvAccess(_immigrantRecObjs) = new std::vector<CkLocation *>;
00238
00239
00240 CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
00241 CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
00242
00243
00244 if(diskCkptFlag){
00245 #if CMK_USE_MKSTEMP
00246 sprintf(fName, "/tmp/ckpt%d-XXXXXX", CkMyPe());
00247 mkstemp(fName);
00248 #else
00249 fName=tmpnam(NULL);
00250 #endif
00251 }
00252
00253
00254 traceRegisterUserEvent("Remove Logs", 20);
00255 traceRegisterUserEvent("Ticket Request Handler", 21);
00256 traceRegisterUserEvent("Ticket Handler", 22);
00257 traceRegisterUserEvent("Local Message Copy Handler", 23);
00258 traceRegisterUserEvent("Local Message Ack Handler", 24);
00259 traceRegisterUserEvent("Preprocess current message",25);
00260 traceRegisterUserEvent("Preprocess past message",26);
00261 traceRegisterUserEvent("Preprocess future message",27);
00262 traceRegisterUserEvent("Checkpoint",28);
00263 traceRegisterUserEvent("Checkpoint Store",29);
00264 traceRegisterUserEvent("Checkpoint Ack",30);
00265 traceRegisterUserEvent("Send Ticket Request",31);
00266 traceRegisterUserEvent("Generalticketrequest1",32);
00267 traceRegisterUserEvent("TicketLogLocal",33);
00268 traceRegisterUserEvent("next_ticket and SN",34);
00269 traceRegisterUserEvent("Timeout for buffered remote messages",35);
00270 traceRegisterUserEvent("Timeout for buffered local messages",36);
00271 traceRegisterUserEvent("Inform Location Home",37);
00272 traceRegisterUserEvent("Receive Location Handler",38);
00273
00274 lastCompletedAlarm=CmiWallTimer();
00275 lastRestart = CmiWallTimer();
00276
00277
00278 #if CMK_CONVERSE_MPI
00279 void heartBeatPartner();
00280 void heartBeatCheckHandler();
00281 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00282 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00283 #endif
00284
00285 #if COLLECT_STATS_MSGS
00286 #if COLLECT_STATS_MSGS_TOTAL
00287 totalMsgsTarget = 0;
00288 totalMsgsSize = 0.0;
00289 #else
00290 numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00291 sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00292 for(int i=0; i<CmiNumPes(); i++){
00293 numMsgsTarget[i] = 0;
00294 sizeMsgsTarget[i] = 0;
00295 }
00296 #endif
00297 #endif
00298 #if COLLECT_STATS_MEMORY
00299 msgLogSize = 0;
00300 bufferedDetsSize = 0;
00301 storedDetsSize = 0;
00302 #endif
00303
00304 }
00305
00306 #if CMK_CONVERSE_MPI
00307
00311 void partnerFailureHandler(char *msg)
00312 {
00313 int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
00314
00315
00316 int newrank = find_spare_mpirank(diepe, CmiMyPartition());
00317 int buddy = getReverseCheckPointPE();
00318 if (buddy == diepe) {
00319 mpi_restart_crashed(diepe, newrank);
00320 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00321 }
00322 }
00323
00327 void heartBeatHandler(void *msg)
00328 {
00329 lastPingTime = CmiWallTimer();
00330 CmiFree(msg);
00331 }
00332
00336 void heartBeatCheckHandler()
00337 {
00338 double now = CmiWallTimer();
00339 if (lastPingTime > 0 && now - lastPingTime > FAIL_DET_THRESHOLD && !inCkptFlag) {
00340 int i, pe, buddy;
00341
00342 buddy = getReverseCheckPointPE();
00343 CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
00344
00345 for (int pe = 0; pe < CmiNumPes(); pe++) {
00346 if (pe == buddy) continue;
00347 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00348 *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
00349 CmiSetHandler(msg, partnerFailureHandlerIdx);
00350 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00351 }
00352 }
00353 else
00354 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00355 }
00356
00360 void heartBeatPartner()
00361 {
00362 int buddy = getCheckPointPE();
00363
00364 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00365 *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
00366 CmiSetHandler(msg, heartBeatHandlerIdx);
00367 CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00368 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00369 }
00370 #endif
00371
00372 void killLocal(void *_dummy,double curWallTime);
00373
00374 void readKillFile(){
00375 FILE *fp=fopen(killFile,"r");
00376 if(!fp){
00377 return;
00378 }
00379 int proc;
00380 double sec;
00381 while(fscanf(fp,"%d %lf",&proc,&sec)==2){
00382 if(proc == CkMyPe()){
00383 killTime = CmiWallTimer()+sec;
00384 printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
00385 CcdCallFnAfter(killLocal,NULL,sec*1000);
00386 }
00387 }
00388 fclose(fp);
00389 }
00390
00395 void readFaultFile(){
00396 FILE *fp=fopen(faultFile,"r");
00397 if(!fp){
00398 return;
00399 }
00400 int proc;
00401 double sec;
00402 fscanf(fp,"%d %lf",&proc,&sec);
00403 faultMean = sec;
00404 if(proc == CkMyPe()){
00405 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
00406 CcdCallFnAfter(killLocal,NULL,sec*1000);
00407 }
00408 fclose(fp);
00409 }
00410
00411 void killLocal(void *_dummy,double curWallTime){
00412 printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
00413 if(CmiWallTimer()<killTime-1){
00414 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);
00415 }else{
00416 #if CMK_CONVERSE_MPI
00417 CkDieNow();
00418 #else
00419 kill(getpid(),SIGKILL);
00420 #endif
00421 }
00422 }
00423
00424 #if ! CMK_CONVERSE_MPI
00425 void CkDieNow()
00426 {
00427
00428 CmiPrintf("[%d] die now.\n", CmiMyPe());
00429 killTime = CmiWallTimer()+0.001;
00430 CcdCallFnAfter(killLocal,NULL,1);
00431 }
00432 #endif
00433
00434
00435
00436
00437
00438
00442 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
00443 if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
00444 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
00445 void *origMsg = EnvToUsr(env);
00446 for(int i=0;i<CmiNumPes();i++){
00447 if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
00448 void *copyMsg = CkCopyMsg(&origMsg);
00449 envelope *copyEnv = UsrToEnv(copyMsg);
00450 copyEnv->SN=0;
00451 copyEnv->sender.type = TypeInvalid;
00452 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
00453 sendGroupMsg(copyEnv,i,_infoIdx);
00454 }
00455 }
00456 return;
00457 }
00458
00459
00460 env->SN=0;
00461 env->sender.type = TypeInvalid;
00462
00463 CkObjID recver;
00464 recver.type = TypeGroup;
00465 recver.data.group.id = env->getGroupNum();
00466 recver.data.group.onPE = destPE;
00467 sendCommonMsg(recver,env,destPE,_infoIdx);
00468 }
00469
00473 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
00474 if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
00475 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
00476 void *origMsg = EnvToUsr(env);
00477 for(int i=0;i<CmiNumNodes();i++){
00478 if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
00479 void *copyMsg = CkCopyMsg(&origMsg);
00480 envelope *copyEnv = UsrToEnv(copyMsg);
00481 copyEnv->SN=0;
00482 copyEnv->sender.type = TypeInvalid;
00483 sendNodeGroupMsg(copyEnv,i,_infoIdx);
00484 }
00485 }
00486 return;
00487 }
00488
00489
00490 env->SN=0;
00491 env->sender.type = TypeInvalid;
00492
00493 CkObjID recver;
00494 recver.type = TypeNodeGroup;
00495 recver.data.group.id = env->getGroupNum();
00496 recver.data.group.onPE = destNode;
00497 sendCommonMsg(recver,env,destNode,_infoIdx);
00498 }
00499
00503 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
00504 CkObjID recver;
00505 recver.type = TypeArray;
00506 recver.data.array.id = env->getArrayMgr();
00507 recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
00508
00509 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00510 char recverString[100],senderString[100];
00511
00512 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00513 }
00514
00515
00516 env->SN = 0;
00517
00518 sendCommonMsg(recver,env,destPE,_infoIdx);
00519 };
00520
00524 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
00525 CkObjID recver;
00526 recver.type = TypeChare;
00527 recver.data.chare.id = *pCid;
00528
00529 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00530 char recverString[100],senderString[100];
00531
00532 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00533 }
00534
00535
00536 env->SN = 0;
00537
00538 sendCommonMsg(recver,env,destPE,_infoIdx);
00539 };
00540
00544 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
00545 envelope *env = _env;
00546 int resend=0;
00547 DEBUG(char recverName[100]);
00548 DEBUG(char senderString[100]);
00549
00550 DEBUG_MEM(CmiMemoryCheck());
00551
00552 if(CpvAccess(_currentObj) == NULL){
00553
00554 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
00555 generalCldEnqueue(destPE,env,_infoIdx);
00556 return;
00557 }
00558
00559
00560 if(env->flags & CK_BYPASS_DET_MLOG){
00561 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00562 env->recver = recver;
00563 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
00564 generalCldEnqueue(destPE,env,_infoIdx);
00565 return;
00566 }
00567
00568
00569 env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
00570 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00571 env->SN = 0;
00572
00573 DEBUG_MEM(CmiMemoryCheck());
00574
00575 CkObjID &sender = env->sender;
00576 env->recver = recver;
00577
00578 Chare *obj = (Chare *)env->sender.getObject();
00579
00580 if(env->SN == 0){
00581 DEBUG_MEM(CmiMemoryCheck());
00582 env->SN = obj->mlogData->nextSN(recver);
00583 }else{
00584 resend = 1;
00585 }
00586
00587
00588 if(isLocal(destPE)){
00589 sendLocalMsg(env, _infoIdx);
00590 }else{
00591 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
00592 sendRemoteMsg(sender,recver,destPE,mEntry,env->SN,resend);
00593 }
00594 }
00595
00600 inline bool isLocal(int destPE){
00601
00602 if(destPE == CkMyPe())
00603 return true;
00604
00605 return false;
00606 }
00607
00612 inline bool isTeamLocal(int destPE){
00613
00614
00615 if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
00616 return true;
00617
00618 return false;
00619 }
00620
00624 void sendRemoteMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
00625 DEBUG(char recverString[100]);
00626 DEBUG(char senderString[100]);
00627
00628 int totalSize;
00629
00630 envelope *env = entry->env;
00631 DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
00632
00633
00634 Chare *obj = (Chare *)entry->env->sender.getObject();
00635 entry->env->recver = recver;
00636 entry->env->SN = SN;
00637 if(!resend){
00638 obj->mlogData->addLogEntry(entry);
00639 #if COLLECT_STATS_TEAM
00640 MLOGFT_totalMessages += 1.0;
00641 MLOGFT_totalLogSize += entry->env->getTotalsize();
00642 #endif
00643 }
00644
00645
00646 generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
00647
00648 DEBUG_MEM(CmiMemoryCheck());
00649 #if COLLECT_STATS_MSGS
00650 #if COLLECT_STATS_MSGS_TOTAL
00651 totalMsgsTarget++;
00652 totalMsgsSize += (float)env->getTotalsize();
00653 #else
00654 numMsgsTarget[destPE]++;
00655 sizeMsgsTarget[destPE] += env->getTotalsize();
00656 #endif
00657 #endif
00658 #if COLLECT_STATS_MEMORY
00659 msgLogSize += env->getTotalsize();
00660 #endif
00661 };
00662
00663
00667 void sendLocalMsg(envelope *env, int _infoIdx){
00668 DEBUG_PERF(double _startTime=CkWallTimer());
00669 DEBUG_MEM(CmiMemoryCheck());
00670 DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
00671 DEBUG(char senderString[100]);
00672 DEBUG(char recverString[100]);
00673
00674 DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
00675
00676
00677
00678
00679
00680 Chare *recverObj = (Chare *)env->recver.getObject();
00681
00682
00683 if(recverObj){
00684
00685
00686 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);
00687
00688 DEBUG_MEM(CmiMemoryCheck());
00689 }else{
00690 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
00691 }
00692 };
00693
00694
00695
00696
00697
00698 bool fault_aware(CkObjID &recver){
00699 switch(recver.type){
00700 case TypeChare:
00701 return true;
00702 case TypeMainChare:
00703 return false;
00704 case TypeGroup:
00705 case TypeNodeGroup:
00706 case TypeArray:
00707 return true;
00708 default:
00709 return false;
00710 }
00711 };
00712
00713
00714 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
00715 DEBUG(char recverString[100]);
00716 DEBUG(char senderString[100]);
00717 DEBUG_MEM(CmiMemoryCheck());
00718 int flag;
00719 bool ticketSuccess;
00720
00721
00722 CkObjID recver = env->recver;
00723
00724
00725 if(env->flags & CK_BYPASS_DET_MLOG){
00726 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
00727 return 1;
00728 }
00729
00730
00731 if(!fault_aware(recver)){
00732 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
00733 return 1;
00734 }
00735
00736
00737 Chare *obj = (Chare *)recver.getObject();
00738 *objPointer = obj;
00739 if(obj == NULL){
00740
00741 DEBUG(printf("[%d] Message SN %d sender %s for NULL recver %s\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
00742 return 1;
00743 }
00744
00745
00746 if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
00747 DEBUG(printf("[%d] Stale message SN %d sender %s for recver %s being ignored\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
00748 CmiFree(env);
00749 return 0;
00750 }
00751
00752 DEBUG_MEM(CmiMemoryCheck());
00753 DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
00754
00755
00756 if(obj->mlogData->checkAndStoreSsn(env->sender,env->SN)){
00757 DEBUG(printf("[%d] Duplicate message SN %d sender %s for recver %s being ignored\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
00758 CmiFree(env);
00759 return 0;
00760 }
00761
00762
00763 DEBUG(printf("[%d] Message SN %d sender %s for recver %s being delivered\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
00764 return 1;
00765 }
00766
00770 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
00771 }
00772
00773
00774
00775
00776
00777 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
00778
00779 if(env->recver.type != TypeNodeGroup){
00780
00781
00782
00783
00784
00785 _skipCldEnqueue(destPE,env,_infoIdx);
00786 }else{
00787 _noCldNodeEnqueue(destPE,env);
00788 }
00789
00790 }
00791
00792
00793 void _pingHandler(CkPingMsg *msg){
00794 printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
00795 CmiFree(msg);
00796 }
00797
00798
00799
00800
00801
00802
00803
00804 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
00805 void clearUpMigratedRetainedLists(int PE);
00806
00807 void checkpointAlarm(void *_dummy,double curWallTime){
00808 double diff = curWallTime-lastCompletedAlarm;
00809 DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
00810
00811
00812
00813
00814 if(diff < ((chkptPeriod) - 2)){
00815 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
00816 return;
00817 }
00818 CheckpointRequest request;
00819 CmiInitMsgHeader(request.header, sizeof(CheckpointRequest));
00820 request.PE = CkMyPe();
00821 CmiSetHandler(&request,_checkpointRequestHandlerIdx);
00822 CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
00823 };
00824
00825 void _checkpointRequestHandler(CheckpointRequest *request){
00826 startMlogCheckpoint(NULL,CmiWallTimer());
00827 }
00828
00832 void CkStartMlogCheckpoint(CkCallback &cb){
00833
00834
00835 ckptCallback = cb;
00836
00837
00838 CheckpointBarrierMsg *msg = (CheckpointBarrierMsg *)CmiAlloc(sizeof(CheckpointBarrierMsg));
00839 CmiSetHandler(msg,_startCheckpointIdx);
00840 CmiSyncBroadcastAllAndFree(sizeof(CheckpointBarrierMsg),(char *)msg);
00841 }
00842
00847 void _startCheckpointHandler(CheckpointBarrierMsg *startMsg){
00848 CmiFree(startMsg);
00849
00850
00851 garbageCollectMlog();
00852
00853
00854 checkpointCount++;
00855
00856
00857 globalLBID.idx = 0;
00858
00859 #if CMK_CONVERSE_MPI
00860 inCkptFlag = 1;
00861 #endif
00862
00863 #if DEBUG_CHECKPOINT
00864 if(CmiMyPe() == 0){
00865 printf("[%d] starting checkpoint handler at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
00866 }
00867 #endif
00868
00869 DEBUG_MEM(CmiMemoryCheck());
00870
00871 PUP::sizer psizer;
00872 psizer | checkpointCount;
00873 for(int i=0; i<CmiNumPes(); i++){
00874 psizer | CpvAccess(_incarnation)[i];
00875 }
00876 CkPupROData(psizer);
00877 DEBUG_MEM(CmiMemoryCheck());
00878 CkPupGroupData(psizer,true);
00879 DEBUG_MEM(CmiMemoryCheck());
00880 CkPupNodeGroupData(psizer,true);
00881 DEBUG_MEM(CmiMemoryCheck());
00882 pupArrayElementsSkip(psizer,true,NULL);
00883 DEBUG_MEM(CmiMemoryCheck());
00884
00885 int dataSize = psizer.size();
00886 int totalSize = sizeof(CheckPointDataMsg)+dataSize;
00887 char *msg = (char *)CmiAlloc(totalSize);
00888 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
00889 chkMsg->PE = CkMyPe();
00890 chkMsg->dataSize = dataSize;
00891
00892 char *buf = &msg[sizeof(CheckPointDataMsg)];
00893 PUP::toMem pBuf(buf);
00894
00895 pBuf | checkpointCount;
00896 for(int i=0; i<CmiNumPes(); i++){
00897 pBuf | CpvAccess(_incarnation)[i];
00898 }
00899 CkPupROData(pBuf);
00900 CkPupGroupData(pBuf,true);
00901 CkPupNodeGroupData(pBuf,true);
00902 pupArrayElementsSkip(pBuf,true,NULL);
00903
00904 unAckedCheckpoint=1;
00905 CmiSetHandler(msg,_storeCheckpointHandlerIdx);
00906 CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
00907
00908 #if DEBUG_CHECKPOINT
00909 if(CmiMyPe() == 0){
00910 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
00911 }
00912 #endif
00913
00914 #if COLLECT_STATS_MEMORY
00915 CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
00916 msgLogSize = 0;
00917 bufferedDetsSize = 0;
00918 storedDetsSize = 0;
00919 #endif
00920
00921 };
00922
00926 void _endCheckpointHandler(char *msg){
00927 CmiFree(msg);
00928
00929
00930 ckptCallback.send();
00931 }
00932
00936 void startMlogCheckpoint(void *_dummy, double curWallTime){
00937 double _startTime = CkWallTimer();
00938
00939
00940 checkpointCount++;
00941
00942 #if CMK_CONVERSE_MPI
00943 inCkptFlag = 1;
00944 #endif
00945
00946 #if DEBUG_CHECKPOINT
00947 if(CmiMyPe() == 0){
00948 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
00949 }
00950 #endif
00951
00952 DEBUG_MEM(CmiMemoryCheck());
00953
00954 PUP::sizer psizer;
00955 psizer | checkpointCount;
00956 for(int i=0; i<CmiNumPes(); i++){
00957 psizer | CpvAccess(_incarnation)[i];
00958 }
00959 CkPupROData(psizer);
00960 DEBUG_MEM(CmiMemoryCheck());
00961 CkPupGroupData(psizer,true);
00962 DEBUG_MEM(CmiMemoryCheck());
00963 CkPupNodeGroupData(psizer,true);
00964 DEBUG_MEM(CmiMemoryCheck());
00965 pupArrayElementsSkip(psizer,true,NULL);
00966 DEBUG_MEM(CmiMemoryCheck());
00967
00968 int dataSize = psizer.size();
00969 int totalSize = sizeof(CheckPointDataMsg)+dataSize;
00970 char *msg = (char *)CmiAlloc(totalSize);
00971 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
00972 chkMsg->PE = CkMyPe();
00973 chkMsg->dataSize = dataSize;
00974
00975 char *buf = &msg[sizeof(CheckPointDataMsg)];
00976 PUP::toMem pBuf(buf);
00977
00978 pBuf | checkpointCount;
00979 for(int i=0; i<CmiNumPes(); i++){
00980 pBuf | CpvAccess(_incarnation)[i];
00981 }
00982 CkPupROData(pBuf);
00983 CkPupGroupData(pBuf,true);
00984 CkPupNodeGroupData(pBuf,true);
00985 pupArrayElementsSkip(pBuf,true,NULL);
00986
00987 unAckedCheckpoint=1;
00988 CmiSetHandler(msg,_storeCheckpointHandlerIdx);
00989 CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
00990
00991 #if DEBUG_CHECKPOINT
00992 if(CmiMyPe() == 0){
00993 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
00994 }
00995 #endif
00996
00997 #if COLLECT_STATS_MEMORY
00998 CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
00999 msgLogSize = 0;
01000 bufferedDetsSize = 0;
01001 storedDetsSize = 0;
01002 #endif
01003
01004 if(CkMyPe() == 0 && onGoingLoadBalancing==0 ){
01005 lastCompletedAlarm = curWallTime;
01006 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
01007 }
01008 traceUserBracketEvent(28,_startTime,CkWallTimer());
01009 };
01010
01011
01012 class ElementPacker : public CkLocIterator {
01013 private:
01014 CkLocMgr *locMgr;
01015 PUP::er &p;
01016 public:
01017 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
01018 void addLocation(CkLocation &loc) {
01019 CkArrayIndexMax idx=loc.getIndex();
01020 CkGroupID gID = locMgr->ckGetGroupID();
01021 p|gID;
01022 p|idx;
01023 p|loc;
01024 }
01025 };
01026
01030 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listsize){
01031 int numElements,i;
01032 int numGroups = CkpvAccess(_groupIDTable)->size();
01033 if(!p.isUnpacking()){
01034 numElements = CkCountArrayElements();
01035 }
01036
01037 p | numElements;
01038 DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
01039 if(!p.isUnpacking()){
01040 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
01041 }else{
01042
01043
01044 for(int j=0;j<listsize;j++){
01045 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01046 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
01047 listToSkip[j].idx.print();
01048 }
01049 }
01050
01051 printf("numElements = %d\n",numElements);
01052
01053 for (int i=0; i<numElements; i++) {
01054 CkGroupID gID;
01055 CkArrayIndexMax idx;
01056 p|gID;
01057 p|idx;
01058 int flag=0;
01059 int matchedIdx=0;
01060 for(int j=0;j<listsize;j++){
01061 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01062 if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
01063 matchedIdx = j;
01064 flag = 1;
01065 break;
01066 }
01067 }
01068 }
01069 if(flag == 1){
01070 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01071 }else{
01072 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01073 }
01074
01075 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01076 CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
01077 mgr->resume(idx,p,create,flag);
01078 if(flag == 1){
01079 int homePE = mgr->homePe(idx);
01080 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
01081 }
01082 }
01083 }
01084 };
01085
01089 void readCheckpointFromDisk(int size, char *data){
01090 FILE *f = fopen(fName,"rb");
01091 fread(data,1,size,f);
01092 fclose(f);
01093 }
01094
01098 void writeCheckpointToDisk(int size, char *data){
01099 FILE *f = fopen(fName,"wb");
01100 fwrite(data, 1, size, f);
01101 fclose(f);
01102 }
01103
01104
01105
01106 void _storeCheckpointHandler(char *msg){
01107 double _startTime=CkWallTimer();
01108
01109 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01110 DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
01111
01112 char *chkpt = &msg[sizeof(CheckPointDataMsg)];
01113
01114 char *oldChkpt = CpvAccess(_storedCheckpointData)->buf;
01115 if(oldChkpt != NULL){
01116 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
01117 CmiFree(oldmsg);
01118 }
01119
01120 int sendingPE = chkMsg->PE;
01121
01122 CpvAccess(_storedCheckpointData)->buf = chkpt;
01123 CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
01124 CpvAccess(_storedCheckpointData)->PE = sendingPE;
01125
01126
01127 if(diskCkptFlag){
01128 writeCheckpointToDisk(chkMsg->dataSize,chkpt);
01129 CpvAccess(_storedCheckpointData)->buf = NULL;
01130 CmiFree(msg);
01131 }
01132
01133 CheckPointAck ackMsg;
01134 CmiInitMsgHeader(ackMsg.header, sizeof(CheckPointAck));
01135 ackMsg.PE = CkMyPe();
01136 ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
01137 CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
01138 CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
01139
01140 traceUserBracketEvent(29,_startTime,CkWallTimer());
01141 };
01142
01143
01144 void _checkpointAckHandler(CheckPointAck *ackMsg){
01145 DEBUG_MEM(CmiMemoryCheck());
01146 unAckedCheckpoint=0;
01147 DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
01148 DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));
01149 if(onGoingLoadBalancing) {
01150 onGoingLoadBalancing = 0;
01151 finishedCheckpointLoadBalancing();
01152 } else {
01153 #if CMK_CONVERSE_MPI
01154 inCkptFlag = 0;
01155 #endif
01156 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01157 CmiSetHandler(msg, _endCheckpointIdx);
01158 CmiReduce(msg,CmiMsgHeaderSizeBytes,doNothingMsg);
01159 }
01160 CmiFree(ackMsg);
01161 };
01162
01163
01164
01165
01166
01172 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
01173 RestartRequest msg;
01174 CmiInitMsgHeader(msg.header, sizeof(RestartRequest));
01175
01176 fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
01177
01178
01179 _restartFlag = true;
01180 _numRestartResponses = 0;
01181
01182
01183 msg.PE = CkMyPe();
01184 CmiSetHandler(&msg,_getCheckpointHandlerIdx);
01185 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01186 };
01187
01188 void CkMlogRestartDouble(void *,double){
01189 CkMlogRestart(NULL,NULL);
01190 };
01191
01195 void _getCheckpointHandler(RestartRequest *restartMsg){
01196
01197
01198 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01199
01200
01201 CkAssert(restartMsg->PE == storedChkpt->PE);
01202
01203 int totalSize = sizeof(RestartProcessorData) + storedChkpt->bufSize;
01204
01205 DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
01206 CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
01207
01208 char *msg = (char *)CmiAlloc(totalSize);
01209
01210 RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
01211 dataMsg->PE = CkMyPe();
01212 dataMsg->restartWallTime = CmiTimer();
01213 dataMsg->checkPointSize = storedChkpt->bufSize;
01214 dataMsg->lbGroupID = globalLBID;
01215
01216
01217 char *buf = &msg[sizeof(RestartProcessorData)];
01218
01219 if(diskCkptFlag){
01220 readCheckpointFromDisk(storedChkpt->bufSize,buf);
01221 } else {
01222 memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
01223 }
01224
01225 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
01226 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
01227 CmiFree(restartMsg);
01228 }
01229
01230
01231
01232
01233 void createObjIDList(void *data, ChareMlogData *mlogData){
01234 std::vector<CkObjID> *list = (std::vector<CkObjID> *)data;
01235 CkObjID entry;
01236 entry = mlogData->objID;
01237 list->push_back(entry);
01238 DEBUG_RECOVERY(printLog(&entry));
01239 }
01240
01241
01246 void _recvCheckpointHandler(char *_restartData){
01247 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
01248
01249 globalLBID = restartData->lbGroupID;
01250
01251 printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
01252 char *buf = &_restartData[sizeof(RestartProcessorData)];
01253
01254 PUP::fromMem pBuf(buf);
01255
01256 pBuf | checkpointCount;
01257 for(int i=0; i<CmiNumPes(); i++){
01258 pBuf | CpvAccess(_incarnation)[i];
01259 }
01260 CkPupROData(pBuf);
01261 CkPupGroupData(pBuf,true);
01262 CkPupNodeGroupData(pBuf,true);
01263 pupArrayElementsSkip(pBuf,true,NULL);
01264 CkAssert(pBuf.size() == restartData->checkPointSize);
01265 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
01266
01267
01268 CpvAccess(_incarnation)[CmiMyPe()]++;
01269
01270 forAllCharesDo(initializeRestart,NULL);
01271
01272 CmiFree(_restartData);
01273
01274 _initDone();
01275
01276 getGlobalStep(globalLBID);
01277 }
01278
01279
01283 void initializeRestart(void *data, ChareMlogData *mlogData){
01284 mlogData->resendReplyRecvd = 0;
01285 mlogData->restartFlag = true;
01286 };
01287
01291 void updateHomePE(void *data,ChareMlogData *mlogData){
01292 RestartRequest *updateRequest = (RestartRequest *)data;
01293 int PE = updateRequest->PE;
01294
01295
01296 if(mlogData->objID.type == TypeArray){
01297
01298 CkGroupID myGID = mlogData->objID.data.array.id;
01299 CkArrayIndexMax myIdx = mlogData->objID.data.array.idx.asChild();
01300 CkArrayID aid(mlogData->objID.data.array.id);
01301
01302 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
01303 if(locMgr->homePe(myIdx) == PE){
01304 DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
01305 DEBUG_RESTART(myIdx.print());
01306 informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
01307 }
01308 }
01309 };
01310
01314 void printLog(CkObjID &recver){
01315 char recverString[100];
01316 CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" \n",CkMyPe(),recver.toString(recverString));
01317 }
01318
01322 void printMsg(envelope *env, const char* par){
01323 char senderString[100];
01324 char recverString[100];
01325 CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
01326 }
01327
01333 void resendMessageForChare(void *data, ChareMlogData *mlogData){
01334 DEBUG_RESTART(char nameString[100]);
01335 DEBUG_RESTART(char recverString[100]);
01336 DEBUG_RESTART(char senderString[100]);
01337
01338 ResendData *resendData = (ResendData *)data;
01339 int PE = resendData->PE;
01340 int count=0;
01341 int ticketRequests=0;
01342 CkQ<MlogEntry *> *log = mlogData->getMlog();
01343
01344 DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
01345
01346
01347 for(int i=0;i<log->length();i++){
01348 MlogEntry *logEntry = (*log)[i];
01349
01350
01351
01352 envelope *env = logEntry->env;
01353 if(env == NULL){
01354 continue;
01355 }
01356
01357
01358 if(env->recver.type != TypeInvalid){
01359 for(int j=0;j<resendData->numberObjects;j++){
01360 if(env->recver == (resendData->listObjects)[j]){
01361 if(PE != CkMyPe()){
01362 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
01363 if(env->recver.type == TypeNodeGroup){
01364 CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
01365 }else{
01366 CmiSetHandler(env,CmiGetXHandler(env));
01367 CmiSyncSend(PE,env->getTotalsize(),(char *)env);
01368 }
01369 }else{
01370 envelope *copyEnv = copyEnvelope(env);
01371 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
01372 }
01373 DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
01374 count++;
01375 }
01376 }
01377
01378 }
01379 }
01380 DEBUG_RESTART(printf("[%d] Resent %d/%d (%d) messages from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)
01381 }
01382
01387 void _resendMessagesHandler(char *msg){
01388 ResendData d;
01389 ResendRequest *resendReq = (ResendRequest *)msg;
01390
01391
01392 CmiResetGlobalReduceSeqID();
01393
01394
01395 char *listObjects = &msg[sizeof(ResendRequest)];
01396 d.numberObjects = resendReq->numberObjects;
01397 d.PE = resendReq->PE;
01398 d.listObjects = (CkObjID *)listObjects;
01399
01400 DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
01401
01402
01403 forAllCharesDo(resendMessageForChare,&d);
01404
01405 DEBUG_MEM(CmiMemoryCheck());
01406
01407 CmiFree(msg);
01408 }
01409
01410
01411
01412
01413
01414
01415
01416
01417
01418
01419 class ElementDistributor: public CkLocIterator{
01420 CkLocMgr *locMgr;
01421 int *targetPE;
01422
01423 void pupLocation(CkLocation &loc,PUP::er &p){
01424 CkArrayIndexMax idx=loc.getIndex();
01425 CkGroupID gID = locMgr->ckGetGroupID();
01426 p|gID;
01427 p|idx;
01428 p|loc;
01429 };
01430 public:
01431 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
01432
01433 void addLocation(CkLocation &loc){
01434
01435
01436 if(*targetPE == CkMyPe()){
01437 *targetPE = (*targetPE +1)%CkNumPes();
01438 return;
01439 }
01440
01441 CkArrayIndexMax idx = loc.getIndex();
01442 CkLocRec *rec = loc.getLocalRecord();
01443 CkLocMgr *locMgr = loc.getManager();
01444 std::vector<CkMigratable *> eltList;
01445
01446 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
01447 idx.print();
01448
01449
01450 CpvAccess(_numEmigrantRecObjs)++;
01451 locMgr->migratableList(rec,eltList);
01452 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
01453
01454
01455 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
01456 reductionMgr->incNumEmigrantRecObjs();
01457
01458
01459 PUP::sizer psizer;
01460 pupLocation(loc,psizer);
01461 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
01462 char *msg = (char *)CmiAlloc(totalSize);
01463 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
01464 distributeMsg->PE = CkMyPe();
01465 char *buf = &msg[sizeof(DistributeObjectMsg)];
01466 PUP::toMem pmem(buf);
01467 pmem.becomeDeleting();
01468 pupLocation(loc,pmem);
01469
01470 locMgr->setDuringMigration(true);
01471 delete rec;
01472 locMgr->setDuringMigration(false);
01473 locMgr->inform(idx,*targetPE);
01474
01475 CmiSetHandler(msg,_distributedLocationHandlerIdx);
01476 CmiSyncSendAndFree(*targetPE,totalSize,msg);
01477
01478 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
01479
01480
01481 *targetPE = *targetPE + 1;
01482 if(*targetPE > (CkMyPe() + parallelRecovery)){
01483 *targetPE = CkMyPe() + 1;
01484 }
01485 }
01486
01487 };
01488
01492 void distributeRestartedObjects(){
01493 int numGroups = CkpvAccess(_groupIDTable)->size();
01494 int i;
01495 int targetPE=CkMyPe()+1;
01496 CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
01497 };
01498
01502 void _sendBackLocationHandler(char *receivedMsg){
01503 printf("Array element received at processor %d after recovery\n",CkMyPe());
01504 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
01505 int sourcePE = distributeMsg->PE;
01506 char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
01507 PUP::fromMem pmem(buf);
01508 CkGroupID gID;
01509 CkArrayIndexMax idx;
01510 pmem |gID;
01511 pmem |idx;
01512 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01513 donotCountMigration=1;
01514 mgr->resume(idx,pmem,true);
01515 donotCountMigration=0;
01516 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
01517 printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
01518 idx.print();
01519
01520
01521 std::vector<CkMigratable *> eltList;
01522 CkLocRec *rec = mgr->elementRec(idx);
01523 mgr->migratableList(rec,eltList);
01524 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
01525 reductionMgr->decNumEmigrantRecObjs();
01526 reductionMgr->decGCount();
01527
01528
01529 CpvAccess(_numEmigrantRecObjs)--;
01530 if(CpvAccess(_numEmigrantRecObjs) == 0){
01531 (*resumeLbFnPtr)(centralLb);
01532 }
01533
01534 }
01535
01539 void _distributedLocationHandler(char *receivedMsg){
01540 printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
01541 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
01542 int sourcePE = distributeMsg->PE;
01543 char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
01544 PUP::fromMem pmem(buf);
01545 CkGroupID gID;
01546 CkArrayIndexMax idx;
01547 pmem |gID;
01548 pmem |idx;
01549 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01550 donotCountMigration=1;
01551 mgr->resume(idx,pmem,true);
01552 donotCountMigration=0;
01553 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
01554 printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
01555 idx.print();
01556
01557 CkLocRec *rec = mgr->elementRec(idx);
01558 CmiAssert(rec != NULL);
01559
01560
01561 CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,rec));
01562 CpvAccess(_numImmigrantRecObjs)++;
01563
01564 std::vector<CkMigratable *> eltList;
01565 mgr->migratableList((CkLocRec *)rec,eltList);
01566 for(int i=0;i<eltList.size();i++){
01567 if(eltList[i]->mlogData->toResumeOrNot && eltList[i]->mlogData->resumeCount < globalResumeCount){
01568 CpvAccess(_currentObj) = eltList[i];
01569 eltList[i]->mlogData->immigrantRecFlag = true;
01570 eltList[i]->mlogData->immigrantSourcePE = sourcePE;
01571
01572
01573 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
01574 reductionMgr->incNumImmigrantRecObjs();
01575 reductionMgr->decGCount();
01576
01577 eltList[i]->ResumeFromSync();
01578 }
01579 }
01580 }
01581
01582
01585 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
01586 DummyMigrationMsg buf;
01587 CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
01588 buf.flag = MLOG_OBJECT;
01589 buf.lbID = lbID;
01590 buf.mgrID = locMgrID;
01591 buf.idx = idx;
01592 buf.locationPE = locationPE;
01593 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
01594 CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
01595 };
01596
01597
01602 void sendDummyMigrationCounts(int *dummyCounts){
01603 DummyMigrationMsg buf;
01604 CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
01605 buf.flag = MLOG_COUNT;
01606 buf.lbID = globalLBID;
01607 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
01608 for(int i=0;i<CmiNumPes();i++){
01609 if(i != CmiMyPe() && dummyCounts[i] != 0){
01610 buf.count = dummyCounts[i];
01611 CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
01612 }
01613 }
01614 }
01615
01616
01620 void _dummyMigrationHandler(DummyMigrationMsg *msg){
01621 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
01622 if(msg->flag == MLOG_OBJECT){
01623 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
01624 LDObjHandle h;
01625 lb->Migrated(h,1);
01626 }
01627 if(msg->flag == MLOG_COUNT){
01628 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
01629 for(int i=0;i<msg->count;i++){
01630 LDObjHandle h;
01631 lb->Migrated(h,1);
01632 }
01633 }
01634 CmiFree(msg);
01635 };
01636
01637
01638
01639
01640
01641
01642
01643
01644 class ElementCaller : public CkLocIterator {
01645 private:
01646 CkLocMgr *locMgr;
01647 MlogFn fnPointer;
01648 void *data;
01649 public:
01650 ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
01651 locMgr = _locMgr;
01652 fnPointer = _fnPointer;
01653 data = _data;
01654 };
01655 void addLocation(CkLocation &loc){
01656 std::vector<CkMigratable *> list;
01657 CkLocRec *local = loc.getLocalRecord();
01658 locMgr->migratableList (local,list);
01659 for(int i=0;i<list.size();i++){
01660 CkMigratable *migratableElement = list[i];
01661 fnPointer(data,migratableElement->mlogData);
01662 }
01663 }
01664 };
01665
01669 void forAllCharesDo(MlogFn fnPointer, void *data){
01670 int numGroups = CkpvAccess(_groupIDTable)->size();
01671 for(int i=0;i<numGroups;i++){
01672 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
01673 fnPointer(data,obj->mlogData);
01674 }
01675 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
01676 for(int i=0;i<numNodeGroups;i++){
01677 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
01678 fnPointer(data,obj->mlogData);
01679 }
01680 int i;
01681 CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
01682 };
01683
01684
01685
01686
01687
01688
01694 void initMlogLBStep(CkGroupID gid){
01695 DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
01696 countLBMigratedAway = 0;
01697 countLBToMigrate=0;
01698 onGoingLoadBalancing=1;
01699 migrationDoneCalled=0;
01700 checkpointBarrierCount=0;
01701 if(globalLBID.idx != 0){
01702 CmiAssert(globalLBID.idx == gid.idx);
01703 }
01704 globalLBID = gid;
01705 #if SYNCHRONIZED_CHECKPOINT
01706 garbageCollectMlog();
01707 #endif
01708 }
01709
01713 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
01714 CkArrayIndexMax idx = loc->getIndex();
01715 CkGroupID gID = locMgr->ckGetGroupID();
01716 p|gID;
01717 p|idx;
01718 p|*loc;
01719 };
01720
01724 void sendBackImmigrantRecObjs(){
01725 CkLocation *loc;
01726 CkLocMgr *locMgr;
01727 CkArrayIndexMax idx;
01728 CkLocRec *rec;
01729 PUP::sizer psizer;
01730 int targetPE;
01731 std::vector<CkMigratable *> eltList;
01732 CkReductionMgr *reductionMgr;
01733
01734
01735 for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
01736
01737
01738 loc = (*CpvAccess(_immigrantRecObjs))[i];
01739 idx = loc->getIndex();
01740 rec = loc->getLocalRecord();
01741 locMgr = loc->getManager();
01742 locMgr->migratableList(rec,eltList);
01743 targetPE = eltList[i]->mlogData->immigrantSourcePE;
01744
01745
01746 reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
01747 reductionMgr->decNumImmigrantRecObjs();
01748
01749 CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
01750 idx.print();
01751
01752
01753 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
01754
01755
01756 pupLocation(loc,locMgr,psizer);
01757 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
01758 char *msg = (char *)CmiAlloc(totalSize);
01759 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
01760 distributeMsg->PE = CkMyPe();
01761 char *buf = &msg[sizeof(DistributeObjectMsg)];
01762 PUP::toMem pmem(buf);
01763 pmem.becomeDeleting();
01764 pupLocation(loc,locMgr,pmem);
01765
01766 locMgr->setDuringMigration(true);
01767 delete rec;
01768 locMgr->setDuringMigration(false);
01769 locMgr->inform(idx,targetPE);
01770
01771
01772 CmiSetHandler(msg,_sendBackLocationHandlerIdx);
01773 CmiSyncSendAndFree(targetPE,totalSize,msg);
01774
01775
01776 delete loc;
01777
01778 CmiAssert(locMgr->lastKnown(idx) == targetPE);
01779
01780 }
01781
01782
01783 CpvAccess(_immigrantRecObjs)->clear();
01784 CpvAccess(_numImmigrantRecObjs) = 0;
01785
01786 }
01787
01792 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
01793 resumeLbFnPtr = _fnPtr;
01794 centralLb = _centralLb;
01795
01796
01797 if(CpvAccess(_numImmigrantRecObjs) > 0){
01798 sendBackImmigrantRecObjs();
01799 }
01800
01801
01802 if(CpvAccess(_numEmigrantRecObjs) > 0)
01803 return;
01804
01805
01806 (*resumeLbFnPtr)(centralLb);
01807 }
01808
01809 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
01810 DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
01811 DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
01812
01813 resumeLbFnPtr = _fnPtr;
01814 centralLb = _centralLb;
01815 migrationDoneCalled = 1;
01816 if(countLBToMigrate == countLBMigratedAway){
01817 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
01818 startMlogCheckpoint(NULL,CmiWallTimer());
01819 }
01820 };
01821
01822 void finishedCheckpointLoadBalancing(){
01823 DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
01824
01825 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01826 CmiSetHandler(msg,_checkpointBarrierHandlerIdx);
01827 CmiReduce(msg,CmiMsgHeaderSizeBytes,doNothingMsg);
01828 };
01829
01830 void _receiveMlogLocationHandler(void *buf){
01831 envelope *env = (envelope *)buf;
01832 DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
01833 CkUnpackMessage(&env);
01834 void *_msg = EnvToUsr(env);
01835 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
01836 CkGroupID gID= msg->gid;
01837 DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
01838 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01839 CpvAccess(_currentObj)=mgr;
01840 mgr->immigrate(msg);
01841 };
01842
01843 void _checkpointBarrierHandler(CheckpointBarrierMsg *barrierMsg){
01844
01845
01846 CmiFree(barrierMsg);
01847
01848
01849 CheckpointBarrierMsg *msg = (CheckpointBarrierMsg *)CmiAlloc(sizeof(CheckpointBarrierMsg));
01850 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
01851 CmiSyncBroadcastAllAndFree(sizeof(CheckpointBarrierMsg),(char *)msg);
01852 }
01853
01854 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
01855 DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
01856 DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
01857
01858 #if CMK_CONVERSE_MPI
01859 inCkptFlag = 0;
01860 #endif
01861
01862
01863 (*resumeLbFnPtr)(centralLb);
01864
01865
01866 CmiFree(msg);
01867 }
01868
01872 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
01873 int total;
01874 MlogEntry *logEntry;
01875 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
01876
01877
01878 total = mlog->length();
01879 for(int i=0; i<total; i++){
01880 logEntry = mlog->deq();
01881 delete logEntry;
01882 }
01883
01884 }
01885
01889 void garbageCollectMlog(){
01890 DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
01891
01892
01893 forAllCharesDo(garbageCollectMlogForChare, NULL);
01894 }
01895
01901 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
01902 double _startTime = CmiWallTimer();
01903 CurrentLocationMsg msg;
01904 CmiInitMsgHeader(msg.header, sizeof(CurrentLocationMsg));
01905 msg.mgrID = locMgrID;
01906 msg.idx = idx;
01907 msg.locationPE = currentPE;
01908 msg.fromPE = CkMyPe();
01909
01910 DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
01911 CmiSetHandler(&msg,_receiveLocationHandlerIdx);
01912 CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
01913 traceUserBracketEvent(37,_startTime,CmiWallTimer());
01914 }
01915
01916
01917 void _receiveLocationHandler(CurrentLocationMsg *data){
01918 double _startTime = CmiWallTimer();
01919 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
01920 if(mgr == NULL){
01921 CmiFree(data);
01922 return;
01923 }
01924 CkLocRec *rec = mgr->elementNrec(data->idx);
01925 DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
01926 if(rec != NULL){
01927 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe()){
01928 if(data->fromPE == data->locationPE){
01929 CmiAbort("Another processor has the same object");
01930 }
01931 }
01932 }
01933 if(rec!= NULL && data->fromPE != CmiMyPe()){
01934 int targetPE = data->fromPE;
01935 data->fromPE = CmiMyPe();
01936 data->locationPE = CmiMyPe();
01937 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
01938 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
01939 }else{
01940 mgr->inform(data->idx,data->locationPE);
01941 }
01942 CmiFree(data);
01943 traceUserBracketEvent(38,_startTime,CmiWallTimer());
01944 }
01945
01946
01947
01948 void getGlobalStep(CkGroupID gID){
01949 LBStepMsg msg;
01950 CmiInitMsgHeader(msg.header, sizeof(LBStepMsg));
01951 int destPE = 0;
01952 msg.lbID = gID;
01953 msg.fromPE = CmiMyPe();
01954 msg.step = -1;
01955 CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
01956 CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
01957 };
01958
01959 void _getGlobalStepHandler(LBStepMsg *msg){
01960 CentralLB *lb;
01961 if(msg->lbID.idx != 0){
01962 lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
01963 msg->step = lb->step();
01964 } else {
01965 msg->step = -1;
01966 }
01967 CmiAssert(msg->fromPE != CmiMyPe());
01968 CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,msg->step,msg->lbID.idx);
01969 CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
01970 CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
01971 };
01972
01976 void _recvGlobalStepHandler(LBStepMsg *msg){
01977
01978
01979 restartDecisionNumber = msg->step;
01980 CmiFree(msg);
01981
01982 CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
01983
01984
01985 std::vector<CkObjID> objectVec;
01986 forAllCharesDo(createObjIDList, (void *)&objectVec);
01987 int numberObjects = objectVec.size();
01988
01989
01990 int totalSize = sizeof(ResendRequest) + numberObjects * sizeof(CkObjID);
01991 char *resendMsg = (char *)CmiAlloc(totalSize);
01992
01993 ResendRequest *resendReq = (ResendRequest *)resendMsg;
01994 resendReq->PE = CkMyPe();
01995 resendReq->numberObjects = numberObjects;
01996 char *objList = &resendMsg[sizeof(ResendRequest)];
01997 memcpy(objList,objectVec.getVec(),numberObjects * sizeof(CkObjID));
01998
01999 if(restartDecisionNumber != -1){
02000 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02001 CpvAccess(_currentObj) = lb;
02002 lb->ReceiveDummyMigration(restartDecisionNumber);
02003 }
02004
02005 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02006 CmiSyncBroadcastAllAndFree(totalSize, resendMsg);
02007
02008
02009 if(fastRecovery){
02010 distributeRestartedObjects();
02011 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02012 }
02013
02014 };
02015
02019 void _messageLoggingExit(){
02020
02021
02022 if(CkMyPe() == 0)
02023 printf("[%d] FastMessageLoggingExit \n",CmiMyPe());
02024
02025 #if COLLECT_STATS_MSGS
02026 #if COLLECT_STATS_MSGS_TOTAL
02027 printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
02028 printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
02029 #else
02030 printf("[%d] TARGETS: ",CmiMyPe());
02031 for(int i=0; i<CmiNumPes(); i++){
02032 #if COLLECT_STATS_MSG_COUNT
02033 printf("%d ",numMsgsTarget[i]);
02034 #else
02035 printf("%d ",sizeMsgsTarget[i]);
02036 #endif
02037 }
02038 printf("\n");
02039 #endif
02040 #endif
02041 }
02042
02043 void MlogEntry::pup(PUP::er &p){
02044 p | destPE;
02045 p | _infoIdx;
02046 int size;
02047 if(!p.isUnpacking()){
02048
02049
02050
02051
02052 if(env == NULL){
02053
02054 size = 0;
02055 }else{
02056 size = env->getTotalsize();
02057 }
02058 }
02059
02060 p | size;
02061 if(p.isUnpacking()){
02062 if(size > 0){
02063 env = (envelope *)_allocEnv(ForChareMsg,size);
02064 }else{
02065 env = NULL;
02066 }
02067 }
02068 if(size > 0){
02069 p((char *)env,size);
02070 }
02071 };
02072
02073
02074
02075
02076
02077
02078
02079 MCount ChareMlogData::nextSN(const CkObjID &recver){
02080 MCount *SN = ssnTable.getPointer(recver);
02081 if(SN==NULL){
02082 ssnTable.put(recver) = 1;
02083 return 1;
02084 }else{
02085 (*SN)++;
02086 return *SN;
02087 }
02088 };
02089
02093 void ChareMlogData::addLogEntry(MlogEntry *entry){
02094 DEBUG(char nameString[100]);
02095 DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
02096 DEBUG_MEM(CmiMemoryCheck());
02097
02098
02099 mlog.enq(entry);
02100 };
02101
02105 int ChareMlogData::checkAndStoreSsn(const CkObjID &sender, MCount ssn){
02106 RSSN *rssn;
02107 rssn = receivedSsnTable.get(sender);
02108 if(rssn == NULL){
02109 rssn = new RSSN();
02110 receivedSsnTable.put(sender) = rssn;
02111 }
02112 return rssn->checkAndStore(ssn);
02113 }
02114
02120 void ChareMlogData::pup(PUP::er &p){
02121 int startSize=0;
02122 char nameStr[100];
02123 if(p.isSizing()){
02124 PUP::sizer *sizep = (PUP::sizer *)&p;
02125 startSize = sizep->size();
02126 }
02127 p | objID;
02128 if(p.isUnpacking()){
02129 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
02130 }
02131 p | toResumeOrNot;
02132 p | resumeCount;
02133 DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
02134
02135 ssnTable.pup(p);
02136
02137
02138 int rssnTableSize;
02139 if(!p.isUnpacking()){
02140 rssnTableSize = receivedSsnTable.numObjects();
02141 }
02142
02143 p | rssnTableSize;
02144 if(!p.isUnpacking()){
02145 CkHashtableIterator *iter = receivedSsnTable.iterator();
02146 while(iter->hasNext()){
02147 CkObjID *objID;
02148 RSSN **row = (RSSN **)iter->next((void **)&objID);
02149 p | (*objID);
02150 (*row)->pup(p);
02151 }
02152 delete iter;
02153 }else{
02154 for(int i=0; i<rssnTableSize; i++){
02155 CkObjID objID;
02156 p | objID;
02157 RSSN *row = new RSSN;
02158 row->pup(p);
02159 receivedSsnTable.put(objID) = row;
02160 }
02161 }
02162
02163 p | resendReplyRecvd;
02164 p | restartFlag;
02165
02166 if(p.isSizing()){
02167 PUP::sizer *sizep = (PUP::sizer *)&p;
02168 int pupSize = sizep->size()-startSize;
02169 DEBUG(char name[40]);
02170 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
02171
02172 }
02173
02174 double _finTime = CkWallTimer();
02175 DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
02176 };
02177
02178
02179
02180
02187 int getCheckPointPE(){
02188 return (CmiMyPe() + 1) % CmiNumPes();
02189 }
02190
02194 int getReverseCheckPointPE(){
02195 return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
02196 }
02197
02198
02199 envelope *copyEnvelope(envelope *env){
02200 envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
02201 memcpy(newEnv,env,env->getTotalsize());
02202 return newEnv;
02203 }
02204
02205 #endif