00001
00008 #include "charm.h"
00009 #include "ck.h"
00010 #include "ckcausalmlog.h"
00011 #include "queueing.h"
00012 #include <sys/types.h>
00013 #include <signal.h>
00014 #include "CentralLB.h"
00015
00016 #ifdef _FAULT_CAUSAL_
00017
00018
00019
00020
00021 #define COLLECT_STATS_MSGS 0
00022 #define COLLECT_STATS_MSGS_TOTAL 0
00023 #define COLLECT_STATS_MSG_COUNT 0
00024 #define COLLECT_STATS_DETS 0
00025 #define COLLECT_STATS_DETS_DUP 0
00026 #define COLLECT_STATS_MEMORY 0
00027 #define COLLECT_STATS_TEAM 0
00028
00029 #define RECOVERY_SEND "SEND"
00030 #define RECOVERY_PROCESS "PROCESS"
00031
00032 #define DEBUG_MEM(x) //x
00033 #define DEBUG(x) // x
00034 #define DEBUG_RESTART(x) //x
00035 #define DEBUGLB(x) // x
00036 #define DEBUG_TEAM(x) // x
00037 #define DEBUG_PERF(x) // x
00038 #define DEBUG_CHECKPOINT 1
00039 #define DEBUG_NOW(x) x
00040 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
00041 #define DEBUG_RECOVERY(x) //x
00042
00043 extern const char *idx2str(const CkArrayIndex &ind);
00044 extern const char *idx2str(const ArrayElement *el);
00045
00046 void getGlobalStep(CkGroupID gID);
00047
00048 bool fault_aware(CkObjID &recver);
00049 void sendCheckpointData(int mode);
00050 void createObjIDList(void *data,ChareMlogData *mlogData);
00051 inline bool isLocal(int destPE);
00052 inline bool isTeamLocal(int destPE);
00053 void printLog(TProcessedLog *log);
00054
00055 int _restartFlag=0;
00056 int _numRestartResponses=0;
00057
00058
00059 int countHashRefs=0;
00060 int countHashCollisions=0;
00061
00062 char *checkpointDirectory=".";
00063 int unAckedCheckpoint=0;
00064
00065 int countLocal=0,countBuffered=0;
00066 int countPiggy=0;
00067 int countClearBufferedLocalCalls=0;
00068
00069 int countUpdateHomeAcks=0;
00070
00071 extern int teamSize;
00072 extern int chkptPeriod;
00073 extern bool fastRecovery;
00074 extern int parallelRecovery;
00075
00076 char *killFile;
00077 char *faultFile;
00078 int killFlag=0;
00079 int faultFlag=0;
00080 int restartingMlogFlag=0;
00081 void readKillFile();
00082 double killTime=0.0;
00083 double faultMean;
00084 int checkpointCount=0;
00085
00086 CpvDeclare(Chare *,_currentObj);
00087 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
00088 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
00089 CpvDeclare(Queue, _outOfOrderMessageQueue);
00090 CpvDeclare(Queue, _delayedRemoteMessageQueue);
00091 CpvDeclare(char **,_bufferedTicketRequests);
00092 CpvDeclare(int *,_numBufferedTicketRequests);
00093
00094
00107
00108 CpvDeclare(char *, _localDets);
00109
00110 int _numBufferedDets;
00111
00112 int _indexBufferedDets;
00113
00114 int _phaseBufferedDets;
00115
00116
00117 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
00118
00119 int _maxBufferedDets;
00120
00121
00122 CpvDeclare(char *, _incarnation);
00123
00124
00125
00126
00127 CpvDeclare(CkVec<LocationID *> *, _emigrantRecObjs);
00128 CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
00129
00130
00131 #if COLLECT_STATS_MSGS
00132 int *numMsgsTarget;
00133 int *sizeMsgsTarget;
00134 int totalMsgsTarget;
00135 float totalMsgsSize;
00136 #endif
00137 #if COLLECT_STATS_DETS
00138 int numPiggyDets;
00139 int numDets;
00140 int numDupDets;
00141 #endif
00142 #if COLLECT_STATS_MEMORY
00143 int msgLogSize;
00144 int bufferedDetsSize;
00145 int storedDetsSize;
00146 #endif
00147
00148 #if COLLECT_STATS_TEAM
00149 float MLOGFT_totalLogSize = 0.0;
00150 float MLOGFT_totalMessages = 0.0;
00151 #endif
00152
00153 static double adjustChkptPeriod=0.0;
00154 static double nextCheckpointTime=0.0;
00155 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
00156
00157 int _pingHandlerIdx;
00158
00159 char objString[100];
00160 int _checkpointRequestHandlerIdx;
00161 int _storeCheckpointHandlerIdx;
00162 int _checkpointAckHandlerIdx;
00163 int _getCheckpointHandlerIdx;
00164 int _recvCheckpointHandlerIdx;
00165 int _removeProcessedLogHandlerIdx;
00166
00167 int _verifyAckRequestHandlerIdx;
00168 int _verifyAckHandlerIdx;
00169 int _dummyMigrationHandlerIdx;
00170
00171
00172 int _getGlobalStepHandlerIdx;
00173 int _recvGlobalStepHandlerIdx;
00174
00175 int _updateHomeRequestHandlerIdx;
00176 int _updateHomeAckHandlerIdx;
00177 int _resendMessagesHandlerIdx;
00178 int _sendDetsHandlerIdx;
00179 int _sendDetsReplyHandlerIdx;
00180 int _receivedTNDataHandlerIdx;
00181 int _receivedDetDataHandlerIdx;
00182 int _distributedLocationHandlerIdx;
00183 int _storeDeterminantsHandlerIdx;
00184 int _removeDeterminantsHandlerIdx;
00185
00186
00187 int _restartHandlerIdx;
00188 int _getRestartCheckpointHandlerIdx;
00189 int _recvRestartCheckpointHandlerIdx;
00190 void setTeamRecovery(void *data, ChareMlogData *mlogData);
00191 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
00192
00193 int verifyAckTotal;
00194 int verifyAckCount;
00195
00196 int verifyAckedRequests=0;
00197
00198 RestartRequest *storedRequest;
00199
00200 int _falseRestart =0;
00207
00208 int onGoingLoadBalancing=0;
00209 void *centralLb;
00210 void (*resumeLbFnPtr)(void *);
00211 int _receiveMlogLocationHandlerIdx;
00212 int _receiveMigrationNoticeHandlerIdx;
00213 int _receiveMigrationNoticeAckHandlerIdx;
00214 int _checkpointBarrierHandlerIdx;
00215 int _checkpointBarrierAckHandlerIdx;
00216
00217 CkVec<MigrationRecord> migratedNoticeList;
00218 CkVec<RetainedMigratedObject *> retainedObjectList;
00219 int donotCountMigration=0;
00220 int countLBMigratedAway=0;
00221 int countLBToMigrate=0;
00222 int migrationDoneCalled=0;
00223 int checkpointBarrierCount=0;
00224 int globalResumeCount=0;
00225 CkGroupID globalLBID;
00226 int restartDecisionNumber=-1;
00227
00228 double lastCompletedAlarm=0;
00229 double lastRestart=0;
00230
00231
00232 int _receiveLocationHandlerIdx;
00233
00237 void _messageLoggingInit(){
00238
00239 CpvInitialize(Chare *,_currentObj);
00240
00241
00242 _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
00243
00244
00245 _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
00246 _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
00247 _removeProcessedLogHandlerIdx = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
00248 _checkpointRequestHandlerIdx = CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
00249
00250
00251 _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
00252 _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
00253 _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
00254 _updateHomeAckHandlerIdx = CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
00255 _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
00256 _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
00257 _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
00258 _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
00259 _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
00260 _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
00261 _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
00262 _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
00263 _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
00264
00265
00266 _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
00267 _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
00268 _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
00269
00270
00271 _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
00272 _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
00273
00274
00275 _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
00276 _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
00277 _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
00278 _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
00279 _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
00280 _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
00281 _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
00282
00283
00284 _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
00285
00286
00287 CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
00288 CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
00289 CpvInitialize(Queue, _outOfOrderMessageQueue);
00290 CpvInitialize(Queue, _delayedRemoteMessageQueue);
00291 CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
00292 CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
00293
00294 CpvInitialize(char **,_bufferedTicketRequests);
00295 CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
00296 CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
00297 for(int i=0;i<CkNumPes();i++){
00298 CpvAccess(_bufferedTicketRequests)[i]=NULL;
00299 CpvAccess(_numBufferedTicketRequests)[i]=0;
00300 }
00301
00302
00303 _numBufferedDets = 0;
00304 _indexBufferedDets = 0;
00305 _phaseBufferedDets = 0;
00306 _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
00307 CpvInitialize(char *, _localDets);
00308 CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
00309 CpvInitialize(char *, _incarnation);
00310 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00311 CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
00312 CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
00313 for(int i=0; i<CmiNumPes(); i++){
00314 CpvAccess(_incarnation)[i] = 0;
00315 }
00316
00317
00318 CpvInitialize(CkVec<LocationID *> *, _emigrantRecObjs);
00319 CpvAccess(_emigrantRecObjs) = new CkVec<LocationID *>;
00320 CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
00321 CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
00322
00323
00324 CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
00325 CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
00326
00327
00328 traceRegisterUserEvent("Remove Logs", 20);
00329 traceRegisterUserEvent("Ticket Request Handler", 21);
00330 traceRegisterUserEvent("Ticket Handler", 22);
00331 traceRegisterUserEvent("Local Message Copy Handler", 23);
00332 traceRegisterUserEvent("Local Message Ack Handler", 24);
00333 traceRegisterUserEvent("Preprocess current message",25);
00334 traceRegisterUserEvent("Preprocess past message",26);
00335 traceRegisterUserEvent("Preprocess future message",27);
00336 traceRegisterUserEvent("Checkpoint",28);
00337 traceRegisterUserEvent("Checkpoint Store",29);
00338 traceRegisterUserEvent("Checkpoint Ack",30);
00339 traceRegisterUserEvent("Send Ticket Request",31);
00340 traceRegisterUserEvent("Generalticketrequest1",32);
00341 traceRegisterUserEvent("TicketLogLocal",33);
00342 traceRegisterUserEvent("next_ticket and SN",34);
00343 traceRegisterUserEvent("Timeout for buffered remote messages",35);
00344 traceRegisterUserEvent("Timeout for buffered local messages",36);
00345 traceRegisterUserEvent("Inform Location Home",37);
00346 traceRegisterUserEvent("Receive Location Handler",38);
00347
00348 lastCompletedAlarm=CmiWallTimer();
00349 lastRestart = CmiWallTimer();
00350
00351 #if COLLECT_STATS_MSGS
00352 #if COLLECT_STATS_MSGS_TOTAL
00353 totalMsgsTarget = 0;
00354 totalMsgsSize = 0.0;
00355 #else
00356 numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00357 sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00358 for(int i=0; i<CmiNumPes(); i++){
00359 numMsgsTarget[i] = 0;
00360 sizeMsgsTarget[i] = 0;
00361 }
00362 #endif
00363 #endif
00364 #if COLLECT_STATS_DETS
00365 numPiggyDets = 0;
00366 numDets = 0;
00367 numDupDets = 0;
00368 #endif
00369 #if COLLECT_STATS_MEMORY
00370 msgLogSize = 0;
00371 bufferedDetsSize = 0;
00372 storedDetsSize = 0;
00373 #endif
00374
00375
00376 }
00377
00378 void killLocal(void *_dummy,double curWallTime);
00379
00380 void readKillFile(){
00381 FILE *fp=fopen(killFile,"r");
00382 if(!fp){
00383 return;
00384 }
00385 int proc;
00386 double sec;
00387 while(fscanf(fp,"%d %lf",&proc,&sec)==2){
00388 if(proc == CkMyPe()){
00389 killTime = CmiWallTimer()+sec;
00390 printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
00391 CcdCallFnAfter(killLocal,NULL,sec*1000);
00392 }
00393 }
00394 fclose(fp);
00395 }
00396
00401 void readFaultFile(){
00402 FILE *fp=fopen(faultFile,"r");
00403 if(!fp){
00404 return;
00405 }
00406 int proc;
00407 double sec;
00408 fscanf(fp,"%d %lf",&proc,&sec);
00409 faultMean = sec;
00410 if(proc == CkMyPe()){
00411 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
00412 CcdCallFnAfter(killLocal,NULL,sec*1000);
00413 }
00414 fclose(fp);
00415 }
00416
00417 void killLocal(void *_dummy,double curWallTime){
00418 printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
00419 if(CmiWallTimer()<killTime-1){
00420 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);
00421 }else{
00422 kill(getpid(),SIGKILL);
00423 }
00424 }
00425
00426
00427
00432 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
00433 Determinant *det, *auxDet;
00434 char *aux;
00435
00436 DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
00437
00438
00439 if(_indexBufferedDets >= _maxBufferedDets){
00440 aux = CpvAccess(_localDets);
00441 _maxBufferedDets *= 2;
00442 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00443 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
00444 CmiFree(aux);
00445 }
00446
00447
00448 det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
00449 det->sender = sender;
00450 det->receiver = receiver;
00451 det->SN = SN;
00452 det->TN = TN;
00453 _numBufferedDets++;
00454 _indexBufferedDets++;
00455
00456 #if COLLECT_STATS_MEMORY
00457 bufferedDetsSize++;
00458 #endif
00459 #if COLLECT_STATS_DETS
00460 numDets++;
00461 #endif
00462 }
00463
00464
00465
00469 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
00470 if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
00471 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
00472 void *origMsg = EnvToUsr(env);
00473 for(int i=0;i<CmiNumPes();i++){
00474 if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
00475 void *copyMsg = CkCopyMsg(&origMsg);
00476 envelope *copyEnv = UsrToEnv(copyMsg);
00477 copyEnv->SN=0;
00478 copyEnv->TN=0;
00479 copyEnv->sender.type = TypeInvalid;
00480 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
00481 sendGroupMsg(copyEnv,i,_infoIdx);
00482 }
00483 }
00484 return;
00485 }
00486
00487
00488 env->SN=0;
00489 env->TN=0;
00490 env->sender.type = TypeInvalid;
00491
00492 CkObjID recver;
00493 recver.type = TypeGroup;
00494 recver.data.group.id = env->getGroupNum();
00495 recver.data.group.onPE = destPE;
00496 sendCommonMsg(recver,env,destPE,_infoIdx);
00497 }
00498
00502 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
00503 if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
00504 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
00505 void *origMsg = EnvToUsr(env);
00506 for(int i=0;i<CmiNumNodes();i++){
00507 if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
00508 void *copyMsg = CkCopyMsg(&origMsg);
00509 envelope *copyEnv = UsrToEnv(copyMsg);
00510 copyEnv->SN=0;
00511 copyEnv->TN=0;
00512 copyEnv->sender.type = TypeInvalid;
00513 sendNodeGroupMsg(copyEnv,i,_infoIdx);
00514 }
00515 }
00516 return;
00517 }
00518
00519
00520 env->SN=0;
00521 env->TN=0;
00522 env->sender.type = TypeInvalid;
00523
00524 CkObjID recver;
00525 recver.type = TypeNodeGroup;
00526 recver.data.group.id = env->getGroupNum();
00527 recver.data.group.onPE = destNode;
00528 sendCommonMsg(recver,env,destNode,_infoIdx);
00529 }
00530
00534 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
00535 CkObjID recver;
00536 recver.type = TypeArray;
00537 recver.data.array.id = env->getsetArrayMgr();
00538 recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
00539
00540 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00541 char recverString[100],senderString[100];
00542
00543 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00544 }
00545
00546
00547 env->SN = 0;
00548 env->TN = 0;
00549
00550 sendCommonMsg(recver,env,destPE,_infoIdx);
00551 };
00552
00556 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
00557 CkObjID recver;
00558 recver.type = TypeChare;
00559 recver.data.chare.id = *pCid;
00560
00561 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00562 char recverString[100],senderString[100];
00563
00564 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00565 }
00566
00567
00568 env->SN = 0;
00569 env->TN = 0;
00570
00571 sendCommonMsg(recver,env,destPE,_infoIdx);
00572 };
00573
00577 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
00578 envelope *env = _env;
00579 MCount ticketNumber = 0;
00580 int resend=0;
00581 char recverName[100];
00582 double _startTime=CkWallTimer();
00583
00584 DEBUG_MEM(CmiMemoryCheck());
00585
00586 if(CpvAccess(_currentObj) == NULL){
00587
00588 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
00589 generalCldEnqueue(destPE,env,_infoIdx);
00590 return;
00591 }
00592
00593
00594 env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
00595 if(env->sender.type == TypeInvalid){
00596 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00597 }else{
00598 envelope *copyEnv = copyEnvelope(env);
00599 env = copyEnv;
00600 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00601 env->SN = 0;
00602 }
00603
00604 DEBUG_MEM(CmiMemoryCheck());
00605
00606 CkObjID &sender = env->sender;
00607 env->recver = recver;
00608
00609 Chare *obj = (Chare *)env->sender.getObject();
00610
00611 if(env->SN == 0){
00612 DEBUG_MEM(CmiMemoryCheck());
00613 env->SN = obj->mlogData->nextSN(recver);
00614 }else{
00615 resend = 1;
00616 }
00617
00618 char senderString[100];
00619
00620 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
00621
00622
00623
00624
00625
00626 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
00627
00628
00629
00630 _startTime = CkWallTimer();
00631
00632
00633 if(isLocal(destPE)){
00634 sendLocalMsg(mEntry);
00635 }else{
00636 if((teamSize > 1) && isTeamLocal(destPE)){
00637
00638
00639 Chare *senderObj = (Chare *)sender.getObject();
00640 SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
00641 if(ticketRow != NULL){
00642 Ticket ticket = ticketRow->get(env->SN);
00643 if(ticket.TN != 0){
00644 ticketNumber = ticket.TN;
00645 DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
00646 }
00647 }
00648 }
00649
00650
00651 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
00652
00653 }
00654 }
00655
00660 inline bool isLocal(int destPE){
00661
00662 if(destPE == CkMyPe())
00663 return true;
00664
00665 return false;
00666 }
00667
00672 inline bool isTeamLocal(int destPE){
00673
00674
00675 if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
00676 return true;
00677
00678 return false;
00679 }
00680
00684 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
00685 DEBUG_NOW(char recverString[100]);
00686 DEBUG_NOW(char senderString[100]);
00687
00688 int totalSize;
00689 StoreDeterminantsHeader header;
00690 int sizes[2];
00691 char *ptrs[2];
00692
00693 envelope *env = entry->env;
00694 DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
00695
00696
00697 Chare *obj = (Chare *)entry->env->sender.getObject();
00698 entry->env->recver = recver;
00699 entry->env->SN = SN;
00700 entry->env->TN = TN;
00701 if(!resend){
00702
00703 if(!isTeamLocal(entry->destPE)){
00704 obj->mlogData->addLogEntry(entry);
00705 #if COLLECT_STATS_TEAM
00706 MLOGFT_totalMessages += 1.0;
00707 MLOGFT_totalLogSize += entry->env->getTotalsize();
00708 #endif
00709 }else{
00710
00711 entry->env->freeMsg = true;
00712 }
00713 }
00714
00715
00716 if(_numBufferedDets > 0){
00717
00718
00719 header.number = _numBufferedDets;
00720 header.index = _indexBufferedDets;
00721 header.phase = _phaseBufferedDets;
00722 header.PE = CmiMyPe();
00723
00724
00725 sizes[0] = sizeof(StoreDeterminantsHeader);
00726 sizes[1] = _numBufferedDets * sizeof(Determinant);
00727 ptrs[0] = (char *) &header;
00728 ptrs[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
00729 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
00730 CmiSetHandler(&header, _storeDeterminantsHandlerIdx);
00731 CmiSyncVectorSend(destPE, 2, &sizes[0], &ptrs[0]);
00732 }
00733
00734
00735 entry->indexBufDets = _indexBufferedDets;
00736 entry->numBufDets = _numBufferedDets;
00737
00738
00739 generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
00740
00741 DEBUG_MEM(CmiMemoryCheck());
00742 #if COLLECT_STATS_MSGS
00743 #if COLLECT_STATS_MSGS_TOTAL
00744 totalMsgsTarget++;
00745 totalMsgsSize += (float)env->getTotalsize();
00746 #else
00747 numMsgsTarget[destPE]++;
00748 sizeMsgsTarget[destPE] += env->getTotalsize();
00749 #endif
00750 #endif
00751 #if COLLECT_STATS_DETS
00752 numPiggyDets += _numBufferedDets;
00753 #endif
00754 #if COLLECT_STATS_MEMORY
00755 msgLogSize += env->getTotalsize();
00756 #endif
00757 };
00758
00759
00765 void sendLocalMsg(MlogEntry *entry){
00766 DEBUG_PERF(double _startTime=CkWallTimer());
00767 DEBUG_MEM(CmiMemoryCheck());
00768 DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
00769 DEBUG(char senderString[100]);
00770 DEBUG(char recverString[100]);
00771 Ticket ticket;
00772
00773 DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
00774
00775
00776 Chare *recverObj = (Chare *)entry->env->recver.getObject();
00777
00778
00779 if(recverObj){
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816 _skipCldEnqueue(CmiMyPe(),entry->env,entry->_infoIdx);
00817
00818 DEBUG_MEM(CmiMemoryCheck());
00819 }else{
00820 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
00821 }
00822
00823 };
00824
00825
00826
00827
00828
00829
00830
00834 void _removeDeterminantsHandler(char *buffer){
00835 RemoveDeterminantsHeader *header;
00836 int index, phase;
00837
00838
00839 header = (RemoveDeterminantsHeader *)buffer;
00840 index = header->index;
00841 phase = header->phase;
00842
00843
00844
00845
00846 if(phase == _phaseBufferedDets){
00847 if(index > _indexBufferedDets)
00848 CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
00849 CmiAssert(index <= _indexBufferedDets);
00850 _numBufferedDets = _indexBufferedDets - index;
00851 }
00852
00853
00854 CmiFree(buffer);
00855
00856 }
00857
00861 void _storeDeterminantsHandler(char *buffer){
00862 StoreDeterminantsHeader *header;
00863 RemoveDeterminantsHeader removeHeader;
00864 Determinant *detPtr, *det;
00865 int i, n, index, phase, destPE;
00866 CkVec<Determinant> *vec;
00867
00868
00869
00870 header = (StoreDeterminantsHeader *)buffer;
00871 n = header->number;
00872 index = header->index;
00873 phase = header->phase;
00874 destPE = header->PE;
00875 detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
00876
00877 DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
00878 DEBUG_MEM(CmiMemoryCheck());
00879
00880
00881 for(i = 0; i < n; i++){
00882 det = new Determinant();
00883 det->sender = detPtr->sender;
00884 det->receiver = detPtr->receiver;
00885 det->SN = detPtr->SN;
00886 det->TN = detPtr->TN;
00887
00888
00889 vec = CpvAccess(_remoteDets)->get(det->receiver);
00890 if(vec == NULL){
00891 vec = new CkVec<Determinant>();
00892 CpvAccess(_remoteDets)->put(det->receiver) = vec;
00893 }
00894 #if COLLECT_STATS_DETS
00895 #if COLLECT_STATS_DETS_DUP
00896 for(int j=0; j<vec->size(); j++){
00897 if(isSameDet(&(*vec)[j],det)){
00898 numDupDets++;
00899 break;
00900 }
00901 }
00902 #endif
00903 #endif
00904 #if COLLECT_STATS_MEMORY
00905 storedDetsSize++;
00906 #endif
00907 vec->push_back(*det);
00908 detPtr = detPtr++;
00909 }
00910
00911 DEBUG_MEM(CmiMemoryCheck());
00912
00913
00914 CmiFree(buffer);
00915
00916
00917 removeHeader.index = index;
00918 removeHeader.phase = phase;
00919 CmiSetHandler(&removeHeader,_removeDeterminantsHandlerIdx);
00920 CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)&removeHeader);
00921
00922 }
00923
00928 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
00929 DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
00930 double _startTime = CkWallTimer();
00931 CmiFree(ticketRequest);
00932
00933 }
00934
00935
00941 inline bool _getTicket(envelope *env, int *flag){
00942 DEBUG_MEM(CmiMemoryCheck());
00943 DEBUG(char recverName[100]);
00944 DEBUG(char senderString[100]);
00945 Ticket ticket;
00946
00947
00948 CkObjID sender = env->sender;
00949 CkObjID recver = env->recver;
00950 MCount SN = env->SN;
00951 MCount TN = env->TN;
00952 Chare *recverObj = (Chare *)recver.getObject();
00953
00954 DEBUG(recver.toString(recverName);)
00955
00956
00957 if(recverObj->mlogData->restartFlag)
00958 return false;
00959
00960
00961 ticket = recverObj->mlogData->getTicket(sender, SN);
00962 TN = ticket.TN;
00963
00964
00965 if(teamSize > 1 && TN != 0){
00966 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
00967 recverObj->mlogData->verifyTicket(sender,SN,TN);
00968 }
00969
00970
00971
00972 if(TN == 0){
00973 ticket = recverObj->mlogData->next_ticket(sender,SN);
00974 *flag = NEW_TICKET;
00975 } else {
00976 *flag = OLD_TICKET;
00977 }
00978 if(ticket.TN > recverObj->mlogData->tProcessed){
00979 ticket.state = NEW_TICKET;
00980 } else {
00981 ticket.state = OLD_TICKET;
00982 }
00983
00984 if(ticket.TN == 0){
00985 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
00986 return false;
00987 }
00988
00989
00990 env->TN = ticket.TN;
00991 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
00992 return true;
00993
00994 };
00995
00996 bool fault_aware(CkObjID &recver){
00997 switch(recver.type){
00998 case TypeChare:
00999 return true;
01000 case TypeMainChare:
01001 return false;
01002 case TypeGroup:
01003 case TypeNodeGroup:
01004 case TypeArray:
01005 return true;
01006 default:
01007 return false;
01008 }
01009 };
01010
01011
01012 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
01013 DEBUG_NOW(char recverString[100]);
01014 DEBUG_NOW(char senderString[100]);
01015 DEBUG_MEM(CmiMemoryCheck());
01016 int flag;
01017 bool ticketSuccess;
01018
01019
01020 CkObjID recver = env->recver;
01021 if(!fault_aware(recver)){
01022 return 1;
01023 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
01024 }
01025
01026 Chare *obj = (Chare *)recver.getObject();
01027 *objPointer = obj;
01028 if(obj == NULL){
01029 int possiblePE = recver.guessPE();
01030 if(possiblePE != CkMyPe()){
01031 int totalSize = env->getTotalsize();
01032 CmiSyncSend(possiblePE,totalSize,(char *)env);
01033
01034 DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
01035 }else{
01036
01037
01038 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01039
01040 DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
01041 }
01042 return 0;
01043 }
01044
01045
01046
01047 if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
01048 CmiFree(env);
01049 return 0;
01050 }
01051
01052 DEBUG_MEM(CmiMemoryCheck());
01053 DEBUG_PE(0,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01054
01055
01056 ticketSuccess = _getTicket(env,&flag);
01057
01058
01059 if(!ticketSuccess){
01060
01061
01062 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
01063 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
01064
01065 return 0;
01066 }
01067
01068
01069
01070
01071 if(flag == NEW_TICKET){
01072
01073 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
01074 }
01075
01076 DEBUG_MEM(CmiMemoryCheck());
01077
01078 double _startTime = CkWallTimer();
01079
01080 if(env->TN == obj->mlogData->tProcessed+1){
01081
01082 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
01083
01084
01085 DEBUG_MEM(CmiMemoryCheck());
01086 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
01087 void *qMsgPtr;
01088 CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
01089 envelope *qEnv = (envelope *)qMsgPtr;
01090 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
01091 DEBUG_MEM(CmiMemoryCheck());
01092 }
01093
01094
01095
01096 DEBUG_MEM(CmiMemoryCheck());
01097 return 1;
01098 }
01099
01100
01101
01102 if(env->TN <= obj->mlogData->tProcessed){
01103 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
01104
01105 CmiFree(env);
01106 return 0;
01107 }
01108
01109
01110 DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01111
01112
01113 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01114
01115 DEBUG_MEM(CmiMemoryCheck());
01116
01117 return 0;
01118 }
01119
01123 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
01124 DEBUG(char senderString[100]);
01125 if(obj){
01126 if(sender.guessPE() == CkMyPe()){
01127 if(entry != NULL){
01128 entry->env = NULL;
01129 }
01130 }
01131 obj->mlogData->tProcessed++;
01132
01133
01134
01135 }
01136 DEBUG_MEM(CmiMemoryCheck());
01137 }
01138
01139
01140
01141
01142
01143 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
01144
01145 if(env->recver.type != TypeNodeGroup){
01146
01147
01148
01149
01150
01151 _skipCldEnqueue(destPE,env,_infoIdx);
01152 }else{
01153 _noCldNodeEnqueue(destPE,env);
01154 }
01155
01156 }
01157
01162 int calledRetryTicketRequest=0;
01163
01164 void _pingHandler(CkPingMsg *msg){
01165 printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
01166 CmiFree(msg);
01167 }
01168
01169
01170
01171
01172
01173
01174
01175 CkVec<TProcessedLog> processedTicketLog;
01176 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
01177 void clearUpMigratedRetainedLists(int PE);
01178
01179 void checkpointAlarm(void *_dummy,double curWallTime){
01180 double diff = curWallTime-lastCompletedAlarm;
01181 DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
01182
01183
01184
01185
01186 if(diff < ((chkptPeriod) - 2)){
01187 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
01188 return;
01189 }
01190 CheckpointRequest request;
01191 request.PE = CkMyPe();
01192 CmiSetHandler(&request,_checkpointRequestHandlerIdx);
01193 CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
01194 };
01195
01196 void _checkpointRequestHandler(CheckpointRequest *request){
01197 startMlogCheckpoint(NULL,CmiWallTimer());
01198 }
01199
01203 void startMlogCheckpoint(void *_dummy, double curWallTime){
01204 double _startTime = CkWallTimer();
01205
01206
01207 checkpointCount++;
01208
01209 #if DEBUG_CHECKPOINT
01210 if(CmiMyPe() == 0){
01211 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
01212 }
01213 #endif
01214
01215 DEBUG_MEM(CmiMemoryCheck());
01216
01217 PUP::sizer psizer;
01218 psizer | checkpointCount;
01219 for(int i=0; i<CmiNumPes(); i++){
01220 psizer | CpvAccess(_incarnation)[i];
01221 }
01222 CkPupROData(psizer);
01223 DEBUG_MEM(CmiMemoryCheck());
01224 CkPupGroupData(psizer,CmiTrue);
01225 DEBUG_MEM(CmiMemoryCheck());
01226 CkPupNodeGroupData(psizer,CmiTrue);
01227 DEBUG_MEM(CmiMemoryCheck());
01228 pupArrayElementsSkip(psizer,CmiTrue,NULL);
01229 DEBUG_MEM(CmiMemoryCheck());
01230
01231 int dataSize = psizer.size();
01232 int totalSize = sizeof(CheckPointDataMsg)+dataSize;
01233 char *msg = (char *)CmiAlloc(totalSize);
01234 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01235 chkMsg->PE = CkMyPe();
01236 chkMsg->dataSize = dataSize;
01237
01238 char *buf = &msg[sizeof(CheckPointDataMsg)];
01239 PUP::toMem pBuf(buf);
01240
01241 pBuf | checkpointCount;
01242 for(int i=0; i<CmiNumPes(); i++){
01243 pBuf | CpvAccess(_incarnation)[i];
01244 }
01245 CkPupROData(pBuf);
01246 CkPupGroupData(pBuf,CmiTrue);
01247 CkPupNodeGroupData(pBuf,CmiTrue);
01248 pupArrayElementsSkip(pBuf,CmiTrue,NULL);
01249
01250 unAckedCheckpoint=1;
01251 CmiSetHandler(msg,_storeCheckpointHandlerIdx);
01252 CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
01253
01254
01255
01256
01257 processedTicketLog.removeAll();
01258 forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
01259
01260 #if DEBUG_CHECKPOINT
01261 if(CmiMyPe() == 0){
01262 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
01263 }
01264 #endif
01265
01266 #if COLLECT_STATS_MEMORY
01267 CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
01268 msgLogSize = 0;
01269 bufferedDetsSize = 0;
01270 storedDetsSize = 0;
01271 #endif
01272
01273 if(CkMyPe() == 0 && onGoingLoadBalancing==0 ){
01274 lastCompletedAlarm = curWallTime;
01275 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
01276 }
01277 traceUserBracketEvent(28,_startTime,CkWallTimer());
01278 };
01279
01283 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
01284 DEBUG(char objString[100]);
01285
01286 CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
01287 TProcessedLog logEntry;
01288 logEntry.recver = mlogData->objID;
01289 logEntry.tProcessed = mlogData->tProcessed;
01290 log->push_back(logEntry);
01291
01292 DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
01293 }
01294
01295 class ElementPacker : public CkLocIterator {
01296 private:
01297 CkLocMgr *locMgr;
01298 PUP::er &p;
01299 public:
01300 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
01301 void addLocation(CkLocation &loc) {
01302 CkArrayIndexMax idx=loc.getIndex();
01303 CkGroupID gID = locMgr->ckGetGroupID();
01304 p|gID;
01305 p|idx;
01306 p|loc;
01307 }
01308 };
01309
01313 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
01314 int numElements,i;
01315 int numGroups = CkpvAccess(_groupIDTable)->size();
01316 if(!p.isUnpacking()){
01317 numElements = CkCountArrayElements();
01318 }
01319 p | numElements;
01320 DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
01321 if(!p.isUnpacking()){
01322 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
01323 }else{
01324
01325
01326 for(int j=0;j<listsize;j++){
01327 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01328 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
01329 listToSkip[j].idx.print();
01330 }
01331 }
01332
01333 printf("numElements = %d\n",numElements);
01334
01335 for (int i=0; i<numElements; i++) {
01336 CkGroupID gID;
01337 CkArrayIndexMax idx;
01338 p|gID;
01339 p|idx;
01340 int flag=0;
01341 int matchedIdx=0;
01342 for(int j=0;j<listsize;j++){
01343 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01344 if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
01345 matchedIdx = j;
01346 flag = 1;
01347 break;
01348 }
01349 }
01350 }
01351 if(flag == 1){
01352 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01353 }else{
01354 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01355 }
01356
01357 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01358 CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
01359 mgr->resume(idx,p,create,flag);
01360 if(flag == 1){
01361 int homePE = mgr->homePe(idx);
01362 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
01363 }
01364 }
01365 }
01366 };
01367
01368
01369 void writeCheckpointToDisk(int size,char *chkpt){
01370 char fNameTemp[100];
01371 sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
01372 int fd = creat(fNameTemp,S_IRWXU);
01373 int ret = write(fd,chkpt,size);
01374 CkAssert(ret == size);
01375 close(fd);
01376
01377 char fName[100];
01378 sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
01379 unlink(fName);
01380
01381 rename(fNameTemp,fName);
01382 }
01383
01384
01385
01386 void _storeCheckpointHandler(char *msg){
01387 double _startTime=CkWallTimer();
01388
01389 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01390 DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
01391
01392 char *chkpt = &msg[sizeof(CheckPointDataMsg)];
01393
01394 char *oldChkpt = CpvAccess(_storedCheckpointData)->buf;
01395 if(oldChkpt != NULL){
01396 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
01397 CmiFree(oldmsg);
01398 }
01399
01400
01401 int sendingPE = chkMsg->PE;
01402
01403 CpvAccess(_storedCheckpointData)->buf = chkpt;
01404 CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
01405 CpvAccess(_storedCheckpointData)->PE = sendingPE;
01406
01407 int count=0;
01408 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01409 if(migratedNoticeList[j].fromPE == sendingPE){
01410 migratedNoticeList[j].ackFrom = 1;
01411 }else{
01412 CmiAssert("migratedNoticeList entry for processor other than buddy");
01413 }
01414 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01415 migratedNoticeList.remove(j);
01416 count++;
01417 }
01418
01419 }
01420 DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
01421
01422 CheckPointAck ackMsg;
01423 ackMsg.PE = CkMyPe();
01424 ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
01425 CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
01426 CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
01427
01428 traceUserBracketEvent(29,_startTime,CkWallTimer());
01429 };
01430
01431
01437 void sendRemoveLogRequests(){
01438 #if SYNCHRONIZED_CHECKPOINT
01439 CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
01440 #endif
01441 double _startTime = CkWallTimer();
01442
01443
01444 int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
01445 char *requestMsg = (char *)CmiAlloc(totalSize);
01446
01447
01448 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01449 request->PE = CkMyPe();
01450 request->numberObjects = processedTicketLog.size();
01451 char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
01452 memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
01453 char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
01454 int *numDeterminants = (int *)listDeterminants;
01455 numDeterminants[0] = 0;
01456 listDeterminants = (char *)&numDeterminants[1];
01457
01458
01459 CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
01460
01461 DEBUG_MEM(CmiMemoryCheck());
01462 for(int i=0;i<CkNumPes();i++){
01463 CmiSyncSend(i,totalSize,requestMsg);
01464 }
01465 CmiFree(requestMsg);
01466
01467 clearUpMigratedRetainedLists(CmiMyPe());
01468
01469
01470 traceUserBracketEvent(30,_startTime,CkWallTimer());
01471
01472 DEBUG_MEM(CmiMemoryCheck());
01473 }
01474
01475
01476 void _checkpointAckHandler(CheckPointAck *ackMsg){
01477 DEBUG_MEM(CmiMemoryCheck());
01478 unAckedCheckpoint=0;
01479 DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
01480 DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));
01481 if(onGoingLoadBalancing){
01482 onGoingLoadBalancing = 0;
01483 finishedCheckpointLoadBalancing();
01484 }else{
01485 sendRemoveLogRequests();
01486 }
01487 CmiFree(ackMsg);
01488
01489 };
01490
01491
01495 inline void populateDeterminantTable(char *data){
01496 DEBUG(char recverString[100]);
01497 DEBUG(char senderString[100]);
01498 int numDets, *numDetsPtr;
01499 Determinant *detList;
01500 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01501 SNToTicket *tickets;
01502 Ticket ticket;
01503
01504 RemoveLogRequest *request = (RemoveLogRequest *)data;
01505 TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01506
01507 numDetsPtr = (int *)&list[request->numberObjects];
01508 numDets = numDetsPtr[0];
01509 detList = (Determinant *)&numDetsPtr[1];
01510
01511
01512 for(int i=0; i<numDets; i++){
01513 table = detTable.get(detList[i].sender);
01514 if(table == NULL){
01515 table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
01516 detTable.put(detList[i].sender) = table;
01517 }
01518 tickets = table->get(detList[i].receiver);
01519 if(tickets == NULL){
01520 tickets = new SNToTicket();
01521 table->put(detList[i].receiver) = tickets;
01522 }
01523 ticket.TN = detList[i].TN;
01524 tickets->put(detList[i].SN) = ticket;
01525 }
01526
01527 DEBUG_MEM(CmiMemoryCheck());
01528
01529 }
01530
01531 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
01532 int total;
01533 DEBUG(char nameString[100]);
01534 DEBUG_MEM(CmiMemoryCheck());
01535 CmiMemoryCheck();
01536 char *data = (char *)_data;
01537 RemoveLogRequest *request = (RemoveLogRequest *)data;
01538 TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01539 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
01540 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01541 SNToTicket *tickets;
01542 MCount TN;
01543
01544 int count=0;
01545 total = mlog->length();
01546 for(int i=0; i<total; i++){
01547 MlogEntry *logEntry = mlog->deq();
01548
01549
01550 table = detTable.get(logEntry->env->sender);
01551 if(table != NULL){
01552 tickets = table->get(logEntry->env->recver);
01553 if(tickets != NULL){
01554 TN = tickets->get(logEntry->env->SN).TN;
01555 if(TN != 0){
01556 logEntry->env->TN = TN;
01557 }
01558 }
01559 }
01560
01561 int match=0;
01562 for(int j=0;j<request->numberObjects;j++){
01563 if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
01564
01565 match = 1;
01566 break;
01567 }
01568 }
01569 char senderString[100],recverString[100];
01570
01571 if(match){
01572 count++;
01573 delete logEntry;
01574 }else{
01575 mlog->enq(logEntry);
01576 }
01577 }
01578 if(count > 0){
01579 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
01580 }
01581 DEBUG_MEM(CmiMemoryCheck());
01582 CmiMemoryCheck();
01583 }
01584
01588 void _removeProcessedLogHandler(char *requestMsg){
01589 double start = CkWallTimer();
01590
01591
01592 populateDeterminantTable(requestMsg);
01593
01594
01595 forAllCharesDo(removeProcessedLogs,requestMsg);
01596
01597
01598
01599
01600 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01601 DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
01602
01603
01604
01605
01606 DEBUG_MEM(CmiMemoryCheck());
01607 clearUpMigratedRetainedLists(request->PE);
01608
01609 traceUserBracketEvent(20,start,CkWallTimer());
01610 CmiFree(requestMsg);
01611 };
01612
01613
01614 void clearUpMigratedRetainedLists(int PE){
01615 int count=0;
01616 CmiMemoryCheck();
01617
01618 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01619 if(migratedNoticeList[j].toPE == PE){
01620 migratedNoticeList[j].ackTo = 1;
01621 }
01622 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01623 migratedNoticeList.remove(j);
01624 count++;
01625 }
01626 }
01627 DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
01628
01629 for(int j=retainedObjectList.size()-1;j>=0;j--){
01630 if(retainedObjectList[j]->migRecord.toPE == PE){
01631 RetainedMigratedObject *obj = retainedObjectList[j];
01632 DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
01633 retainedObjectList.remove(j);
01634 if(obj->msg != NULL){
01635 CmiMemoryCheck();
01636 CmiFree(obj->msg);
01637 }
01638 delete obj;
01639 }
01640 }
01641 }
01642
01643
01644
01645
01646
01652 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
01653 RestartRequest msg;
01654
01655 fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
01656
01657
01658 _restartFlag = 1;
01659 _numRestartResponses = 0;
01660
01661
01662 if(teamSize > 1){
01663 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
01664 if(i != CkMyPe() && i < CkNumPes()){
01665
01666 msg.PE = CkMyPe();
01667 CmiSetHandler(&msg,_restartHandlerIdx);
01668 CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
01669 }
01670 }
01671 }
01672
01673
01674 msg.PE = CkMyPe();
01675 CmiSetHandler(&msg,_getCheckpointHandlerIdx);
01676 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01677 };
01678
01683 void _restartHandler(RestartRequest *restartMsg){
01684 int i;
01685 int numGroups = CkpvAccess(_groupIDTable)->size();
01686 RestartRequest msg;
01687
01688 fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
01689
01690
01691 _restartFlag = 1;
01692 _numRestartResponses = 0;
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706 msg.PE = CkMyPe();
01707 CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
01708 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01709 }
01710
01711
01715 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
01716
01717
01718 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01719
01720
01721 CkAssert(restartMsg->PE == storedChkpt->PE);
01722
01723 storedRequest = restartMsg;
01724 verifyAckTotal = 0;
01725
01726 for(int i=0;i<migratedNoticeList.size();i++){
01727 if(migratedNoticeList[i].fromPE == restartMsg->PE){
01728
01729 if(migratedNoticeList[i].ackFrom == 0){
01730
01731
01732 VerifyAckMsg msg;
01733 msg.migRecord = migratedNoticeList[i];
01734 msg.index = i;
01735 msg.fromPE = CmiMyPe();
01736 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
01737 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
01738 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
01739 verifyAckTotal++;
01740 }
01741 }
01742 }
01743
01744
01745 if(verifyAckTotal == 0){
01746 sendCheckpointData(MLOG_RESTARTED);
01747 }
01748 verifyAckCount = 0;
01749 }
01750
01754 void _recvRestartCheckpointHandler(char *_restartData){
01755 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
01756 MigrationRecord *migratedAwayElements;
01757
01758 globalLBID = restartData->lbGroupID;
01759
01760 restartData->restartWallTime *= 1000;
01761 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
01762 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
01763 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
01764
01765
01766 printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
01767 char *buf = &_restartData[sizeof(RestartProcessorData)];
01768
01769 if(restartData->numMigratedAwayElements != 0){
01770 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
01771 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
01772 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
01773 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
01774 }
01775
01776
01777 forAllCharesDo(setTeamRecovery,NULL);
01778
01779 PUP::fromMem pBuf(buf);
01780 pBuf | checkpointCount;
01781 for(int i=0; i<CmiNumPes(); i++){
01782 pBuf | CpvAccess(_incarnation)[i];
01783 }
01784 CkPupROData(pBuf);
01785 CkPupGroupData(pBuf,CmiFalse);
01786 CkPupNodeGroupData(pBuf,CmiFalse);
01787 pupArrayElementsSkip(pBuf,CmiFalse,NULL);
01788 CkAssert(pBuf.size() == restartData->checkPointSize);
01789 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
01790
01791
01792 for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
01793 CpvAccess(_incarnation)[i]++;
01794 }
01795
01796
01797 forAllCharesDo(unsetTeamRecovery,NULL);
01798
01799
01800 forAllCharesDo(initializeRestart,NULL);
01801
01802 CmiFree(_restartData);
01803
01804
01805
01806
01807
01808
01809
01810
01811
01812
01813
01814
01815
01816
01817
01818
01819
01820
01821 CkVec<TProcessedLog> objectVec;
01822 forAllCharesDo(createObjIDList, (void *)&objectVec);
01823 int numberObjects = objectVec.size();
01824
01825
01826
01827
01828 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
01829 char *resendMsg = (char *)CmiAlloc(totalSize);
01830
01831 ResendRequest *resendReq = (ResendRequest *)resendMsg;
01832 resendReq->PE =CkMyPe();
01833 resendReq->numberObjects = numberObjects;
01834 char *objList = &resendMsg[sizeof(ResendRequest)];
01835 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
01850 CpvAccess(_currentObj) = lb;
01851 lb->ReceiveDummyMigration(restartDecisionNumber);
01852
01853 sleep(10);
01854
01855 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
01856 for(int i=0;i<CkNumPes();i++){
01857 if(i != CkMyPe()){
01858 CmiSyncSend(i,totalSize,resendMsg);
01859 }
01860 }
01861 _resendMessagesHandler(resendMsg);
01862
01863 }
01864
01865
01866 void CkMlogRestartDouble(void *,double){
01867 CkMlogRestart(NULL,NULL);
01868 };
01869
01870
01871 void CkMlogRestartLocal(){
01872 CkMlogRestart(NULL,NULL);
01873 };
01874
01878 void _getCheckpointHandler(RestartRequest *restartMsg){
01879
01880
01881 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01882
01883
01884 CkAssert(restartMsg->PE == storedChkpt->PE);
01885
01886 storedRequest = restartMsg;
01887 verifyAckTotal = 0;
01888
01889 for(int i=0;i<migratedNoticeList.size();i++){
01890 if(migratedNoticeList[i].fromPE == restartMsg->PE){
01891
01892 if(migratedNoticeList[i].ackFrom == 0){
01893
01894
01895 VerifyAckMsg msg;
01896 msg.migRecord = migratedNoticeList[i];
01897 msg.index = i;
01898 msg.fromPE = CmiMyPe();
01899 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
01900 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
01901 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
01902 verifyAckTotal++;
01903 }
01904 }
01905 }
01906
01907
01908 if(verifyAckTotal == 0){
01909 sendCheckpointData(MLOG_CRASHED);
01910 }
01911 verifyAckCount = 0;
01912 }
01913
01914
01915 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
01916 CkLocMgr *locMgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
01917 CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
01918 if(rec != NULL && rec->type() == CkLocRec::local){
01919
01920
01921 CkLocRec_local *localRec = (CkLocRec_local *) rec;
01922 CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
01923
01924 int localIdx = localRec->getLocalIndex();
01925 LBDatabase *lbdb = localRec->getLBDB();
01926 LDObjHandle ldHandle = localRec->getLdHandle();
01927
01928 locMgr->setDuringMigration(true);
01929
01930 locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
01931 lbdb->UnregisterObj(ldHandle);
01932
01933 locMgr->setDuringMigration(false);
01934
01935 verifyAckedRequests++;
01936
01937 }
01938 CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
01939 CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
01940 };
01941
01942
01943 void _verifyAckHandler(VerifyAckMsg *verifyReply){
01944 int index = verifyReply->index;
01945 migratedNoticeList[index] = verifyReply->migRecord;
01946 verifyAckCount++;
01947 CmiPrintf("[%d] VerifyReply received %d for gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
01948 if(verifyAckCount == verifyAckTotal){
01949 sendCheckpointData(MLOG_CRASHED);
01950 }
01951 }
01952
01953
01959 void sendCheckpointData(int mode){
01960 RestartRequest *restartMsg = storedRequest;
01961 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01962 int numMigratedAwayElements = migratedNoticeList.size();
01963 if(migratedNoticeList.size() != 0){
01964 printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
01965
01966 }
01967
01968
01969 int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
01970
01971 DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
01972 CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
01973
01974 totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
01975
01976 char *msg = (char *)CmiAlloc(totalSize);
01977
01978 RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
01979 dataMsg->PE = CkMyPe();
01980 dataMsg->restartWallTime = CmiTimer();
01981 dataMsg->checkPointSize = storedChkpt->bufSize;
01982
01983 dataMsg->numMigratedAwayElements = numMigratedAwayElements;
01984
01985
01986 dataMsg->numMigratedInElements = 0;
01987 dataMsg->migratedElementSize = 0;
01988 dataMsg->lbGroupID = globalLBID;
01989
01990
01991
01992
01993
01994 char *buf = &msg[sizeof(RestartProcessorData)];
01995
01996 if(dataMsg->numMigratedAwayElements != 0){
01997 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
01998 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
01999 }
02000
02001
02002 memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
02003 buf = &buf[storedChkpt->bufSize];
02004
02005 if(mode == MLOG_RESTARTED){
02006 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
02007 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02008 CmiFree(restartMsg);
02009 }else{
02010 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
02011 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02012 CmiFree(restartMsg);
02013 }
02014 };
02015
02016
02017
02018
02019
02020 void createObjIDList(void *data,ChareMlogData *mlogData){
02021 CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
02022 TProcessedLog entry;
02023 entry.recver = mlogData->objID;
02024 entry.tProcessed = mlogData->tProcessed;
02025 list->push_back(entry);
02026 DEBUG_TEAM(char objString[100]);
02027 DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
02028 DEBUG_RECOVERY(printLog(&entry));
02029 }
02030
02031
02036 void _recvCheckpointHandler(char *_restartData){
02037 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
02038 MigrationRecord *migratedAwayElements;
02039
02040 globalLBID = restartData->lbGroupID;
02041
02042 restartData->restartWallTime *= 1000;
02043 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
02044 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
02045 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
02046
02047
02048 printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
02049 char *buf = &_restartData[sizeof(RestartProcessorData)];
02050
02051 if(restartData->numMigratedAwayElements != 0){
02052 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
02053 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
02054 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
02055 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
02056 }
02057
02058 PUP::fromMem pBuf(buf);
02059
02060 pBuf | checkpointCount;
02061 for(int i=0; i<CmiNumPes(); i++){
02062 pBuf | CpvAccess(_incarnation)[i];
02063 }
02064 CkPupROData(pBuf);
02065 CkPupGroupData(pBuf,CmiTrue);
02066 CkPupNodeGroupData(pBuf,CmiTrue);
02067 pupArrayElementsSkip(pBuf,CmiTrue,NULL);
02068 CkAssert(pBuf.size() == restartData->checkPointSize);
02069 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
02070
02071
02072 CpvAccess(_incarnation)[CmiMyPe()]++;
02073
02074 forAllCharesDo(initializeRestart,NULL);
02075
02076 CmiFree(_restartData);
02077
02078 _initDone();
02079
02080 getGlobalStep(globalLBID);
02081
02082
02083 _numRestartResponses = 0;
02084
02085
02086 CkVec<TProcessedLog> objectVec;
02087 forAllCharesDo(createObjIDList, (void *)&objectVec);
02088 int numberObjects = objectVec.size();
02089
02090
02091 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02092 char *resendMsg = (char *)CmiAlloc(totalSize);
02093
02094 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02095 resendReq->PE =CkMyPe();
02096 resendReq->numberObjects = numberObjects;
02097 char *objList = &resendMsg[sizeof(ResendRequest)];
02098 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02099
02100 CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
02101 for(int i=0;i<CkNumPes();i++){
02102 if(i != CkMyPe()){
02103 CmiSyncSend(i,totalSize,resendMsg);
02104 }
02105 }
02106 CmiFree(resendMsg);
02107
02108 }
02109
02114 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
02115 countUpdateHomeAcks++;
02116 CmiFree(updateHomeAck);
02117
02118 if(countUpdateHomeAcks != CmiNumPes()){
02119 return;
02120 }
02121
02122
02123 CkVec<TProcessedLog> objectVec;
02124 forAllCharesDo(createObjIDList, (void *)&objectVec);
02125 int numberObjects = objectVec.size();
02126
02127
02128 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02129 char *resendMsg = (char *)CmiAlloc(totalSize);
02130
02131 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02132 resendReq->PE =CkMyPe();
02133 resendReq->numberObjects = numberObjects;
02134 char *objList = &resendMsg[sizeof(ResendRequest)];
02135 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02136
02137
02138 if(fastRecovery){
02139 distributeRestartedObjects();
02140 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02141 }
02142
02143
02144
02145
02146
02147
02148 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02149 CpvAccess(_currentObj) = lb;
02150 lb->ReceiveDummyMigration(restartDecisionNumber);
02151
02152
02153
02154 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02155 for(int i=0;i<CkNumPes();i++){
02156 if(i != CkMyPe()){
02157 CmiSyncSend(i,totalSize,resendMsg);
02158 }
02159 }
02160 _resendMessagesHandler(resendMsg);
02161 CmiFree(resendMsg);
02162
02163 };
02164
02168 void initializeRestart(void *data, ChareMlogData *mlogData){
02169 mlogData->resendReplyRecvd = 0;
02170 mlogData->receivedTNs = new CkVec<MCount>;
02171 mlogData->restartFlag = 1;
02172 };
02173
02177 void updateHomePE(void *data,ChareMlogData *mlogData){
02178 RestartRequest *updateRequest = (RestartRequest *)data;
02179 int PE = updateRequest->PE;
02180
02181
02182 if(mlogData->objID.type == TypeArray){
02183
02184 CkGroupID myGID = mlogData->objID.data.array.id;
02185 CkArrayIndexMax myIdx = mlogData->objID.data.array.idx.asChild();
02186 CkArrayID aid(mlogData->objID.data.array.id);
02187
02188 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
02189 if(locMgr->homePe(myIdx) == PE){
02190 DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
02191 DEBUG_RESTART(myIdx.print());
02192 informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
02193 }
02194 }
02195 };
02196
02197
02201 void _updateHomeRequestHandler(RestartRequest *updateRequest){
02202 int sender = updateRequest->PE;
02203
02204 forAllCharesDo(updateHomePE,updateRequest);
02205
02206 updateRequest->PE = CmiMyPe();
02207 CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
02208 CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
02209 if(sender == getCheckPointPE() && unAckedCheckpoint==1){
02210 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
02211 checkpointCount--;
02212 startMlogCheckpoint(NULL,0);
02213 }
02214 if(sender == getCheckPointPE()){
02215 for(int i=0;i<retainedObjectList.size();i++){
02216 if(retainedObjectList[i]->acked == 0){
02217 MigrationNotice migMsg;
02218 migMsg.migRecord = retainedObjectList[i]->migRecord;
02219 migMsg.record = retainedObjectList[i];
02220 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
02221 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
02222 }
02223 }
02224 }
02225 }
02226
02230 void fillTicketForChare(void *data, ChareMlogData *mlogData){
02231 DEBUG(char name[100]);
02232 ResendData *resendData = (ResendData *)data;
02233 int PE = resendData->PE;
02234 int count=0;
02235 CkHashtableIterator *iterator;
02236 void *objp;
02237 void *objkey;
02238 CkObjID *objID;
02239 SNToTicket *snToTicket;
02240 Ticket ticket;
02241
02242
02243 iterator = mlogData->teamTable.iterator();
02244 while( (objp = iterator->next(&objkey)) != NULL ){
02245 objID = (CkObjID *)objkey;
02246
02247
02248 for(int j=0;j<resendData->numberObjects;j++){
02249 if((*objID) == (resendData->listObjects)[j].recver){
02250 snToTicket = *(SNToTicket **)objp;
02251
02252 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
02253 ticket = snToTicket->get(snIndex);
02254 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
02255
02256 resendData->ticketVecs[j].push_back(ticket.TN);
02257 }
02258 }
02259 }
02260 }
02261 }
02262
02263
02264 delete iterator;
02265 }
02266
02267
02272 void setTeamRecovery(void *data, ChareMlogData *mlogData){
02273 char name[100];
02274 mlogData->teamRecoveryFlag = 1;
02275 }
02276
02280 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
02281 mlogData->teamRecoveryFlag = 0;
02282 }
02283
02287 void printLog(TProcessedLog *log){
02288 char recverString[100];
02289 CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
02290 }
02291
02295 void printMsg(envelope *env, const char* par){
02296 char senderString[100];
02297 char recverString[100];
02298 CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
02299 }
02300
02304 void printDet(Determinant *det, const char* par){
02305 char senderString[100];
02306 char recverString[100];
02307 CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
02308 }
02309
02315 void resendMessageForChare(void *data, ChareMlogData *mlogData){
02316 DEBUG_RESTART(char nameString[100]);
02317 DEBUG_RESTART(char recverString[100]);
02318 DEBUG_RESTART(char senderString[100]);
02319
02320 ResendData *resendData = (ResendData *)data;
02321 int PE = resendData->PE;
02322 int count=0;
02323 int ticketRequests=0;
02324 CkQ<MlogEntry *> *log = mlogData->getMlog();
02325
02326 DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
02327
02328
02329 for(int i=0;i<log->length();i++){
02330 MlogEntry *logEntry = (*log)[i];
02331
02332
02333
02334 envelope *env = logEntry->env;
02335 if(env == NULL){
02336 continue;
02337 }
02338
02339
02340 if(env->recver.type != TypeInvalid){
02341 for(int j=0;j<resendData->numberObjects;j++){
02342 if(env->recver == (resendData->listObjects)[j].recver){
02343 if(PE != CkMyPe()){
02344 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
02345 if(env->recver.type == TypeNodeGroup){
02346 CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
02347 }else{
02348 CmiSetHandler(env,CmiGetXHandler(env));
02349 CmiSyncSend(PE,env->getTotalsize(),(char *)env);
02350 }
02351 }else{
02352 envelope *copyEnv = copyEnvelope(env);
02353 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
02354 }
02355 DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
02356 count++;
02357 }
02358 }
02359
02360 }
02361 }
02362 DEBUG_RESTART(printf("[%d] Resent %d/%d (%d) messages from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)
02363 }
02364
02369 void _sendDetsHandler(char *msg){
02370 ResendData d;
02371 CkVec<Determinant> *detVec;
02372 ResendRequest *resendReq = (ResendRequest *)msg;
02373
02374
02375
02376
02377 char *listObjects = &msg[sizeof(ResendRequest)];
02378 d.numberObjects = resendReq->numberObjects;
02379 d.PE = resendReq->PE;
02380 d.listObjects = (TProcessedLog *)listObjects;
02381 d.ticketVecs = new CkVec<MCount>[d.numberObjects];
02382 detVec = new CkVec<Determinant>[d.numberObjects];
02383
02384
02385
02386 CkVec<Determinant> *vec;
02387 for(int i=0; i<d.numberObjects; i++){
02388 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
02389 if(vec != NULL){
02390 for(int j=0; j<vec->size(); j++){
02391
02392
02393 if((*vec)[j].TN > d.listObjects[i].tProcessed){
02394
02395
02396 d.ticketVecs[i].push_back((*vec)[j].TN);
02397
02398
02399 detVec[i].push_back((*vec)[j]);
02400
02401 DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
02402 }
02403 }
02404 }
02405 }
02406
02407 int totalDetStored = 0;
02408 for(int i=0;i<d.numberObjects;i++){
02409 totalDetStored += detVec[i].size();
02410 }
02411
02412
02413
02414
02415
02416 int totalTNStored=0;
02417 for(int i=0;i<d.numberObjects;i++){
02418 totalTNStored += d.ticketVecs[i].size();
02419 }
02420
02421 int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
02422 char *resendReplyMsg = (char *)CmiAlloc(totalSize);
02423
02424 ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
02425 resendReply->PE = CkMyPe();
02426 resendReply->numberObjects = d.numberObjects;
02427
02428 char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
02429 CkObjID *replyObjects = (CkObjID *)replyListObjects;
02430 for(int i=0;i<d.numberObjects;i++){
02431 replyObjects[i] = d.listObjects[i].recver;
02432 }
02433
02434 char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
02435 for(int i=0;i<d.numberObjects;i++){
02436 int vecsize = d.ticketVecs[i].size();
02437 memcpy(ticketList,&vecsize,sizeof(int));
02438 ticketList = &ticketList[sizeof(int)];
02439 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
02440 ticketList = &ticketList[sizeof(MCount)*vecsize];
02441 }
02442
02443
02444 for(int i=0;i<d.numberObjects;i++){
02445 int vecsize = detVec[i].size();
02446 memcpy(ticketList,&vecsize,sizeof(int));
02447 ticketList = &ticketList[sizeof(int)];
02448 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
02449 ticketList = &ticketList[sizeof(Determinant)*vecsize];
02450 }
02451
02452 CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
02453 CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
02454
02455 delete [] detVec;
02456 delete [] d.ticketVecs;
02457
02458 DEBUG_MEM(CmiMemoryCheck());
02459
02460 if(resendReq->PE != CkMyPe()){
02461 CmiFree(msg);
02462 }
02463
02464 lastRestart = CmiWallTimer();
02465
02466 }
02467
02472 void _resendMessagesHandler(char *msg){
02473 ResendData d;
02474 ResendRequest *resendReq = (ResendRequest *)msg;
02475
02476
02477
02478
02479 char *listObjects = &msg[sizeof(ResendRequest)];
02480 d.numberObjects = resendReq->numberObjects;
02481 d.PE = resendReq->PE;
02482 d.listObjects = (TProcessedLog *)listObjects;
02483
02484 DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
02485
02486
02487
02488 if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
02489 forAllCharesDo(fillTicketForChare,&d);
02490 else
02491 forAllCharesDo(resendMessageForChare,&d);
02492
02493 DEBUG_MEM(CmiMemoryCheck());
02494
02495 if(resendReq->PE != CkMyPe()){
02496 CmiFree(msg);
02497 }
02498
02499 lastRestart = CmiWallTimer();
02500 }
02501
02502 MCount maxVec(CkVec<MCount> *TNvec);
02503 void sortVec(CkVec<MCount> *TNvec);
02504 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
02505
02509 void processDelayedRemoteMsgQueue(){
02510 DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
02511
02512 while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
02513 void *qMsgPtr;
02514 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
02515 envelope *qEnv = (envelope *)qMsgPtr;
02516 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
02517 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
02518 DEBUG_MEM(CmiMemoryCheck());
02519 }
02520
02521 }
02522
02528 void _sendDetsReplyHandler(char *msg){
02529 ResendRequest *resendReply = (ResendRequest *)msg;
02530 CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
02531 char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
02532
02533
02534 DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
02535
02536 for(int i =0; i< resendReply->numberObjects;i++){
02537 Chare *obj = (Chare *)listObjects[i].getObject();
02538
02539 int vecsize;
02540 memcpy(&vecsize,listTickets,sizeof(int));
02541 listTickets = &listTickets[sizeof(int)];
02542 MCount *listTNs = (MCount *)listTickets;
02543 listTickets = &listTickets[vecsize*sizeof(MCount)];
02544
02545 if(obj != NULL){
02546
02547 processReceivedTN(obj,vecsize,listTNs);
02548 }else{
02549
02550 int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
02551 char *TNMsg = (char *)CmiAlloc(totalSize);
02552 ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
02553 receivedTNData->recver = listObjects[i];
02554 receivedTNData->numTNs = vecsize;
02555 char *tnList = &TNMsg[sizeof(ReceivedTNData)];
02556 memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
02557 CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
02558 CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
02559 }
02560
02561 }
02562
02563
02564 for(int i = 0; i < resendReply->numberObjects; i++){
02565 Chare *obj = (Chare *)listObjects[i].getObject();
02566
02567 int vecsize;
02568 memcpy(&vecsize,listTickets,sizeof(int));
02569 listTickets = &listTickets[sizeof(int)];
02570 Determinant *listDets = (Determinant *)listTickets;
02571 listTickets = &listTickets[vecsize*sizeof(Determinant)];
02572
02573 if(obj != NULL){
02574
02575 processReceivedDet(obj,vecsize,listDets);
02576 } else {
02577
02578
02579 int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
02580 char *detMsg = (char *)CmiAlloc(totalSize);
02581 ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
02582 receivedDetData->recver = listObjects[i];
02583 receivedDetData->numDets = vecsize;
02584 char *detList = &detMsg[sizeof(ReceivedDetData)];
02585 memcpy(detList,listDets,sizeof(Determinant)*vecsize);
02586 CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
02587 CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
02588 }
02589
02590 }
02591
02592
02593 _numRestartResponses++;
02594 if(_numRestartResponses != CkNumPes())
02595 return;
02596 else
02597 _numRestartResponses = 0;
02598
02599
02600 CkVec<TProcessedLog> objectVec;
02601 forAllCharesDo(createObjIDList, (void *)&objectVec);
02602 int numberObjects = objectVec.size();
02603
02604
02605 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02606 char *resendMsg = (char *)CmiAlloc(totalSize);
02607
02608 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02609 resendReq->PE =CkMyPe();
02610 resendReq->numberObjects = numberObjects;
02611 char *objList = &resendMsg[sizeof(ResendRequest)];
02612 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02613
02614 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02615 CpvAccess(_currentObj) = lb;
02616 lb->ReceiveDummyMigration(restartDecisionNumber);
02617
02618
02619
02620
02621 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02622 for(int i=0;i<CkNumPes();i++){
02623 if(i != CkMyPe()){
02624 CmiSyncSend(i,totalSize,resendMsg);
02625 }
02626 }
02627 _resendMessagesHandler(resendMsg);
02628 CmiFree(resendMsg);
02629
02630
02631 if(fastRecovery){
02632 distributeRestartedObjects();
02633 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02634 }
02635
02636
02637
02638 };
02639
02643 void _receivedDetDataHandler(ReceivedDetData *msg){
02644 DEBUG_NOW(char objName[100]);
02645 Chare *obj = (Chare *) msg->recver.getObject();
02646 if(obj){
02647 char *_msg = (char *)msg;
02648 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02649 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
02650 processReceivedDet(obj,msg->numDets,listDets);
02651 CmiFree(msg);
02652 }else{
02653 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
02654 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02655 }
02656 }
02657
02661 void _receivedTNDataHandler(ReceivedTNData *msg){
02662 DEBUG_NOW(char objName[100]);
02663 Chare *obj = (Chare *) msg->recver.getObject();
02664 if(obj){
02665 char *_msg = (char *)msg;
02666 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02667 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
02668 processReceivedTN(obj,msg->numTNs,listTNs);
02669 CmiFree(msg);
02670 }else{
02671 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
02672 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02673 }
02674 };
02675
02679 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
02680 Determinant *det;
02681
02682
02683 for(int i=0; i<listSize; i++){
02684 det = &listDets[i];
02685 if(CkMyPe() == 4) printDet(det,"RECOVERY");
02686 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
02687 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
02688 }
02689
02690 DEBUG_MEM(CmiMemoryCheck());
02691 }
02692
02696 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
02697
02698 obj->mlogData->resendReplyRecvd++;
02699
02700 DEBUG(char objName[100]);
02701 DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
02702
02703
02704
02705
02706
02707
02708 for(int j=0;j<listSize;j++){
02709 obj->mlogData->receivedTNs->push_back(listTNs[j]);
02710 }
02711
02712
02713
02714
02715
02716 if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
02717 obj->mlogData->resendReplyRecvd = 0;
02718
02719 #if VERIFY_DETS
02720
02721 sortVec(obj->mlogData->receivedTNs);
02722
02723
02724
02725 if(obj->mlogData->receivedTNs->size() > 0){
02726 int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
02727 int vecsize = obj->mlogData->receivedTNs->size();
02728 int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
02729
02730
02731 if(teamSize > 1){
02732 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
02733 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02734 }else{
02735 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02736 }
02737
02738 if(numberHoles == 0){
02739 }else{
02740 char objName[100];
02741 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
02742 obj->mlogData->numberHoles = numberHoles;
02743 obj->mlogData->ticketHoles = new MCount[numberHoles];
02744 int countHoles=0;
02745 for(int k=tProcessedIndex+1;k<vecsize;k++){
02746 if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
02747
02748 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
02749 DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
02750 obj->mlogData->ticketHoles[countHoles] = newTN;
02751 countHoles++;
02752 }
02753 }
02754 }
02755
02756 if(countHoles != numberHoles){
02757 char str[100];
02758 printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
02759 }
02760 CkAssert(countHoles == numberHoles);
02761 obj->mlogData->currentHoles = numberHoles;
02762 }
02763 }
02764 #else
02765 if(obj->mlogData->receivedTNs->size() > 0){
02766 obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
02767 }
02768 #endif
02769
02770 delete obj->mlogData->receivedTNs;
02771 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
02772 obj->mlogData->receivedTNs = NULL;
02773 obj->mlogData->restartFlag = 0;
02774
02775
02776
02777 DEBUG_RESTART(char objString[100]);
02778 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
02779 }
02780
02781 }
02782
02784 MCount maxVec(CkVec<MCount> *TNvec){
02785 MCount max = 0;
02786 for(int i=0; i<TNvec->size(); i++){
02787 if((*TNvec)[i] > max)
02788 max = (*TNvec)[i];
02789 }
02790 return max;
02791 }
02792
02793 void sortVec(CkVec<MCount> *TNvec){
02794
02795
02796 for(int i=0;i<TNvec->size();i++){
02797 for(int j=i+1;j<TNvec->size();j++){
02798 if((*TNvec)[j] < (*TNvec)[i]){
02799 MCount temp;
02800 temp = (*TNvec)[i];
02801 (*TNvec)[i] = (*TNvec)[j];
02802 (*TNvec)[j] = temp;
02803 }
02804 }
02805 }
02806
02807 MCount *tempArray = new MCount[TNvec->size()];
02808 int uniqueCount=-1;
02809 for(int i=0;i<TNvec->size();i++){
02810 tempArray[i] = 0;
02811 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
02812 uniqueCount++;
02813 tempArray[uniqueCount] = (*TNvec)[i];
02814 }
02815 }
02816 uniqueCount++;
02817 TNvec->removeAll();
02818 for(int i=0;i<uniqueCount;i++){
02819 TNvec->push_back(tempArray[i]);
02820 }
02821 delete [] tempArray;
02822 }
02823
02824 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
02825 if(TNVec->size() == 0){
02826 return -1;
02827 }
02828
02829 int left=0;
02830 int right = TNVec->size();
02831 int mid = (left +right)/2;
02832 while(searchTN != (*TNVec)[mid] && left < right){
02833 if((*TNVec)[mid] > searchTN){
02834 right = mid-1;
02835 }else{
02836 left = mid+1;
02837 }
02838 mid = (left + right)/2;
02839 }
02840 if(left < right){
02841
02842 return mid;
02843 }else{
02844 if(mid < TNVec->size() && mid >=0){
02845 if((*TNVec)[mid] == searchTN){
02846 return mid;
02847 }else{
02848 return -1;
02849 }
02850 }else{
02851 return -1;
02852 }
02853 }
02854 };
02855
02856
02857
02858
02859
02860
02861
02862
02863
02864
02865
02866 class ElementDistributor: public CkLocIterator{
02867 CkLocMgr *locMgr;
02868 int *targetPE;
02869
02870 void pupLocation(CkLocation &loc,PUP::er &p){
02871 CkArrayIndexMax idx=loc.getIndex();
02872 CkGroupID gID = locMgr->ckGetGroupID();
02873 p|gID;
02874 p|idx;
02875 p|loc;
02876 };
02877 public:
02878 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
02879
02880 void addLocation(CkLocation &loc){
02881
02882
02883 if(*targetPE == CkMyPe()){
02884 *targetPE = (*targetPE +1)%CkNumPes();
02885 return;
02886 }
02887
02888 CkArrayIndexMax idx = loc.getIndex();
02889 CkLocRec_local *rec = loc.getLocalRecord();
02890
02891 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
02892 idx.print();
02893
02894
02895
02896 LocationID *location = new LocationID();
02897 location->idx = idx;
02898 location->gid = loc.getManager()->getGroupID();
02899 location->PE = *targetPE;
02900 CpvAccess(_emigrantRecObjs)->push_back(location);
02901
02902
02903 PUP::sizer psizer;
02904 pupLocation(loc,psizer);
02905 int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
02906 char *msg = (char *)CmiAlloc(totalSize);
02907 char *buf = &msg[CmiMsgHeaderSizeBytes];
02908 PUP::toMem pmem(buf);
02909 pmem.becomeDeleting();
02910 pupLocation(loc,pmem);
02911
02912 locMgr->setDuringMigration(CmiTrue);
02913 delete rec;
02914 locMgr->setDuringMigration(CmiFalse);
02915 locMgr->inform(idx,*targetPE);
02916
02917 CmiSetHandler(msg,_distributedLocationHandlerIdx);
02918 CmiSyncSendAndFree(*targetPE,totalSize,msg);
02919
02920 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
02921
02922
02923 *targetPE = *targetPE + 1;
02924 if(*targetPE > (CkMyPe() + parallelRecovery)){
02925 *targetPE = CkMyPe() + 1;
02926 }
02927 }
02928
02929 };
02930
02934 void distributeRestartedObjects(){
02935 int numGroups = CkpvAccess(_groupIDTable)->size();
02936 int i;
02937 int targetPE=CkMyPe()+1;
02938 CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
02939 };
02940
02944 void _distributedLocationHandler(char *receivedMsg){
02945 printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
02946 char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
02947 PUP::fromMem pmem(buf);
02948 CkGroupID gID;
02949 CkArrayIndexMax idx;
02950 pmem |gID;
02951 pmem |idx;
02952 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
02953 donotCountMigration=1;
02954 mgr->resume(idx,pmem,CmiTrue);
02955 donotCountMigration=0;
02956 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
02957 printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
02958 idx.print();
02959
02960 CkLocRec *rec = mgr->elementRec(idx);
02961 CmiAssert(rec->type() == CkLocRec::local);
02962
02963
02964 CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
02965
02966 CkVec<CkMigratable *> eltList;
02967 mgr->migratableList((CkLocRec_local *)rec,eltList);
02968 for(int i=0;i<eltList.size();i++){
02969 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
02970 CpvAccess(_currentObj) = eltList[i];
02971 eltList[i]->mlogData->immigrantRecFlag = 1;
02972 eltList[i]->ResumeFromSync();
02973 }
02974 }
02975 }
02976
02977
02980 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
02981 DummyMigrationMsg buf;
02982 buf.flag = MLOG_OBJECT;
02983 buf.lbID = lbID;
02984 buf.mgrID = locMgrID;
02985 buf.idx = idx;
02986 buf.locationPE = locationPE;
02987 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
02988 CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
02989 };
02990
02991
02996 void sendDummyMigrationCounts(int *dummyCounts){
02997 DummyMigrationMsg buf;
02998 buf.flag = MLOG_COUNT;
02999 buf.lbID = globalLBID;
03000 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03001 for(int i=0;i<CmiNumPes();i++){
03002 if(i != CmiMyPe() && dummyCounts[i] != 0){
03003 buf.count = dummyCounts[i];
03004 CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
03005 }
03006 }
03007 }
03008
03009
03013 void _dummyMigrationHandler(DummyMigrationMsg *msg){
03014 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03015 if(msg->flag == MLOG_OBJECT){
03016 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
03017 LDObjHandle h;
03018 lb->Migrated(h,1);
03019 }
03020 if(msg->flag == MLOG_COUNT){
03021 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
03022 msg->count -= verifyAckedRequests;
03023 for(int i=0;i<msg->count;i++){
03024 LDObjHandle h;
03025 lb->Migrated(h,1);
03026 }
03027 }
03028 verifyAckedRequests=0;
03029 CmiFree(msg);
03030 };
03031
03032
03033
03034
03035
03036
03037
03038
03039 class ElementCaller : public CkLocIterator {
03040 private:
03041 CkLocMgr *locMgr;
03042 MlogFn fnPointer;
03043 void *data;
03044 public:
03045 ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
03046 locMgr = _locMgr;
03047 fnPointer = _fnPointer;
03048 data = _data;
03049 };
03050 void addLocation(CkLocation &loc){
03051 CkVec<CkMigratable *> list;
03052 CkLocRec_local *local = loc.getLocalRecord();
03053 locMgr->migratableList (local,list);
03054 for(int i=0;i<list.size();i++){
03055 CkMigratable *migratableElement = list[i];
03056 fnPointer(data,migratableElement->mlogData);
03057 }
03058 }
03059 };
03060
03064 void forAllCharesDo(MlogFn fnPointer, void *data){
03065 int numGroups = CkpvAccess(_groupIDTable)->size();
03066 for(int i=0;i<numGroups;i++){
03067 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
03068 fnPointer(data,obj->mlogData);
03069 }
03070 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
03071 for(int i=0;i<numNodeGroups;i++){
03072 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
03073 fnPointer(data,obj->mlogData);
03074 }
03075 int i;
03076 CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
03077 };
03078
03079
03080
03081
03082
03083
03089 void initMlogLBStep(CkGroupID gid){
03090 DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
03091 countLBMigratedAway = 0;
03092 countLBToMigrate=0;
03093 onGoingLoadBalancing=1;
03094 migrationDoneCalled=0;
03095 checkpointBarrierCount=0;
03096 if(globalLBID.idx != 0){
03097 CmiAssert(globalLBID.idx == gid.idx);
03098 }
03099 globalLBID = gid;
03100 #if SYNCHRONIZED_CHECKPOINT
03101 garbageCollectMlog();
03102 #endif
03103 }
03104
03105 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
03106 DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03107 DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03108
03109 resumeLbFnPtr = _fnPtr;
03110 centralLb = _centralLb;
03111 migrationDoneCalled = 1;
03112 if(countLBToMigrate == countLBMigratedAway){
03113 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03114 startMlogCheckpoint(NULL,CmiWallTimer());
03115 }
03116 };
03117
03118 void finishedCheckpointLoadBalancing(){
03119 DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
03120 CheckpointBarrierMsg msg;
03121 msg.fromPE = CmiMyPe();
03122 msg.checkpointCount = checkpointCount;
03123
03124 CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
03125 CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
03126
03127 };
03128
03129
03130 void sendMlogLocation(int targetPE, envelope *env){
03131 #if !SYNCHRONIZED_CHECKPOINT
03132 void *_msg = EnvToUsr(env);
03133 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03134
03135 int existing = 0;
03136
03137
03138
03139 for(int i=0;i<retainedObjectList.size();i++){
03140 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
03141 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
03142 DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
03143 existing = 1;
03144 break;
03145 }
03146 }
03147
03148 if(existing){
03149 return;
03150 }
03151
03152 countLBToMigrate++;
03153
03154 MigrationNotice migMsg;
03155 migMsg.migRecord.gID = msg->gid;
03156 migMsg.migRecord.idx = msg->idx;
03157 migMsg.migRecord.fromPE = CkMyPe();
03158 migMsg.migRecord.toPE = targetPE;
03159
03160 DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
03161
03162 RetainedMigratedObject *retainedObject = new RetainedMigratedObject;
03163 retainedObject->migRecord = migMsg.migRecord;
03164 retainedObject->acked = 0;
03165
03166 CkPackMessage(&env);
03167
03168 migMsg.record = retainedObject;
03169 retainedObject->msg = env;
03170 int size = retainedObject->size = env->getTotalsize();
03171
03172 retainedObjectList.push_back(retainedObject);
03173
03174 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
03175 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
03176
03177 DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
03178 #endif
03179 }
03180
03181 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
03182 msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
03183 migratedNoticeList.push_back(msg->migRecord);
03184
03185 MigrationNoticeAck buf;
03186 buf.record = msg->record;
03187 CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
03188 CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
03189 }
03190
03191 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
03192
03193 RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
03194 retainedObject->acked = 1;
03195
03196 CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
03197 CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
03198
03199
03200 CkGroupID gID = retainedObject->migRecord.gID ;
03201 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03202 informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
03203
03204 countLBMigratedAway++;
03205 if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
03206 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03207 startMlogCheckpoint(NULL,CmiWallTimer());
03208 }
03209 };
03210
03211 void _receiveMlogLocationHandler(void *buf){
03212 envelope *env = (envelope *)buf;
03213 DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
03214 CkUnpackMessage(&env);
03215 void *_msg = EnvToUsr(env);
03216 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03217 CkGroupID gID= msg->gid;
03218 DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
03219 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03220 CpvAccess(_currentObj)=mgr;
03221 mgr->immigrate(msg);
03222 };
03223
03224
03225 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
03226
03227
03228
03229
03230
03231
03232
03233
03234 }
03235
03239 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
03240 if(checkpointBarrierCount == CmiNumPes()){
03241 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
03242 for(int i=0;i<CmiNumPes();i++){
03243 CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
03244 }
03245 }
03246 }
03247
03251 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
03252 DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
03253 if(msg->checkpointCount == checkpointCount){
03254 checkpointBarrierCount++;
03255 checkAndSendCheckpointBarrierAcks(msg);
03256 }else{
03257 if(msg->checkpointCount-1 == checkpointCount){
03258 checkpointBarrierCount++;
03259 checkAndSendCheckpointBarrierAcks(msg);
03260 }else{
03261 printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
03262 CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
03263 }
03264 }
03265
03266
03267 CmiFree(msg);
03268 }
03269
03270 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
03271 DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
03272 DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
03273
03274 #if !SYNCHRONIZED_CHECKPOINT
03275
03276 sendRemoveLogRequests();
03277 #endif
03278
03279
03280 (*resumeLbFnPtr)(centralLb);
03281
03282
03283 CmiFree(msg);
03284 }
03285
03289 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
03290 int total;
03291 MlogEntry *logEntry;
03292 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
03293
03294
03295 total = mlog->length();
03296 for(int i=0; i<total; i++){
03297 logEntry = mlog->deq();
03298 delete logEntry;
03299 }
03300
03301 }
03302
03308 void garbageCollectMlog(){
03309 CkHashtableIterator *iterator;
03310 CkVec<Determinant> *detArray;
03311
03312 DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
03313
03314
03315 _indexBufferedDets = 0;
03316 _numBufferedDets = 0;
03317 _phaseBufferedDets++;
03318
03319
03320 iterator = CpvAccess(_remoteDets)->iterator();
03321 while(iterator->hasNext()){
03322 detArray = *(CkVec<Determinant> **)iterator->next();
03323 detArray->removeAll();
03324 }
03325
03326
03327 delete iterator;
03328
03329
03330 forAllCharesDo(garbageCollectMlogForChare, NULL);
03331 }
03332
03338 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
03339 double _startTime = CmiWallTimer();
03340 CurrentLocationMsg msg;
03341 msg.mgrID = locMgrID;
03342 msg.idx = idx;
03343 msg.locationPE = currentPE;
03344 msg.fromPE = CkMyPe();
03345
03346 DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
03347 CmiSetHandler(&msg,_receiveLocationHandlerIdx);
03348 CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
03349 traceUserBracketEvent(37,_startTime,CmiWallTimer());
03350 }
03351
03352
03353 void _receiveLocationHandler(CurrentLocationMsg *data){
03354 double _startTime = CmiWallTimer();
03355 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
03356 if(mgr == NULL){
03357 CmiFree(data);
03358 return;
03359 }
03360 CkLocRec *rec = mgr->elementNrec(data->idx);
03361 DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
03362 if(rec != NULL){
03363 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
03364 if(data->fromPE == data->locationPE){
03365 CmiAbort("Another processor has the same object");
03366 }
03367 }
03368 }
03369 if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
03370 int targetPE = data->fromPE;
03371 data->fromPE = CmiMyPe();
03372 data->locationPE = CmiMyPe();
03373 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
03374 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
03375 }else{
03376 mgr->inform(data->idx,data->locationPE);
03377 }
03378 CmiFree(data);
03379 traceUserBracketEvent(38,_startTime,CmiWallTimer());
03380 }
03381
03382
03383
03384 void getGlobalStep(CkGroupID gID){
03385 LBStepMsg msg;
03386 int destPE = 0;
03387 msg.lbID = gID;
03388 msg.fromPE = CmiMyPe();
03389 msg.step = -1;
03390 CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
03391 CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
03392 };
03393
03394 void _getGlobalStepHandler(LBStepMsg *msg){
03395 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03396 msg->step = lb->step();
03397 CmiAssert(msg->fromPE != CmiMyPe());
03398 CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
03399 CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
03400 CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
03401 };
03402
03406 void _recvGlobalStepHandler(LBStepMsg *msg){
03407
03408
03409 restartDecisionNumber = msg->step;
03410 CmiFree(msg);
03411
03412 CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
03413
03414
03415 ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
03416 resendReplyMsg->PE = CkMyPe();
03417 resendReplyMsg->numberObjects = 0;
03418 _sendDetsReplyHandler((char *)resendReplyMsg);
03419 };
03420
03424 void _messageLoggingExit(){
03425
03426
03427 if(CkMyPe() == 0)
03428 printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
03429
03430
03431 #if COLLECT_STATS_TEAM
03432 printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
03433 printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
03434 #endif
03435
03436 #if COLLECT_STATS_MSGS
03437 #if COLLECT_STATS_MSGS_TOTAL
03438 printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
03439 printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
03440 #else
03441 printf("[%d] TARGETS: ",CmiMyPe());
03442 for(int i=0; i<CmiNumPes(); i++){
03443 #if COLLECT_STATS_MSG_COUNT
03444 printf("%d ",numMsgsTarget[i]);
03445 #else
03446 printf("%d ",sizeMsgsTarget[i]);
03447 #endif
03448 }
03449 printf("\n");
03450 #endif
03451 #endif
03452
03453 #if COLLECT_STATS_DETS
03454 printf("\n");
03455 printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
03456 printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
03457 #if COLLECT_STATS_DETS_DUP
03458 printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
03459 #endif
03460 #endif
03461
03462 }
03463
03469 void* CkObjID::getObject(){
03470
03471 switch(type){
03472 case TypeChare:
03473 return CkLocalChare(&data.chare.id);
03474 case TypeMainChare:
03475 return CkLocalChare(&data.chare.id);
03476 case TypeGroup:
03477
03478 CkAssert(data.group.onPE == CkMyPe());
03479 return CkLocalBranch(data.group.id);
03480 case TypeNodeGroup:
03481 CkAssert(data.group.onPE == CkMyNode());
03482
03483 {
03484 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
03485 void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
03486 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
03487
03488 return retval;
03489 }
03490 case TypeArray:
03491 {
03492
03493
03494 CkArrayID aid(data.array.id);
03495
03496 if(aid.ckLocalBranch() == NULL){ return NULL;}
03497
03498 CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
03499
03500 return aProxy.ckLocal();
03501 }
03502 default:
03503 CkAssert(0);
03504 }
03505 }
03506
03507
03508 int CkObjID::guessPE(){
03509 switch(type){
03510 case TypeChare:
03511 case TypeMainChare:
03512 return data.chare.id.onPE;
03513 case TypeGroup:
03514 case TypeNodeGroup:
03515 return data.group.onPE;
03516 case TypeArray:
03517 {
03518 CkArrayID aid(data.array.id);
03519 if(aid.ckLocalBranch() == NULL){
03520 return -1;
03521 }
03522 return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
03523 }
03524 default:
03525 CkAssert(0);
03526 }
03527 };
03528
03529 char *CkObjID::toString(char *buf) const {
03530
03531 switch(type){
03532 case TypeChare:
03533 sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
03534 break;
03535 case TypeMainChare:
03536 sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
03537 break;
03538 case TypeGroup:
03539 sprintf(buf,"Group %d PE %d \0",data.group.id.idx,data.group.onPE);
03540 break;
03541 case TypeNodeGroup:
03542 sprintf(buf,"NodeGroup %d Node %d \0",data.group.id.idx,data.group.onPE);
03543 break;
03544 case TypeArray:
03545 {
03546 const CkArrayIndexMax &idx = data.array.idx.asChild();
03547 const int *indexData = idx.data();
03548 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
03549 break;
03550 }
03551 default:
03552 CkAssert(0);
03553 }
03554
03555 return buf;
03556 };
03557
03558 void CkObjID::updatePosition(int PE){
03559 if(guessPE() == PE){
03560 return;
03561 }
03562 switch(type){
03563 case TypeArray:
03564 {
03565 CkArrayID aid(data.array.id);
03566 if(aid.ckLocalBranch() == NULL){
03567
03568 }else{
03569 char str[100];
03570 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
03571
03572 CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
03573 if(rec != NULL){
03574 if(rec->type() == CkLocRec::local){
03575 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
03576 return;
03577 }
03578 }
03579 mgr->inform(data.array.idx.asChild(),PE);
03580 }
03581 }
03582
03583 break;
03584 case TypeChare:
03585 case TypeMainChare:
03586 CkAssert(data.chare.id.onPE == PE);
03587 break;
03588 case TypeGroup:
03589 case TypeNodeGroup:
03590 CkAssert(data.group.onPE == PE);
03591 break;
03592 default:
03593 CkAssert(0);
03594 }
03595 }
03596
03597
03598
03599 void MlogEntry::pup(PUP::er &p){
03600 p | destPE;
03601 p | _infoIdx;
03602 int size;
03603 if(!p.isUnpacking()){
03604
03605
03606
03607
03608 if(env == NULL){
03609
03610 size = 0;
03611 }else{
03612 size = env->getTotalsize();
03613 }
03614 }
03615 p | size;
03616 if(p.isUnpacking()){
03617 if(size > 0){
03618 env = (envelope *)_allocEnv(ForChareMsg,size);
03619 }else{
03620 env = NULL;
03621 }
03622 }
03623 if(size > 0){
03624 p((char *)env,size);
03625 }
03626 };
03627
03628 void RestoredLocalMap::pup(PUP::er &p){
03629 p | minSN;
03630 p | maxSN;
03631 p | count;
03632 if(p.isUnpacking()){
03633 TNArray = new MCount[count];
03634 }
03635 p(TNArray,count);
03636 };
03637
03638
03639
03640
03641
03642
03643 MCount ChareMlogData::nextSN(const CkObjID &recver){
03644 MCount *SN = snTable.getPointer(recver);
03645 if(SN==NULL){
03646 snTable.put(recver) = 1;
03647 return 1;
03648 }else{
03649 (*SN)++;
03650 return *SN;
03651 }
03652 };
03653
03657 MCount ChareMlogData::newTN(){
03658 MCount TN;
03659 if(currentHoles > 0){
03660 int holeidx = numberHoles-currentHoles;
03661 TN = ticketHoles[holeidx];
03662 currentHoles--;
03663 if(currentHoles == 0){
03664 delete []ticketHoles;
03665 numberHoles = 0;
03666 }
03667 }else{
03668 TN = ++tCount;
03669 }
03670 return TN;
03671 };
03672
03676 inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
03677 Ticket ticket;
03678
03679 SNToTicket *ticketRow = ticketTable.get(sender);
03680 if(ticketRow != NULL){
03681 return ticketRow->get(SN);
03682 }
03683 return ticket;
03684 }
03685
03689 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
03690 Ticket ticket;
03691
03692 SNToTicket *ticketRow = ticketTable.get(sender);
03693 if(ticketRow != NULL){
03694 Ticket earlierTicket = ticketRow->get(SN);
03695 if(earlierTicket.TN != 0){
03696 CkAssert(earlierTicket.TN == TN);
03697 return;
03698 }
03699 }else{
03700 ticketRow = new SNToTicket();
03701 ticketTable.put(sender) = ticketRow;
03702 }
03703 ticket.TN = TN;
03704 ticketRow->put(SN) = ticket;
03705 }
03706
03710 inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
03711 DEBUG(char senderName[100];)
03712 DEBUG(char recverName[100];)
03713 double _startTime = CmiWallTimer();
03714 Ticket ticket;
03715
03716
03717 if(restartFlag){
03718 ticket.TN = 0;
03719 return ticket;
03720 }
03721
03722 SNToTicket *ticketRow = ticketTable.get(sender);
03723 if(ticketRow != NULL){
03724 Ticket earlierTicket = ticketRow->get(SN);
03725 if(earlierTicket.TN == 0){
03726 ticket.TN = newTN();
03727 ticketRow->put(SN) = ticket;
03728 DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
03729 }else{
03730 ticket.TN = earlierTicket.TN;
03731 if(ticket.TN > tCount){
03732 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03733 }
03734 CmiAssert(ticket.TN <= tCount);
03735 }
03736 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03737 }else{
03738 SNToTicket *newRow = new SNToTicket;
03739 ticket.TN = newTN();
03740 newRow->put(SN) = ticket;
03741 ticketTable.put(sender) = newRow;
03742 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
03743 }
03744 ticket.state = NEW_TICKET;
03745
03746
03747 return ticket;
03748 };
03749
03753 void ChareMlogData::addLogEntry(MlogEntry *entry){
03754 DEBUG(char nameString[100]);
03755 DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
03756 DEBUG_MEM(CmiMemoryCheck());
03757
03758
03759 mlog.enq(entry);
03760 };
03761
03762 double totalSearchRestoredTime=0;
03763 double totalSearchRestoredCount=0;
03764
03769 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
03770 double start= CkWallTimer();
03771 MCount TN=0;
03772
03773 DEBUG(char senderName[100]);
03774 DEBUG(char recverName[100]);
03775 DEBUG(if(TN != 0){ CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN);});
03776
03777 totalSearchRestoredTime += CkWallTimer()-start;
03778 totalSearchRestoredCount++;
03779 return TN;
03780 }
03781
03787 void ChareMlogData::pup(PUP::er &p){
03788 int tCountAux;
03789 int startSize=0;
03790 char nameStr[100];
03791 if(p.isSizing()){
03792 PUP::sizer *sizep = (PUP::sizer *)&p;
03793 startSize = sizep->size();
03794 }
03795 double _startTime = CkWallTimer();
03796
03797 p | objID;
03798 if(teamRecoveryFlag)
03799 p | tCountAux;
03800 else
03801 p | tCount;
03802 p | tProcessed;
03803 if(p.isUnpacking()){
03804 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
03805 }
03806 p | toResumeOrNot;
03807 p | resumeCount;
03808 DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
03809
03810
03811
03812 int lengthReceivedTNs;
03813 if(!p.isUnpacking()){
03814 if(receivedTNs == NULL){
03815 lengthReceivedTNs = -1;
03816 }else{
03817 lengthReceivedTNs = receivedTNs->size();
03818 }
03819 }
03820 p | lengthReceivedTNs;
03821 if(p.isUnpacking()){
03822 if(lengthReceivedTNs == -1){
03823 receivedTNs = NULL;
03824 }else{
03825 receivedTNs = new CkVec<MCount>;
03826 for(int i=0;i<lengthReceivedTNs;i++){
03827 MCount tempTicket;
03828 p | tempTicket;
03829 CkAssert(tempTicket > 0);
03830 receivedTNs->push_back(tempTicket);
03831 }
03832 }
03833 }else{
03834 for(int i=0;i<lengthReceivedTNs;i++){
03835 p | (*receivedTNs)[i];
03836 }
03837 }
03838
03839 p | currentHoles;
03840 p | numberHoles;
03841 if(p.isUnpacking()){
03842 if(numberHoles > 0){
03843 ticketHoles = new MCount[numberHoles];
03844 }else{
03845 ticketHoles = NULL;
03846 }
03847 }
03848 if(numberHoles > 0){
03849 p(ticketHoles,numberHoles);
03850 }
03851
03852 snTable.pup(p);
03853
03854
03855
03856
03857
03858
03859
03860
03861
03862
03863
03864
03865
03866
03867
03868
03869
03870
03871
03872
03873 p | resendReplyRecvd;
03874 p | restartFlag;
03875
03876
03877
03878
03879
03880
03881
03882
03883
03884
03885
03886
03887
03888
03889
03890
03891
03892
03893
03894
03895
03896
03897
03898
03899
03900
03901
03902
03903 {
03904 int ticketTableSize;
03905 if(!p.isUnpacking()){
03906 ticketTableSize = ticketTable.numObjects();
03907 }
03908 p | ticketTableSize;
03909 if(!p.isUnpacking()){
03910 CkHashtableIterator *iter = ticketTable.iterator();
03911 while(iter->hasNext()){
03912 CkObjID *objID;
03913 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
03914 p | (*objID);
03915 (*ticketRow)->pup(p);
03916 }
03917
03918 delete iter;
03919 }else{
03920 for(int i=0;i<ticketTableSize;i++){
03921 CkObjID objID;
03922 p | objID;
03923 SNToTicket *ticketRow = new SNToTicket;
03924 ticketRow->pup(p);
03925 if(!teamRecoveryFlag)
03926 ticketTable.put(objID) = ticketRow;
03927 else
03928 delete ticketRow;
03929 }
03930 }
03931 }
03932
03933 if(p.isSizing()){
03934 PUP::sizer *sizep = (PUP::sizer *)&p;
03935 int pupSize = sizep->size()-startSize;
03936 DEBUG(char name[40]);
03937 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
03938
03939 }
03940
03941 double _finTime = CkWallTimer();
03942 DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
03943 };
03944
03945
03946
03947
03953 int getCheckPointPE(){
03954 return (CmiMyPe() + 1) % CmiNumPes();
03955 }
03956
03957
03958 envelope *copyEnvelope(envelope *env){
03959 envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
03960 memcpy(newEnv,env,env->getTotalsize());
03961 return newEnv;
03962 }
03963
03964
03965 inline int isSameDet(Determinant *first, Determinant *second){
03966 return first->sender == second->sender && first->receiver == second->receiver && first->SN == second->SN && first->TN == second->TN;
03967 }
03968
03969 #endif