00001
00006 #include "charm.h"
00007 #include "ck.h"
00008 #include "ckmessagelogging.h"
00009 #include "queueing.h"
00010 #include <sys/types.h>
00011 #include <signal.h>
00012 #include "CentralLB.h"
00013
00014 #ifdef _FAULT_MLOG_
00015
00016
00017 #define DEBUG_MEM(x) //x
00018 #define DEBUG(x) //x
00019 #define DEBUGRESTART(x) //x
00020 #define DEBUGLB(x) // x
00021 #define DEBUG_TEAM(x) // x
00022
00023 #define BUFFERED_LOCAL
00024 #define BUFFERED_REMOTE
00025
00026 extern const char *idx2str(const CkArrayIndex &ind);
00027 extern const char *idx2str(const ArrayElement *el);
00028 const char *idx2str(const CkArrayIndex &ind){
00029 return idx2str((const CkArrayIndex &)ind);
00030 };
00031
00032 void getGlobalStep(CkGroupID gID);
00033
00034 bool fault_aware(CkObjID &recver);
00035 void sendCheckpointData(int mode);
00036 void createObjIDList(void *data,ChareMlogData *mlogData);
00037 inline bool isLocal(int destPE);
00038 inline bool isTeamLocal(int destPE);
00039
00040 int _restartFlag=0;
00041
00042
00043
00044 float MLOGFT_totalLogSize = 0.0;
00045 float MLOGFT_totalMessages = 0.0;
00046 float MLOGFT_totalObjects = 0.0;
00047
00048
00049 int countHashRefs=0;
00050 int countHashCollisions=0;
00051
00052
00053 char *checkpointDirectory=".";
00054 int unAckedCheckpoint=0;
00055
00056 int countLocal=0,countBuffered=0;
00057 int countPiggy=0;
00058 int countClearBufferedLocalCalls=0;
00059
00060 int countUpdateHomeAcks=0;
00061
00062 extern int teamSize;
00063 extern int chkptPeriod;
00064 extern bool parallelRestart;
00065
00066 char *killFile;
00067 int killFlag=0;
00068 int restartingMlogFlag=0;
00069 void readKillFile();
00070 double killTime=0.0;
00071 int checkpointCount=0;
00072
00073
00074 CpvDeclare(Chare *,_currentObj);
00075 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
00076 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
00077 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
00078 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
00079 CpvDeclare(Queue, _outOfOrderMessageQueue);
00080 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
00081
00082 CpvDeclare(char **,_bufferedTicketRequests);
00083 CpvDeclare(int *,_numBufferedTicketRequests);
00084 CpvDeclare(char *,_bufferTicketReply);
00085
00086
00087
00088 static double adjustChkptPeriod=0.0;
00089 static double nextCheckpointTime=0.0;
00090
00091 double lastBufferedLocalMessageCopyTime;
00092
00093 int _maxBufferedMessages;
00094 int _maxBufferedTicketRequests;
00095 int BUFFER_TIME=2;
00096
00097
00098 int _ticketRequestHandlerIdx;
00099 int _ticketHandlerIdx;
00100 int _localMessageCopyHandlerIdx;
00101 int _localMessageAckHandlerIdx;
00102 int _pingHandlerIdx;
00103 int _bufferedLocalMessageCopyHandlerIdx;
00104 int _bufferedLocalMessageAckHandlerIdx;
00105 int _bufferedTicketRequestHandlerIdx;
00106 int _bufferedTicketHandlerIdx;
00107
00108
00109 char objString[100];
00110 int _checkpointRequestHandlerIdx;
00111 int _storeCheckpointHandlerIdx;
00112 int _checkpointAckHandlerIdx;
00113 int _getCheckpointHandlerIdx;
00114 int _recvCheckpointHandlerIdx;
00115 int _removeProcessedLogHandlerIdx;
00116
00117 int _verifyAckRequestHandlerIdx;
00118 int _verifyAckHandlerIdx;
00119 int _dummyMigrationHandlerIdx;
00120
00121
00122 int _getGlobalStepHandlerIdx;
00123 int _recvGlobalStepHandlerIdx;
00124
00125 int _updateHomeRequestHandlerIdx;
00126 int _updateHomeAckHandlerIdx;
00127 int _resendMessagesHandlerIdx;
00128 int _resendReplyHandlerIdx;
00129 int _receivedTNDataHandlerIdx;
00130 int _distributedLocationHandlerIdx;
00131
00132
00133 int _restartHandlerIdx;
00134 int _getRestartCheckpointHandlerIdx;
00135 int _recvRestartCheckpointHandlerIdx;
00136 void setTeamRecovery(void *data, ChareMlogData *mlogData);
00137 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
00138
00139 int verifyAckTotal;
00140 int verifyAckCount;
00141
00142 int verifyAckedRequests=0;
00143
00144 RestartRequest *storedRequest;
00145
00146
00147
00148 int _falseRestart =0;
00155
00156 int _lockNewTicket=0;
00157
00158
00159
00160 int onGoingLoadBalancing=0;
00161 void *centralLb;
00162 void (*resumeLbFnPtr)(void *);
00163 int _receiveMlogLocationHandlerIdx;
00164 int _receiveMigrationNoticeHandlerIdx;
00165 int _receiveMigrationNoticeAckHandlerIdx;
00166 int _checkpointBarrierHandlerIdx;
00167 int _checkpointBarrierAckHandlerIdx;
00168
00169 CkVec<MigrationRecord> migratedNoticeList;
00170 CkVec<RetainedMigratedObject *> retainedObjectList;
00171 int donotCountMigration=0;
00172 int countLBMigratedAway=0;
00173 int countLBToMigrate=0;
00174 int migrationDoneCalled=0;
00175 int checkpointBarrierCount=0;
00176 int globalResumeCount=0;
00177 CkGroupID globalLBID;
00178 int restartDecisionNumber=-1;
00179
00180
00181 double lastCompletedAlarm=0;
00182 double lastRestart=0;
00183
00184
00185
00186 int _receiveLocationHandlerIdx;
00187
00188
00189
00190
00191 void _messageLoggingInit(){
00192
00193 CpvInitialize(Chare *,_currentObj);
00194
00195
00196 _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
00197 _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
00198 _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
00199 _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
00200 _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
00201 _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
00202 _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
00203 _bufferedTicketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
00204 _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
00205
00206
00207
00208 _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
00209 _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
00210 _removeProcessedLogHandlerIdx = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
00211 _checkpointRequestHandlerIdx = CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
00212
00213
00214
00215 _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
00216 _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
00217 _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
00218 _updateHomeAckHandlerIdx = CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
00219 _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
00220 _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
00221 _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
00222 _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
00223 _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
00224 _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
00225 _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
00226
00227
00228 _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
00229 _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
00230 _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
00231
00232
00233
00234 _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
00235 _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
00236 _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
00237 _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
00238 _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
00239 _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
00240 _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
00241 _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
00242 _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
00243
00244
00245
00246 _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
00247
00248
00249 CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
00250 CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
00251 CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
00252 CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
00253 CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
00254 CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
00255 CpvInitialize(Queue, _outOfOrderMessageQueue);
00256 CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
00257 CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
00258 CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
00259
00260 CpvInitialize(char **,_bufferedTicketRequests);
00261 CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
00262 CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
00263 for(int i=0;i<CkNumPes();i++){
00264 CpvAccess(_bufferedTicketRequests)[i]=NULL;
00265 CpvAccess(_numBufferedTicketRequests)[i]=0;
00266 }
00267 CpvInitialize(char *,_bufferTicketReply);
00268 CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
00269
00270
00271 CcdCallFnAfter(retryTicketRequest,NULL,100);
00272
00273
00274
00275 CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
00276 CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
00277
00278
00279
00280
00281 if(CkMyPe() == 0){
00282
00283 #ifdef BUFFERED_LOCAL
00284 if(CmiMyPe() == 0){
00285 printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
00286 }
00287 #endif
00288 }
00289 #ifdef BUFFERED_REMOTE
00290 if(CmiMyPe() == 0){
00291 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
00292 }
00293 #endif
00294
00295 traceRegisterUserEvent("Remove Logs", 20);
00296 traceRegisterUserEvent("Ticket Request Handler", 21);
00297 traceRegisterUserEvent("Ticket Handler", 22);
00298 traceRegisterUserEvent("Local Message Copy Handler", 23);
00299 traceRegisterUserEvent("Local Message Ack Handler", 24);
00300 traceRegisterUserEvent("Preprocess current message",25);
00301 traceRegisterUserEvent("Preprocess past message",26);
00302 traceRegisterUserEvent("Preprocess future message",27);
00303 traceRegisterUserEvent("Checkpoint",28);
00304 traceRegisterUserEvent("Checkpoint Store",29);
00305 traceRegisterUserEvent("Checkpoint Ack",30);
00306
00307 traceRegisterUserEvent("Send Ticket Request",31);
00308 traceRegisterUserEvent("Generalticketrequest1",32);
00309 traceRegisterUserEvent("TicketLogLocal",33);
00310 traceRegisterUserEvent("next_ticket and SN",34);
00311 traceRegisterUserEvent("Timeout for buffered remote messages",35);
00312 traceRegisterUserEvent("Timeout for buffered local messages",36);
00313 traceRegisterUserEvent("Inform Location Home",37);
00314 traceRegisterUserEvent("Receive Location Handler",38);
00315
00316 lastCompletedAlarm=CmiWallTimer();
00317 lastRestart = CmiWallTimer();
00318
00319 CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
00320 }
00321
00322 void killLocal(void *_dummy,double curWallTime);
00323
00324 void readKillFile(){
00325 FILE *fp=fopen(killFile,"r");
00326 if(!fp){
00327 return;
00328 }
00329 int proc;
00330 double sec;
00331 while(fscanf(fp,"%d %lf",&proc,&sec)==2){
00332 if(proc == CkMyPe()){
00333 killTime = CmiWallTimer()+sec;
00334 printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
00335 CcdCallFnAfter(killLocal,NULL,sec*1000);
00336 }
00337 }
00338 fclose(fp);
00339 }
00340
00341 void killLocal(void *_dummy,double curWallTime){
00342 printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
00343 if(CmiWallTimer()<killTime-1){
00344 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);
00345 }else{
00346 kill(getpid(),SIGKILL);
00347 }
00348 }
00349
00350
00351
00352
00353
00354
00355 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
00356 if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
00357 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
00358 void *origMsg = EnvToUsr(env);
00359 for(int i=0;i<CmiNumPes();i++){
00360 if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
00361 void *copyMsg = CkCopyMsg(&origMsg);
00362 envelope *copyEnv = UsrToEnv(copyMsg);
00363 copyEnv->SN=0;
00364 copyEnv->TN=0;
00365 copyEnv->sender.type = TypeInvalid;
00366 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
00367 sendTicketGroupRequest(copyEnv,i,_infoIdx);
00368 }
00369 }
00370 return;
00371 }
00372 CkObjID recver;
00373 recver.type = TypeGroup;
00374 recver.data.group.id = env->getGroupNum();
00375 recver.data.group.onPE = destPE;
00376
00377
00378
00379 generateCommonTicketRequest(recver,env,destPE,_infoIdx);
00380 }
00381
00382
00383 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
00384 if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
00385 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
00386 void *origMsg = EnvToUsr(env);
00387 for(int i=0;i<CmiNumNodes();i++){
00388 if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
00389 void *copyMsg = CkCopyMsg(&origMsg);
00390 envelope *copyEnv = UsrToEnv(copyMsg);
00391 copyEnv->SN=0;
00392 copyEnv->TN=0;
00393 copyEnv->sender.type = TypeInvalid;
00394 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
00395 }
00396 }
00397 return;
00398 }
00399 CkObjID recver;
00400 recver.type = TypeNodeGroup;
00401 recver.data.group.id = env->getGroupNum();
00402 recver.data.group.onPE = destNode;
00403 generateCommonTicketRequest(recver,env,destNode,_infoIdx);
00404 }
00405
00406
00407 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
00408 CkObjID recver;
00409 recver.type = TypeArray;
00410 recver.data.array.id = env->getsetArrayMgr();
00411 recver.data.array.idx = *(&env->getsetArrayIndex());
00412
00413 if(CpvAccess(_currentObj)!=NULL && CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00414 char recverString[100],senderString[100];
00415
00416 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00417 }
00418
00419 generateCommonTicketRequest(recver,env,destPE,_infoIdx);
00420 };
00421
00425 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
00426 envelope *env = _env;
00427 MCount ticketNumber = 0;
00428 int resend=0;
00429 char recverName[100];
00430 double _startTime=CkWallTimer();
00431
00432 if(CpvAccess(_currentObj) == NULL){
00433
00434 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
00435 generalCldEnqueue(destPE,env,_infoIdx);
00436 return;
00437 }
00438
00439 if(env->sender.type == TypeInvalid){
00440 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00441
00442 }else{
00443 envelope *copyEnv = copyEnvelope(env);
00444 env = copyEnv;
00445 env->sender = CpvAccess(_currentObj)->mlogData->objID;
00446 env->SN = 0;
00447 }
00448
00449 CkObjID &sender = env->sender;
00450 env->recver = recver;
00451
00452 Chare *obj = (Chare *)env->sender.getObject();
00453
00454 if(env->SN == 0){
00455 env->SN = obj->mlogData->nextSN(recver);
00456 }else{
00457 resend = 1;
00458 }
00459
00460 char senderString[100];
00461
00462 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
00463
00464
00465
00466
00467
00468 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
00469
00470
00471
00472 _startTime = CkWallTimer();
00473
00474
00475 if(isLocal(destPE)){
00476 ticketLogLocalMessage(mEntry);
00477 }else{
00478 if((teamSize > 1) && isTeamLocal(destPE)){
00479
00480
00481 Chare *senderObj = (Chare *)sender.getObject();
00482 SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
00483 if(ticketRow != NULL){
00484 Ticket ticket = ticketRow->get(env->SN);
00485 if(ticket.TN != 0){
00486 ticketNumber = ticket.TN;
00487 DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
00488 }
00489 }
00490 }
00491
00492
00493 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
00494
00495 }
00496 }
00497
00502 inline bool isLocal(int destPE){
00503
00504 if(destPE == CkMyPe())
00505 return true;
00506
00507 return false;
00508 }
00509
00514 inline bool isTeamLocal(int destPE){
00515
00516
00517 if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
00518 return true;
00519
00520 return false;
00521 }
00522
00523
00524
00528 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
00529 char recverString[100],senderString[100];
00530 envelope *env = entry->env;
00531 DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
00532
00533
00534
00535 Chare *obj = (Chare *)entry->env->sender.getObject();
00536 if(!resend){
00537
00538 if(!isTeamLocal(entry->destPE)){
00539 obj->mlogData->addLogEntry(entry);
00540 MLOGFT_totalMessages += 1.0;
00541 MLOGFT_totalLogSize += entry->env->getTotalsize();
00542 }else{
00543
00544 entry->env->freeMsg = true;
00545 }
00546 }
00547
00548 #ifdef BUFFERED_REMOTE
00549
00550 if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
00551
00552 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
00553 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
00554 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
00555 }
00556
00557
00558 TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
00559 ticketRequest->sender = sender;
00560 ticketRequest->recver = recver;
00561 ticketRequest->logEntry = entry;
00562 ticketRequest->SN = SN;
00563 ticketRequest->TN = TN;
00564 ticketRequest->senderPE = CkMyPe();
00565
00566 CpvAccess(_numBufferedTicketRequests)[destPE]++;
00567
00568
00569 if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
00570 sendBufferedTicketRequests(destPE);
00571 }else{
00572 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
00573 int *checkPE = new int;
00574 *checkPE = destPE;
00575 CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);
00576 }
00577 }
00578 #else
00579
00580 TicketRequest ticketRequest;
00581 ticketRequest.sender = sender;
00582 ticketRequest.recver = recver;
00583 ticketRequest.logEntry = entry;
00584 ticketRequest.SN = SN;
00585 ticketRequest.TN = TN;
00586 ticketRequest.senderPE = CkMyPe();
00587
00588 CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
00589
00590 CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
00591 #endif
00592 DEBUG_MEM(CmiMemoryCheck());
00593 };
00594
00598 void sendBufferedTicketRequests(int destPE){
00599 DEBUG_MEM(CmiMemoryCheck());
00600 int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
00601 if(numberRequests == 0){
00602 return;
00603 }
00604 DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
00605 int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
00606 void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
00607 BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
00608 header->numberLogs = numberRequests;
00609
00610 CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
00611 CmiSyncSend(destPE,totalSize,(char *)buf);
00612
00613 CpvAccess(_numBufferedTicketRequests)[destPE]=0;
00614 DEBUG_MEM(CmiMemoryCheck());
00615 };
00616
00617 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
00618 int destPE = *(int *)_destPE;
00619 if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
00620 sendBufferedTicketRequests(destPE);
00621
00622 }
00623 delete (int *)_destPE;
00624 DEBUG_MEM(CmiMemoryCheck());
00625 };
00626
00632 void ticketLogLocalMessage(MlogEntry *entry){
00633 double _startTime=CkWallTimer();
00634 DEBUG_MEM(CmiMemoryCheck());
00635
00636 Chare *recverObj = (Chare *)entry->env->recver.getObject();
00637 DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
00638 if(recverObj){
00639
00640
00641 Ticket ticket;
00642 if(recverObj->mlogData->mapTable.numObjects() > 0){
00643 ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
00644 }else{
00645 ticket.TN = 0;
00646 }
00647
00648 char senderString[100], recverString[100] ;
00649
00650 if(ticket.TN == 0){
00651 ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
00652
00653 if(ticket.TN == 0){
00654 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
00655 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
00656
00657
00658
00659 return;
00660 }
00661 }
00662
00663
00664 entry->env->TN = ticket.TN;
00665 CkAssert(entry->env->TN > 0);
00666 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
00667
00668
00669 sendLocalMessageCopy(entry);
00670
00671 DEBUG_MEM(CmiMemoryCheck());
00672
00673
00674 entry->unackedLocal = 1;
00675 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
00676
00677 DEBUG_MEM(CmiMemoryCheck());
00678 }else{
00679 CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
00680 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
00681 }
00682 _lockNewTicket=0;
00683
00684 };
00685
00689 void sendLocalMessageCopy(MlogEntry *entry){
00690 LocalMessageLog msgLog;
00691 msgLog.sender = entry->env->sender;
00692 msgLog.recver = entry->env->recver;
00693 msgLog.SN = entry->env->SN;
00694 msgLog.TN = entry->env->TN;
00695 msgLog.entry = entry;
00696 msgLog.senderPE = CkMyPe();
00697
00698 char recvString[100];
00699 char senderString[100];
00700 DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN, msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
00701
00702 #ifdef BUFFERED_LOCAL
00703 countLocal++;
00704 CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
00705 if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
00706 sendBufferedLocalMessageCopy();
00707 }else{
00708 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
00709 lastBufferedLocalMessageCopyTime = CkWallTimer();
00710 CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
00711 countClearBufferedLocalCalls++;
00712 }
00713 }
00714 #else
00715 CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
00716
00717 CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
00718 #endif
00719 DEBUG_MEM(CmiMemoryCheck());
00720 };
00721
00722
00723 void sendBufferedLocalMessageCopy(){
00724 int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
00725 if(numberLogs == 0){
00726 return;
00727 }
00728 countBuffered++;
00729 int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
00730 void *buf=CmiAlloc(totalSize);
00731 BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
00732 header->numberLogs=numberLogs;
00733
00734 DEBUG_MEM(CmiMemoryCheck());
00735 DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
00736
00737 char *ptr = (char *)buf;
00738 ptr = &ptr[sizeof(BufferedLocalLogHeader)];
00739
00740 for(int i=0;i<numberLogs;i++){
00741 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
00742 memcpy(ptr,&log,sizeof(LocalMessageLog));
00743 ptr = &ptr[sizeof(LocalMessageLog)];
00744 }
00745
00746 CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
00747
00748 CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
00749 DEBUG_MEM(CmiMemoryCheck());
00750 };
00751
00752 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
00753 countClearBufferedLocalCalls--;
00754 if(countClearBufferedLocalCalls > 10){
00755 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
00756 }
00757 DEBUG_MEM(CmiMemoryCheck());
00758 DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
00759 if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
00760 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
00761 sendBufferedLocalMessageCopy();
00762
00763 }
00764 }
00765 DEBUG_MEM(CmiMemoryCheck());
00766 }
00767
00768
00769
00770
00771
00772
00773 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
00778 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
00779 DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
00780 double _startTime = CkWallTimer();
00781 if(CpvAccess(_delayedTicketRequests)->length() > 0){
00782 retryTicketRequest(NULL,_startTime);
00783 }
00784 _processTicketRequest(ticketRequest);
00785 CmiFree(ticketRequest);
00786
00787 }
00792 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
00793 DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
00794 DEBUG_MEM(CmiMemoryCheck());
00795 double _startTime = CkWallTimer();
00796 if(CpvAccess(_delayedTicketRequests)->length() > 0){
00797 retryTicketRequest(NULL,_startTime);
00798 }
00799 DEBUG_MEM(CmiMemoryCheck());
00800 int numRequests = recvdHeader->numberLogs;
00801 char *msg = (char *)recvdHeader;
00802 msg = &msg[sizeof(BufferedTicketRequestHeader)];
00803 int senderPE=((TicketRequest *)msg)->senderPE;
00804
00805
00806 int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
00807 void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
00808
00809 char *ptr = (char *)buf;
00810 BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
00811 header->numberLogs = 0;
00812
00813 DEBUG_MEM(CmiMemoryCheck());
00814
00815 ptr = &ptr[sizeof(BufferedTicketRequestHeader)];
00816
00817 for(int i=0;i<numRequests;i++){
00818 TicketRequest *request = (TicketRequest *)msg;
00819 msg = &msg[sizeof(TicketRequest)];
00820 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
00821
00822 if(replied){
00823
00824
00825 header->numberLogs++;
00826 ptr = &ptr[sizeof(TicketReply)];
00827 }
00828 }
00829
00830
00831
00832
00833 CmiSetHandler(buf,_bufferedTicketHandlerIdx);
00834 CmiSyncSend(senderPE,totalSize,(char *)buf);
00835 CmiFree(recvdHeader);
00836
00837 DEBUG_MEM(CmiMemoryCheck());
00838 };
00839
00847 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
00848
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860 DEBUG_MEM(CmiMemoryCheck());
00861
00862
00863 CkObjID sender = ticketRequest->sender;
00864 CkObjID recver = ticketRequest->recver;
00865 MCount SN = ticketRequest->SN;
00866 MCount TN = ticketRequest->TN;
00867 Chare *recverObj = (Chare *)recver.getObject();
00868
00869 DEBUG(char recverName[100]);
00870 DEBUG(recver.toString(recverName);)
00871
00872 if(recverObj == NULL){
00873 int estPE = recver.guessPE();
00874 if(estPE == CkMyPe() || estPE == -1){
00875
00876 char senderString[100];
00877 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
00878 if(estPE == CkMyPe() && recver.type == TypeArray){
00879 CkArrayID aid(recver.data.array.id);
00880 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
00881 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx)));
00882 }
00883 TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
00884 *delayed = *ticketRequest;
00885 CpvAccess(_delayedTicketRequests)->enq(delayed);
00886
00887 }else{
00888 DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
00889 TicketRequest forward = *ticketRequest;
00890 CmiSetHandler(&forward,_ticketRequestHandlerIdx);
00891 CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
00892 }
00893 DEBUG_MEM(CmiMemoryCheck());
00894 return false;
00895
00896 }else{
00897 char senderString[100];
00898
00899 Ticket ticket;
00900
00901
00902 if(teamSize > 1 && TN != 0){
00903 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
00904 ticket.TN = TN;
00905 recverObj->mlogData->verifyTicket(sender,SN,TN);
00906 }
00907
00908
00909
00910 if(recverObj->mlogData->mapTable.numObjects() > 0){
00911
00912 ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
00913 }
00914
00915 if(ticket.TN == 0){
00916 ticket = recverObj->mlogData->next_ticket(sender,SN);
00917 }
00918 if(ticket.TN > recverObj->mlogData->tProcessed){
00919 ticket.state = NEW_TICKET;
00920 }else{
00921 ticket.state = OLD_TICKET;
00922 }
00923
00924 if(ticket.TN == 0){
00925 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
00926 TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
00927 *delayed = *ticketRequest;
00928 CpvAccess(_delayedTicketRequests)->enq(delayed);
00929 return false;
00930 }
00931
00932
00933
00934
00935
00936 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
00937
00938 if(reply == NULL){
00939
00940
00941 TicketReply ticketReply;
00942 ticketReply.request = *ticketRequest;
00943 ticketReply.ticket = ticket;
00944 ticketReply.recverPE = CkMyPe();
00945 CmiSetHandler(&ticketReply,_ticketHandlerIdx);
00946
00947 CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
00948 }else{
00949 reply->request = *ticketRequest;
00950 reply->ticket = ticket;
00951 reply->recverPE = CkMyPe();
00952 CmiSetHandler(reply,_ticketHandlerIdx);
00953
00954 }
00955 DEBUG_MEM(CmiMemoryCheck());
00956 return true;
00957 }
00958
00959 };
00960
00961
00965 inline void _ticketHandler(TicketReply *ticketReply){
00966
00967 double _startTime = CkWallTimer();
00968 DEBUG_MEM(CmiMemoryCheck());
00969
00970 char senderString[100];
00971 CkObjID sender = ticketReply->request.sender;
00972 CkObjID recver = ticketReply->request.recver;
00973
00974 if(sender.guessPE() != CkMyPe()){
00975 DEBUG(CkAssert(sender.guessPE()>= 0));
00976 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
00977
00978 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
00979 CmiSetHandler(ticketReply,_ticketHandlerIdx);
00980 #ifdef BUFFERED_REMOTE
00981
00982
00983
00984 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
00985 #else
00986 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
00987 #endif
00988 }else{
00989 char recverName[100];
00990 DEBUG(printf("[%d] TN %d received for %s SN %d from %s time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
00991 MlogEntry *logEntry=NULL;
00992 if(ticketReply->ticket.state & FORWARDED_TICKET){
00993
00994 DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
00995 Chare *senderObj = (Chare *)sender.getObject();
00996 if(senderObj){
00997 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
00998 for(int i=0;i<mlog->length();i++){
00999 MlogEntry *tempEntry = (*mlog)[i];
01000 if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
01001 logEntry = tempEntry;
01002 break;
01003 }
01004 }
01005 if(logEntry == NULL){
01006 #ifdef BUFFERED_REMOTE
01007 #else
01008 CmiFree(ticketReply);
01009 #endif
01010 return;
01011 }
01012 }else{
01013 CmiAbort("This processor thinks it should have the sender\n");
01014 }
01015 ticketReply->ticket.state ^= FORWARDED_TICKET;
01016 }else{
01017 logEntry = ticketReply->request.logEntry;
01018 }
01019 if(logEntry->env->TN <= 0){
01020
01021 char recverString[100];
01022 logEntry->env->TN = ticketReply->ticket.TN;
01023 logEntry->env->setSrcPe(CkMyPe());
01024 if(ticketReply->ticket.state == NEW_TICKET){
01025
01026
01027 if(isTeamLocal(ticketReply->recverPE)){
01028
01029 Chare *senderObj = (Chare *)sender.getObject();
01030 SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
01031 if(ticketRow == NULL){
01032 ticketRow = new SNToTicket();
01033 senderObj->mlogData->teamTable.put(recver) = ticketRow;
01034 }
01035 ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
01036 }
01037
01038 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
01039 if(ticketReply->recverPE != CkMyPe()){
01040 generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
01041 }else{
01042
01043 sendLocalMessageCopy(logEntry);
01044 }
01045 }
01046 }else{
01047 DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
01048 }
01049 recver.updatePosition(ticketReply->recverPE);
01050 #ifdef BUFFERED_REMOTE
01051 #else
01052 CmiFree(ticketReply);
01053 #endif
01054 }
01055 CmiMemoryCheck();
01056
01057
01058 };
01059
01066 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
01067 double _startTime=CmiWallTimer();
01068 int numTickets = recvdHeader->numberLogs;
01069 char *msg = (char *)recvdHeader;
01070 msg = &msg[sizeof(BufferedTicketRequestHeader)];
01071 DEBUG_MEM(CmiMemoryCheck());
01072
01073 TicketReply *_reply = (TicketReply *)msg;
01074
01075
01076 for(int i=0;i<numTickets;i++){
01077 TicketReply *reply = (TicketReply *)msg;
01078 _ticketHandler(reply);
01079
01080 msg = &msg[sizeof(TicketReply)];
01081 }
01082
01083 CmiFree(recvdHeader);
01084
01085 DEBUG_MEM(CmiMemoryCheck());
01086 };
01087
01091 void _localMessageCopyHandler(LocalMessageLog *msgLog){
01092 double _startTime = CkWallTimer();
01093
01094 char senderString[100],recverString[100];
01095 DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
01096
01097
01098
01099 CpvAccess(_localMessageLog)->enq(*msgLog);
01100
01101 LocalMessageLogAck ack;
01102 ack.entry = msgLog->entry;
01103 DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
01104 CmiSetHandler(&ack,_localMessageAckHandlerIdx);
01105 CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
01106
01107
01108 };
01109
01110 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
01111 double _startTime = CkWallTimer();
01112 DEBUG_MEM(CmiMemoryCheck());
01113
01114 int numLogs = recvdHeader->numberLogs;
01115 char *msg = (char *)recvdHeader;
01116
01117
01118 int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
01119 numPiggyLogs=0;
01120
01121
01122
01123
01124
01125 DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
01126
01127 int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
01128 void *buf = CmiAlloc(totalSize);
01129 char *ptr = (char *)buf;
01130 memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
01131
01132 msg = &msg[sizeof(BufferedLocalLogHeader)];
01133 ptr = &ptr[sizeof(BufferedLocalLogHeader)];
01134
01135 DEBUG_MEM(CmiMemoryCheck());
01136 int PE;
01137 for(int i=0;i<numLogs;i++){
01138 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
01139 CpvAccess(_localMessageLog)->enq(*msgLog);
01140 PE = msgLog->senderPE;
01141 DEBUG(CmiAssert( PE == getCheckPointPE()));
01142
01143 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
01144 ack->entry = msgLog->entry;
01145
01146 msg = &msg[sizeof(LocalMessageLog)];
01147 ptr = &ptr[sizeof(LocalMessageLogAck)];
01148 }
01149 DEBUG_MEM(CmiMemoryCheck());
01150
01151 BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
01152 piggyHeader->numberLogs = numPiggyLogs;
01153 ptr = &ptr[sizeof(BufferedLocalLogHeader)];
01154 if(numPiggyLogs > 0){
01155 countPiggy++;
01156 }
01157
01158 for(int i=0;i<numPiggyLogs;i++){
01159 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
01160 memcpy(ptr,&log,sizeof(LocalMessageLog));
01161 ptr = &ptr[sizeof(LocalMessageLog)];
01162 }
01163 DEBUG_MEM(CmiMemoryCheck());
01164
01165 CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
01166 CmiSyncSendAndFree(PE,totalSize,(char *)buf);
01167
01168
01169
01170
01171
01172
01173
01174 if(freeHeader){
01175 CmiFree(recvdHeader);
01176 }
01177 DEBUG_MEM(CmiMemoryCheck());
01178
01179 }
01180
01181
01182 void _localMessageAckHandler(LocalMessageLogAck *ack){
01183
01184 double _startTime = CkWallTimer();
01185
01186 MlogEntry *entry = ack->entry;
01187 if(entry == NULL){
01188 CkExit();
01189 }
01190 entry->unackedLocal = 0;
01191 envelope *env = entry->env;
01192 char recverName[100];
01193 char senderString[100];
01194 DEBUG_MEM(CmiMemoryCheck());
01195
01196 DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
01197 if(env == NULL)
01198 return;
01199 CkAssert(env->SN > 0);
01200 CkAssert(env->TN > 0);
01201 env->sender.toString(senderString);
01202 DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
01203 env->recver.toString(recverName);
01204
01205 DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215 envelope *copyEnv = env;
01216 copyEnv->localMlogEntry = entry;
01217
01218 DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
01219 if(CmiMyPe() != entry->destPE){
01220 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
01221 }
01222
01223 _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);
01224
01225
01226 #ifdef BUFFERED_LOCAL
01227 #else
01228 CmiFree(ack);
01229
01230 #endif
01231
01232 DEBUG_MEM(CmiMemoryCheck());
01233 DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
01234 }
01235
01236
01237 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
01238
01239 double _startTime=CkWallTimer();
01240 DEBUG_MEM(CmiMemoryCheck());
01241
01242 int numLogs = recvdHeader->numberLogs;
01243 char *msg = (char *)recvdHeader;
01244 msg = &msg[sizeof(BufferedLocalLogHeader)];
01245
01246 DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
01247
01248 for(int i=0;i<numLogs;i++){
01249 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
01250 _localMessageAckHandler(ack);
01251
01252 msg = &msg[sizeof(LocalMessageLogAck)];
01253 }
01254
01255
01256 BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
01257
01258 if(piggyHeader->numberLogs > 0){
01259 _bufferedLocalMessageCopyHandler(piggyHeader,0);
01260 }
01261
01262 CmiFree(recvdHeader);
01263 DEBUG_MEM(CmiMemoryCheck());
01264
01265 }
01266
01267 bool fault_aware(CkObjID &recver){
01268 switch(recver.type){
01269 case TypeChare:
01270 return false;
01271 case TypeMainChare:
01272 return false;
01273 case TypeGroup:
01274 case TypeNodeGroup:
01275 case TypeArray:
01276 return true;
01277 default:
01278 return false;
01279 }
01280 };
01281
01282 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
01283 char recverString[100];
01284 char senderString[100];
01285
01286 DEBUG_MEM(CmiMemoryCheck());
01287 CkObjID recver = env->recver;
01288 if(!fault_aware(recver))
01289 return 1;
01290
01291
01292 Chare *obj = (Chare *)recver.getObject();
01293 *objPointer = obj;
01294 if(obj == NULL){
01295 int possiblePE = recver.guessPE();
01296 if(possiblePE != CkMyPe()){
01297 int totalSize = env->getTotalsize();
01298 CmiSyncSend(possiblePE,totalSize,(char *)env);
01299 }
01300 return 0;
01301 }
01302
01303
01304 double _startTime = CkWallTimer();
01305
01306 if(env->TN == obj->mlogData->tProcessed+1){
01307
01308 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
01309
01310
01311 if(env->sender.guessPE() == CkMyPe()){
01312 *logEntryPointer = env->localMlogEntry;
01313 }
01314 DEBUG_MEM(CmiMemoryCheck());
01315 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
01316 void *qMsgPtr;
01317 CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
01318 envelope *qEnv = (envelope *)qMsgPtr;
01319 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
01320 DEBUG_MEM(CmiMemoryCheck());
01321 }
01322
01323
01324
01325 DEBUG_MEM(CmiMemoryCheck());
01326 return 1;
01327 }
01328 if(env->TN <= obj->mlogData->tProcessed){
01329
01330 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
01331
01332 DEBUG_MEM(CmiMemoryCheck());
01333 return 0;
01334 }
01335
01336
01337
01338
01339
01340 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01341
01342 DEBUG_MEM(CmiMemoryCheck());
01343
01344 return 0;
01345 }
01346
01350 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
01351 DEBUG(char senderString[100]);
01352 if(obj){
01353 if(sender.guessPE() == CkMyPe()){
01354 if(entry != NULL){
01355 entry->env = NULL;
01356 }
01357 }
01358 obj->mlogData->tProcessed++;
01359
01360
01361
01362 }
01363 DEBUG_MEM(CmiMemoryCheck());
01364 }
01365
01366
01367
01368
01369
01370 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
01371
01372 if(env->recver.type != TypeNodeGroup){
01373
01374
01375
01376
01377
01378 _skipCldEnqueue(destPE,env,_infoIdx);
01379 }else{
01380 _noCldNodeEnqueue(destPE,env);
01381 }
01382
01383 }
01384
01389 int calledRetryTicketRequest=0;
01390
01391 void retryTicketRequestTimer(void *_dummy,double _time){
01392 calledRetryTicketRequest=0;
01393 retryTicketRequest(_dummy,_time);
01394 }
01395
01396 void retryTicketRequest(void *_dummy,double curWallTime){
01397 double start = CkWallTimer();
01398 DEBUG_MEM(CmiMemoryCheck());
01399 int length = CpvAccess(_delayedTicketRequests)->length();
01400 for(int i=0;i<length;i++){
01401 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
01402 if(ticketRequest){
01403 char senderString[100],recverString[100];
01404 DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
01405 DEBUG_MEM(CmiMemoryCheck());
01406 _processTicketRequest(ticketRequest);
01407 CmiFree(ticketRequest);
01408 DEBUG_MEM(CmiMemoryCheck());
01409 }
01410 }
01411 for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
01412 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
01413 ticketLogLocalMessage(entry);
01414 }
01415 int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
01416
01417
01418
01419
01420
01421
01422
01423
01424
01425
01426
01427
01428
01429
01430
01431 if(calledRetryTicketRequest == 0){
01432 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);
01433 calledRetryTicketRequest =1;
01434 }
01435 DEBUG_MEM(CmiMemoryCheck());
01436 }
01437
01438 void _pingHandler(CkPingMsg *msg){
01439 printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
01440 CmiFree(msg);
01441 }
01442
01443
01444
01445
01446
01447
01448
01449 CkVec<TProcessedLog> processedTicketLog;
01450 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
01451 void clearUpMigratedRetainedLists(int PE);
01452
01453 void checkpointAlarm(void *_dummy,double curWallTime){
01454 double diff = curWallTime-lastCompletedAlarm;
01455 DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
01456
01457
01458
01459
01460 if(diff < ((chkptPeriod) - 2)){
01461 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
01462 return;
01463 }
01464 CheckpointRequest request;
01465 request.PE = CkMyPe();
01466 CmiSetHandler(&request,_checkpointRequestHandlerIdx);
01467 CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
01468 };
01469
01470 void _checkpointRequestHandler(CheckpointRequest *request){
01471 startMlogCheckpoint(NULL,CmiWallTimer());
01472 }
01473
01474 void startMlogCheckpoint(void *_dummy,double curWallTime){
01475 double _startTime = CkWallTimer();
01476 checkpointCount++;
01477
01478
01479
01480 if(CmiNumPes() < 256 || CmiMyPe() == 0){
01481 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
01482 }
01483 PUP::sizer psizer;
01484 DEBUG_MEM(CmiMemoryCheck());
01485
01486 psizer | checkpointCount;
01487
01488 CkPupROData(psizer);
01489 DEBUG_MEM(CmiMemoryCheck());
01490 CkPupGroupData(psizer,CmiTrue);
01491 DEBUG_MEM(CmiMemoryCheck());
01492 CkPupNodeGroupData(psizer,CmiTrue);
01493 DEBUG_MEM(CmiMemoryCheck());
01494 pupArrayElementsSkip(psizer,CmiTrue,NULL);
01495 DEBUG_MEM(CmiMemoryCheck());
01496
01497 int dataSize = psizer.size();
01498 int totalSize = sizeof(CheckPointDataMsg)+dataSize;
01499 char *msg = (char *)CmiAlloc(totalSize);
01500 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01501 chkMsg->PE = CkMyPe();
01502 chkMsg->dataSize = dataSize;
01503
01504
01505 char *buf = &msg[sizeof(CheckPointDataMsg)];
01506 PUP::toMem pBuf(buf);
01507
01508 pBuf | checkpointCount;
01509
01510 CkPupROData(pBuf);
01511 CkPupGroupData(pBuf,CmiTrue);
01512 CkPupNodeGroupData(pBuf,CmiTrue);
01513 pupArrayElementsSkip(pBuf,CmiTrue,NULL);
01514
01515 unAckedCheckpoint=1;
01516 CmiSetHandler(msg,_storeCheckpointHandlerIdx);
01517 CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
01518
01519
01520
01521
01522 processedTicketLog.removeAll();
01523 forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
01524 if(CmiNumPes() < 256 || CmiMyPe() == 0){
01525 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
01526 }
01527
01528 if(CkMyPe() == 0 && onGoingLoadBalancing==0 ){
01529 lastCompletedAlarm = curWallTime;
01530 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
01531 }
01532 traceUserBracketEvent(28,_startTime,CkWallTimer());
01533 };
01534
01535 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
01536 CkVec<TProcessedLog> *log = ( CkVec<TProcessedLog> *)data;
01537 TProcessedLog logEntry;
01538 logEntry.recver = mlogData->objID;
01539 logEntry.tProcessed = mlogData->tProcessed;
01540 log->push_back(logEntry);
01541 char objString[100];
01542 DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
01543 }
01544
01545 class ElementPacker : public CkLocIterator {
01546 private:
01547 CkLocMgr *locMgr;
01548 PUP::er &p;
01549 public:
01550 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
01551 void addLocation(CkLocation &loc) {
01552 CkArrayIndex idx=loc.getIndex();
01553 CkGroupID gID = locMgr->ckGetGroupID();
01554 p|gID;
01555 p|idx;
01556 p|loc;
01557 }
01558 };
01559
01563 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
01564 int numElements,i;
01565 int numGroups = CkpvAccess(_groupIDTable)->size();
01566 if(!p.isUnpacking()){
01567 numElements = CkCountArrayElements();
01568 }
01569 p | numElements;
01570 DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
01571 if(!p.isUnpacking()){
01572 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
01573 }else{
01574
01575
01576 for(int j=0;j<listsize;j++){
01577 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01578 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
01579 listToSkip[j].idx.print();
01580 }
01581 }
01582
01583 printf("numElements = %d\n",numElements);
01584
01585 for (int i=0; i<numElements; i++) {
01586 CkGroupID gID;
01587 CkArrayIndex idx;
01588 p|gID;
01589 p|idx;
01590 int flag=0;
01591 int matchedIdx=0;
01592 for(int j=0;j<listsize;j++){
01593 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01594 if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
01595 matchedIdx = j;
01596 flag = 1;
01597 break;
01598 }
01599 }
01600 }
01601 if(flag == 1){
01602 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01603 }else{
01604 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01605 }
01606
01607 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01608 CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
01609 mgr->resume(idx,p,create,flag);
01610 if(flag == 1){
01611 int homePE = mgr->homePe(idx);
01612 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
01613 }
01614 }
01615 }
01616 };
01617
01618
01619 void writeCheckpointToDisk(int size,char *chkpt){
01620 char fNameTemp[100];
01621 sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
01622 int fd = creat(fNameTemp,S_IRWXU);
01623 int ret = write(fd,chkpt,size);
01624 CkAssert(ret == size);
01625 close(fd);
01626
01627 char fName[100];
01628 sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
01629 unlink(fName);
01630
01631 rename(fNameTemp,fName);
01632
01633 }
01634
01635
01636
01637 void _storeCheckpointHandler(char *msg){
01638
01639 double _startTime=CkWallTimer();
01640
01641 CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01642 DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
01643
01644 char *chkpt = &msg[sizeof(CheckPointDataMsg)];
01645
01646 char *oldChkpt = CpvAccess(_storedCheckpointData)->buf;
01647 if(oldChkpt != NULL){
01648 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
01649 CmiFree(oldmsg);
01650 }
01651
01652
01653 int sendingPE = chkMsg->PE;
01654
01655 CpvAccess(_storedCheckpointData)->buf = chkpt;
01656 CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
01657 CpvAccess(_storedCheckpointData)->PE = sendingPE;
01658
01659 #ifdef CHECKPOINT_DISK
01660
01661 writeCheckpointToDisk(chkMsg->dataSize,chkpt);
01662 CpvAccess(_storedCheckpointData)->buf = NULL;
01663 CmiFree(msg);
01664 #endif
01665
01666 int count=0;
01667 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01668 if(migratedNoticeList[j].fromPE == sendingPE){
01669 migratedNoticeList[j].ackFrom = 1;
01670 }else{
01671 CmiAssert("migratedNoticeList entry for processor other than buddy");
01672 }
01673 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01674 migratedNoticeList.remove(j);
01675 count++;
01676 }
01677
01678 }
01679 DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
01680
01681 CheckPointAck ackMsg;
01682 ackMsg.PE = CkMyPe();
01683 ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
01684 CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
01685 CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
01686
01687
01688
01689 traceUserBracketEvent(29,_startTime,CkWallTimer());
01690 };
01691
01692
01693 void sendRemoveLogRequests(){
01694 double _startTime = CkWallTimer();
01695
01696
01697
01698
01699
01700 int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
01701 char *requestMsg = (char *)CmiAlloc(totalSize);
01702 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01703 request->PE = CkMyPe();
01704 request->numberObjects = processedTicketLog.size();
01705 char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
01706 memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
01707 CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
01708
01709 DEBUG_MEM(CmiMemoryCheck());
01710 for(int i=0;i<CkNumPes();i++){
01711 CmiSyncSend(i,totalSize,requestMsg);
01712 }
01713 CmiFree(requestMsg);
01714
01715 clearUpMigratedRetainedLists(CmiMyPe());
01716
01717
01718 traceUserBracketEvent(30,_startTime,CkWallTimer());
01719 DEBUG_MEM(CmiMemoryCheck());
01720 }
01721
01722
01723 void _checkpointAckHandler(CheckPointAck *ackMsg){
01724 DEBUG_MEM(CmiMemoryCheck());
01725 unAckedCheckpoint=0;
01726 DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
01727 DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));
01728 if(onGoingLoadBalancing){
01729 onGoingLoadBalancing = 0;
01730 finishedCheckpointLoadBalancing();
01731 }else{
01732 sendRemoveLogRequests();
01733 }
01734 CmiFree(ackMsg);
01735
01736 };
01737
01738 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
01739 DEBUG_MEM(CmiMemoryCheck());
01740 CmiMemoryCheck();
01741 char *data = (char *)_data;
01742 RemoveLogRequest *request = (RemoveLogRequest *)data;
01743 TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01744 CkQ<MlogEntry *> *mlog = mlogData->getMlog();
01745
01746 int count=0;
01747 for(int i=0;i<mlog->length();i++){
01748 MlogEntry *logEntry = mlog->deq();
01749 int match=0;
01750 for(int j=0;j<request->numberObjects;j++){
01751 if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
01752
01753 match = 1;
01754 break;
01755 }
01756 }
01757 char senderString[100],recverString[100];
01758
01759 if(match){
01760 count++;
01761 delete logEntry;
01762 }else{
01763 mlog->enq(logEntry);
01764 }
01765 }
01766 if(count > 0){
01767 char nameString[100];
01768 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
01769 }
01770 DEBUG_MEM(CmiMemoryCheck());
01771 CmiMemoryCheck();
01772 }
01773
01774 void _removeProcessedLogHandler(char *requestMsg){
01775 double start = CkWallTimer();
01776 forAllCharesDo(removeProcessedLogs,requestMsg);
01777
01778 RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01779 DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
01780
01781 if(request->PE == getCheckPointPE()){
01782 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
01783 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
01784 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
01785 int count=0;
01786
01787
01788
01789
01790 for(int i=0;i<localQ->length();i++){
01791 LocalMessageLog localLogEntry = (*localQ)[i];
01792 if(!fault_aware(localLogEntry.recver)){
01793 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
01794 }
01795 bool keep = true;
01796 for(int j=0;j<request->numberObjects;j++){
01797 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
01798 keep = false;
01799 break;
01800 }
01801 }
01802 if(keep){
01803 tempQ->enq(localLogEntry);
01804 }else{
01805 count++;
01806 }
01807 }
01808 delete localQ;
01809 CpvAccess(_localMessageLog) = tempQ;
01810 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
01811 }
01812
01813
01814
01815
01816 CmiMemoryCheck();
01817 clearUpMigratedRetainedLists(request->PE);
01818
01819 traceUserBracketEvent(20,start,CkWallTimer());
01820 CmiFree(requestMsg);
01821 };
01822
01823
01824 void clearUpMigratedRetainedLists(int PE){
01825 int count=0;
01826 CmiMemoryCheck();
01827
01828 for(int j=migratedNoticeList.size()-1;j>=0;j--){
01829 if(migratedNoticeList[j].toPE == PE){
01830 migratedNoticeList[j].ackTo = 1;
01831 }
01832 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01833 migratedNoticeList.remove(j);
01834 count++;
01835 }
01836 }
01837 DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
01838
01839 for(int j=retainedObjectList.size()-1;j>=0;j--){
01840 if(retainedObjectList[j]->migRecord.toPE == PE){
01841 RetainedMigratedObject *obj = retainedObjectList[j];
01842 DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
01843 retainedObjectList.remove(j);
01844 if(obj->msg != NULL){
01845 CmiMemoryCheck();
01846 CmiFree(obj->msg);
01847 }
01848 delete obj;
01849 }
01850 }
01851 }
01852
01853
01854
01855
01856
01862 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
01863 RestartRequest msg;
01864
01865 fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
01866
01867
01868 _restartFlag = 1;
01869
01870
01871 if(teamSize > 1){
01872 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
01873 if(i != CkMyPe() && i < CkNumPes()){
01874
01875 msg.PE = CkMyPe();
01876 CmiSetHandler(&msg,_restartHandlerIdx);
01877 CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
01878 }
01879 }
01880 }
01881
01882
01883 msg.PE = CkMyPe();
01884 CmiSetHandler(&msg,_getCheckpointHandlerIdx);
01885 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01886 };
01887
01892 void _restartHandler(RestartRequest *restartMsg){
01893 int i;
01894 int numGroups = CkpvAccess(_groupIDTable)->size();
01895 RestartRequest msg;
01896
01897 fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
01898
01899
01900 _restartFlag = 1;
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914 msg.PE = CkMyPe();
01915 CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
01916 CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01917 }
01918
01919
01923 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
01924
01925
01926 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01927
01928
01929 CkAssert(restartMsg->PE == storedChkpt->PE);
01930
01931 storedRequest = restartMsg;
01932 verifyAckTotal = 0;
01933
01934 for(int i=0;i<migratedNoticeList.size();i++){
01935 if(migratedNoticeList[i].fromPE == restartMsg->PE){
01936
01937 if(migratedNoticeList[i].ackFrom == 0){
01938
01939
01940 VerifyAckMsg msg;
01941 msg.migRecord = migratedNoticeList[i];
01942 msg.index = i;
01943 msg.fromPE = CmiMyPe();
01944 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
01945 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
01946 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
01947 verifyAckTotal++;
01948 }
01949 }
01950 }
01951
01952
01953 if(verifyAckTotal == 0){
01954 sendCheckpointData(MLOG_RESTARTED);
01955 }
01956 verifyAckCount = 0;
01957 }
01958
01962 void _recvRestartCheckpointHandler(char *_restartData){
01963 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
01964 MigrationRecord *migratedAwayElements;
01965
01966 globalLBID = restartData->lbGroupID;
01967
01968 restartData->restartWallTime *= 1000;
01969 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
01970 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
01971 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
01972
01973
01974 printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
01975 char *buf = &_restartData[sizeof(RestartProcessorData)];
01976
01977 if(restartData->numMigratedAwayElements != 0){
01978 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
01979 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
01980 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
01981 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
01982 }
01983
01984
01985 forAllCharesDo(setTeamRecovery,NULL);
01986
01987 PUP::fromMem pBuf(buf);
01988 pBuf | checkpointCount;
01989 CkPupROData(pBuf);
01990 CkPupGroupData(pBuf,CmiFalse);
01991 CkPupNodeGroupData(pBuf,CmiFalse);
01992 pupArrayElementsSkip(pBuf,CmiFalse,NULL);
01993 CkAssert(pBuf.size() == restartData->checkPointSize);
01994 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
01995
01996
01997 forAllCharesDo(unsetTeamRecovery,NULL);
01998
01999
02000 forAllCharesDo(initializeRestart,NULL);
02001
02002
02003 buf = &buf[restartData->checkPointSize];
02004 for(int i=0;i<restartData->numLocalMessages;i++){
02005 LocalMessageLog logEntry;
02006 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
02007
02008 Chare *recverObj = (Chare *)logEntry.recver.getObject();
02009 if(recverObj!=NULL){
02010 recverObj->mlogData->addToRestoredLocalQ(&logEntry);
02011 recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
02012 char senderString[100];
02013 char recverString[100];
02014 DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
02015 }else{
02016
02017 }
02018 buf = &buf[sizeof(LocalMessageLog)];
02019 }
02020
02021 forAllCharesDo(sortRestoredLocalMsgLog,NULL);
02022 CmiFree(_restartData);
02023
02024
02025
02026
02027
02028
02029
02030
02031
02032
02033
02034
02035
02036
02037
02038
02039
02040
02041 CkVec<TProcessedLog> objectVec;
02042 forAllCharesDo(createObjIDList, (void *)&objectVec);
02043 int numberObjects = objectVec.size();
02044
02045
02046
02047
02048 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02049 char *resendMsg = (char *)CmiAlloc(totalSize);
02050
02051 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02052 resendReq->PE =CkMyPe();
02053 resendReq->numberObjects = numberObjects;
02054 char *objList = &resendMsg[sizeof(ResendRequest)];
02055 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02056
02057
02058
02059
02060
02061
02062
02063
02064
02065
02066
02067
02068
02069 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02070 CpvAccess(_currentObj) = lb;
02071 lb->ReceiveDummyMigration(restartDecisionNumber);
02072
02073 sleep(10);
02074
02075 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02076 for(int i=0;i<CkNumPes();i++){
02077 if(i != CkMyPe()){
02078 CmiSyncSend(i,totalSize,resendMsg);
02079 }
02080 }
02081 _resendMessagesHandler(resendMsg);
02082
02083 }
02084
02085
02086 void CkMlogRestartDouble(void *,double){
02087 CkMlogRestart(NULL,NULL);
02088 };
02089
02090
02091 void CkMlogRestartLocal(){
02092 CkMlogRestart(NULL,NULL);
02093 };
02094
02095
02096 void readCheckpointFromDisk(int size,char *buf){
02097 char fName[100];
02098 sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
02099
02100 int fd = open(fName,O_RDONLY);
02101 int count=0;
02102 while(count < size){
02103 count += read(fd,&buf[count],size-count);
02104 }
02105 close(fd);
02106
02107 };
02108
02109
02110
02114 void _getCheckpointHandler(RestartRequest *restartMsg){
02115
02116
02117 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02118
02119
02120 CkAssert(restartMsg->PE == storedChkpt->PE);
02121
02122 storedRequest = restartMsg;
02123 verifyAckTotal = 0;
02124
02125 for(int i=0;i<migratedNoticeList.size();i++){
02126 if(migratedNoticeList[i].fromPE == restartMsg->PE){
02127
02128 if(migratedNoticeList[i].ackFrom == 0){
02129
02130
02131 VerifyAckMsg msg;
02132 msg.migRecord = migratedNoticeList[i];
02133 msg.index = i;
02134 msg.fromPE = CmiMyPe();
02135 CmiPrintf("[%d] Verify gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
02136 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
02137 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
02138 verifyAckTotal++;
02139 }
02140 }
02141 }
02142
02143
02144 if(verifyAckTotal == 0){
02145 sendCheckpointData(MLOG_CRASHED);
02146 }
02147 verifyAckCount = 0;
02148 }
02149
02150
02151 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
02152 CkLocMgr *locMgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
02153 CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
02154 if(rec != NULL && rec->type() == CkLocRec::local){
02155
02156
02157 CkLocRec_local *localRec = (CkLocRec_local *) rec;
02158 CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
02159
02160 int localIdx = localRec->getLocalIndex();
02161 LBDatabase *lbdb = localRec->getLBDB();
02162 LDObjHandle ldHandle = localRec->getLdHandle();
02163
02164 locMgr->setDuringMigration(true);
02165
02166 locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
02167 lbdb->UnregisterObj(ldHandle);
02168
02169 locMgr->setDuringMigration(false);
02170
02171 verifyAckedRequests++;
02172
02173 }
02174 CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
02175 CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
02176 };
02177
02178
02179 void _verifyAckHandler(VerifyAckMsg *verifyReply){
02180 int index = verifyReply->index;
02181 migratedNoticeList[index] = verifyReply->migRecord;
02182 verifyAckCount++;
02183 CmiPrintf("[%d] VerifyReply received %d for gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
02184 if(verifyAckCount == verifyAckTotal){
02185 sendCheckpointData(MLOG_CRASHED);
02186 }
02187 }
02188
02189
02195 void sendCheckpointData(int mode){
02196 RestartRequest *restartMsg = storedRequest;
02197 StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02198 int numMigratedAwayElements = migratedNoticeList.size();
02199 if(migratedNoticeList.size() != 0){
02200 printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
02201
02202 }
02203
02204
02205 int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
02206
02207 DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
02208 CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
02209
02210 CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
02211 totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
02212 totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
02213
02214 char *msg = (char *)CmiAlloc(totalSize);
02215
02216 RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
02217 dataMsg->PE = CkMyPe();
02218 dataMsg->restartWallTime = CmiTimer();
02219 dataMsg->checkPointSize = storedChkpt->bufSize;
02220
02221 dataMsg->numMigratedAwayElements = numMigratedAwayElements;
02222
02223
02224 dataMsg->numMigratedInElements = 0;
02225 dataMsg->migratedElementSize = 0;
02226 dataMsg->lbGroupID = globalLBID;
02227
02228
02229
02230
02231
02232 char *buf = &msg[sizeof(RestartProcessorData)];
02233
02234 if(dataMsg->numMigratedAwayElements != 0){
02235 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
02236 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
02237 }
02238
02239
02240 #ifdef CHECKPOINT_DISK
02241 readCheckpointFromDisk(storedChkpt->bufSize,buf);
02242 #else
02243 memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
02244 #endif
02245 buf = &buf[storedChkpt->bufSize];
02246
02247
02248
02249 dataMsg->numLocalMessages = localMsgQ->length();
02250 for(int i=0;i<localMsgQ->length();i++){
02251 if(!fault_aware(((*localMsgQ)[i]).recver )){
02252 CmiAbort("Non fault aware localMsgQ");
02253 }
02254 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
02255 buf = &buf[sizeof(LocalMessageLog)];
02256 }
02257
02258 if(mode == MLOG_RESTARTED){
02259 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
02260 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02261 CmiFree(restartMsg);
02262 }else{
02263 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
02264 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02265 CmiFree(restartMsg);
02266 }
02267 };
02268
02269
02270
02271
02272
02273 void createObjIDList(void *data,ChareMlogData *mlogData){
02274 CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
02275 TProcessedLog entry;
02276 entry.recver = mlogData->objID;
02277 entry.tProcessed = mlogData->tProcessed;
02278 list->push_back(entry);
02279 DEBUG_TEAM(char objString[100]);
02280 DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
02281 }
02282
02283
02288 void _recvCheckpointHandler(char *_restartData){
02289 RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
02290 MigrationRecord *migratedAwayElements;
02291
02292 globalLBID = restartData->lbGroupID;
02293
02294 restartData->restartWallTime *= 1000;
02295 adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
02296 adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
02297 if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
02298
02299
02300 printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
02301 char *buf = &_restartData[sizeof(RestartProcessorData)];
02302
02303 if(restartData->numMigratedAwayElements != 0){
02304 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
02305 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
02306 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
02307 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
02308 }
02309
02310 PUP::fromMem pBuf(buf);
02311
02312 pBuf | checkpointCount;
02313
02314 CkPupROData(pBuf);
02315 CkPupGroupData(pBuf,CmiTrue);
02316 CkPupNodeGroupData(pBuf,CmiTrue);
02317 pupArrayElementsSkip(pBuf,CmiTrue,NULL);
02318 CkAssert(pBuf.size() == restartData->checkPointSize);
02319 printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
02320
02321 forAllCharesDo(initializeRestart,NULL);
02322
02323
02324 buf = &buf[restartData->checkPointSize];
02325 for(int i=0;i<restartData->numLocalMessages;i++){
02326 LocalMessageLog logEntry;
02327 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
02328
02329 Chare *recverObj = (Chare *)logEntry.recver.getObject();
02330 if(recverObj!=NULL){
02331 recverObj->mlogData->addToRestoredLocalQ(&logEntry);
02332 recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
02333 char senderString[100];
02334 char recverString[100];
02335 DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
02336 }else{
02337
02338 }
02339 buf = &buf[sizeof(LocalMessageLog)];
02340 }
02341
02342 forAllCharesDo(sortRestoredLocalMsgLog,NULL);
02343
02344 CmiFree(_restartData);
02345
02346
02347 _initDone();
02348
02349 getGlobalStep(globalLBID);
02350
02351 countUpdateHomeAcks = 0;
02352 RestartRequest updateHomeRequest;
02353 updateHomeRequest.PE = CmiMyPe();
02354 CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
02355 for(int i=0;i<CmiNumPes();i++){
02356 if(i != CmiMyPe()){
02357 CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
02358 }
02359 }
02360
02361 }
02362
02367 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
02368 countUpdateHomeAcks++;
02369 CmiFree(updateHomeAck);
02370
02371 if(countUpdateHomeAcks != CmiNumPes()){
02372 return;
02373 }
02374
02375
02376 CkVec<TProcessedLog> objectVec;
02377 forAllCharesDo(createObjIDList, (void *)&objectVec);
02378 int numberObjects = objectVec.size();
02379
02380
02381 int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02382 char *resendMsg = (char *)CmiAlloc(totalSize);
02383
02384 ResendRequest *resendReq = (ResendRequest *)resendMsg;
02385 resendReq->PE =CkMyPe();
02386 resendReq->numberObjects = numberObjects;
02387 char *objList = &resendMsg[sizeof(ResendRequest)];
02388 memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02389
02390
02391 if(parallelRestart){
02392 distributeRestartedObjects();
02393 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02394 }
02395
02396
02397
02398
02399
02400
02401 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02402 CpvAccess(_currentObj) = lb;
02403 lb->ReceiveDummyMigration(restartDecisionNumber);
02404
02405 sleep(10);
02406
02407 CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02408 for(int i=0;i<CkNumPes();i++){
02409 if(i != CkMyPe()){
02410 CmiSyncSend(i,totalSize,resendMsg);
02411 }
02412 }
02413 _resendMessagesHandler(resendMsg);
02414 CmiFree(resendMsg);
02415 };
02416
02420 void initializeRestart(void *data, ChareMlogData *mlogData){
02421 mlogData->resendReplyRecvd = 0;
02422 mlogData->receivedTNs = new CkVec<MCount>;
02423 mlogData->restartFlag = 1;
02424 mlogData->restoredLocalMsgLog.removeAll();
02425 mlogData->mapTable.empty();
02426 };
02427
02431 void updateHomePE(void *data,ChareMlogData *mlogData){
02432 RestartRequest *updateRequest = (RestartRequest *)data;
02433 int PE = updateRequest->PE;
02434
02435
02436 if(mlogData->objID.type == TypeArray){
02437
02438 CkGroupID myGID = mlogData->objID.data.array.id;
02439 CkArrayIndex myIdx = mlogData->objID.data.array.idx;
02440 CkArrayID aid(mlogData->objID.data.array.id);
02441
02442 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
02443 if(locMgr->homePe(myIdx) == PE){
02444 DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
02445 DEBUGRESTART(myIdx.print());
02446 informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
02447 }
02448 }
02449 };
02450
02451
02455 void _updateHomeRequestHandler(RestartRequest *updateRequest){
02456 int sender = updateRequest->PE;
02457
02458 forAllCharesDo(updateHomePE,updateRequest);
02459
02460 updateRequest->PE = CmiMyPe();
02461 CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
02462 CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
02463 if(sender == getCheckPointPE() && unAckedCheckpoint==1){
02464 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
02465 checkpointCount--;
02466 startMlogCheckpoint(NULL,0);
02467 }
02468 if(sender == getCheckPointPE()){
02469 for(int i=0;i<retainedObjectList.size();i++){
02470 if(retainedObjectList[i]->acked == 0){
02471 MigrationNotice migMsg;
02472 migMsg.migRecord = retainedObjectList[i]->migRecord;
02473 migMsg.record = retainedObjectList[i];
02474 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
02475 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
02476 }
02477 }
02478 }
02479 }
02480
02484 void fillTicketForChare(void *data, ChareMlogData *mlogData){
02485 ResendData *resendData = (ResendData *)data;
02486 int PE = resendData->PE;
02487 int count=0;
02488 CkHashtableIterator *iterator;
02489 void *objp;
02490 void *objkey;
02491 CkObjID *objID;
02492 SNToTicket *snToTicket;
02493 Ticket ticket;
02494
02495
02496 iterator = mlogData->teamTable.iterator();
02497 while( (objp = iterator->next(&objkey)) != NULL ){
02498 objID = (CkObjID *)objkey;
02499
02500
02501 for(int j=0;j<resendData->numberObjects;j++){
02502 if((*objID) == (resendData->listObjects)[j].recver){
02503 char name[100];
02504 snToTicket = *(SNToTicket **)objp;
02505
02506 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
02507 ticket = snToTicket->get(snIndex);
02508 if(ticket.TN > resendData->maxTickets[j]){
02509 resendData->maxTickets[j] = ticket.TN;
02510 }
02511 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
02512
02513 resendData->ticketVecs[j].push_back(ticket.TN);
02514 }
02515 }
02516 }
02517 }
02518 }
02519
02520
02521 delete iterator;
02522 }
02523
02524
02529 void setTeamRecovery(void *data, ChareMlogData *mlogData){
02530 char name[100];
02531 mlogData->teamRecoveryFlag = 1;
02532 }
02533
02537 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
02538 mlogData->teamRecoveryFlag = 0;
02539 }
02540
02541
02542
02543
02544
02545
02546 void resendMessageForChare(void *data,ChareMlogData *mlogData){
02547 char nameString[100];
02548 ResendData *resendData = (ResendData *)data;
02549 int PE = resendData->PE;
02550 DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
02551 int count=0;
02552 int ticketRequests=0;
02553 CkQ<MlogEntry *> *log = mlogData->getMlog();
02554
02555 for(int i=0;i<log->length();i++){
02556 MlogEntry *logEntry = (*log)[i];
02557
02558
02559
02560 envelope *env = logEntry->env;
02561 if(env == NULL){
02562 continue;
02563 }
02564 if(logEntry->unackedLocal){
02565 char recverString[100];
02566 DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
02567 sendLocalMessageCopy(logEntry);
02568 }
02569
02570 if(env->TN <= 0){
02571
02572 sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
02573 }
02574
02575 if(env->recver.type != TypeInvalid){
02576 int flag = 0;
02577 for(int j=0;j<resendData->numberObjects;j++){
02578 if(env->recver == (resendData->listObjects)[j].recver){
02579 flag = 1;
02580
02581 if(env->TN > 0){
02582
02583 if(env->TN > resendData->maxTickets[j]){
02584 resendData->maxTickets[j] = env->TN;
02585 }
02586
02587 if(env->TN >= (resendData->listObjects)[j].tProcessed){
02588
02589 resendData->ticketVecs[j].push_back(env->TN);
02590
02591 if(PE != CkMyPe()){
02592 if(env->recver.type == TypeNodeGroup){
02593 CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
02594 }else{
02595 CmiSetHandler(env,CmiGetXHandler(env));
02596 CmiSyncSend(PE,env->getTotalsize(),(char *)env);
02597 }
02598 }else{
02599 envelope *copyEnv = copyEnvelope(env);
02600 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
02601 }
02602 char senderString[100];
02603 DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
02604 count++;
02605 }
02606 }else{
02607
02608
02609
02610
02611
02612
02613
02614
02615 }
02616 }
02617 }
02618
02619 }
02620 }
02621 DEBUGRESTART(printf("[%d] Resent %d/%d (%d) messages from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)
02622 }
02623
02628 void _resendMessagesHandler(char *msg){
02629 ResendData d;
02630 ResendRequest *resendReq = (ResendRequest *)msg;
02631
02632
02633 char *listObjects = &msg[sizeof(ResendRequest)];
02634 d.numberObjects = resendReq->numberObjects;
02635 d.PE = resendReq->PE;
02636 d.listObjects = (TProcessedLog *)listObjects;
02637 d.maxTickets = new MCount[d.numberObjects];
02638 d.ticketVecs = new CkVec<MCount>[d.numberObjects];
02639 for(int i=0;i<d.numberObjects;i++){
02640 d.maxTickets[i] = 0;
02641 }
02642
02643
02644
02645
02646 int count=0;
02647 for(int i=0;i<retainedObjectList.size();i++){
02648 if(retainedObjectList[i]->migRecord.toPE == d.PE){
02649 count++;
02650 int recreate=1;
02651 for(int j=0;j<d.numberObjects;j++){
02652 if(d.listObjects[j].recver.type != TypeArray ){
02653 continue;
02654 }
02655 CkArrayID aid(d.listObjects[j].recver.data.array.id);
02656 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
02657 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
02658 if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx){
02659 recreate = 0;
02660 break;
02661 }
02662 }
02663 }
02664 CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
02665 if(recreate){
02666 donotCountMigration=1;
02667 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
02668 donotCountMigration=0;
02669 CkLocMgr *locMgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
02670 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
02671 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
02672 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
02673 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
02674 CmiAssert(rec->type() == CkLocRec::local);
02675 CkVec<CkMigratable *> eltList;
02676 locMgr->migratableList((CkLocRec_local *)rec,eltList);
02677 for(int j=0;j<eltList.size();j++){
02678 if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
02679 CpvAccess(_currentObj) = eltList[j];
02680 eltList[j]->ResumeFromSync();
02681 }
02682 }
02683 retainedObjectList[i]->msg=NULL;
02684 }
02685 }
02686 }
02687
02688 if(count > 0){
02689
02690 }
02691
02692 DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
02693
02694
02695
02696
02697 if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
02698 forAllCharesDo(fillTicketForChare,&d);
02699 else
02700 forAllCharesDo(resendMessageForChare,&d);
02701
02702
02703
02704
02705
02706 int totalTNStored=0;
02707 for(int i=0;i<d.numberObjects;i++){
02708 totalTNStored += d.ticketVecs[i].size();
02709 }
02710
02711 int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
02712 char *resendReplyMsg = (char *)CmiAlloc(totalSize);
02713
02714 ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
02715 resendReply->PE = CkMyPe();
02716 resendReply->numberObjects = d.numberObjects;
02717
02718 char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
02719 CkObjID *replyObjects = (CkObjID *)replyListObjects;
02720 for(int i=0;i<d.numberObjects;i++){
02721 replyObjects[i] = d.listObjects[i].recver;
02722 }
02723
02724 char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
02725 for(int i=0;i<d.numberObjects;i++){
02726 int vecsize = d.ticketVecs[i].size();
02727 memcpy(ticketList,&vecsize,sizeof(int));
02728 ticketList = &ticketList[sizeof(int)];
02729 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
02730 ticketList = &ticketList[sizeof(MCount)*vecsize];
02731 }
02732
02733 CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
02734 CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
02735
02736
02737
02738
02739
02740
02741
02742
02743
02744
02745
02746
02747
02748 delete [] d.maxTickets;
02749 delete [] d.ticketVecs;
02750 if(resendReq->PE != CkMyPe()){
02751 CmiFree(msg);
02752 }
02753
02754 lastRestart = CmiWallTimer();
02755 }
02756
02757 void sortVec(CkVec<MCount> *TNvec);
02758 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
02759
02763 void _resendReplyHandler(char *msg){
02767 ResendRequest *resendReply = (ResendRequest *)msg;
02768 CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
02769
02770 char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
02771
02772
02773 DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
02774 for(int i =0; i< resendReply->numberObjects;i++){
02775 Chare *obj = (Chare *)listObjects[i].getObject();
02776
02777 int vecsize;
02778 memcpy(&vecsize,listTickets,sizeof(int));
02779 listTickets = &listTickets[sizeof(int)];
02780 MCount *listTNs = (MCount *)listTickets;
02781 listTickets = &listTickets[vecsize*sizeof(MCount)];
02782
02783 if(obj != NULL){
02784
02785 processReceivedTN(obj,vecsize,listTNs);
02786 }else{
02787
02788 int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
02789 char *TNMsg = (char *)CmiAlloc(totalSize);
02790 ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
02791 receivedTNData->recver = listObjects[i];
02792 receivedTNData->numTNs = vecsize;
02793 char *tnList = &TNMsg[sizeof(ReceivedTNData)];
02794 memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
02795
02796 CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
02797 CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
02798 }
02799 }
02800 };
02801
02802 void _receivedTNDataHandler(ReceivedTNData *msg){
02803 char objName[100];
02804 Chare *obj = (Chare *) msg->recver.getObject();
02805 if(obj){
02806 char *_msg = (char *)msg;
02807 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02808 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
02809 processReceivedTN(obj,msg->numTNs,listTNs);
02810 }else{
02811 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
02812 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02813 }
02814 };
02815
02819 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
02820
02821 obj->mlogData->resendReplyRecvd++;
02822
02823 DEBUG(char objName[100]);
02824 DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
02825
02826
02827
02828
02829
02830
02831 for(int j=0;j<listSize;j++){
02832 obj->mlogData->receivedTNs->push_back(listTNs[j]);
02833 }
02834
02835
02836
02837
02838
02839 if(obj->mlogData->resendReplyRecvd == CkNumPes()){
02840 obj->mlogData->resendReplyRecvd = 0;
02841
02842 sortVec(obj->mlogData->receivedTNs);
02843
02844
02845
02846 if(obj->mlogData->receivedTNs->size() > 0){
02847 int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
02848 int vecsize = obj->mlogData->receivedTNs->size();
02849 int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
02850
02851
02852 if(teamSize > 1){
02853 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
02854 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02855 }else{
02856 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02857 }
02858
02859 if(numberHoles == 0){
02860 }else{
02861 char objName[100];
02862 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
02863 obj->mlogData->numberHoles = numberHoles;
02864 obj->mlogData->ticketHoles = new MCount[numberHoles];
02865 int countHoles=0;
02866 for(int k=tProcessedIndex+1;k<vecsize;k++){
02867 if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
02868
02869 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
02870 DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
02871 obj->mlogData->ticketHoles[countHoles] = newTN;
02872 countHoles++;
02873 }
02874 }
02875 }
02876
02877 if(countHoles != numberHoles){
02878 char str[100];
02879 printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
02880 }
02881 CkAssert(countHoles == numberHoles);
02882 obj->mlogData->currentHoles = numberHoles;
02883 }
02884 }
02885
02886
02887 delete obj->mlogData->receivedTNs;
02888 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
02889 obj->mlogData->receivedTNs = NULL;
02890 obj->mlogData->restartFlag = 0;
02891
02892 DEBUGRESTART(char objString[100]);
02893 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
02894 }
02895
02896 }
02897
02898
02899 void sortVec(CkVec<MCount> *TNvec){
02900
02901
02902 for(int i=0;i<TNvec->size();i++){
02903 for(int j=i+1;j<TNvec->size();j++){
02904 if((*TNvec)[j] < (*TNvec)[i]){
02905 MCount temp;
02906 temp = (*TNvec)[i];
02907 (*TNvec)[i] = (*TNvec)[j];
02908 (*TNvec)[j] = temp;
02909 }
02910 }
02911 }
02912
02913 MCount *tempArray = new MCount[TNvec->size()];
02914 int uniqueCount=-1;
02915 for(int i=0;i<TNvec->size();i++){
02916 tempArray[i] = 0;
02917 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
02918 uniqueCount++;
02919 tempArray[uniqueCount] = (*TNvec)[i];
02920 }
02921 }
02922 uniqueCount++;
02923 TNvec->removeAll();
02924 for(int i=0;i<uniqueCount;i++){
02925 TNvec->push_back(tempArray[i]);
02926 }
02927 delete [] tempArray;
02928 }
02929
02930 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
02931 if(TNVec->size() == 0){
02932 return -1;
02933 }
02934
02935 int left=0;
02936 int right = TNVec->size();
02937 int mid = (left +right)/2;
02938 while(searchTN != (*TNVec)[mid] && left < right){
02939 if((*TNVec)[mid] > searchTN){
02940 right = mid-1;
02941 }else{
02942 left = mid+1;
02943 }
02944 mid = (left + right)/2;
02945 }
02946 if(left < right){
02947
02948 return mid;
02949 }else{
02950 if(mid < TNVec->size() && mid >=0){
02951 if((*TNVec)[mid] == searchTN){
02952 return mid;
02953 }else{
02954 return -1;
02955 }
02956 }else{
02957 return -1;
02958 }
02959 }
02960 };
02961
02962
02963
02964
02965
02966
02967
02968
02969 class ElementDistributor: public CkLocIterator{
02970 CkLocMgr *locMgr;
02971 int *targetPE;
02972 void pupLocation(CkLocation &loc,PUP::er &p){
02973 CkArrayIndex idx=loc.getIndex();
02974 CkGroupID gID = locMgr->ckGetGroupID();
02975 p|gID;
02976 p|idx;
02977 p|loc;
02978 };
02979 public:
02980 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
02981 void addLocation(CkLocation &loc){
02982 if(*targetPE == CkMyPe()){
02983 *targetPE = (*targetPE +1)%CkNumPes();
02984 return;
02985 }
02986
02987 CkArrayIndex idx=loc.getIndex();
02988 CkLocRec_local *rec = loc.getLocalRecord();
02989
02990 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
02991 idx.print();
02992
02993
02994
02995
02996
02997
02998 PUP::sizer psizer;
02999 pupLocation(loc,psizer);
03000 int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
03001 char *msg = (char *)CmiAlloc(totalSize);
03002 char *buf = &msg[CmiMsgHeaderSizeBytes];
03003 PUP::toMem pmem(buf);
03004 pmem.becomeDeleting();
03005 pupLocation(loc,pmem);
03006
03007 locMgr->setDuringMigration(CmiTrue);
03008 delete rec;
03009 locMgr->setDuringMigration(CmiFalse);
03010 locMgr->inform(idx,*targetPE);
03011
03012 CmiSetHandler(msg,_distributedLocationHandlerIdx);
03013 CmiSyncSendAndFree(*targetPE,totalSize,msg);
03014
03015 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
03016
03017 *targetPE = (*targetPE +1)%CkNumPes();
03018 }
03019
03020 };
03021
03022 void distributeRestartedObjects(){
03023 int numGroups = CkpvAccess(_groupIDTable)->size();
03024 int i;
03025 int targetPE=CkMyPe();
03026 CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
03027 };
03028
03029 void _distributedLocationHandler(char *receivedMsg){
03030 printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
03031 char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
03032 PUP::fromMem pmem(buf);
03033 CkGroupID gID;
03034 CkArrayIndex idx;
03035 pmem |gID;
03036 pmem |idx;
03037 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03038 donotCountMigration=1;
03039 mgr->resume(idx,pmem,CmiTrue);
03040 donotCountMigration=0;
03041 informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
03042 printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
03043 idx.print();
03044
03045 CkLocRec *rec = mgr->elementRec(idx);
03046 CmiAssert(rec->type() == CkLocRec::local);
03047
03048 CkVec<CkMigratable *> eltList;
03049 mgr->migratableList((CkLocRec_local *)rec,eltList);
03050 for(int i=0;i<eltList.size();i++){
03051 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
03052 CpvAccess(_currentObj) = eltList[i];
03053 eltList[i]->ResumeFromSync();
03054 }
03055 }
03056
03057
03058 }
03059
03060
03063 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE){
03064 DummyMigrationMsg buf;
03065 buf.flag = MLOG_OBJECT;
03066 buf.lbID = lbID;
03067 buf.mgrID = locMgrID;
03068 buf.idx = idx;
03069 buf.locationPE = locationPE;
03070 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03071 CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
03072 };
03073
03074
03079 void sendDummyMigrationCounts(int *dummyCounts){
03080 DummyMigrationMsg buf;
03081 buf.flag = MLOG_COUNT;
03082 buf.lbID = globalLBID;
03083 CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03084 for(int i=0;i<CmiNumPes();i++){
03085 if(i != CmiMyPe() && dummyCounts[i] != 0){
03086 buf.count = dummyCounts[i];
03087 CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
03088 }
03089 }
03090 }
03091
03092
03096 void _dummyMigrationHandler(DummyMigrationMsg *msg){
03097 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03098 if(msg->flag == MLOG_OBJECT){
03099 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
03100 LDObjHandle h;
03101 lb->Migrated(h,1);
03102 }
03103 if(msg->flag == MLOG_COUNT){
03104 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
03105 msg->count -= verifyAckedRequests;
03106 for(int i=0;i<msg->count;i++){
03107 LDObjHandle h;
03108 lb->Migrated(h,1);
03109 }
03110 }
03111 verifyAckedRequests=0;
03112 CmiFree(msg);
03113 };
03114
03115
03116
03117
03118
03119
03120
03121
03122 class ElementCaller : public CkLocIterator {
03123 private:
03124 CkLocMgr *locMgr;
03125 MlogFn fnPointer;
03126 void *data;
03127 public:
03128 ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
03129 locMgr = _locMgr;
03130 fnPointer = _fnPointer;
03131 data = _data;
03132 };
03133 void addLocation(CkLocation &loc){
03134 CkVec<CkMigratable *> list;
03135 CkLocRec_local *local = loc.getLocalRecord();
03136 locMgr->migratableList (local,list);
03137 for(int i=0;i<list.size();i++){
03138 CkMigratable *migratableElement = list[i];
03139 fnPointer(data,migratableElement->mlogData);
03140 }
03141 }
03142 };
03143
03147 void forAllCharesDo(MlogFn fnPointer,void *data){
03148 int numGroups = CkpvAccess(_groupIDTable)->size();
03149 for(int i=0;i<numGroups;i++){
03150 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
03151 fnPointer(data,obj->mlogData);
03152 }
03153 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
03154 for(int i=0;i<numNodeGroups;i++){
03155 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
03156 fnPointer(data,obj->mlogData);
03157 }
03158 int i;
03159 CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
03160 };
03161
03162
03163
03164
03165
03166
03167 void initMlogLBStep(CkGroupID gid){
03168 DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
03169 countLBMigratedAway = 0;
03170 countLBToMigrate=0;
03171 onGoingLoadBalancing=1;
03172 migrationDoneCalled=0;
03173 checkpointBarrierCount=0;
03174 if(globalLBID.idx != 0){
03175 CmiAssert(globalLBID.idx == gid.idx);
03176 }
03177 globalLBID = gid;
03178 }
03179
03180 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
03181 DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03182 DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03183
03184 resumeLbFnPtr = _fnPtr;
03185 centralLb = _centralLb;
03186 migrationDoneCalled = 1;
03187 if(countLBToMigrate == countLBMigratedAway){
03188 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03189 startMlogCheckpoint(NULL,CmiWallTimer());
03190 }
03191 };
03192
03193 void finishedCheckpointLoadBalancing(){
03194 DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
03195 CheckpointBarrierMsg msg;
03196 msg.fromPE = CmiMyPe();
03197 msg.checkpointCount = checkpointCount;
03198
03199 CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
03200 CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
03201
03202 };
03203
03204
03205 void sendMlogLocation(int targetPE,envelope *env){
03206 void *_msg = EnvToUsr(env);
03207 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03208
03209
03210 int existing = 0;
03211
03212
03213
03214 for(int i=0;i<retainedObjectList.size();i++){
03215 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
03216 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
03217 DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
03218 existing = 1;
03219 break;
03220 }
03221 }
03222
03223 if(existing){
03224 return;
03225 }
03226
03227
03228 countLBToMigrate++;
03229
03230 MigrationNotice migMsg;
03231 migMsg.migRecord.gID = msg->gid;
03232 migMsg.migRecord.idx = msg->idx;
03233 migMsg.migRecord.fromPE = CkMyPe();
03234 migMsg.migRecord.toPE = targetPE;
03235
03236 DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
03237
03238 RetainedMigratedObject *retainedObject = new RetainedMigratedObject;
03239 retainedObject->migRecord = migMsg.migRecord;
03240 retainedObject->acked = 0;
03241
03242 CkPackMessage(&env);
03243
03244 migMsg.record = retainedObject;
03245 retainedObject->msg = env;
03246 int size = retainedObject->size = env->getTotalsize();
03247
03248 retainedObjectList.push_back(retainedObject);
03249
03250 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
03251 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
03252
03253 DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
03254
03255 }
03256
03257 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
03258 msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
03259 migratedNoticeList.push_back(msg->migRecord);
03260
03261 MigrationNoticeAck buf;
03262 buf.record = msg->record;
03263 CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
03264 CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
03265 }
03266
03267 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
03268
03269 RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
03270 retainedObject->acked = 1;
03271
03272 CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
03273 CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
03274
03275
03276 CkGroupID gID = retainedObject->migRecord.gID ;
03277 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03278 informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
03279
03280 countLBMigratedAway++;
03281 if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
03282 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03283 startMlogCheckpoint(NULL,CmiWallTimer());
03284 }
03285 };
03286
03287 void _receiveMlogLocationHandler(void *buf){
03288 envelope *env = (envelope *)buf;
03289 DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
03290 CkUnpackMessage(&env);
03291 void *_msg = EnvToUsr(env);
03292 CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03293 CkGroupID gID= msg->gid;
03294 DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
03295 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03296 CpvAccess(_currentObj)=mgr;
03297 mgr->immigrate(msg);
03298 };
03299
03300
03301 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
03302
03303
03304
03305
03306
03307
03308
03309
03310 }
03311
03312 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
03313 if(checkpointBarrierCount == CmiNumPes()){
03314 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
03315 for(int i=0;i<CmiNumPes();i++){
03316 CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
03317 }
03318 }
03319 }
03320
03321 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
03322 DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
03323 if(msg->checkpointCount == checkpointCount){
03324 checkpointBarrierCount++;
03325 checkAndSendCheckpointBarrierAcks(msg);
03326 }else{
03327 if(msg->checkpointCount-1 == checkpointCount){
03328 checkpointBarrierCount++;
03329 checkAndSendCheckpointBarrierAcks(msg);
03330 }else{
03331 printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
03332 CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
03333 }
03334 }
03335 CmiFree(msg);
03336 }
03337
03338 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
03339 DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
03340 DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
03341 sendRemoveLogRequests();
03342 (*resumeLbFnPtr)(centralLb);
03343 CmiFree(msg);
03344 }
03345
03351 void informLocationHome(CkGroupID locMgrID,CkArrayIndex idx,int homePE,int currentPE){
03352 double _startTime = CmiWallTimer();
03353 CurrentLocationMsg msg;
03354 msg.mgrID = locMgrID;
03355 msg.idx = idx;
03356 msg.locationPE = currentPE;
03357 msg.fromPE = CkMyPe();
03358
03359 DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
03360 CmiSetHandler(&msg,_receiveLocationHandlerIdx);
03361 CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
03362 traceUserBracketEvent(37,_startTime,CmiWallTimer());
03363 }
03364
03365
03366 void _receiveLocationHandler(CurrentLocationMsg *data){
03367 double _startTime = CmiWallTimer();
03368 CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
03369 if(mgr == NULL){
03370 CmiFree(data);
03371 return;
03372 }
03373 CkLocRec *rec = mgr->elementNrec(data->idx);
03374 DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
03375 if(rec != NULL){
03376 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
03377 if(data->fromPE == data->locationPE){
03378 CmiAbort("Another processor has the same object");
03379 }
03380 }
03381 }
03382 if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
03383 int targetPE = data->fromPE;
03384 data->fromPE = CmiMyPe();
03385 data->locationPE = CmiMyPe();
03386 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
03387 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
03388 }else{
03389 mgr->inform(data->idx,data->locationPE);
03390 }
03391 CmiFree(data);
03392 traceUserBracketEvent(38,_startTime,CmiWallTimer());
03393 }
03394
03395
03396
03397 void getGlobalStep(CkGroupID gID){
03398 LBStepMsg msg;
03399 int destPE = 0;
03400 msg.lbID = gID;
03401 msg.fromPE = CmiMyPe();
03402 msg.step = -1;
03403 CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
03404 CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
03405 };
03406
03407 void _getGlobalStepHandler(LBStepMsg *msg){
03408 CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03409 msg->step = lb->step();
03410 CmiAssert(msg->fromPE != CmiMyPe());
03411 CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
03412 CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
03413 CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
03414 };
03415
03416 void _recvGlobalStepHandler(LBStepMsg *msg){
03417
03418 restartDecisionNumber=msg->step;
03419 RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
03420 _updateHomeAckHandler(dummyAck);
03421 };
03422
03426 void _messageLoggingExit(){
03427
03428
03429
03430
03431
03432
03433
03434
03435 printf("[%d] _messageLoggingExit \n",CmiMyPe());
03436
03437
03438
03439 CkPrintf("[%d] Logged messages = %.0f, log size = %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
03440
03441 }
03442
03448 void* CkObjID::getObject(){
03449
03450 switch(type){
03451 case TypeChare:
03452 return CkLocalChare(&data.chare.id);
03453 case TypeMainChare:
03454 return CkLocalChare(&data.chare.id);
03455 case TypeGroup:
03456
03457 CkAssert(data.group.onPE == CkMyPe());
03458 return CkLocalBranch(data.group.id);
03459 case TypeNodeGroup:
03460 CkAssert(data.group.onPE == CkMyNode());
03461
03462 {
03463 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
03464 void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
03465 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
03466
03467 return retval;
03468 }
03469 case TypeArray:
03470 {
03471
03472
03473 CkArrayID aid(data.array.id);
03474
03475 if(aid.ckLocalBranch() == NULL){ return NULL;}
03476
03477 CProxyElement_ArrayBase aProxy(aid,data.array.idx);
03478
03479 return aProxy.ckLocal();
03480 }
03481 default:
03482 CkAssert(0);
03483 }
03484 }
03485
03486
03487 int CkObjID::guessPE(){
03488 switch(type){
03489 case TypeChare:
03490 case TypeMainChare:
03491 return data.chare.id.onPE;
03492 case TypeGroup:
03493 case TypeNodeGroup:
03494 return data.group.onPE;
03495 case TypeArray:
03496 {
03497 CkArrayID aid(data.array.id);
03498 if(aid.ckLocalBranch() == NULL){
03499 return -1;
03500 }
03501 return aid.ckLocalBranch()->lastKnown(data.array.idx);
03502 }
03503 default:
03504 CkAssert(0);
03505 }
03506 };
03507
03508 char *CkObjID::toString(char *buf) const {
03509
03510 switch(type){
03511 case TypeChare:
03512 sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
03513 break;
03514 case TypeMainChare:
03515 sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
03516 break;
03517 case TypeGroup:
03518 sprintf(buf,"Group %d PE %d \0",data.group.id.idx,data.group.onPE);
03519 break;
03520 case TypeNodeGroup:
03521 sprintf(buf,"NodeGroup %d Node %d \0",data.group.id.idx,data.group.onPE);
03522 break;
03523 case TypeArray:
03524 {
03525 const CkArrayIndex &idx = data.array.idx;
03526 const int *indexData = idx.data();
03527 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
03528 break;
03529 }
03530 default:
03531 CkAssert(0);
03532 }
03533
03534 return buf;
03535 };
03536
03537 void CkObjID::updatePosition(int PE){
03538 if(guessPE() == PE){
03539 return;
03540 }
03541 switch(type){
03542 case TypeArray:
03543 {
03544 CkArrayID aid(data.array.id);
03545 if(aid.ckLocalBranch() == NULL){
03546
03547 }else{
03548 char str[100];
03549 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
03550
03551 CkLocRec *rec = mgr->elementNrec(data.array.idx);
03552 if(rec != NULL){
03553 if(rec->type() == CkLocRec::local){
03554 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
03555 return;
03556 }
03557 }
03558 mgr->inform(data.array.idx,PE);
03559 }
03560 }
03561
03562 break;
03563 case TypeChare:
03564 case TypeMainChare:
03565 CkAssert(data.chare.id.onPE == PE);
03566 break;
03567 case TypeGroup:
03568 case TypeNodeGroup:
03569 CkAssert(data.group.onPE == PE);
03570 break;
03571 default:
03572 CkAssert(0);
03573 }
03574 }
03575
03576
03577
03578 void MlogEntry::pup(PUP::er &p){
03579 p | destPE;
03580 p | _infoIdx;
03581 p | unackedLocal;
03582 int size;
03583 if(!p.isUnpacking()){
03584
03585
03586
03587
03588 if(env == NULL){
03589
03590 size = 0;
03591 }else{
03592 size = env->getTotalsize();
03593 }
03594 }
03595 p | size;
03596 if(p.isUnpacking()){
03597 if(size > 0){
03598 env = (envelope *)_allocEnv(ForChareMsg,size);
03599 }else{
03600 env = NULL;
03601 }
03602 }
03603 if(size > 0){
03604 p((char *)env,size);
03605
03606 if(p.isUnpacking()){
03607 env->localMlogEntry = NULL;
03608 }
03609 }
03610 };
03611
03612 void RestoredLocalMap::pup(PUP::er &p){
03613 p | minSN;
03614 p | maxSN;
03615 p | count;
03616 if(p.isUnpacking()){
03617 TNArray = new MCount[count];
03618 }
03619 p(TNArray,count);
03620 };
03621
03622
03623
03624
03625
03626
03627
03628
03629
03630 MCount ChareMlogData::nextSN(const CkObjID &recver){
03631
03632
03633
03634 double _startTime = CmiWallTimer();
03635 MCount *SN = snTable.getPointer(recver);
03636 if(SN==NULL){
03637 snTable.put(recver) = 1;
03638 return 1;
03639 }else{
03640 (*SN)++;
03641 return *SN;
03642 }
03643
03644 };
03645
03646
03647 MCount ChareMlogData::newTN(){
03648 MCount TN;
03649 if(currentHoles > 0){
03650 int holeidx = numberHoles-currentHoles;
03651 TN = ticketHoles[holeidx];
03652 currentHoles--;
03653 if(currentHoles == 0){
03654 delete []ticketHoles;
03655 numberHoles = 0;
03656 }
03657 }else{
03658 TN = ++tCount;
03659 }
03660 return TN;
03661 };
03662
03666 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
03667 Ticket ticket;
03668
03669 SNToTicket *ticketRow = ticketTable.get(sender);
03670 if(ticketRow != NULL){
03671 Ticket earlierTicket = ticketRow->get(SN);
03672 if(earlierTicket.TN != 0){
03673 CkAssert(earlierTicket.TN == TN);
03674 return;
03675 }
03676 }else{
03677 ticketRow = new SNToTicket();
03678 ticketTable.put(sender) = ticketRow;
03679 }
03680 ticket.TN = TN;
03681 ticketRow->put(SN) = ticket;
03682 }
03683
03687 inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
03688 DEBUG(char senderName[100];)
03689 DEBUG(char recverName[100];)
03690 double _startTime =CmiWallTimer();
03691 Ticket ticket;
03692
03693
03694 if(restartFlag){
03695 ticket.TN = 0;
03696 return ticket;
03697 }
03698
03699
03700
03701
03702
03703
03704
03705
03706
03707
03708
03709 SNToTicket *ticketRow = ticketTable.get(sender);
03710 if(ticketRow != NULL){
03711 Ticket earlierTicket = ticketRow->get(SN);
03712 if(earlierTicket.TN == 0){
03713 ticket.TN = newTN();
03714 ticketRow->put(SN) = ticket;
03715 DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
03716 }else{
03717 ticket.TN = earlierTicket.TN;
03718 if(ticket.TN > tCount){
03719 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03720 }
03721 CmiAssert(ticket.TN <= tCount);
03722 }
03723 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03724 }else{
03725 SNToTicket *newRow = new SNToTicket;
03726 ticket.TN = newTN();
03727 newRow->put(SN) = ticket;
03728 ticketTable.put(sender) = newRow;
03729 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
03730 }
03731
03732
03733
03734
03735
03736 ticket.state = NEW_TICKET;
03737
03738 return ticket;
03739 };
03740
03744 void ChareMlogData::addLogEntry(MlogEntry *entry){
03745 DEBUG(char nameString[100]);
03746 DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
03747 DEBUG_MEM(CmiMemoryCheck());
03748
03749
03750 mlog.enq(entry);
03751 };
03752
03753 double totalSearchRestoredTime=0;
03754 double totalSearchRestoredCount=0;
03755
03760 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
03761 double start= CkWallTimer();
03762 MCount TN=0;
03763 if(mapTable.numObjects() > 0){
03764 RestoredLocalMap *map = mapTable.get(sender);
03765 if(map){
03766 int index = SN - map->minSN;
03767 if(index < map->count){
03768 TN = map->TNArray[index];
03769 }
03770 }
03771 }
03772
03773 DEBUG(char senderName[100]);
03774 DEBUG(char recverName[100]);
03775 DEBUG(if(TN != 0){ CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN);});
03776
03777 totalSearchRestoredTime += CkWallTimer()-start;
03778 totalSearchRestoredCount++;
03779 return TN;
03780 }
03781
03782 void ChareMlogData::addToRestoredLocalQ(LocalMessageLog *logEntry){
03783 restoredLocalMsgLog.push_back(*logEntry);
03784 }
03785
03786 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData){
03787 mlogData->sortRestoredLocalMsgLog();
03788 }
03789
03790 void ChareMlogData::sortRestoredLocalMsgLog(){
03791
03792
03793 for(int i=0;i<restoredLocalMsgLog.size();i++){
03794 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
03795 RestoredLocalMap *map = mapTable.get(logEntry.sender);
03796 if(map == NULL){
03797 map = new RestoredLocalMap;
03798 mapTable.put(logEntry.sender)=map;
03799 }
03800 map->count++;
03801 if(map->minSN == 0){
03802 map->minSN = logEntry.SN;
03803 }else{
03804 if(logEntry.SN < map->minSN){
03805 map->minSN = logEntry.SN;
03806 }
03807 }
03808 if(logEntry.SN > map->maxSN){
03809 map->maxSN = logEntry.SN;
03810 }
03811
03812 }
03813 for(int i=0;i< restoredLocalMsgLog.size();i++){
03814 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
03815 RestoredLocalMap *map = mapTable.get(logEntry.sender);
03816 CkAssert(map != NULL);
03817 if(map->TNArray == NULL){
03818 map->TNArray = new MCount[map->maxSN-map->minSN+1];
03819 CkAssert(map->count == map->maxSN-map->minSN+1);
03820 map->count = 0;
03821 }
03822 map->TNArray[map->count] = logEntry.TN;
03823 map->count++;
03824 }
03825 restoredLocalMsgLog.free();
03826 }
03827
03833 void ChareMlogData::pup(PUP::er &p){
03834 int tCountAux;
03835 int startSize=0;
03836 char nameStr[100];
03837 if(p.isSizing()){
03838 PUP::sizer *sizep = (PUP::sizer *)&p;
03839 startSize = sizep->size();
03840 }
03841 double _startTime = CkWallTimer();
03842
03843 p | objID;
03844 if(teamRecoveryFlag)
03845 p | tCountAux;
03846 else
03847 p | tCount;
03848 p | tProcessed;
03849 if(p.isUnpacking()){
03850 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
03851 }
03852 p | toResumeOrNot;
03853 p | resumeCount;
03854 DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
03855
03856
03857
03858 int lengthReceivedTNs;
03859 if(!p.isUnpacking()){
03860 if(receivedTNs == NULL){
03861 lengthReceivedTNs = -1;
03862 }else{
03863 lengthReceivedTNs = receivedTNs->size();
03864 }
03865 }
03866 p | lengthReceivedTNs;
03867 if(p.isUnpacking()){
03868 if(lengthReceivedTNs == -1){
03869 receivedTNs = NULL;
03870 }else{
03871 receivedTNs = new CkVec<MCount>;
03872 for(int i=0;i<lengthReceivedTNs;i++){
03873 MCount tempTicket;
03874 p | tempTicket;
03875 CkAssert(tempTicket > 0);
03876 receivedTNs->push_back(tempTicket);
03877 }
03878 }
03879 }else{
03880 for(int i=0;i<lengthReceivedTNs;i++){
03881 p | (*receivedTNs)[i];
03882 }
03883 }
03884
03885
03886 p | currentHoles;
03887 p | numberHoles;
03888 if(p.isUnpacking()){
03889 if(numberHoles > 0){
03890 ticketHoles = new MCount[numberHoles];
03891 }else{
03892 ticketHoles = NULL;
03893 }
03894 }
03895 if(numberHoles > 0){
03896 p(ticketHoles,numberHoles);
03897 }
03898
03899 snTable.pup(p);
03900
03901
03902 int length = 0;
03903 MlogEntry *entry;
03904 if(!p.isUnpacking()){
03905 for(int i=0; i<mlog.length(); i++){
03906 entry = mlog[i];
03907 if(entry->unackedLocal)
03908 length++;
03909 }
03910 }
03911 p | length;
03912 if(p.isUnpacking()){
03913 for(int i=0; i<length; i++){
03914 entry = new MlogEntry();
03915 mlog.enq(entry);
03916 entry->pup(p);
03917 }
03918 }else{
03919 for(int i=0; i<mlog.length(); i++){
03920 entry = mlog[i];
03921 if(entry->unackedLocal){
03922 entry->pup(p);
03923 }
03924 }
03925 }
03926
03927
03928
03929
03930
03931
03932
03933
03934
03935
03936
03937
03938
03939
03940
03941
03942
03943
03944
03945 p | restoredLocalMsgLog;
03946 p | resendReplyRecvd;
03947 p | restartFlag;
03948
03949
03950 int tableSize;
03951 if(!p.isUnpacking()){
03952 tableSize = mapTable.numObjects();
03953 }
03954 p | tableSize;
03955 if(!p.isUnpacking()){
03956 CkHashtableIterator *iter = mapTable.iterator();
03957 while(iter->hasNext()){
03958 CkObjID *objID;
03959 RestoredLocalMap **map = (RestoredLocalMap **) iter->next((void **)&objID);
03960 p | (*objID);
03961 (*map)->pup(p);
03962 }
03963
03964 delete iter;
03965 }else{
03966 for(int i=0;i<tableSize;i++){
03967 CkObjID objID;
03968 p | objID;
03969 RestoredLocalMap *map = new RestoredLocalMap;
03970 map->pup(p);
03971 mapTable.put(objID) = map;
03972 }
03973 }
03974
03975
03976 {
03977 int ticketTableSize;
03978 if(!p.isUnpacking()){
03979 ticketTableSize = ticketTable.numObjects();
03980 }
03981 p | ticketTableSize;
03982 if(!p.isUnpacking()){
03983 CkHashtableIterator *iter = ticketTable.iterator();
03984 while(iter->hasNext()){
03985 CkObjID *objID;
03986 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
03987 p | (*objID);
03988 (*ticketRow)->pup(p);
03989 }
03990
03991 delete iter;
03992 }else{
03993 for(int i=0;i<ticketTableSize;i++){
03994 CkObjID objID;
03995 p | objID;
03996 SNToTicket *ticketRow = new SNToTicket;
03997 ticketRow->pup(p);
03998 if(!teamRecoveryFlag)
03999 ticketTable.put(objID) = ticketRow;
04000 else
04001 delete ticketRow;
04002 }
04003 }
04004 }
04005
04006 if(p.isSizing()){
04007 PUP::sizer *sizep = (PUP::sizer *)&p;
04008 int pupSize = sizep->size()-startSize;
04009 DEBUG(char name[40]);
04010 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
04011
04012 }
04013
04014 double _finTime = CkWallTimer();
04015 DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
04016 };
04017
04018
04019
04020
04024 int getCheckPointPE(){
04025
04026 if(teamSize != 1){
04027 return (CmiMyPe() + teamSize) % CmiNumPes();
04028 }
04029 return (CmiNumPes() -1 - CmiMyPe());
04030 }
04031
04032
04033 envelope *copyEnvelope(envelope *env){
04034 envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
04035 memcpy(newEnv,env,env->getTotalsize());
04036 return newEnv;
04037 }
04038
04039 #endif