00001
00008 #include "charm.h"
00009 #include "ck.h"
00010 #include "ckcausalmlog.h"
00011 #include "queueing.h"
00012 #include <sys/types.h>
00013 #include <signal.h>
00014 #include "CentralLB.h"
00015
00016 #ifdef _FAULT_CAUSAL_
00017
00018
00019
00020
00021 #define COLLECT_STATS_MSGS 0
00022 #define COLLECT_STATS_MSGS_TOTAL 0
00023 #define COLLECT_STATS_MSG_COUNT 0
00024 #define COLLECT_STATS_DETS 0
00025 #define COLLECT_STATS_DETS_DUP 0
00026 #define COLLECT_STATS_MEMORY 0
00027 #define COLLECT_STATS_LOGGING 0
00028
00029 #define RECOVERY_SEND "SEND"
00030 #define RECOVERY_PROCESS "PROCESS"
00031
00032 #define DEBUG_MEM(x) //x
00033 #define DEBUG(x) // x
00034 #define DEBUG_RESTART(x) //x
00035 #define DEBUGLB(x) // x
00036 #define DEBUG_TEAM(x) // x
00037 #define DEBUG_PERF(x) // x
00038 #define DEBUG_CHECKPOINT 1
00039 #define DEBUG_NOW(x) x
00040 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
00041 #define DEBUG_PE_NOW(x,y) if(CkMyPe() == x) y
00042 #define DEBUG_RECOVERY(x) //x
00043
00044 #define FAIL_DET_THRESHOLD 10
00045
00046 extern const char *idx2str(const CkArrayIndex &ind);
00047 extern const char *idx2str(const ArrayElement *el);
00048
00049 void getGlobalStep(CkGroupID gID);
00050
00051 bool fault_aware(CkObjID &recver);
00052 void sendCheckpointData(int mode);
00053 void createObjIDList(void *data,ChareMlogData *mlogData);
00054 inline bool isLocal(int destPE);
00055 inline bool isTeamLocal(int destPE);
00056 void printLog(TProcessedLog *log);
00057
00058 bool _recoveryFlag=false;
00059 bool _restartFlag=false;
00060 int _numRestartResponses=0;
00061
00062
00063 int countHashRefs=0;
00064 int countHashCollisions=0;
00065
00066 char *checkpointDirectory=".";
00067 int unAckedCheckpoint=0;
00068
00069 int countLocal=0,countBuffered=0;
00070 int countPiggy=0;
00071 int countClearBufferedLocalCalls=0;
00072
00073 int countUpdateHomeAcks=0;
00074
00075 extern int teamSize;
00076 extern int chkptPeriod;
00077 extern bool fastRecovery;
00078 extern int parallelRecovery;
00079
00080 char *killFile;
00081 char *faultFile;
00082 int killFlag=0;
00083 int faultFlag=0;
00084 int restartingMlogFlag=0;
00085 void readKillFile();
00086 double killTime=0.0;
00087 double faultMean;
00088 int checkpointCount=0;
00089 int diskCkptFlag = 0;
00090 static char fName[100];
00091
00092 CpvDeclare(Chare *,_currentObj);
00093 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
00094 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
00095 CpvDeclare(Queue, _outOfOrderMessageQueue);
00096 CpvDeclare(Queue, _delayedRemoteMessageQueue);
00097 CpvDeclare(char **,_bufferedTicketRequests);
00098 CpvDeclare(int *,_numBufferedTicketRequests);
00099
00100
00113
00114 CpvDeclare(char *, _localDets);
00115
00116 int _numBufferedDets;
00117
00118 int _indexBufferedDets;
00119
00120 int _phaseBufferedDets;
00121
00122
00123 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
00124
00125 int _maxBufferedDets;
00126
00127
00128 CpvDeclare(char *, _incarnation);
00129
00130
00131 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
00132
00133 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
00134
00135 CpvDeclare(int *, _storeDetsSizes);
00136
00137 CpvDeclare(char **, _storeDetsPtrs);
00138
00139
00140
00141
00142 CpvDeclare(int, _numEmigrantRecObjs);
00143 CpvDeclare(int, _numImmigrantRecObjs);
00144 CpvDeclare(std::vector<CkLocation *> *, _immigrantRecObjs);
00145
00146
00147 #if COLLECT_STATS_MSGS
00148 int *numMsgsTarget;
00149 int *sizeMsgsTarget;
00150 int totalMsgsTarget;
00151 float totalMsgsSize;
00152 #endif
00153 #if COLLECT_STATS_DETS
00154 int numPiggyDets;
00155 int numDets;
00156 int numDupDets;
00157 #endif
00158 #if COLLECT_STATS_MEMORY
00159 int msgLogSize;
00160 int bufferedDetsSize;
00161 int storedDetsSize;
00162 #endif
00163
00164 #if COLLECT_STATS_LOGGING
00165 float MLOGFT_totalLogSize = 0.0;
00166 float MLOGFT_totalMessages = 0.0;
00167 float MLOGFT_totalMcastLogSize = 0.0;
00168 float MLOGFT_totalReductionLogSize = 0.0;
00169 #endif
00170
00171 static double adjustChkptPeriod=0.0;
00172 static double nextCheckpointTime=0.0;
00173 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
00174
00175 int _pingHandlerIdx;
00176
00177 char objString[100];
00178 int _checkpointRequestHandlerIdx;
00179 int _storeCheckpointHandlerIdx;
00180 int _checkpointAckHandlerIdx;
00181 int _getCheckpointHandlerIdx;
00182 int _recvCheckpointHandlerIdx;
00183 int _removeProcessedLogHandlerIdx;
00184
00185 int _verifyAckRequestHandlerIdx;
00186 int _verifyAckHandlerIdx;
00187 int _dummyMigrationHandlerIdx;
00188
00189
00190 int _getGlobalStepHandlerIdx;
00191 int _recvGlobalStepHandlerIdx;
00192
00193 int _updateHomeRequestHandlerIdx;
00194 int _updateHomeAckHandlerIdx;
00195 int _resendMessagesHandlerIdx;
00196 int _sendDetsHandlerIdx;
00197 int _sendDetsReplyHandlerIdx;
00198 int _receivedTNDataHandlerIdx;
00199 int _receivedDetDataHandlerIdx;
00200 int _distributedLocationHandlerIdx;
00201 int _sendBackLocationHandlerIdx;
00202 int _storeDeterminantsHandlerIdx;
00203 int _removeDeterminantsHandlerIdx;
00204
00205
00206 int _restartHandlerIdx;
00207 int _getRestartCheckpointHandlerIdx;
00208 int _recvRestartCheckpointHandlerIdx;
00209 void setTeamRecovery(void *data, ChareMlogData *mlogData);
00210 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
00211
00212 int verifyAckTotal;
00213 int verifyAckCount;
00214
00215 int verifyAckedRequests=0;
00216
00217 RestartRequest *storedRequest;
00218
00219 int _falseRestart =0;
00226
00227 int onGoingLoadBalancing=0;
00228 void *centralLb;
00229 void (*resumeLbFnPtr)(void *);
00230 int _receiveMlogLocationHandlerIdx;
00231 int _receiveMigrationNoticeHandlerIdx;
00232 int _receiveMigrationNoticeAckHandlerIdx;
00233 int _checkpointBarrierHandlerIdx;
00234 int _checkpointBarrierAckHandlerIdx;
00235
00236 std::vector<MigrationRecord> migratedNoticeList;
00237 std::vector<RetainedMigratedObject *> retainedObjectList;
00238 int donotCountMigration=0;
00239 int countLBMigratedAway=0;
00240 int countLBToMigrate=0;
00241 int migrationDoneCalled=0;
00242 int checkpointBarrierCount=0;
00243 int globalResumeCount=0;
00244 CkGroupID globalLBID;
00245 int restartDecisionNumber=-1;
00246
00247 double lastCompletedAlarm=0;
00248 double lastRestart=0;
00249
00250
00251 int _receiveLocationHandlerIdx;
00252
00253 #if CMK_CONVERSE_MPI
00254 static int heartBeatHandlerIdx;
00255 static int heartBeatCheckHandlerIdx;
00256 static int partnerFailureHandlerIdx;
00257 static double lastPingTime = -1;
00258
00259 void mpi_restart_crashed(int pe, int rank);
00260 int find_spare_mpirank(int pe, int partition);
00261
00262 void heartBeatPartner();
00263 void heartBeatHandler(void *msg);
00264 void heartBeatCheckHandler();
00265 void partnerFailureHandler(char *msg);
00266 int getReverseCheckPointPE();
00267 int inCkptFlag = 0;
00268 #endif
00269
00270 static void *doNothingMsg(int * size, void * data, void ** remote, int count){
00271 return data;
00272 }
00273
00277 void _messageLoggingInit(){
00278 if(CkMyPe() == 0)
00279 CkPrintf("[%d] Causal Message Logging Support\n",CkMyPe());
00280
00281
00282 CpvInitialize(Chare *,_currentObj);
00283
00284
00285 _pingHandlerIdx = CkRegisterHandler(_pingHandler);
00286
00287
00288 _storeCheckpointHandlerIdx = CkRegisterHandler(_storeCheckpointHandler);
00289 _checkpointAckHandlerIdx = CkRegisterHandler( _checkpointAckHandler);
00290 _removeProcessedLogHandlerIdx = CkRegisterHandler(_removeProcessedLogHandler);
00291 _checkpointRequestHandlerIdx = CkRegisterHandler(_checkpointRequestHandler);
00292
00293
00294 _getCheckpointHandlerIdx = CkRegisterHandler(_getCheckpointHandler);
00295 _recvCheckpointHandlerIdx = CkRegisterHandler(_recvCheckpointHandler);
00296 _updateHomeRequestHandlerIdx =CkRegisterHandler(_updateHomeRequestHandler);
00297 _updateHomeAckHandlerIdx = CkRegisterHandler( _updateHomeAckHandler);
00298 _resendMessagesHandlerIdx = CkRegisterHandler(_resendMessagesHandler);
00299 _sendDetsHandlerIdx = CkRegisterHandler(_sendDetsHandler);
00300 _sendDetsReplyHandlerIdx = CkRegisterHandler(_sendDetsReplyHandler);
00301 _receivedTNDataHandlerIdx=CkRegisterHandler(_receivedTNDataHandler);
00302 _receivedDetDataHandlerIdx = CkRegisterHandler(_receivedDetDataHandler);
00303 _distributedLocationHandlerIdx=CkRegisterHandler(_distributedLocationHandler);
00304 _sendBackLocationHandlerIdx=CkRegisterHandler(_sendBackLocationHandler);
00305 _verifyAckRequestHandlerIdx = CkRegisterHandler(_verifyAckRequestHandler);
00306 _verifyAckHandlerIdx = CkRegisterHandler(_verifyAckHandler);
00307 _dummyMigrationHandlerIdx = CkRegisterHandler(_dummyMigrationHandler);
00308
00309
00310 _restartHandlerIdx = CkRegisterHandler(_restartHandler);
00311 _getRestartCheckpointHandlerIdx = CkRegisterHandler(_getRestartCheckpointHandler);
00312 _recvRestartCheckpointHandlerIdx = CkRegisterHandler(_recvRestartCheckpointHandler);
00313
00314
00315 _storeDeterminantsHandlerIdx = CkRegisterHandler(_storeDeterminantsHandler);
00316 _removeDeterminantsHandlerIdx = CkRegisterHandler(_removeDeterminantsHandler);
00317
00318
00319 _receiveMlogLocationHandlerIdx=CkRegisterHandler(_receiveMlogLocationHandler);
00320 _receiveMigrationNoticeHandlerIdx=CkRegisterHandler(_receiveMigrationNoticeHandler);
00321 _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler(_receiveMigrationNoticeAckHandler);
00322 _getGlobalStepHandlerIdx=CkRegisterHandler(_getGlobalStepHandler);
00323 _recvGlobalStepHandlerIdx=CkRegisterHandler(_recvGlobalStepHandler);
00324 _checkpointBarrierHandlerIdx=CkRegisterHandler(_checkpointBarrierHandler);
00325 _checkpointBarrierAckHandlerIdx=CkRegisterHandler(_checkpointBarrierAckHandler);
00326
00327
00328 _receiveLocationHandlerIdx=CkRegisterHandler(_receiveLocationHandler);
00329
00330
00331 #if CMK_CONVERSE_MPI
00332 heartBeatHandlerIdx = CkRegisterHandler(heartBeatHandler);
00333 heartBeatCheckHandlerIdx = CkRegisterHandler(heartBeatCheckHandler);
00334 partnerFailureHandlerIdx = CkRegisterHandler(partnerFailureHandler);
00335 #endif
00336
00337
00338 CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
00339 CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
00340 CpvInitialize(Queue, _outOfOrderMessageQueue);
00341 CpvInitialize(Queue, _delayedRemoteMessageQueue);
00342 CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
00343 CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
00344
00345 CpvInitialize(char **,_bufferedTicketRequests);
00346 CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
00347 CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
00348 for(int i=0;i<CkNumPes();i++){
00349 CpvAccess(_bufferedTicketRequests)[i]=NULL;
00350 CpvAccess(_numBufferedTicketRequests)[i]=0;
00351 }
00352
00353
00354 _numBufferedDets = 0;
00355 _indexBufferedDets = 0;
00356 _phaseBufferedDets = 0;
00357 _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
00358 CpvInitialize(char *, _localDets);
00359 CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, std::vector<Determinant> *> *),_remoteDets);
00360 CpvInitialize(char *, _incarnation);
00361 CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
00362 CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
00363 CpvInitialize(int *, _storeDetsSizes);
00364 CpvInitialize(char **, _storeDetsPtrs);
00365 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00366 CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, std::vector<Determinant> *>(100, 0.4);
00367 CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
00368 for(int i=0; i<CmiNumPes(); i++){
00369 CpvAccess(_incarnation)[i] = 0;
00370 }
00371 CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
00372 CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
00373 CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
00374 CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
00375
00376
00377 CpvInitialize(int, _numEmigrantRecObjs);
00378 CpvAccess(_numEmigrantRecObjs) = 0;
00379 CpvInitialize(int, _numImmigrantRecObjs);
00380 CpvAccess(_numImmigrantRecObjs) = 0;
00381
00382 CpvInitialize(std::vector<CkLocation *> *, _immigrantRecObjs);
00383 CpvAccess(_immigrantRecObjs) = new std::vector<CkLocation *>;
00384
00385
00386 CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
00387 CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
00388
00389
00390 if(diskCkptFlag){
00391 #if CMK_USE_MKSTEMP
00392 sprintf(fName, "/tmp/ckpt%d-XXXXXX", CkMyPe());
00393 mkstemp(fName);
00394 #else
00395 fName=tmpnam(NULL);
00396 #endif
00397 }
00398
00399
00400 traceRegisterUserEvent("Remove Logs", 20);
00401 traceRegisterUserEvent("Ticket Request Handler", 21);
00402 traceRegisterUserEvent("Ticket Handler", 22);
00403 traceRegisterUserEvent("Local Message Copy Handler", 23);
00404 traceRegisterUserEvent("Local Message Ack Handler", 24);
00405 traceRegisterUserEvent("Preprocess current message",25);
00406 traceRegisterUserEvent("Preprocess past message",26);
00407 traceRegisterUserEvent("Preprocess future message",27);
00408 traceRegisterUserEvent("Checkpoint",28);
00409 traceRegisterUserEvent("Checkpoint Store",29);
00410 traceRegisterUserEvent("Checkpoint Ack",30);
00411 traceRegisterUserEvent("Send Ticket Request",31);
00412 traceRegisterUserEvent("Generalticketrequest1",32);
00413 traceRegisterUserEvent("TicketLogLocal",33);
00414 traceRegisterUserEvent("next_ticket and SN",34);
00415 traceRegisterUserEvent("Timeout for buffered remote messages",35);
00416 traceRegisterUserEvent("Timeout for buffered local messages",36);
00417 traceRegisterUserEvent("Inform Location Home",37);
00418 traceRegisterUserEvent("Receive Location Handler",38);
00419
00420 lastCompletedAlarm=CmiWallTimer();
00421 lastRestart = CmiWallTimer();
00422
00423
00424 #if CMK_CONVERSE_MPI
00425 void heartBeatPartner();
00426 void heartBeatCheckHandler();
00427 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00428 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00429 #endif
00430
00431 #if COLLECT_STATS_MSGS
00432 #if COLLECT_STATS_MSGS_TOTAL
00433 totalMsgsTarget = 0;
00434 totalMsgsSize = 0.0;
00435 #else
00436 numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00437 sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00438 for(int i=0; i<CmiNumPes(); i++){
00439 numMsgsTarget[i] = 0;
00440 sizeMsgsTarget[i] = 0;
00441 }
00442 #endif
00443 #endif
00444 #if COLLECT_STATS_DETS
00445 numPiggyDets = 0;
00446 numDets = 0;
00447 numDupDets = 0;
00448 #endif
00449 #if COLLECT_STATS_MEMORY
00450 msgLogSize = 0;
00451 bufferedDetsSize = 0;
00452 storedDetsSize = 0;
00453 #endif
00454
00455 }
00456
00457 #if CMK_CONVERSE_MPI
00458
00462 void partnerFailureHandler(char *msg)
00463 {
00464 int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
00465
00466
00467 int newrank = find_spare_mpirank(diepe, CmiMyPartition());
00468 int buddy = getReverseCheckPointPE();
00469 if (buddy == diepe) {
00470 mpi_restart_crashed(diepe, newrank);
00471 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00472 }
00473 }
00474
00478 void heartBeatHandler(void *msg)
00479 {
00480 lastPingTime = CmiWallTimer();
00481 CmiFree(msg);
00482 }
00483
00487 void heartBeatCheckHandler()
00488 {
00489 double now = CmiWallTimer();
00490 if (lastPingTime > 0 && now - lastPingTime > FAIL_DET_THRESHOLD && !inCkptFlag) {
00491 int i, pe, buddy;
00492
00493 buddy = getReverseCheckPointPE();
00494 CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
00495
00496 for (int pe = 0; pe < CmiNumPes(); pe++) {
00497 if (pe == buddy) continue;
00498 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00499 *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
00500 CmiSetHandler(msg, partnerFailureHandlerIdx);
00501 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00502 }
00503 }
00504 else
00505 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00506 }
00507
00511 void heartBeatPartner()
00512 {
00513 int buddy = getCheckPointPE();
00514
00515 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00516 *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
00517 CmiSetHandler(msg, heartBeatHandlerIdx);
00518 CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00519 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00520 }
00521 #endif
00522
00523 void killLocal(void *_dummy,double curWallTime);
00524
00525 void readKillFile(){
00526 FILE *fp=fopen(killFile,"r");
00527 if(!fp){
00528 return;
00529 }
00530 int proc;
00531 double sec;
00532 while(fscanf(fp,"%d %lf",&proc,&sec)==2){
00533 if(proc == CkMyPe()){
00534 killTime = CmiWallTimer()+sec;
00535 printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
00536 CcdCallFnAfter(killLocal,NULL,sec*1000);
00537 }
00538 }
00539 fclose(fp);
00540 }
00541
00546 void readFaultFile(){
00547 FILE *fp=fopen(faultFile,"r");
00548 if(!fp){
00549 return;
00550 }
00551 int proc;
00552 double sec;
00553 fscanf(fp,"%d %lf",&proc,&sec);
00554 faultMean = sec;
00555 if(proc == CkMyPe()){
00556 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
00557 CcdCallFnAfter(killLocal,NULL,sec*1000);
00558 }
00559 fclose(fp);
00560 }
00561
00562 void killLocal(void *_dummy,double curWallTime){
00563 printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
00564 if(CmiWallTimer()<killTime-1){
00565 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);
00566 }else{
00567 #if CMK_CONVERSE_MPI
00568 CkDieNow();
00569 #else
00570 kill(getpid(),SIGKILL);
00571 #endif
00572 }
00573 }
00574
00575 #if ! CMK_CONVERSE_MPI
00576 void CkDieNow()
00577 {
00578
00579 CmiPrintf("[%d] die now.\n", CmiMyPe());
00580 killTime = CmiWallTimer()+0.001;
00581 CcdCallFnAfter(killLocal,NULL,1);
00582 }
00583 #endif
00584
00585
00586
00591 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
00592 Determinant *det, *auxDet;
00593 char *aux;
00594
00595 DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
00596
00597
00598 if(_indexBufferedDets >= _maxBufferedDets){
00599 aux = CpvAccess(_localDets);
00600 _maxBufferedDets *= 2;
00601 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00602 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
00603 CmiFree(aux);
00604 }
00605
00606
00607 det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
00608 det->sender = sender;
00609 det->receiver = receiver;
00610 det->SN = SN;
00611 det->TN = TN;
00612 _numBufferedDets++;
00613 _indexBufferedDets++;
00614
00615 #if COLLECT_STATS_MEMORY
00616 bufferedDetsSize++;
00617 #endif
00618 #if COLLECT_STATS_DETS
00619 numDets++;
00620 #endif
00621 }
00622
00623
00624
00628 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
00629 if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
00630 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
00631 void *origMsg = EnvToUsr(env);
00632 for(int i=0;i<CmiNumPes();i++){
00633 if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
00634 void *copyMsg = CkCopyMsg(&origMsg);
00635 envelope *copyEnv = UsrToEnv(copyMsg);
00636 copyEnv->SN=0;
00637 copyEnv->TN=0;
00638 copyEnv->sender.type = TypeInvalid;
00639 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
00640 sendGroupMsg(copyEnv,i,_infoIdx);
00641 }
00642 }
00643 return;
00644 }
00645
00646
00647 env->SN=0;
00648 env->TN=0;
00649 env->sender.type = TypeInvalid;
00650
00651 CkObjID recver;
00652 recver.type = TypeGroup;
00653 recver.data.group.id = env->getGroupNum();
00654 recver.data.group.onPE = destPE;
00655 sendCommonMsg(recver,env,destPE,_infoIdx);
00656 }
00657
00661 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
00662 if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
00663 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
00664 void *origMsg = EnvToUsr(env);
00665 for(int i=0;i<CmiNumNodes();i++){
00666 if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
00667 void *copyMsg = CkCopyMsg(&origMsg);
00668 envelope *copyEnv = UsrToEnv(copyMsg);
00669 copyEnv->SN=0;
00670 copyEnv->TN=0;
00671 copyEnv->sender.type = TypeInvalid;
00672 sendNodeGroupMsg(copyEnv,i,_infoIdx);
00673 }
00674 }
00675 return;
00676 }
00677
00678
00679 env->SN=0;
00680 env->TN=0;
00681 env->sender.type = TypeInvalid;
00682
00683 CkObjID recver;
00684 recver.type = TypeNodeGroup;
00685 recver.data.group.id = env->getGroupNum();
00686 recver.data.group.onPE = destNode;
00687 sendCommonMsg(recver,env,destNode,_infoIdx);
00688 }
00689
00693 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
00694 CkObjID recver;
00695 recver.type = TypeArray;
00696 recver.data.array.id = env->getArrayMgr();
00697 recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
00698
00699 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00700 char recverString[100],senderString[100];
00701
00702 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00703 }
00704
00705
00706 env->SN = 0;
00707 env->TN = 0;
00708
00709 sendCommonMsg(recver,env,destPE,_infoIdx);
00710 };
00711
00715 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
00716 CkObjID recver;
00717 recver.type = TypeChare;
00718 recver.data.chare.id = *pCid;
00719
00720 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00721 char recverString[100],senderString[100];
00722
00723 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00724 }
00725
00726
00727 env->SN = 0;
00728 env->TN = 0;
00729
00730 sendCommonMsg(recver,env,destPE,_infoIdx);
00731 };
00732
00736 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
00737 envelope *env = _env;
00738 MCount ticketNumber = 0;
00739 int resend=0;
00740 char recverName[100];
00741 char senderString[100];
00742 double _startTime=CkWallTimer();
00743
00744 DEBUG_MEM(CmiMemoryCheck());
00745
00746 if(CpvAccess(_currentObj) == NULL){
00747
00748 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
00749 generalCldEnqueue(destPE,env,_infoIdx);
00750 return;
00751 }
00752
00753
00754 if(env->flags & CK_BYPASS_DET_MLOG){
00755 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00756 env->recver = recver;
00757 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
00758 generalCldEnqueue(destPE,env,_infoIdx);
00759 return;
00760 }
00761
00762
00763 env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
00764 if(env->sender.type == TypeInvalid){
00765 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00766 }else{
00767
00768
00769 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00770 env->SN = 0;
00771 }
00772
00773 DEBUG_MEM(CmiMemoryCheck());
00774
00775 CkObjID &sender = env->sender;
00776 env->recver = recver;
00777
00778 Chare *obj = (Chare *)env->sender.getObject();
00779
00780 if(env->SN == 0){
00781 DEBUG_MEM(CmiMemoryCheck());
00782 env->SN = obj->mlogData->nextSN(recver);
00783 }else{
00784 resend = 1;
00785 }
00786
00787
00788 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
00789
00790
00791
00792
00793
00794
00795
00796
00797 _startTime = CkWallTimer();
00798
00799
00800 if(isLocal(destPE)){
00801 sendLocalMsg(env, _infoIdx);
00802 }else{
00803 if((teamSize > 1) && isTeamLocal(destPE)){
00804
00805
00806 Chare *senderObj = (Chare *)sender.getObject();
00807 SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
00808 if(ticketRow != NULL){
00809 Ticket ticket = ticketRow->get(env->SN);
00810 if(ticket.TN != 0){
00811 ticketNumber = ticket.TN;
00812 DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
00813 }
00814 }
00815 }
00816
00817
00818 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
00819 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
00820
00821 }
00822 }
00823
00828 inline bool isLocal(int destPE){
00829
00830 if(destPE == CkMyPe())
00831 return true;
00832
00833 return false;
00834 }
00835
00840 inline bool isTeamLocal(int destPE){
00841
00842
00843 if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
00844 return true;
00845
00846 return false;
00847 }
00848
00852 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
00853 DEBUG_NOW(char recverString[100]);
00854 DEBUG_NOW(char senderString[100]);
00855
00856 int totalSize;
00857
00858 envelope *env = entry->env;
00859 DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
00860
00861
00862 Chare *obj = (Chare *)entry->env->sender.getObject();
00863 entry->env->recver = recver;
00864 entry->env->SN = SN;
00865 entry->env->TN = TN;
00866 if(!resend){
00867
00868 if(!isTeamLocal(entry->destPE)){
00869 obj->mlogData->addLogEntry(entry);
00870 #if COLLECT_STATS_LOGGING
00871 MLOGFT_totalMessages += 1.0;
00872 MLOGFT_totalLogSize += entry->env->getTotalsize();
00873 if(entry->env->flags & CK_MULTICAST_MSG_MLOG){
00874 MLOGFT_totalMcastLogSize += entry->env->getTotalsize();
00875 }
00876 if(entry->env->flags & CK_REDUCTION_MSG_MLOG){
00877 MLOGFT_totalReductionLogSize += entry->env->getTotalsize();
00878 }
00879
00880 #endif
00881 }else{
00882
00883 entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
00884 }
00885 }
00886
00887
00888 if(_numBufferedDets > 0){
00889
00890
00891 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
00892 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
00893 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
00894 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
00895
00896
00897 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
00898 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
00899 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
00900 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
00901 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
00902 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
00903 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
00904 }
00905
00906
00907 entry->indexBufDets = _indexBufferedDets;
00908 entry->numBufDets = _numBufferedDets;
00909
00910
00911 generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
00912
00913 DEBUG_MEM(CmiMemoryCheck());
00914 #if COLLECT_STATS_MSGS
00915 #if COLLECT_STATS_MSGS_TOTAL
00916 totalMsgsTarget++;
00917 totalMsgsSize += (float)env->getTotalsize();
00918 #else
00919 numMsgsTarget[destPE]++;
00920 sizeMsgsTarget[destPE] += env->getTotalsize();
00921 #endif
00922 #endif
00923 #if COLLECT_STATS_DETS
00924 numPiggyDets += _numBufferedDets;
00925 #endif
00926 #if COLLECT_STATS_MEMORY
00927 msgLogSize += env->getTotalsize();
00928 #endif
00929 };
00930
00931
00937 void sendLocalMsg(envelope *env, int _infoIdx){
00938 DEBUG_PERF(double _startTime=CkWallTimer());
00939 DEBUG_MEM(CmiMemoryCheck());
00940 DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
00941 DEBUG(char senderString[100]);
00942 DEBUG(char recverString[100]);
00943 Ticket ticket;
00944
00945 DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
00946
00947
00948 Chare *recverObj = (Chare *)env->recver.getObject();
00949
00950
00951 if(recverObj){
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);
00989
00990 DEBUG_MEM(CmiMemoryCheck());
00991 }else{
00992 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
00993 }
00994
00995 };
00996
00997
00998
00999
01000
01001
01002
01006 void _removeDeterminantsHandler(char *buffer){
01007 RemoveDeterminantsHeader *header;
01008 int index, phase;
01009
01010
01011 header = (RemoveDeterminantsHeader *)buffer;
01012 index = header->index;
01013 phase = header->phase;
01014
01015
01016
01017
01018 if(phase == _phaseBufferedDets){
01019 if(index > _indexBufferedDets)
01020 CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
01021 CmiAssert(index <= _indexBufferedDets);
01022 _numBufferedDets = _indexBufferedDets - index;
01023 }
01024
01025
01026 CmiFree(buffer);
01027
01028 }
01029
01033 void _storeDeterminantsHandler(char *buffer){
01034 StoreDeterminantsHeader *header;
01035 Determinant *detPtr, det;
01036 int i, n, index, phase, destPE;
01037 std::vector<Determinant> *vec;
01038
01039
01040 header = (StoreDeterminantsHeader *)buffer;
01041 n = header->number;
01042 index = header->index;
01043 phase = header->phase;
01044 destPE = header->PE;
01045 detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
01046
01047 DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
01048 DEBUG_MEM(CmiMemoryCheck());
01049
01050
01051 for(i = 0; i < n; i++){
01052 det.sender = detPtr->sender;
01053 det.receiver = detPtr->receiver;
01054 det.SN = detPtr->SN;
01055 det.TN = detPtr->TN;
01056
01057
01058 vec = CpvAccess(_remoteDets)->get(det.receiver);
01059 if(vec == NULL){
01060 vec = new std::vector<Determinant>();
01061 CpvAccess(_remoteDets)->put(det.receiver) = vec;
01062 }
01063 #if COLLECT_STATS_DETS
01064 #if COLLECT_STATS_DETS_DUP
01065 for(int j=0; j<vec->size(); j++){
01066 if(isSameDet(&(*vec)[j],&det)){
01067 numDupDets++;
01068 break;
01069 }
01070 }
01071 #endif
01072 #endif
01073 #if COLLECT_STATS_MEMORY
01074 storedDetsSize++;
01075 #endif
01076 vec->push_back(det);
01077 detPtr = detPtr++;
01078 }
01079
01080 DEBUG_MEM(CmiMemoryCheck());
01081
01082
01083 CmiFree(buffer);
01084
01085
01086 CpvAccess(_removeDetsHeader)->index = index;
01087 CpvAccess(_removeDetsHeader)->phase = phase;
01088 CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
01089 CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
01090
01091 }
01092
01097 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
01098 DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
01099 double _startTime = CkWallTimer();
01100 CmiFree(ticketRequest);
01101
01102 }
01103
01104
01110 inline bool _getTicket(envelope *env, int *flag){
01111 DEBUG_MEM(CmiMemoryCheck());
01112 DEBUG(char recverName[100]);
01113 DEBUG(char senderString[100]);
01114 Ticket ticket;
01115
01116
01117 CkObjID sender = env->sender;
01118 CkObjID recver = env->recver;
01119 MCount SN = env->SN;
01120 MCount TN = env->TN;
01121 Chare *recverObj = (Chare *)recver.getObject();
01122
01123 DEBUG(recver.toString(recverName);)
01124
01125
01126 if(recverObj->mlogData->restartFlag)
01127 return false;
01128
01129
01130 ticket = recverObj->mlogData->getTicket(sender, SN);
01131 TN = ticket.TN;
01132
01133
01134 if(teamSize > 1 && TN != 0){
01135 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
01136 recverObj->mlogData->verifyTicket(sender,SN,TN);
01137 }
01138
01139
01140
01141 if(TN == 0){
01142 ticket = recverObj->mlogData->next_ticket(sender,SN);
01143 *flag = NEW_TICKET;
01144 } else {
01145 *flag = OLD_TICKET;
01146 }
01147 if(ticket.TN > recverObj->mlogData->tProcessed){
01148 ticket.state = NEW_TICKET;
01149 } else {
01150 ticket.state = OLD_TICKET;
01151 }
01152
01153 if(ticket.TN == 0){
01154 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
01155 return false;
01156 }
01157
01158
01159 env->TN = ticket.TN;
01160 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
01161 return true;
01162
01163 };
01164
01165 bool fault_aware(CkObjID &recver){
01166 switch(recver.type){
01167 case TypeChare:
01168 return true;
01169 case TypeMainChare:
01170 return false;
01171 case TypeGroup:
01172 case TypeNodeGroup:
01173 case TypeArray:
01174 return true;
01175 default:
01176 return false;
01177 }
01178 };
01179
01180
01181 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
01182 DEBUG_NOW(char recverString[100]);
01183 DEBUG_NOW(char senderString[100]);
01184 DEBUG_MEM(CmiMemoryCheck());
01185 int flag;
01186 bool ticketSuccess;
01187
01188
01189 CkObjID recver = env->recver;
01190
01191
01192 if(env->flags & CK_BYPASS_DET_MLOG){
01193 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
01194 return 1;
01195 }
01196
01197
01198 if(!fault_aware(recver)){
01199 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
01200 return 1;
01201 }
01202
01203
01204 Chare *obj = (Chare *)recver.getObject();
01205 *objPointer = obj;
01206 if(obj == NULL){
01207 if(_recoveryFlag){
01208 int possiblePE = recver.guessPE();
01209 if(possiblePE != CkMyPe()){
01210 int totalSize = env->getTotalsize();
01211 CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
01212 DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
01213 }else{
01214
01215
01216 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01217 DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
01218 }
01219 return 0;
01220 } else {
01221 return 1;
01222 }
01223 }
01224
01225
01226 if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
01227 CmiFree(env);
01228 return 0;
01229 }
01230
01231 DEBUG_MEM(CmiMemoryCheck());
01232 DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01233
01234
01235 ticketSuccess = _getTicket(env,&flag);
01236
01237
01238 if(!ticketSuccess){
01239
01240
01241 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
01242 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
01243
01244 return 0;
01245 }
01246
01247
01248
01249
01250 if(flag == NEW_TICKET){
01251
01252 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
01253 }
01254
01255 DEBUG_MEM(CmiMemoryCheck());
01256
01257 double _startTime = CkWallTimer();
01258
01259 if(env->TN == obj->mlogData->tProcessed+1){
01260
01261 DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
01262
01263
01264 DEBUG_MEM(CmiMemoryCheck());
01265 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
01266 void *qMsgPtr;
01267 CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
01268 envelope *qEnv = (envelope *)qMsgPtr;
01269 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
01270 DEBUG_MEM(CmiMemoryCheck());
01271 }
01272
01273
01274
01275 DEBUG_MEM(CmiMemoryCheck());
01276 return 1;
01277 }
01278
01279
01280
01281 if(env->TN <= obj->mlogData->tProcessed){
01282 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
01283
01284 CmiFree(env);
01285 return 0;
01286 }
01287
01288
01289 DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01290
01291
01292 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01293
01294 DEBUG_MEM(CmiMemoryCheck());
01295
01296 return 0;
01297 }
01298
01302 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
01303 DEBUG(char senderString[100]);
01304 if(obj){
01305 if(sender.guessPE() == CkMyPe()){
01306 if(entry != NULL){
01307 entry->env = NULL;
01308 }
01309 }
01310 obj->mlogData->tProcessed++;
01311
01312
01313
01314 }
01315 DEBUG_MEM(CmiMemoryCheck());
01316 }
01317
01318
01319
01320
01321
01322 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
01323
01324 if(env->recver.type != TypeNodeGroup){
01325
01326
01327
01328
01329
01330 _skipCldEnqueue(destPE,env,_infoIdx);
01331 }else{
01332 _noCldNodeEnqueue(destPE,env);
01333 }
01334
01335 }
01336
01341 int calledRetryTicketRequest=0;
01342
01343 void _pingHandler(CkPingMsg *msg){
01344 printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
01345 CmiFree(msg);
01346 }
01347
01348
01349
01350
01351
01352
01353
01354 std::vector<TProcessedLog> processedTicketLog;
01355 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
01356 void clearUpMigratedRetainedLists(int PE);
01357
01358 void checkpointAlarm(void *_dummy,double curWallTime){
01359 double diff = curWallTime-lastCompletedAlarm;
01360 DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
01361
01362
01363
01364
01365 if(diff < ((chkptPeriod) - 2)){
01366 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
01367 return;
01368 }
01369 CheckpointRequest request;
01370 CmiInitMsgHeader(request.header, sizeof(CheckpointRequest));
01371 request.PE = CkMyPe();
01372 CmiSetHandler(&request,_checkpointRequestHandlerIdx);
01373 CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
01374 };
01375
01376 void _checkpointRequestHandler(CheckpointRequest *request){
01377 startMlogCheckpoint(NULL,CmiWallTimer());
01378 }
01379
01383 void startMlogCheckpoint(void *_dummy, double curWallTime){
01384 double _startTime = CkWallTimer();
01385
01386
01387 checkpointCount++;
01388 _recoveryFlag = false;
01389
01390 #if CMK_CONVERSE_MPI
01391 inCkptFlag = 1;
01392 #endif
01393
01394 #if DEBUG_CHECKPOINT
01395 if(CmiMyPe() == 0){
01396 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
01397 }
01398 #endif
01399
01400 DEBUG_MEM(CmiMemoryCheck());
01401
01402 PUP::sizer psizer;
01403 psizer | checkpointCount;
01404 for(int i=0; i<CmiNumPes(); i++){
01405 psizer | CpvAccess(_incarnation)[i];
01406 }
01407 CkPupROData(psizer);
01408 DEBUG_MEM(CmiMemoryCheck());
01409 CkPupGroupData(psizer,true);
01410 DEBUG_MEM(CmiMemoryCheck());
01411 CkPupNodeGroupData(psizer,true);
01412 DEBUG_MEM(CmiMemoryCheck());
01413 pupArrayElementsSkip(psizer,true,NULL);
01414 DEBUG_MEM(CmiMemoryCheck());
01415
01416 int dataSize = psizer.size();
01417 int totalSize = sizeof(CheckPointDataMsg)+dataSize;
01418 char *msg = (char *)CmiAlloc(totalSize);
01419 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01420 chkMsg->PE = CkMyPe();
01421 chkMsg->dataSize = dataSize;
01422
01423 char *buf = &msg[sizeof(CheckPointDataMsg)];
01424 PUP::toMem pBuf(buf);
01425
01426 pBuf | checkpointCount;
01427 for(int i=0; i<CmiNumPes(); i++){
01428 pBuf | CpvAccess(_incarnation)[i];
01429 }
01430 CkPupROData(pBuf);
01431 CkPupGroupData(pBuf,true);
01432 CkPupNodeGroupData(pBuf,true);
01433 pupArrayElementsSkip(pBuf,true,NULL);
01434
01435 unAckedCheckpoint=1;
01436 CmiSetHandler(msg,_storeCheckpointHandlerIdx);
01437 CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
01438
01439
01440
01441
01442 processedTicketLog.clear();
01443 forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
01444
01445 #if DEBUG_CHECKPOINT
01446 if(CmiMyPe() == 0){
01447 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
01448 }
01449 #endif
01450
01451 #if COLLECT_STATS_MEMORY
01452 CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
01453 msgLogSize = 0;
01454 bufferedDetsSize = 0;
01455 storedDetsSize = 0;
01456 #endif
01457
01458 if(CkMyPe() == 0 && onGoingLoadBalancing==0 ){
01459 lastCompletedAlarm = curWallTime;
01460 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
01461 }
01462 traceUserBracketEvent(28,_startTime,CkWallTimer());
01463 };
01464
01468 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
01469 DEBUG(char objString[100]);
01470
01471 std::vector<TProcessedLog> *log = (std::vector<TProcessedLog> *)data;
01472 TProcessedLog logEntry;
01473 logEntry.recver = mlogData->objID;
01474 logEntry.tProcessed = mlogData->tProcessed;
01475 log->push_back(logEntry);
01476
01477 DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
01478 }
01479
01480 class ElementPacker : public CkLocIterator {
01481 private:
01482 CkLocMgr *locMgr;
01483 PUP::er &p;
01484 public:
01485 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
01486 void addLocation(CkLocation &loc) {
01487 CkArrayIndexMax idx=loc.getIndex();
01488 CkGroupID gID = locMgr->ckGetGroupID();
01489 p|gID;
01490 p|idx;
01491 p|loc;
01492 }
01493 };
01494
01498 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listsize){
01499 int numElements,i;
01500 int numGroups = CkpvAccess(_groupIDTable)->size();
01501 if(!p.isUnpacking()){
01502 numElements = CkCountArrayElements();
01503 }
01504
01505 p | numElements;
01506 DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
01507 if(!p.isUnpacking()){
01508 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
01509 }else{
01510
01511
01512 for(int j=0;j<listsize;j++){
01513 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01514 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
01515 listToSkip[j].idx.print();
01516 }
01517 }
01518
01519 printf("numElements = %d\n",numElements);
01520
01521 for (int i=0; i<numElements; i++) {
01522 CkGroupID gID;
01523 CkArrayIndexMax idx;
01524 p|gID;
01525 p|idx;
01526 int flag=0;
01527 int matchedIdx=0;
01528 for(int j=0;j<listsize;j++){
01529 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01530 if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
01531 matchedIdx = j;
01532 flag = 1;
01533 break;
01534 }
01535 }
01536 }
01537 if(flag == 1){
01538 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01539 }else{
01540 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01541 }
01542
01543 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01544 CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
01545 mgr->resume(idx,p,create,flag);
01546 if(flag == 1){
01547 int homePE = mgr->homePe(idx);
01548 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
01549 }
01550 }
01551 }
01552 };
01553
01557 void readCheckpointFromDisk(int size, char *data){
01558 FILE *f = fopen(fName,"rb");
01559 fread(data,1,size,f);
01560 fclose(f);
01561 }
01562
01566 void writeCheckpointToDisk(int size, char *data){
01567 FILE *f = fopen(fName,"wb");
01568 fwrite(data, 1, size, f);
01569 fclose(f);
01570 }
01571
01572
01573
01574 void _storeCheckpointHandler(char *msg){
01575 double _startTime=CkWallTimer();
01576
01577 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01578 DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
01579
01580 char *chkpt = &msg[sizeof(CheckPointDataMsg)];
01581
01582 char *oldChkpt = CpvAccess(_storedCheckpointData)->buf;
01583 if(oldChkpt != NULL){
01584 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
01585 CmiFree(oldmsg);
01586 }
01587
01588 int sendingPE = chkMsg->PE;
01589
01590 CpvAccess(_storedCheckpointData)->buf = chkpt;
01591 CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
01592 CpvAccess(_storedCheckpointData)->PE = sendingPE;
01593
01594
01595 if(diskCkptFlag){
01596 writeCheckpointToDisk(chkMsg->dataSize,chkpt);
01597 CpvAccess(_storedCheckpointData)->buf = NULL;
01598 CmiFree(msg);
01599 }
01600
01601 int count=0;
01602 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01603 if(migratedNoticeList[j].fromPE == sendingPE){
01604 migratedNoticeList[j].ackFrom = 1;
01605 }else{
01606 CmiAssert("migratedNoticeList entry for processor other than buddy");
01607 }
01608 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01609 migratedNoticeList.erase(migratedNoticeList.begin() + j);
01610 count++;
01611 }
01612
01613 }
01614 DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
01615
01616 CheckPointAck ackMsg;
01617 CmiInitMsgHeader(ackMsg.header, sizeof(CheckPointAck));
01618 ackMsg.PE = CkMyPe();
01619 ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
01620 CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
01621 CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
01622
01623 traceUserBracketEvent(29,_startTime,CkWallTimer());
01624 };
01625
01626
01632 void sendRemoveLogRequests(){
01633 #if SYNCHRONIZED_CHECKPOINT
01634 CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
01635 #endif
01636 double _startTime = CkWallTimer();
01637
01638
01639 int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
01640 char *requestMsg = (char *)CmiAlloc(totalSize);
01641
01642
01643 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01644 request->PE = CkMyPe();
01645 request->numberObjects = processedTicketLog.size();
01646 char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
01647 memcpy(listProcessedLogs,(char *)processedTicketLog.data(),processedTicketLog.size()*sizeof(TProcessedLog));
01648 char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
01649 int *numDeterminants = (int *)listDeterminants;
01650 numDeterminants[0] = 0;
01651 listDeterminants = (char *)&numDeterminants[1];
01652
01653
01654 CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
01655
01656 DEBUG_MEM(CmiMemoryCheck());
01657 for(int i=0;i<CkNumPes();i++){
01658 CmiSyncSend(i,totalSize,requestMsg);
01659 }
01660 CmiFree(requestMsg);
01661
01662 clearUpMigratedRetainedLists(CmiMyPe());
01663
01664
01665 traceUserBracketEvent(30,_startTime,CkWallTimer());
01666
01667 DEBUG_MEM(CmiMemoryCheck());
01668 }
01669
01670
01671 void _checkpointAckHandler(CheckPointAck *ackMsg){
01672 DEBUG_MEM(CmiMemoryCheck());
01673 unAckedCheckpoint=0;
01674 DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
01675 DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));
01676 if(onGoingLoadBalancing){
01677 onGoingLoadBalancing = 0;
01678 finishedCheckpointLoadBalancing();
01679 }else{
01680 sendRemoveLogRequests();
01681 }
01682 CmiFree(ackMsg);
01683
01684 };
01685
01686
01690 inline void populateDeterminantTable(char *data){
01691 DEBUG(char recverString[100]);
01692 DEBUG(char senderString[100]);
01693 int numDets, *numDetsPtr;
01694 Determinant *detList;
01695 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01696 SNToTicket *tickets;
01697 Ticket ticket;
01698
01699 RemoveLogRequest *request = (RemoveLogRequest *)data;
01700 TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01701
01702 numDetsPtr = (int *)&list[request->numberObjects];
01703 numDets = numDetsPtr[0];
01704 detList = (Determinant *)&numDetsPtr[1];
01705
01706
01707 for(int i=0; i<numDets; i++){
01708 table = detTable.get(detList[i].sender);
01709 if(table == NULL){
01710 table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
01711 detTable.put(detList[i].sender) = table;
01712 }
01713 tickets = table->get(detList[i].receiver);
01714 if(tickets == NULL){
01715 tickets = new SNToTicket();
01716 table->put(detList[i].receiver) = tickets;
01717 }
01718 ticket.TN = detList[i].TN;
01719 tickets->put(detList[i].SN) = ticket;
01720 }
01721
01722 DEBUG_MEM(CmiMemoryCheck());
01723
01724 }
01725
01726 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
01727 int total;
01728 DEBUG(char nameString[100]);
01729 DEBUG_MEM(CmiMemoryCheck());
01730 CmiMemoryCheck();
01731 char *data = (char *)_data;
01732 RemoveLogRequest *request = (RemoveLogRequest *)data;
01733 TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01734 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
01735 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01736 SNToTicket *tickets;
01737 MCount TN;
01738
01739 int count=0;
01740 total = mlog->length();
01741 for(int i=0; i<total; i++){
01742 MlogEntry *logEntry = mlog->deq();
01743
01744
01745 table = detTable.get(logEntry->env->sender);
01746 if(table != NULL){
01747 tickets = table->get(logEntry->env->recver);
01748 if(tickets != NULL){
01749 TN = tickets->get(logEntry->env->SN).TN;
01750 if(TN != 0){
01751 logEntry->env->TN = TN;
01752 }
01753 }
01754 }
01755
01756 int match=0;
01757 for(int j=0;j<request->numberObjects;j++){
01758 if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
01759
01760 match = 1;
01761 break;
01762 }
01763 }
01764 char senderString[100],recverString[100];
01765
01766 if(match){
01767 count++;
01768 delete logEntry;
01769 }else{
01770 mlog->enq(logEntry);
01771 }
01772 }
01773 if(count > 0){
01774 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
01775 }
01776 DEBUG_MEM(CmiMemoryCheck());
01777 CmiMemoryCheck();
01778 }
01779
01783 void _removeProcessedLogHandler(char *requestMsg){
01784 double start = CkWallTimer();
01785
01786
01787 populateDeterminantTable(requestMsg);
01788
01789
01790 forAllCharesDo(removeProcessedLogs,requestMsg);
01791
01792
01793
01794
01795 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01796 DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
01797
01798
01799
01800
01801 DEBUG_MEM(CmiMemoryCheck());
01802 clearUpMigratedRetainedLists(request->PE);
01803
01804 traceUserBracketEvent(20,start,CkWallTimer());
01805 CmiFree(requestMsg);
01806 };
01807
01808
01809 void clearUpMigratedRetainedLists(int PE){
01810 int count=0;
01811 CmiMemoryCheck();
01812
01813 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01814 if(migratedNoticeList[j].toPE == PE){
01815 migratedNoticeList[j].ackTo = 1;
01816 }
01817 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01818 migratedNoticeList.erase(migratedNoticeList.begin() + j);
01819 count++;
01820 }
01821 }
01822 DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
01823
01824 for(int j=retainedObjectList.size()-1;j>=0;j--){
01825 if(retainedObjectList[j]->migRecord.toPE == PE){
01826 RetainedMigratedObject *obj = retainedObjectList[j];
01827 DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
01828 retainedObjectList.erase(retainedObjectList.begin() + j);
01829 if(obj->msg != NULL){
01830 CmiMemoryCheck();
01831 CmiFree(obj->msg);
01832 }
01833 delete obj;
01834 }
01835 }
01836 }
01837
01838
01839
01840
01841
01847 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
01848 RestartRequest msg;
01849 CmiInitMsgHeader(msg.header, sizeof(RestartRequest));
01850 fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
01851
01852
01853 _restartFlag = true;
01854 _recoveryFlag = true;
01855 _numRestartResponses = 0;
01856
01857
01858 if(teamSize > 1){
01859 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
01860 if(i != CkMyPe() && i < CkNumPes()){
01861
01862 msg.PE = CkMyPe();
01863 CmiSetHandler(&msg,_restartHandlerIdx);
01864 CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
01865 }
01866 }
01867 }
01868
01869
01870 msg.PE = CkMyPe();
01871 CmiSetHandler(&msg,_getCheckpointHandlerIdx);
01872 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01873 };
01874
01879 void _restartHandler(RestartRequest *restartMsg){
01880 int i;
01881 int numGroups = CkpvAccess(_groupIDTable)->size();
01882 RestartRequest msg;
01883 CmiInitMsgHeader(msg.header, sizeof(RestartRequest));
01884 fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
01885
01886
01887 _restartFlag = true;
01888 _numRestartResponses = 0;
01889
01890
01891
01892
01893
01894
01895
01896
01897
01898
01899
01900
01901
01902 msg.PE = CkMyPe();
01903 CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
01904 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01905 }
01906
01907
01911 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
01912
01913
01914 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01915
01916
01917 CkAssert(restartMsg->PE == storedChkpt->PE);
01918
01919 storedRequest = restartMsg;
01920 verifyAckTotal = 0;
01921
01922 for(int i=0;i<migratedNoticeList.size();i++){
01923 if(migratedNoticeList[i].fromPE == restartMsg->PE){
01924
01925 if(migratedNoticeList[i].ackFrom == 0){
01926
01927
01928 VerifyAckMsg msg;
01929 CmiInitMsgHeader(msg.header, sizeof(VerifyAckMsg));
01930 msg.migRecord = migratedNoticeList[i];
01931 msg.index = i;
01932 msg.fromPE = CmiMyPe();
01933 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
01934 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
01935 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
01936 verifyAckTotal++;
01937 }
01938 }
01939 }
01940
01941
01942 if(verifyAckTotal == 0){
01943 sendCheckpointData(MLOG_RESTARTED);
01944 }
01945 verifyAckCount = 0;
01946 }
01947
01951 void _recvRestartCheckpointHandler(char *_restartData){
01952 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
01953 MigrationRecord *migratedAwayElements;
01954
01955 globalLBID = restartData->lbGroupID;
01956
01957 restartData->restartWallTime *= 1000;
01958 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
01959 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
01960 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
01961
01962
01963 printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
01964 char *buf = &_restartData[sizeof(RestartProcessorData)];
01965
01966 if(restartData->numMigratedAwayElements != 0){
01967 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
01968 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
01969 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
01970 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
01971 }
01972
01973
01974 forAllCharesDo(setTeamRecovery,NULL);
01975
01976 PUP::fromMem pBuf(buf);
01977 pBuf | checkpointCount;
01978 for(int i=0; i<CmiNumPes(); i++){
01979 pBuf | CpvAccess(_incarnation)[i];
01980 }
01981 CkPupROData(pBuf);
01982 CkPupGroupData(pBuf,false);
01983 CkPupNodeGroupData(pBuf,false);
01984 pupArrayElementsSkip(pBuf,false,NULL);
01985 CkAssert(pBuf.size() == restartData->checkPointSize);
01986 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
01987
01988
01989 for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
01990 CpvAccess(_incarnation)[i]++;
01991 }
01992
01993
01994 forAllCharesDo(unsetTeamRecovery,NULL);
01995
01996
01997 forAllCharesDo(initializeRestart,NULL);
01998
01999 CmiFree(_restartData);
02000
02001
02002
02003
02004
02005
02006
02007
02008
02009
02010
02011
02012
02013
02014
02015
02016
02017
02018 std::vector<TProcessedLog> objectVec;
02019 forAllCharesDo(createObjIDList, (void *)&objectVec);
02020 int numberObjects = objectVec.size();
02021
02022
02023
02024
02025 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02026 char *resendMsg = (char *)CmiAlloc(totalSize);
02027
02028 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02029 resendReq->PE =CkMyPe();
02030 resendReq->numberObjects = numberObjects;
02031 char *objList = &resendMsg[sizeof(ResendRequest)];
02032 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02033
02034
02035
02036
02037
02038
02039
02040
02041
02042
02043
02044
02045
02046 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02047 CpvAccess(_currentObj) = lb;
02048 lb->ReceiveDummyMigration(restartDecisionNumber);
02049
02050 sleep(10);
02051
02052 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02053 for(int i=0;i<CkNumPes();i++){
02054 if(i != CkMyPe()){
02055 CmiSyncSend(i,totalSize,resendMsg);
02056 }
02057 }
02058 _resendMessagesHandler(resendMsg);
02059
02060 }
02061
02062
02063 void CkMlogRestartDouble(void *,double){
02064 CkMlogRestart(NULL,NULL);
02065 };
02066
02067
02068 void CkMlogRestartLocal(){
02069 CkMlogRestart(NULL,NULL);
02070 };
02071
02075 void _getCheckpointHandler(RestartRequest *restartMsg){
02076
02077
02078 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02079
02080
02081 CkAssert(restartMsg->PE == storedChkpt->PE);
02082
02083 storedRequest = restartMsg;
02084 verifyAckTotal = 0;
02085
02086 for(int i=0;i<migratedNoticeList.size();i++){
02087 if(migratedNoticeList[i].fromPE == restartMsg->PE){
02088
02089 if(migratedNoticeList[i].ackFrom == 0){
02090
02091
02092 VerifyAckMsg msg;
02093 CmiInitMsgHeader(msg.header, sizeof(VerifyAckMsg));
02094 msg.migRecord = migratedNoticeList[i];
02095 msg.index = i;
02096 msg.fromPE = CmiMyPe();
02097 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
02098 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
02099 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
02100 verifyAckTotal++;
02101 }
02102 }
02103 }
02104
02105
02106 if(verifyAckTotal == 0){
02107 sendCheckpointData(MLOG_CRASHED);
02108 }
02109 verifyAckCount = 0;
02110 }
02111
02112
02113 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
02114 CkLocMgr *locMgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
02115 CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
02116 if(rec != NULL) {
02117
02118
02119 CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
02120
02121 LBDatabase *lbdb = rec->getLBDB();
02122 LDObjHandle ldHandle = rec->getLdHandle();
02123
02124 locMgr->setDuringMigration(true);
02125
02126 locMgr->reclaim(verifyRequest->migRecord.idx);
02127 lbdb->UnregisterObj(ldHandle);
02128
02129 locMgr->setDuringMigration(false);
02130
02131 verifyAckedRequests++;
02132
02133 }
02134 CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
02135 CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
02136 };
02137
02138
02139 void _verifyAckHandler(VerifyAckMsg *verifyReply){
02140 int index = verifyReply->index;
02141 migratedNoticeList[index] = verifyReply->migRecord;
02142 verifyAckCount++;
02143 CmiPrintf("[%d] VerifyReply received %d for gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
02144 if(verifyAckCount == verifyAckTotal){
02145 sendCheckpointData(MLOG_CRASHED);
02146 }
02147 }
02148
02149
02155 void sendCheckpointData(int mode){
02156 RestartRequest *restartMsg = storedRequest;
02157 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02158 int numMigratedAwayElements = migratedNoticeList.size();
02159 if(migratedNoticeList.size() != 0){
02160 printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
02161
02162 }
02163
02164 int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
02165
02166 DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
02167 CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
02168
02169 totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
02170
02171 char *msg = (char *)CmiAlloc(totalSize);
02172
02173 RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
02174 dataMsg->PE = CkMyPe();
02175 dataMsg->restartWallTime = CmiTimer();
02176 dataMsg->checkPointSize = storedChkpt->bufSize;
02177
02178 dataMsg->numMigratedAwayElements = numMigratedAwayElements;
02179
02180
02181 dataMsg->numMigratedInElements = 0;
02182 dataMsg->migratedElementSize = 0;
02183 dataMsg->lbGroupID = globalLBID;
02184
02185
02186
02187
02188
02189 char *buf = &msg[sizeof(RestartProcessorData)];
02190
02191 if(dataMsg->numMigratedAwayElements != 0){
02192 memcpy(buf,migratedNoticeList.data(),migratedNoticeList.size()*sizeof(MigrationRecord));
02193 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
02194 }
02195
02196 if(diskCkptFlag){
02197 readCheckpointFromDisk(storedChkpt->bufSize,buf);
02198 } else {
02199 memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
02200 }
02201 buf = &buf[storedChkpt->bufSize];
02202
02203 if(mode == MLOG_RESTARTED){
02204 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
02205 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02206 CmiFree(restartMsg);
02207 }else{
02208 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
02209 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02210 CmiFree(restartMsg);
02211 }
02212 };
02213
02214
02215
02216
02217
02218 void createObjIDList(void *data,ChareMlogData *mlogData){
02219 std::vector<TProcessedLog> *list = (std::vector<TProcessedLog> *)data;
02220 TProcessedLog entry;
02221 entry.recver = mlogData->objID;
02222 entry.tProcessed = mlogData->tProcessed;
02223 list->push_back(entry);
02224 DEBUG_TEAM(char objString[100]);
02225 DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
02226 DEBUG_RECOVERY(printLog(&entry));
02227 }
02228
02229
02234 void _recvCheckpointHandler(char *_restartData){
02235 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
02236 MigrationRecord *migratedAwayElements;
02237
02238 globalLBID = restartData->lbGroupID;
02239
02240 restartData->restartWallTime *= 1000;
02241 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
02242 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
02243 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
02244
02245
02246 printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
02247 char *buf = &_restartData[sizeof(RestartProcessorData)];
02248
02249 if(restartData->numMigratedAwayElements != 0){
02250 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
02251 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
02252 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
02253 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
02254 }
02255
02256 PUP::fromMem pBuf(buf);
02257
02258 pBuf | checkpointCount;
02259 for(int i=0; i<CmiNumPes(); i++){
02260 pBuf | CpvAccess(_incarnation)[i];
02261 }
02262 CkPupROData(pBuf);
02263 CkPupGroupData(pBuf,true);
02264 CkPupNodeGroupData(pBuf,true);
02265 pupArrayElementsSkip(pBuf,true,NULL);
02266 CkAssert(pBuf.size() == restartData->checkPointSize);
02267 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
02268
02269
02270 CpvAccess(_incarnation)[CmiMyPe()]++;
02271
02272 forAllCharesDo(initializeRestart,NULL);
02273
02274 CmiFree(_restartData);
02275
02276 _initDone();
02277
02278 getGlobalStep(globalLBID);
02279
02280
02281 _numRestartResponses = 0;
02282
02283
02284 std::vector<TProcessedLog> objectVec;
02285 forAllCharesDo(createObjIDList, (void *)&objectVec);
02286 int numberObjects = objectVec.size();
02287
02288
02289 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02290 char *resendMsg = (char *)CmiAlloc(totalSize);
02291
02292 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02293 resendReq->PE =CkMyPe();
02294 resendReq->numberObjects = numberObjects;
02295 char *objList = &resendMsg[sizeof(ResendRequest)];
02296 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02297
02298 CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
02299 CmiSyncBroadcastAndFree(totalSize,resendMsg);
02300
02301 }
02302
02307 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
02308 countUpdateHomeAcks++;
02309 CmiFree(updateHomeAck);
02310
02311 if(countUpdateHomeAcks != CmiNumPes()){
02312 return;
02313 }
02314
02315
02316 std::vector<TProcessedLog> objectVec;
02317 forAllCharesDo(createObjIDList, (void *)&objectVec);
02318 int numberObjects = objectVec.size();
02319
02320
02321 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02322 char *resendMsg = (char *)CmiAlloc(totalSize);
02323
02324 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02325 resendReq->PE =CkMyPe();
02326 resendReq->numberObjects = numberObjects;
02327 char *objList = &resendMsg[sizeof(ResendRequest)];
02328 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02329
02330
02331 if(fastRecovery){
02332 distributeRestartedObjects();
02333 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02334 }
02335
02336
02337
02338
02339
02340
02341 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02342 CpvAccess(_currentObj) = lb;
02343 lb->ReceiveDummyMigration(restartDecisionNumber);
02344
02345
02346
02347 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02348 CmiSyncBroadcastAllAndFree(totalSize, resendMsg);
02349
02350 };
02351
02355 void initializeRestart(void *data, ChareMlogData *mlogData){
02356 mlogData->resendReplyRecvd = 0;
02357 mlogData->receivedTNs = new std::vector<MCount>;
02358 mlogData->restartFlag = true;
02359 };
02360
02364 void updateHomePE(void *data,ChareMlogData *mlogData){
02365 RestartRequest *updateRequest = (RestartRequest *)data;
02366 int PE = updateRequest->PE;
02367
02368
02369 if(mlogData->objID.type == TypeArray){
02370
02371 CkGroupID myGID = mlogData->objID.data.array.id;
02372 CkArrayIndexMax myIdx = mlogData->objID.data.array.idx.asChild();
02373 CkArrayID aid(mlogData->objID.data.array.id);
02374
02375 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
02376 if(locMgr->homePe(myIdx) == PE){
02377 DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
02378 DEBUG_RESTART(myIdx.print());
02379 informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
02380 }
02381 }
02382 };
02383
02384
02388 void _updateHomeRequestHandler(RestartRequest *updateRequest){
02389 int sender = updateRequest->PE;
02390
02391 forAllCharesDo(updateHomePE,updateRequest);
02392
02393 updateRequest->PE = CmiMyPe();
02394 CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
02395 CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
02396 if(sender == getCheckPointPE() && unAckedCheckpoint==1){
02397 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
02398 checkpointCount--;
02399 startMlogCheckpoint(NULL,0);
02400 }
02401 if(sender == getCheckPointPE()){
02402 for(int i=0;i<retainedObjectList.size();i++){
02403 if(retainedObjectList[i]->acked == 0){
02404 MigrationNotice migMsg;
02405 CmiInitMsgHeader(migMsg.header, sizeof(MigrationNotice));
02406 migMsg.migRecord = retainedObjectList[i]->migRecord;
02407 migMsg.record = retainedObjectList[i];
02408 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
02409 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
02410 }
02411 }
02412 }
02413 }
02414
02418 void fillTicketForChare(void *data, ChareMlogData *mlogData){
02419 DEBUG(char name[100]);
02420 ResendData *resendData = (ResendData *)data;
02421 int PE = resendData->PE;
02422 int count=0;
02423 CkHashtableIterator *iterator;
02424 void *objp;
02425 void *objkey;
02426 CkObjID *objID;
02427 SNToTicket *snToTicket;
02428 Ticket ticket;
02429
02430
02431 iterator = mlogData->teamTable.iterator();
02432 while( (objp = iterator->next(&objkey)) != NULL ){
02433 objID = (CkObjID *)objkey;
02434
02435
02436 for(int j=0;j<resendData->numberObjects;j++){
02437 if((*objID) == (resendData->listObjects)[j].recver){
02438 snToTicket = *(SNToTicket **)objp;
02439
02440 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
02441 ticket = snToTicket->get(snIndex);
02442 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
02443
02444 resendData->ticketVecs[j].push_back(ticket.TN);
02445 }
02446 }
02447 }
02448 }
02449 }
02450
02451
02452 delete iterator;
02453 }
02454
02455
02460 void setTeamRecovery(void *data, ChareMlogData *mlogData){
02461 char name[100];
02462 mlogData->teamRecoveryFlag = true;
02463 }
02464
02468 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
02469 mlogData->teamRecoveryFlag = false;
02470 }
02471
02475 void printLog(TProcessedLog *log){
02476 char recverString[100];
02477 CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
02478 }
02479
02483 void printMsg(envelope *env, const char* par){
02484 char senderString[100];
02485 char recverString[100];
02486 CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
02487 }
02488
02492 void printDet(Determinant *det, const char* par){
02493 char senderString[100];
02494 char recverString[100];
02495 CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
02496 }
02497
02503 void resendMessageForChare(void *data, ChareMlogData *mlogData){
02504 DEBUG_RESTART(char nameString[100]);
02505 DEBUG_RESTART(char recverString[100]);
02506 DEBUG_RESTART(char senderString[100]);
02507
02508 ResendData *resendData = (ResendData *)data;
02509 int PE = resendData->PE;
02510 int count=0;
02511 int ticketRequests=0;
02512 CkQ<MlogEntry *> *log = mlogData->getMlog();
02513
02514 DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
02515
02516
02517 for(int i=0;i<log->length();i++){
02518 MlogEntry *logEntry = (*log)[i];
02519
02520
02521
02522 envelope *env = logEntry->env;
02523 if(env == NULL){
02524 continue;
02525 }
02526
02527
02528 if(env->recver.type != TypeInvalid){
02529 for(int j=0;j<resendData->numberObjects;j++){
02530 if(env->recver == (resendData->listObjects)[j].recver){
02531 if(PE != CkMyPe()){
02532 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
02533 if(env->recver.type == TypeNodeGroup){
02534 CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
02535 }else{
02536 CmiSetHandler(env,CmiGetXHandler(env));
02537 CmiSyncSend(PE,env->getTotalsize(),(char *)env);
02538 }
02539 }else{
02540 envelope *copyEnv = copyEnvelope(env);
02541 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
02542 }
02543 DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
02544 count++;
02545 }
02546 }
02547
02548 }
02549 }
02550 DEBUG_RESTART(printf("[%d] Resent %d/%d (%d) messages from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)
02551 }
02552
02557 void _sendDetsHandler(char *msg){
02558 ResendData d;
02559 std::vector<Determinant> *detVec;
02560 ResendRequest *resendReq = (ResendRequest *)msg;
02561
02562
02563
02564
02565 CmiResetGlobalReduceSeqID();
02566
02567
02568 char *listObjects = &msg[sizeof(ResendRequest)];
02569 d.numberObjects = resendReq->numberObjects;
02570 d.PE = resendReq->PE;
02571 d.listObjects = (TProcessedLog *)listObjects;
02572 d.ticketVecs = new std::vector<MCount>[d.numberObjects];
02573 detVec = new std::vector<Determinant>[d.numberObjects];
02574
02575
02576
02577 std::vector<Determinant> *vec;
02578 for(int i=0; i<d.numberObjects; i++){
02579 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
02580 if(vec != NULL){
02581 for(int j=0; j<vec->size(); j++){
02582
02583
02584 if((*vec)[j].TN > d.listObjects[i].tProcessed){
02585
02586
02587 d.ticketVecs[i].push_back((*vec)[j].TN);
02588
02589
02590 detVec[i].push_back((*vec)[j]);
02591
02592 DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
02593 }
02594 }
02595 }
02596 }
02597
02598 int totalDetStored = 0;
02599 for(int i=0;i<d.numberObjects;i++){
02600 totalDetStored += detVec[i].size();
02601 }
02602
02603
02604
02605
02606
02607 int totalTNStored=0;
02608 for(int i=0;i<d.numberObjects;i++){
02609 totalTNStored += d.ticketVecs[i].size();
02610 }
02611
02612 int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
02613 char *resendReplyMsg = (char *)CmiAlloc(totalSize);
02614
02615 ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
02616 resendReply->PE = CkMyPe();
02617 resendReply->numberObjects = d.numberObjects;
02618
02619 char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
02620 CkObjID *replyObjects = (CkObjID *)replyListObjects;
02621 for(int i=0;i<d.numberObjects;i++){
02622 replyObjects[i] = d.listObjects[i].recver;
02623 }
02624
02625 char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
02626 for(int i=0;i<d.numberObjects;i++){
02627 int vecsize = d.ticketVecs[i].size();
02628 memcpy(ticketList,&vecsize,sizeof(int));
02629 ticketList = &ticketList[sizeof(int)];
02630 memcpy(ticketList,d.ticketVecs[i].data(),sizeof(MCount)*vecsize);
02631 ticketList = &ticketList[sizeof(MCount)*vecsize];
02632 }
02633
02634
02635 for(int i=0;i<d.numberObjects;i++){
02636 int vecsize = detVec[i].size();
02637 memcpy(ticketList,&vecsize,sizeof(int));
02638 ticketList = &ticketList[sizeof(int)];
02639 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
02640 ticketList = &ticketList[sizeof(Determinant)*vecsize];
02641 }
02642
02643 CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
02644 CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
02645
02646 delete [] detVec;
02647 delete [] d.ticketVecs;
02648
02649 DEBUG_MEM(CmiMemoryCheck());
02650
02651 if(resendReq->PE != CkMyPe()){
02652 CmiFree(msg);
02653 }
02654
02655 lastRestart = CmiWallTimer();
02656
02657 }
02658
02663 void _resendMessagesHandler(char *msg){
02664 ResendData d;
02665 ResendRequest *resendReq = (ResendRequest *)msg;
02666
02667
02668
02669
02670 char *listObjects = &msg[sizeof(ResendRequest)];
02671 d.numberObjects = resendReq->numberObjects;
02672 d.PE = resendReq->PE;
02673 d.listObjects = (TProcessedLog *)listObjects;
02674
02675 DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
02676
02677
02678
02679 if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
02680 forAllCharesDo(fillTicketForChare,&d);
02681 else
02682 forAllCharesDo(resendMessageForChare,&d);
02683
02684 DEBUG_MEM(CmiMemoryCheck());
02685
02686
02687 CmiFree(msg);
02688
02689
02690 lastRestart = CmiWallTimer();
02691 }
02692
02693 MCount maxVec(std::vector<MCount> *TNvec);
02694 void sortVec(std::vector<MCount> *TNvec);
02695 int searchVec(std::vector<MCount> *TNVec,MCount searchTN);
02696
02700 void processDelayedRemoteMsgQueue(){
02701 DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
02702
02703 while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
02704 void *qMsgPtr;
02705 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
02706 envelope *qEnv = (envelope *)qMsgPtr;
02707 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
02708 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
02709 DEBUG_MEM(CmiMemoryCheck());
02710 }
02711
02712 }
02713
02719 void _sendDetsReplyHandler(char *msg){
02720 ResendRequest *resendReply = (ResendRequest *)msg;
02721 CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
02722 char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
02723
02724
02725 DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
02726
02727 for(int i =0; i< resendReply->numberObjects;i++){
02728 Chare *obj = (Chare *)listObjects[i].getObject();
02729
02730 int vecsize;
02731 memcpy(&vecsize,listTickets,sizeof(int));
02732 listTickets = &listTickets[sizeof(int)];
02733 MCount *listTNs = (MCount *)listTickets;
02734 listTickets = &listTickets[vecsize*sizeof(MCount)];
02735
02736 if(obj != NULL){
02737
02738 processReceivedTN(obj,vecsize,listTNs);
02739 }else{
02740
02741 int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
02742 char *TNMsg = (char *)CmiAlloc(totalSize);
02743 ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
02744 receivedTNData->recver = listObjects[i];
02745 receivedTNData->numTNs = vecsize;
02746 char *tnList = &TNMsg[sizeof(ReceivedTNData)];
02747 memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
02748 CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
02749 CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
02750 }
02751
02752 }
02753
02754
02755 for(int i = 0; i < resendReply->numberObjects; i++){
02756 Chare *obj = (Chare *)listObjects[i].getObject();
02757
02758 int vecsize;
02759 memcpy(&vecsize,listTickets,sizeof(int));
02760 listTickets = &listTickets[sizeof(int)];
02761 Determinant *listDets = (Determinant *)listTickets;
02762 listTickets = &listTickets[vecsize*sizeof(Determinant)];
02763
02764 if(obj != NULL){
02765
02766 processReceivedDet(obj,vecsize,listDets);
02767 } else {
02768
02769
02770 int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
02771 char *detMsg = (char *)CmiAlloc(totalSize);
02772 ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
02773 receivedDetData->recver = listObjects[i];
02774 receivedDetData->numDets = vecsize;
02775 char *detList = &detMsg[sizeof(ReceivedDetData)];
02776 memcpy(detList,listDets,sizeof(Determinant)*vecsize);
02777 CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
02778 CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
02779 }
02780
02781 }
02782
02783
02784 _numRestartResponses++;
02785 if(_numRestartResponses != CkNumPes())
02786 return;
02787 else
02788 _numRestartResponses = 0;
02789
02790
02791 std::vector<TProcessedLog> objectVec;
02792 forAllCharesDo(createObjIDList, (void *)&objectVec);
02793 int numberObjects = objectVec.size();
02794
02795
02796 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02797 char *resendMsg = (char *)CmiAlloc(totalSize);
02798
02799 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02800 resendReq->PE =CkMyPe();
02801 resendReq->numberObjects = numberObjects;
02802 char *objList = &resendMsg[sizeof(ResendRequest)];
02803 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02804
02805 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02806 CpvAccess(_currentObj) = lb;
02807 lb->ReceiveDummyMigration(restartDecisionNumber);
02808
02809
02810
02811
02812 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02813 CmiSyncBroadcastAllAndFree(totalSize, resendMsg);
02814
02815
02816 if(fastRecovery){
02817 distributeRestartedObjects();
02818 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02819 }
02820
02821
02822
02823 };
02824
02828 void _receivedDetDataHandler(ReceivedDetData *msg){
02829 DEBUG_NOW(char objName[100]);
02830 Chare *obj = (Chare *) msg->recver.getObject();
02831 if(obj){
02832 char *_msg = (char *)msg;
02833 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02834 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
02835 processReceivedDet(obj,msg->numDets,listDets);
02836 CmiFree(msg);
02837 }else{
02838 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
02839 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02840 }
02841 }
02842
02846 void _receivedTNDataHandler(ReceivedTNData *msg){
02847 DEBUG_NOW(char objName[100]);
02848 Chare *obj = (Chare *) msg->recver.getObject();
02849 if(obj){
02850 char *_msg = (char *)msg;
02851 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02852 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
02853 processReceivedTN(obj,msg->numTNs,listTNs);
02854 CmiFree(msg);
02855 }else{
02856 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
02857 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02858 }
02859 };
02860
02864 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
02865 Determinant *det;
02866
02867
02868 for(int i=0; i<listSize; i++){
02869 det = &listDets[i];
02870 if(CkMyPe() == 4) printDet(det,"RECOVERY");
02871 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
02872 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
02873 }
02874
02875 DEBUG_MEM(CmiMemoryCheck());
02876 }
02877
02881 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
02882
02883 obj->mlogData->resendReplyRecvd++;
02884
02885 DEBUG(char objName[100]);
02886 DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
02887
02888
02889
02890
02891
02892
02893 for(int j=0;j<listSize;j++){
02894 obj->mlogData->receivedTNs->push_back(listTNs[j]);
02895 }
02896
02897
02898
02899
02900
02901 if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
02902 obj->mlogData->resendReplyRecvd = 0;
02903
02904 #if VERIFY_DETS
02905
02906 sortVec(obj->mlogData->receivedTNs);
02907
02908
02909
02910 if(obj->mlogData->receivedTNs->size() > 0){
02911 int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
02912 int vecsize = obj->mlogData->receivedTNs->size();
02913 int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
02914
02915
02916 if(teamSize > 1){
02917 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
02918 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02919 }else{
02920 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02921 }
02922
02923 if(numberHoles == 0){
02924 }else{
02925 char objName[100];
02926 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
02927 obj->mlogData->numberHoles = numberHoles;
02928 obj->mlogData->ticketHoles = new MCount[numberHoles];
02929 int countHoles=0;
02930 for(int k=tProcessedIndex+1;k<vecsize;k++){
02931 if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
02932
02933 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
02934 DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
02935 obj->mlogData->ticketHoles[countHoles] = newTN;
02936 countHoles++;
02937 }
02938 }
02939 }
02940
02941 if(countHoles != numberHoles){
02942 char str[100];
02943 printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
02944 }
02945 CkAssert(countHoles == numberHoles);
02946 obj->mlogData->currentHoles = numberHoles;
02947 }
02948 }
02949 #else
02950 if(obj->mlogData->receivedTNs->size() > 0){
02951 obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
02952 }
02953 #endif
02954
02955 delete obj->mlogData->receivedTNs;
02956 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
02957 obj->mlogData->receivedTNs = NULL;
02958 obj->mlogData->restartFlag = false;
02959
02960
02961
02962 DEBUG_RESTART(char objString[100]);
02963 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
02964 }
02965
02966 }
02967
02969 MCount maxVec(std::vector<MCount> *TNvec){
02970 MCount max = 0;
02971 for(int i=0; i<TNvec->size(); i++){
02972 if((*TNvec)[i] > max)
02973 max = (*TNvec)[i];
02974 }
02975 return max;
02976 }
02977
02978 void sortVec(std::vector<MCount> *TNvec){
02979 std::sort(TNvec->begin(), TNvec->end());
02980 std::erase(std::unique(TNvec->begin(), TNvec->end()), TNvec->end());
02981 }
02982
02983 int searchVec(std::vector<MCount> *TNVec,MCount searchTN){
02984
02985 auto it = std::lower_bound(TNvec->begin(), TNvec->end(), searchTN);
02986 if (it == TNvec.end() || *it != searchTN)
02987 return -1;
02988 else
02989 return it - TNvec->begin();
02990 };
02991
02992
02993
02994
02995
02996
02997
02998
02999
03000
03001
03002 class ElementDistributor: public CkLocIterator{
03003 CkLocMgr *locMgr;
03004 int *targetPE;
03005
03006 void pupLocation(CkLocation &loc,PUP::er &p){
03007 CkArrayIndexMax idx=loc.getIndex();
03008 CkGroupID gID = locMgr->ckGetGroupID();
03009 p|gID;
03010 p|idx;
03011 p|loc;
03012 };
03013 public:
03014 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
03015
03016 void addLocation(CkLocation &loc){
03017
03018
03019 if(*targetPE == CkMyPe()){
03020 *targetPE = (*targetPE +1)%CkNumPes();
03021 return;
03022 }
03023
03024 CkArrayIndexMax idx = loc.getIndex();
03025 CkLocRec *rec = loc.getLocalRecord();
03026 CkLocMgr *locMgr = loc.getManager();
03027 std::vector<CkMigratable *> eltList;
03028
03029 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
03030 idx.print();
03031
03032
03033 CpvAccess(_numEmigrantRecObjs)++;
03034 locMgr->migratableList((CkLocRec *)rec,eltList);
03035 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
03036
03037
03038 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
03039 reductionMgr->incNumEmigrantRecObjs();
03040
03041
03042 PUP::sizer psizer;
03043 pupLocation(loc,psizer);
03044 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
03045 char *msg = (char *)CmiAlloc(totalSize);
03046 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
03047 distributeMsg->PE = CkMyPe();
03048 char *buf = &msg[sizeof(DistributeObjectMsg)];
03049 PUP::toMem pmem(buf);
03050 pmem.becomeDeleting();
03051 pupLocation(loc,pmem);
03052
03053 locMgr->setDuringMigration(true);
03054 delete rec;
03055 locMgr->setDuringMigration(false);
03056 locMgr->inform(idx,*targetPE);
03057
03058 CmiSetHandler(msg,_distributedLocationHandlerIdx);
03059 CmiSyncSendAndFree(*targetPE,totalSize,msg);
03060
03061 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
03062
03063
03064 *targetPE = *targetPE + 1;
03065 if(*targetPE > (CkMyPe() + parallelRecovery)){
03066 *targetPE = CkMyPe() + 1;
03067 }
03068 }
03069
03070 };
03071
03075 void distributeRestartedObjects(){
03076 int numGroups = CkpvAccess(_groupIDTable)->size();
03077 int i;
03078 int targetPE=CkMyPe()+1;
03079 CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
03080 };
03081
03085 void _sendBackLocationHandler(char *receivedMsg){
03086 printf("Array element received at processor %d after recovery\n",CkMyPe());
03087 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
03088 int sourcePE = distributeMsg->PE;
03089 char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
03090 PUP::fromMem pmem(buf);
03091 CkGroupID gID;
03092 CkArrayIndexMax idx;
03093 pmem |gID;
03094 pmem |idx;
03095 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03096 donotCountMigration=1;
03097 mgr->resume(idx,pmem,true);
03098 donotCountMigration=0;
03099 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
03100 printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
03101 idx.print();
03102
03103
03104 std::vector<CkMigratable *> eltList;
03105 CkLocRec *rec = mgr->elementRec(idx);
03106 mgr->migratableList((CkLocRec *)rec,eltList);
03107 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
03108 reductionMgr->decNumEmigrantRecObjs();
03109 reductionMgr->decGCount();
03110
03111
03112 CpvAccess(_numEmigrantRecObjs)--;
03113 if(CpvAccess(_numEmigrantRecObjs) == 0){
03114 (*resumeLbFnPtr)(centralLb);
03115 }
03116
03117 }
03118
03122 void _distributedLocationHandler(char *receivedMsg){
03123 printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
03124 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
03125 int sourcePE = distributeMsg->PE;
03126 char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
03127 PUP::fromMem pmem(buf);
03128 CkGroupID gID;
03129 CkArrayIndexMax idx;
03130 pmem |gID;
03131 pmem |idx;
03132 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03133 donotCountMigration=1;
03134 mgr->resume(idx,pmem,true);
03135 donotCountMigration=0;
03136 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
03137 printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
03138 idx.print();
03139
03140 CkLocRec *rec = mgr->elementRec(idx);
03141 CmiAssert(rec != NULL);
03142
03143
03144 CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,rec));
03145 CpvAccess(_numImmigrantRecObjs)++;
03146
03147 std::vector<CkMigratable *> eltList;
03148 mgr->migratableList((CkLocRec *)rec,eltList);
03149 for(int i=0;i<eltList.size();i++){
03150 if(eltList[i]->mlogData->toResumeOrNot && eltList[i]->mlogData->resumeCount < globalResumeCount){
03151 CpvAccess(_currentObj) = eltList[i];
03152 eltList[i]->mlogData->immigrantRecFlag = true;
03153 eltList[i]->mlogData->immigrantSourcePE = sourcePE;
03154
03155
03156 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
03157 reductionMgr->incNumImmigrantRecObjs();
03158 reductionMgr->decGCount();
03159
03160 eltList[i]->ResumeFromSync();
03161 }
03162 }
03163 }
03164
03165
03168 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
03169 DummyMigrationMsg buf;
03170 CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
03171 buf.flag = MLOG_OBJECT;
03172 buf.lbID = lbID;
03173 buf.mgrID = locMgrID;
03174 buf.idx = idx;
03175 buf.locationPE = locationPE;
03176 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03177 CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
03178 };
03179
03180
03185 void sendDummyMigrationCounts(int *dummyCounts){
03186 DummyMigrationMsg buf;
03187 CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
03188 buf.flag = MLOG_COUNT;
03189 buf.lbID = globalLBID;
03190 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03191 for(int i=0;i<CmiNumPes();i++){
03192 if(i != CmiMyPe() && dummyCounts[i] != 0){
03193 buf.count = dummyCounts[i];
03194 CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
03195 }
03196 }
03197 }
03198
03199
03203 void _dummyMigrationHandler(DummyMigrationMsg *msg){
03204 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03205 if(msg->flag == MLOG_OBJECT){
03206 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
03207 LDObjHandle h;
03208 lb->Migrated(h,1);
03209 }
03210 if(msg->flag == MLOG_COUNT){
03211 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
03212 msg->count -= verifyAckedRequests;
03213 for(int i=0;i<msg->count;i++){
03214 LDObjHandle h;
03215 lb->Migrated(h,1);
03216 }
03217 }
03218 verifyAckedRequests=0;
03219 CmiFree(msg);
03220 };
03221
03222
03223
03224
03225
03226
03227
03228
03229 class ElementCaller : public CkLocIterator {
03230 private:
03231 CkLocMgr *locMgr;
03232 MlogFn fnPointer;
03233 void *data;
03234 public:
03235 ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
03236 locMgr = _locMgr;
03237 fnPointer = _fnPointer;
03238 data = _data;
03239 };
03240 void addLocation(CkLocation &loc){
03241 std::vector<CkMigratable *> list;
03242 CkLocRec *local = loc.getLocalRecord();
03243 locMgr->migratableList (local,list);
03244 for(int i=0;i<list.size();i++){
03245 CkMigratable *migratableElement = list[i];
03246 fnPointer(data,migratableElement->mlogData);
03247 }
03248 }
03249 };
03250
03254 void forAllCharesDo(MlogFn fnPointer, void *data){
03255 int numGroups = CkpvAccess(_groupIDTable)->size();
03256 for(int i=0;i<numGroups;i++){
03257 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
03258 fnPointer(data,obj->mlogData);
03259 }
03260 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
03261 for(int i=0;i<numNodeGroups;i++){
03262 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
03263 fnPointer(data,obj->mlogData);
03264 }
03265 int i;
03266 CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
03267 };
03268
03269
03270
03271
03272
03273
03279 void initMlogLBStep(CkGroupID gid){
03280 DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
03281 countLBMigratedAway = 0;
03282 countLBToMigrate=0;
03283 onGoingLoadBalancing=1;
03284 migrationDoneCalled=0;
03285 checkpointBarrierCount=0;
03286 if(globalLBID.idx != 0){
03287 CmiAssert(globalLBID.idx == gid.idx);
03288 }
03289 globalLBID = gid;
03290 #if SYNCHRONIZED_CHECKPOINT
03291 garbageCollectMlog();
03292 #endif
03293 }
03294
03298 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
03299 CkArrayIndexMax idx = loc->getIndex();
03300 CkGroupID gID = locMgr->ckGetGroupID();
03301 p|gID;
03302 p|idx;
03303 p|*loc;
03304 };
03305
03309 void sendBackImmigrantRecObjs(){
03310 CkLocation *loc;
03311 CkLocMgr *locMgr;
03312 CkArrayIndexMax idx;
03313 CkLocRec *rec;
03314 PUP::sizer psizer;
03315 int targetPE;
03316 std::vector<CkMigratable *> eltList;
03317 CkReductionMgr *reductionMgr;
03318
03319
03320 for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
03321
03322
03323 loc = (*CpvAccess(_immigrantRecObjs))[i];
03324 idx = loc->getIndex();
03325 rec = loc->getLocalRecord();
03326 locMgr = loc->getManager();
03327 locMgr->migratableList(rec,eltList);
03328 targetPE = eltList[i]->mlogData->immigrantSourcePE;
03329
03330
03331 reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
03332 reductionMgr->decNumImmigrantRecObjs();
03333
03334 CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
03335 idx.print();
03336
03337
03338 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
03339
03340
03341 pupLocation(loc,locMgr,psizer);
03342 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
03343 char *msg = (char *)CmiAlloc(totalSize);
03344 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
03345 distributeMsg->PE = CkMyPe();
03346 char *buf = &msg[sizeof(DistributeObjectMsg)];
03347 PUP::toMem pmem(buf);
03348 pmem.becomeDeleting();
03349 pupLocation(loc,locMgr,pmem);
03350
03351 locMgr->setDuringMigration(true);
03352 delete rec;
03353 locMgr->setDuringMigration(false);
03354 locMgr->inform(idx,targetPE);
03355
03356
03357 CmiSetHandler(msg,_sendBackLocationHandlerIdx);
03358 CmiSyncSendAndFree(targetPE,totalSize,msg);
03359
03360
03361 delete loc;
03362
03363 CmiAssert(locMgr->lastKnown(idx) == targetPE);
03364
03365 }
03366
03367
03368 CpvAccess(_immigrantRecObjs)->clear();
03369 CpvAccess(_numImmigrantRecObjs) = 0;
03370
03371 }
03372
03377 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
03378 resumeLbFnPtr = _fnPtr;
03379 centralLb = _centralLb;
03380
03381
03382 if(CpvAccess(_numImmigrantRecObjs) > 0){
03383 sendBackImmigrantRecObjs();
03384 }
03385
03386
03387 if(CpvAccess(_numEmigrantRecObjs) > 0)
03388 return;
03389
03390
03391 (*resumeLbFnPtr)(centralLb);
03392 }
03393
03394 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
03395 DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03396 DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03397
03398 resumeLbFnPtr = _fnPtr;
03399 centralLb = _centralLb;
03400 migrationDoneCalled = 1;
03401 if(countLBToMigrate == countLBMigratedAway){
03402 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03403 startMlogCheckpoint(NULL,CmiWallTimer());
03404 }
03405 };
03406
03407 void finishedCheckpointLoadBalancing(){
03408 DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
03409
03410 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
03411 CmiSetHandler(msg,_checkpointBarrierHandlerIdx);
03412 CmiReduce(msg,CmiMsgHeaderSizeBytes,doNothingMsg);
03413 };
03414
03415
03416 void sendMlogLocation(int targetPE, envelope *env){
03417 #if !SYNCHRONIZED_CHECKPOINT
03418 void *_msg = EnvToUsr(env);
03419 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03420
03421 int existing = 0;
03422
03423
03424
03425 for(int i=0;i<retainedObjectList.size();i++){
03426 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
03427 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
03428 DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
03429 existing = 1;
03430 break;
03431 }
03432 }
03433
03434 if(existing){
03435 return;
03436 }
03437
03438 countLBToMigrate++;
03439
03440 MigrationNotice migMsg;
03441 CmiInitMsgHeader(migMsg.header, sizeof(MigrationNotice));
03442 migMsg.migRecord.gID = msg->gid;
03443 migMsg.migRecord.idx = msg->idx;
03444 migMsg.migRecord.fromPE = CkMyPe();
03445 migMsg.migRecord.toPE = targetPE;
03446
03447 DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
03448
03449 RetainedMigratedObject *retainedObject = new RetainedMigratedObject;
03450 retainedObject->migRecord = migMsg.migRecord;
03451 retainedObject->acked = 0;
03452
03453 CkPackMessage(&env);
03454
03455 migMsg.record = retainedObject;
03456 retainedObject->msg = env;
03457 int size = retainedObject->size = env->getTotalsize();
03458
03459 retainedObjectList.push_back(retainedObject);
03460
03461 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
03462 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
03463
03464 DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
03465 #endif
03466 }
03467
03468 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
03469 msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
03470 migratedNoticeList.push_back(msg->migRecord);
03471
03472 MigrationNoticeAck buf;
03473 CmiInitMsgHeader(buf.header, sizeof(MigrationNoticeAck));
03474 buf.record = msg->record;
03475 CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
03476 CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
03477 }
03478
03479 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
03480
03481 RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
03482 retainedObject->acked = 1;
03483
03484 CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
03485 CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
03486
03487
03488 CkGroupID gID = retainedObject->migRecord.gID ;
03489 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03490 informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
03491
03492 countLBMigratedAway++;
03493 if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
03494 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03495 startMlogCheckpoint(NULL,CmiWallTimer());
03496 }
03497 };
03498
03499 void _receiveMlogLocationHandler(void *buf){
03500 envelope *env = (envelope *)buf;
03501 DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
03502 CkUnpackMessage(&env);
03503 void *_msg = EnvToUsr(env);
03504 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03505 CkGroupID gID= msg->gid;
03506 DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
03507 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03508 CpvAccess(_currentObj)=mgr;
03509 mgr->immigrate(msg);
03510 };
03511
03512
03513 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
03514
03515
03516
03517
03518
03519
03520
03521
03522 }
03523
03527 void _checkpointBarrierHandler(CheckpointBarrierMsg *barrierMsg){
03528
03529
03530 CmiFree(barrierMsg);
03531
03532
03533 CheckpointBarrierMsg *msg = (CheckpointBarrierMsg *)CmiAlloc(sizeof(CheckpointBarrierMsg));
03534 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
03535 CmiSyncBroadcastAllAndFree(sizeof(CheckpointBarrierMsg),(char *)msg);
03536 }
03537
03538 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
03539 DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
03540 DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
03541
03542 #if !SYNCHRONIZED_CHECKPOINT
03543
03544 sendRemoveLogRequests();
03545 #endif
03546
03547 #if CMK_CONVERSE_MPI
03548 inCkptFlag = 0;
03549 #endif
03550
03551
03552 (*resumeLbFnPtr)(centralLb);
03553
03554
03555 CmiFree(msg);
03556 }
03557
03561 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
03562 int total;
03563 MlogEntry *logEntry;
03564 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
03565
03566
03567 total = mlog->length();
03568 for(int i=0; i<total; i++){
03569 logEntry = mlog->deq();
03570 delete logEntry;
03571 }
03572
03573 }
03574
03580 void garbageCollectMlog(){
03581 CkHashtableIterator *iterator;
03582 std::vector<Determinant> *detArray;
03583
03584 DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
03585
03586
03587 _indexBufferedDets = 0;
03588 _numBufferedDets = 0;
03589 _phaseBufferedDets++;
03590
03591
03592 iterator = CpvAccess(_remoteDets)->iterator();
03593 while(iterator->hasNext()){
03594 detArray = *(std::vector<Determinant> **)iterator->next();
03595 detArray->clear();
03596 }
03597
03598
03599 delete iterator;
03600
03601
03602 forAllCharesDo(garbageCollectMlogForChare, NULL);
03603 }
03604
03610 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
03611 double _startTime = CmiWallTimer();
03612 CurrentLocationMsg msg;
03613 CmiInitMsgHeader(msg.header, sizeof(CurrentLocationMsg));
03614 msg.mgrID = locMgrID;
03615 msg.idx = idx;
03616 msg.locationPE = currentPE;
03617 msg.fromPE = CkMyPe();
03618
03619 DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
03620 CmiSetHandler(&msg,_receiveLocationHandlerIdx);
03621 CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
03622 traceUserBracketEvent(37,_startTime,CmiWallTimer());
03623 }
03624
03625
03626 void _receiveLocationHandler(CurrentLocationMsg *data){
03627 double _startTime = CmiWallTimer();
03628 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
03629 if(mgr == NULL){
03630 CmiFree(data);
03631 return;
03632 }
03633 CkLocRec *rec = mgr->elementNrec(data->idx);
03634 DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
03635 if(rec != NULL){
03636 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe()){
03637 if(data->fromPE == data->locationPE){
03638 CmiAbort("Another processor has the same object");
03639 }
03640 }
03641 }
03642 if(rec!= NULL && data->fromPE != CmiMyPe()){
03643 int targetPE = data->fromPE;
03644 data->fromPE = CmiMyPe();
03645 data->locationPE = CmiMyPe();
03646 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
03647 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
03648 }else{
03649 mgr->inform(data->idx,data->locationPE);
03650 }
03651 CmiFree(data);
03652 traceUserBracketEvent(38,_startTime,CmiWallTimer());
03653 }
03654
03655
03656
03657 void getGlobalStep(CkGroupID gID){
03658 LBStepMsg msg;
03659 CmiInitMsgHeader(msg.header, sizeof(LBStepMsg));
03660 int destPE = 0;
03661 msg.lbID = gID;
03662 msg.fromPE = CmiMyPe();
03663 msg.step = -1;
03664 CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
03665 CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
03666 };
03667
03668 void _getGlobalStepHandler(LBStepMsg *msg){
03669 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03670 msg->step = lb->step();
03671 CmiAssert(msg->fromPE != CmiMyPe());
03672 CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
03673 CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
03674 CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
03675 };
03676
03680 void _recvGlobalStepHandler(LBStepMsg *msg){
03681
03682
03683 restartDecisionNumber = msg->step;
03684 CmiFree(msg);
03685
03686 CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
03687
03688
03689 ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
03690 resendReplyMsg->PE = CkMyPe();
03691 resendReplyMsg->numberObjects = 0;
03692 _sendDetsReplyHandler((char *)resendReplyMsg);
03693 };
03694
03698 void _messageLoggingExit(){
03699
03700
03701 if(CkMyPe() == 0)
03702 printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
03703
03704 #if COLLECT_STATS_LOGGING
03705 printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
03706 printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
03707 printf("[%d] MULTICAST MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalMcastLogSize/(float)MEGABYTE);
03708 printf("[%d] REDUCTION MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalReductionLogSize/(float)MEGABYTE);
03709 #endif
03710
03711 #if COLLECT_STATS_MSGS
03712 #if COLLECT_STATS_MSGS_TOTAL
03713 printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
03714 printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
03715 #else
03716 printf("[%d] TARGETS: ",CmiMyPe());
03717 for(int i=0; i<CmiNumPes(); i++){
03718 #if COLLECT_STATS_MSG_COUNT
03719 printf("%d ",numMsgsTarget[i]);
03720 #else
03721 printf("%d ",sizeMsgsTarget[i]);
03722 #endif
03723 }
03724 printf("\n");
03725 #endif
03726 #endif
03727
03728 #if COLLECT_STATS_DETS
03729 printf("\n");
03730 printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
03731 printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
03732 #if COLLECT_STATS_DETS_DUP
03733 printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
03734 #endif
03735 #endif
03736
03737 }
03738
03739 void MlogEntry::pup(PUP::er &p){
03740 p | destPE;
03741 p | _infoIdx;
03742 int size;
03743 if(!p.isUnpacking()){
03744
03745
03746
03747
03748 if(env == NULL){
03749
03750 size = 0;
03751 }else{
03752 size = env->getTotalsize();
03753 }
03754 }
03755
03756 p | size;
03757 if(p.isUnpacking()){
03758 if(size > 0){
03759 env = (envelope *)_allocEnv(ForChareMsg,size);
03760 }else{
03761 env = NULL;
03762 }
03763 }
03764 if(size > 0){
03765 p((char *)env,size);
03766 }
03767 };
03768
03769 void RestoredLocalMap::pup(PUP::er &p){
03770 p | minSN;
03771 p | maxSN;
03772 p | count;
03773 if(p.isUnpacking()){
03774 TNArray = new MCount[count];
03775 }
03776 p(TNArray,count);
03777 };
03778
03779
03780
03781
03782
03783
03784 MCount ChareMlogData::nextSN(const CkObjID &recver){
03785 MCount *SN = snTable.getPointer(recver);
03786 if(SN==NULL){
03787 snTable.put(recver) = 1;
03788 return 1;
03789 }else{
03790 (*SN)++;
03791 return *SN;
03792 }
03793 };
03794
03798 MCount ChareMlogData::newTN(){
03799 MCount TN;
03800 if(currentHoles > 0){
03801 int holeidx = numberHoles-currentHoles;
03802 TN = ticketHoles[holeidx];
03803 currentHoles--;
03804 if(currentHoles == 0){
03805 delete []ticketHoles;
03806 numberHoles = 0;
03807 }
03808 }else{
03809 TN = ++tCount;
03810 }
03811 return TN;
03812 };
03813
03817 inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
03818 Ticket ticket;
03819
03820 SNToTicket *ticketRow = ticketTable.get(sender);
03821 if(ticketRow != NULL){
03822 return ticketRow->get(SN);
03823 }
03824 return ticket;
03825 }
03826
03830 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
03831 Ticket ticket;
03832
03833 SNToTicket *ticketRow = ticketTable.get(sender);
03834 if(ticketRow != NULL){
03835 Ticket earlierTicket = ticketRow->get(SN);
03836 if(earlierTicket.TN != 0){
03837 CkAssert(earlierTicket.TN == TN);
03838 return;
03839 }
03840 }else{
03841 ticketRow = new SNToTicket();
03842 ticketTable.put(sender) = ticketRow;
03843 }
03844 ticket.TN = TN;
03845 ticketRow->put(SN) = ticket;
03846 }
03847
03851 inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
03852 DEBUG(char senderName[100];)
03853 DEBUG(char recverName[100];)
03854 double _startTime = CmiWallTimer();
03855 Ticket ticket;
03856
03857
03858 if(restartFlag){
03859 ticket.TN = 0;
03860 return ticket;
03861 }
03862
03863 SNToTicket *ticketRow = ticketTable.get(sender);
03864 if(ticketRow != NULL){
03865 Ticket earlierTicket = ticketRow->get(SN);
03866 if(earlierTicket.TN == 0){
03867 ticket.TN = newTN();
03868 ticketRow->put(SN) = ticket;
03869 DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
03870 }else{
03871 ticket.TN = earlierTicket.TN;
03872 if(ticket.TN > tCount){
03873 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03874 }
03875 CmiAssert(ticket.TN <= tCount);
03876 }
03877 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03878 }else{
03879 SNToTicket *newRow = new SNToTicket;
03880 ticket.TN = newTN();
03881 newRow->put(SN) = ticket;
03882 ticketTable.put(sender) = newRow;
03883 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
03884 }
03885 ticket.state = NEW_TICKET;
03886
03887
03888 return ticket;
03889 };
03890
03894 void ChareMlogData::addLogEntry(MlogEntry *entry){
03895 DEBUG(char nameString[100]);
03896 DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
03897 DEBUG_MEM(CmiMemoryCheck());
03898
03899
03900 mlog.enq(entry);
03901 };
03902
03903 double totalSearchRestoredTime=0;
03904 double totalSearchRestoredCount=0;
03905
03910 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
03911 double start= CkWallTimer();
03912 MCount TN=0;
03913
03914 DEBUG(char senderName[100]);
03915 DEBUG(char recverName[100]);
03916 DEBUG(if(TN != 0){ CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN);});
03917
03918 totalSearchRestoredTime += CkWallTimer()-start;
03919 totalSearchRestoredCount++;
03920 return TN;
03921 }
03922
03928 void ChareMlogData::pup(PUP::er &p){
03929 int tCountAux = 0;
03930 int startSize=0;
03931 char nameStr[100];
03932 if(p.isSizing()){
03933 PUP::sizer *sizep = (PUP::sizer *)&p;
03934 startSize = sizep->size();
03935 }
03936 double _startTime = CkWallTimer();
03937
03938 p | objID;
03939 if(teamRecoveryFlag)
03940 p | tCountAux;
03941 else
03942 p | tCount;
03943 p | tProcessed;
03944 if(p.isUnpacking()){
03945 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
03946 }
03947 p | toResumeOrNot;
03948 p | resumeCount;
03949 DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
03950
03951
03952
03953 int lengthReceivedTNs;
03954 if(!p.isUnpacking()){
03955 if(receivedTNs == NULL){
03956 lengthReceivedTNs = -1;
03957 }else{
03958 lengthReceivedTNs = receivedTNs->size();
03959 }
03960 }
03961
03962 p | lengthReceivedTNs;
03963 if(p.isUnpacking()){
03964 if(lengthReceivedTNs == -1){
03965 receivedTNs = NULL;
03966 }else{
03967 receivedTNs = new std::vector<MCount>;
03968 for(int i=0;i<lengthReceivedTNs;i++){
03969 MCount tempTicket;
03970 p | tempTicket;
03971 CkAssert(tempTicket > 0);
03972 receivedTNs->push_back(tempTicket);
03973 }
03974 }
03975 }else{
03976 for(int i=0;i<lengthReceivedTNs;i++){
03977 p | (*receivedTNs)[i];
03978 }
03979 }
03980
03981 p | currentHoles;
03982 p | numberHoles;
03983 if(p.isUnpacking()){
03984 if(numberHoles > 0){
03985 ticketHoles = new MCount[numberHoles];
03986 }else{
03987 ticketHoles = NULL;
03988 }
03989 }
03990 if(numberHoles > 0){
03991 p(ticketHoles,numberHoles);
03992 }
03993
03994 snTable.pup(p);
03995
03996
03997
03998
03999
04000
04001
04002
04003
04004
04005
04006
04007
04008
04009
04010
04011
04012
04013
04014
04015 p | resendReplyRecvd;
04016 p | restartFlag;
04017
04018
04019
04020
04021
04022
04023
04024
04025
04026
04027
04028
04029
04030
04031
04032
04033
04034
04035
04036
04037
04038
04039
04040
04041
04042
04043
04044
04045 {
04046 int ticketTableSize;
04047 if(!p.isUnpacking()){
04048 ticketTableSize = ticketTable.numObjects();
04049 }
04050
04051 p | ticketTableSize;
04052 if(!p.isUnpacking()){
04053 CkHashtableIterator *iter = ticketTable.iterator();
04054 while(iter->hasNext()){
04055 CkObjID *objID;
04056 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
04057 p | (*objID);
04058 (*ticketRow)->pup(p);
04059 }
04060
04061 delete iter;
04062 }else{
04063 for(int i=0;i<ticketTableSize;i++){
04064 CkObjID objID;
04065 p | objID;
04066 SNToTicket *ticketRow = new SNToTicket;
04067 ticketRow->pup(p);
04068 if(!teamRecoveryFlag)
04069 ticketTable.put(objID) = ticketRow;
04070 else
04071 delete ticketRow;
04072 }
04073 }
04074 }
04075
04076 if(p.isSizing()){
04077 PUP::sizer *sizep = (PUP::sizer *)&p;
04078 int pupSize = sizep->size()-startSize;
04079 DEBUG(char name[40]);
04080 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
04081
04082 }
04083
04084 double _finTime = CkWallTimer();
04085 DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
04086 };
04087
04088
04089
04090
04097 int getCheckPointPE(){
04098 return (CmiMyPe() + 1) % CmiNumPes();
04099 }
04100
04104 int getReverseCheckPointPE(){
04105 return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
04106 }
04107
04108
04109 envelope *copyEnvelope(envelope *env){
04110 envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
04111 memcpy(newEnv,env,env->getTotalsize());
04112 return newEnv;
04113 }
04114
04115
04116 inline bool isSameDet(Determinant *first, Determinant *second){
04117 return first->sender == second->sender && first->receiver == second->receiver && first->SN == second->SN && first->TN == second->TN;
04118 }
04119
04120 #endif