00001 
00008 #include "charm.h"
00009 #include "ck.h"
00010 #include "ckcausalmlog.h"
00011 #include "queueing.h"
00012 #include <sys/types.h>
00013 #include <signal.h>
00014 #include "CentralLB.h"
00015 
00016 #ifdef _FAULT_CAUSAL_
00017 
00018 
00019 
00020 
00021 #define COLLECT_STATS_MSGS 0
00022 #define COLLECT_STATS_MSGS_TOTAL 0
00023 #define COLLECT_STATS_MSG_COUNT 0
00024 #define COLLECT_STATS_DETS 0
00025 #define COLLECT_STATS_DETS_DUP 0
00026 #define COLLECT_STATS_MEMORY 0
00027 #define COLLECT_STATS_LOGGING 0
00028 
00029 #define RECOVERY_SEND "SEND"
00030 #define RECOVERY_PROCESS "PROCESS"
00031 
00032 #define DEBUG_MEM(x)  //x
00033 #define DEBUG(x) // x
00034 #define DEBUG_RESTART(x)  //x
00035 #define DEBUGLB(x)   // x
00036 #define DEBUG_TEAM(x)  // x
00037 #define DEBUG_PERF(x) // x
00038 #define DEBUG_CHECKPOINT 1
00039 #define DEBUG_NOW(x) x
00040 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
00041 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
00042 #define DEBUG_RECOVERY(x) //x
00043 
00044 #define FAIL_DET_THRESHOLD 10
00045 
00046 extern const char *idx2str(const CkArrayIndex &ind);
00047 extern const char *idx2str(const ArrayElement *el);
00048 
00049 void getGlobalStep(CkGroupID gID);
00050 
00051 bool fault_aware(CkObjID &recver);
00052 void sendCheckpointData(int mode);
00053 void createObjIDList(void *data,ChareMlogData *mlogData);
00054 inline bool isLocal(int destPE);
00055 inline bool isTeamLocal(int destPE);
00056 void printLog(TProcessedLog *log);
00057 
00058 bool _recoveryFlag=false;
00059 bool _restartFlag=false;
00060 int _numRestartResponses=0;
00061 
00062 
00063 int countHashRefs=0; 
00064 int countHashCollisions=0;
00065 
00066 char *checkpointDirectory=".";
00067 int unAckedCheckpoint=0;
00068 
00069 int countLocal=0,countBuffered=0;
00070 int countPiggy=0;
00071 int countClearBufferedLocalCalls=0;
00072 
00073 int countUpdateHomeAcks=0;
00074 
00075 extern int teamSize;
00076 extern int chkptPeriod;
00077 extern bool fastRecovery;
00078 extern int parallelRecovery;
00079 
00080 char *killFile;
00081 char *faultFile;
00082 int killFlag=0;
00083 int faultFlag=0;
00084 int restartingMlogFlag=0;
00085 void readKillFile();
00086 double killTime=0.0;
00087 double faultMean;
00088 int checkpointCount=0;
00089 int diskCkptFlag = 0;
00090 static char fName[100];
00091 
00092 CpvDeclare(Chare *,_currentObj);
00093 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
00094 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
00095 CpvDeclare(Queue, _outOfOrderMessageQueue);
00096 CpvDeclare(Queue, _delayedRemoteMessageQueue);
00097 CpvDeclare(char **,_bufferedTicketRequests);
00098 CpvDeclare(int *,_numBufferedTicketRequests);
00099 
00100 
00113 
00114 CpvDeclare(char *, _localDets);
00115 
00116 int _numBufferedDets;
00117 
00118 int _indexBufferedDets;
00119 
00120 int _phaseBufferedDets;
00121 
00122 
00123 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
00124 
00125 int _maxBufferedDets;
00126 
00127 
00128 CpvDeclare(char *, _incarnation);
00129 
00130 
00131 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
00132 
00133 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
00134 
00135 CpvDeclare(int *, _storeDetsSizes);
00136 
00137 CpvDeclare(char **, _storeDetsPtrs);
00138 
00139 
00140 
00141 
00142 CpvDeclare(int, _numEmigrantRecObjs);
00143 CpvDeclare(int, _numImmigrantRecObjs);
00144 CpvDeclare(std::vector<CkLocation *> *, _immigrantRecObjs);
00145 
00146 
00147 #if COLLECT_STATS_MSGS
00148 int *numMsgsTarget;
00149 int *sizeMsgsTarget;
00150 int totalMsgsTarget;
00151 float totalMsgsSize;
00152 #endif
00153 #if COLLECT_STATS_DETS
00154 int numPiggyDets;
00155 int numDets;
00156 int numDupDets;
00157 #endif
00158 #if COLLECT_STATS_MEMORY
00159 int msgLogSize;
00160 int bufferedDetsSize;
00161 int storedDetsSize;
00162 #endif
00163 
00164 #if COLLECT_STATS_LOGGING
00165 float MLOGFT_totalLogSize = 0.0;
00166 float MLOGFT_totalMessages = 0.0;
00167 float MLOGFT_totalMcastLogSize = 0.0;
00168 float MLOGFT_totalReductionLogSize = 0.0;
00169 #endif
00170 
00171 static double adjustChkptPeriod=0.0; 
00172 static double nextCheckpointTime=0.0;
00173 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
00174 
00175 int _pingHandlerIdx;
00176 
00177 char objString[100];
00178 int _checkpointRequestHandlerIdx;
00179 int _storeCheckpointHandlerIdx;
00180 int _checkpointAckHandlerIdx;
00181 int _getCheckpointHandlerIdx;
00182 int _recvCheckpointHandlerIdx;
00183 int _removeProcessedLogHandlerIdx;
00184 
00185 int _verifyAckRequestHandlerIdx;
00186 int _verifyAckHandlerIdx;
00187 int _dummyMigrationHandlerIdx;
00188 
00189 
00190 int _getGlobalStepHandlerIdx;
00191 int _recvGlobalStepHandlerIdx;
00192 
00193 int _updateHomeRequestHandlerIdx;
00194 int _updateHomeAckHandlerIdx;
00195 int _resendMessagesHandlerIdx;
00196 int _sendDetsHandlerIdx;
00197 int _sendDetsReplyHandlerIdx;
00198 int _receivedTNDataHandlerIdx;
00199 int _receivedDetDataHandlerIdx;
00200 int _distributedLocationHandlerIdx;
00201 int _sendBackLocationHandlerIdx;
00202 int _storeDeterminantsHandlerIdx;
00203 int _removeDeterminantsHandlerIdx;
00204 
00205 
00206 int _restartHandlerIdx;
00207 int _getRestartCheckpointHandlerIdx;
00208 int _recvRestartCheckpointHandlerIdx;
00209 void setTeamRecovery(void *data, ChareMlogData *mlogData);
00210 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
00211 
00212 int verifyAckTotal;
00213 int verifyAckCount;
00214 
00215 int verifyAckedRequests=0;
00216 
00217 RestartRequest *storedRequest;
00218 
00219 int _falseRestart =0; 
00226 
00227 int onGoingLoadBalancing=0;
00228 void *centralLb;
00229 void (*resumeLbFnPtr)(void *);
00230 int _receiveMlogLocationHandlerIdx;
00231 int _receiveMigrationNoticeHandlerIdx;
00232 int _receiveMigrationNoticeAckHandlerIdx;
00233 int _checkpointBarrierHandlerIdx;
00234 int _checkpointBarrierAckHandlerIdx;
00235 
00236 std::vector<MigrationRecord> migratedNoticeList;
00237 std::vector<RetainedMigratedObject *> retainedObjectList;
00238 int donotCountMigration=0;
00239 int countLBMigratedAway=0;
00240 int countLBToMigrate=0;
00241 int migrationDoneCalled=0;
00242 int checkpointBarrierCount=0;
00243 int globalResumeCount=0;
00244 CkGroupID globalLBID;
00245 int restartDecisionNumber=-1;
00246 
00247 double lastCompletedAlarm=0;
00248 double lastRestart=0;
00249 
00250 
00251 int _receiveLocationHandlerIdx;
00252 
00253 #if CMK_CONVERSE_MPI
00254 static int heartBeatHandlerIdx;
00255 static int heartBeatCheckHandlerIdx;
00256 static int partnerFailureHandlerIdx;
00257 static double lastPingTime = -1;
00258 
00259 void mpi_restart_crashed(int pe, int rank);
00260 int  find_spare_mpirank(int pe, int partition);
00261 
00262 void heartBeatPartner();
00263 void heartBeatHandler(void *msg);
00264 void heartBeatCheckHandler();
00265 void partnerFailureHandler(char *msg);
00266 int getReverseCheckPointPE();
00267 int inCkptFlag = 0;
00268 #endif
00269 
00270 static void *doNothingMsg(int * size, void * data, void ** remote, int count){
00271     return data;
00272 }
00273 
00277 void _messageLoggingInit(){
00278     if(CkMyPe() == 0)
00279         CkPrintf("[%d] Causal Message Logging Support\n",CkMyPe());
00280 
00281     
00282     CpvInitialize(Chare *,_currentObj);
00283     
00284     
00285     _pingHandlerIdx = CkRegisterHandler(_pingHandler);
00286         
00287     
00288     _storeCheckpointHandlerIdx = CkRegisterHandler(_storeCheckpointHandler);
00289     _checkpointAckHandlerIdx = CkRegisterHandler( _checkpointAckHandler);
00290     _removeProcessedLogHandlerIdx  = CkRegisterHandler(_removeProcessedLogHandler);
00291     _checkpointRequestHandlerIdx =  CkRegisterHandler(_checkpointRequestHandler);
00292 
00293     
00294     _getCheckpointHandlerIdx = CkRegisterHandler(_getCheckpointHandler);
00295     _recvCheckpointHandlerIdx = CkRegisterHandler(_recvCheckpointHandler);
00296     _updateHomeRequestHandlerIdx =CkRegisterHandler(_updateHomeRequestHandler);
00297     _updateHomeAckHandlerIdx =  CkRegisterHandler( _updateHomeAckHandler);
00298     _resendMessagesHandlerIdx = CkRegisterHandler(_resendMessagesHandler);
00299     _sendDetsHandlerIdx = CkRegisterHandler(_sendDetsHandler);
00300     _sendDetsReplyHandlerIdx = CkRegisterHandler(_sendDetsReplyHandler);
00301     _receivedTNDataHandlerIdx=CkRegisterHandler(_receivedTNDataHandler);
00302     _receivedDetDataHandlerIdx = CkRegisterHandler(_receivedDetDataHandler);
00303     _distributedLocationHandlerIdx=CkRegisterHandler(_distributedLocationHandler);
00304     _sendBackLocationHandlerIdx=CkRegisterHandler(_sendBackLocationHandler);
00305     _verifyAckRequestHandlerIdx = CkRegisterHandler(_verifyAckRequestHandler);
00306     _verifyAckHandlerIdx = CkRegisterHandler(_verifyAckHandler);
00307     _dummyMigrationHandlerIdx = CkRegisterHandler(_dummyMigrationHandler);
00308 
00309     
00310     _restartHandlerIdx = CkRegisterHandler(_restartHandler);
00311     _getRestartCheckpointHandlerIdx = CkRegisterHandler(_getRestartCheckpointHandler);
00312     _recvRestartCheckpointHandlerIdx = CkRegisterHandler(_recvRestartCheckpointHandler);
00313 
00314     
00315     _storeDeterminantsHandlerIdx = CkRegisterHandler(_storeDeterminantsHandler);
00316     _removeDeterminantsHandlerIdx = CkRegisterHandler(_removeDeterminantsHandler);
00317     
00318     
00319     _receiveMlogLocationHandlerIdx=CkRegisterHandler(_receiveMlogLocationHandler);
00320     _receiveMigrationNoticeHandlerIdx=CkRegisterHandler(_receiveMigrationNoticeHandler);
00321     _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler(_receiveMigrationNoticeAckHandler);
00322     _getGlobalStepHandlerIdx=CkRegisterHandler(_getGlobalStepHandler);
00323     _recvGlobalStepHandlerIdx=CkRegisterHandler(_recvGlobalStepHandler);
00324     _checkpointBarrierHandlerIdx=CkRegisterHandler(_checkpointBarrierHandler);
00325     _checkpointBarrierAckHandlerIdx=CkRegisterHandler(_checkpointBarrierAckHandler);
00326     
00327     
00328     _receiveLocationHandlerIdx=CkRegisterHandler(_receiveLocationHandler);
00329 
00330    
00331 #if CMK_CONVERSE_MPI
00332     heartBeatHandlerIdx = CkRegisterHandler(heartBeatHandler);
00333     heartBeatCheckHandlerIdx = CkRegisterHandler(heartBeatCheckHandler);
00334     partnerFailureHandlerIdx = CkRegisterHandler(partnerFailureHandler);
00335 #endif
00336 
00337     
00338     CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
00339     CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
00340     CpvInitialize(Queue, _outOfOrderMessageQueue);
00341     CpvInitialize(Queue, _delayedRemoteMessageQueue);
00342     CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
00343     CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
00344     
00345     CpvInitialize(char **,_bufferedTicketRequests);
00346     CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
00347     CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
00348     for(int i=0;i<CkNumPes();i++){
00349         CpvAccess(_bufferedTicketRequests)[i]=NULL;
00350         CpvAccess(_numBufferedTicketRequests)[i]=0;
00351     }
00352  
00353     
00354     _numBufferedDets = 0;
00355     _indexBufferedDets = 0;
00356     _phaseBufferedDets = 0;
00357     _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
00358     CpvInitialize(char *, _localDets);
00359     CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, std::vector<Determinant> *> *),_remoteDets);
00360     CpvInitialize(char *, _incarnation);
00361     CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
00362     CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
00363     CpvInitialize(int *, _storeDetsSizes);
00364     CpvInitialize(char **, _storeDetsPtrs);
00365     CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00366     CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, std::vector<Determinant> *>(100, 0.4);
00367     CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
00368     for(int i=0; i<CmiNumPes(); i++){
00369         CpvAccess(_incarnation)[i] = 0;
00370     }
00371     CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
00372     CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
00373     CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
00374     CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
00375 
00376     
00377     CpvInitialize(int, _numEmigrantRecObjs);
00378     CpvAccess(_numEmigrantRecObjs) = 0;
00379     CpvInitialize(int, _numImmigrantRecObjs);
00380     CpvAccess(_numImmigrantRecObjs) = 0;
00381 
00382     CpvInitialize(std::vector<CkLocation *> *, _immigrantRecObjs);
00383     CpvAccess(_immigrantRecObjs) = new std::vector<CkLocation *>;
00384 
00385     
00386     CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
00387     CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
00388 
00389     
00390     if(diskCkptFlag){
00391 #if CMK_USE_MKSTEMP
00392         sprintf(fName, "/tmp/ckpt%d-XXXXXX", CkMyPe());
00393         mkstemp(fName);
00394 #else
00395         fName=tmpnam(NULL);
00396 #endif
00397     }
00398 
00399     
00400     traceRegisterUserEvent("Remove Logs", 20);
00401     traceRegisterUserEvent("Ticket Request Handler", 21);
00402     traceRegisterUserEvent("Ticket Handler", 22);
00403     traceRegisterUserEvent("Local Message Copy Handler", 23);
00404     traceRegisterUserEvent("Local Message Ack Handler", 24);    
00405     traceRegisterUserEvent("Preprocess current message",25);
00406     traceRegisterUserEvent("Preprocess past message",26);
00407     traceRegisterUserEvent("Preprocess future message",27);
00408     traceRegisterUserEvent("Checkpoint",28);
00409     traceRegisterUserEvent("Checkpoint Store",29);
00410     traceRegisterUserEvent("Checkpoint Ack",30);
00411     traceRegisterUserEvent("Send Ticket Request",31);
00412     traceRegisterUserEvent("Generalticketrequest1",32);
00413     traceRegisterUserEvent("TicketLogLocal",33);
00414     traceRegisterUserEvent("next_ticket and SN",34);
00415     traceRegisterUserEvent("Timeout for buffered remote messages",35);
00416     traceRegisterUserEvent("Timeout for buffered local messages",36);
00417     traceRegisterUserEvent("Inform Location Home",37);
00418     traceRegisterUserEvent("Receive Location Handler",38);
00419     
00420     lastCompletedAlarm=CmiWallTimer();
00421     lastRestart = CmiWallTimer();
00422 
00423     
00424 #if CMK_CONVERSE_MPI
00425     void heartBeatPartner();
00426     void heartBeatCheckHandler();
00427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00428     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00429 #endif
00430 
00431 #if COLLECT_STATS_MSGS
00432 #if COLLECT_STATS_MSGS_TOTAL
00433     totalMsgsTarget = 0;
00434     totalMsgsSize = 0.0;
00435 #else
00436     numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00437     sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
00438     for(int i=0; i<CmiNumPes(); i++){
00439         numMsgsTarget[i] = 0;
00440         sizeMsgsTarget[i] = 0;
00441     }
00442 #endif
00443 #endif
00444 #if COLLECT_STATS_DETS
00445     numPiggyDets = 0;
00446     numDets = 0;
00447     numDupDets = 0;
00448 #endif
00449 #if COLLECT_STATS_MEMORY
00450     msgLogSize = 0;
00451     bufferedDetsSize = 0;
00452     storedDetsSize = 0;
00453 #endif
00454 
00455 }
00456 
00457 #if CMK_CONVERSE_MPI
00458 
00462 void partnerFailureHandler(char *msg)
00463 {
00464    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
00465 
00466    
00467    int newrank = find_spare_mpirank(diepe, CmiMyPartition());
00468    int buddy = getReverseCheckPointPE();
00469    if (buddy == diepe)  {
00470      mpi_restart_crashed(diepe, newrank);
00471      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00472    }
00473 }
00474 
00478 void heartBeatHandler(void *msg)
00479 {
00480     lastPingTime = CmiWallTimer();
00481     CmiFree(msg);
00482 }
00483 
00487 void heartBeatCheckHandler()
00488 {
00489     double now = CmiWallTimer();
00490     if (lastPingTime > 0 && now - lastPingTime > FAIL_DET_THRESHOLD && !inCkptFlag) {
00491         int i, pe, buddy;
00492         
00493         buddy = getReverseCheckPointPE();
00494         CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
00495         
00496         for (int pe = 0; pe < CmiNumPes(); pe++) {
00497             if (pe == buddy) continue;
00498             char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00499             *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
00500             CmiSetHandler(msg, partnerFailureHandlerIdx);
00501             CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00502         }
00503     }
00504     else 
00505         CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
00506 }
00507 
00511 void heartBeatPartner()
00512 {
00513     int buddy = getCheckPointPE();
00514     
00515     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
00516     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
00517     CmiSetHandler(msg, heartBeatHandlerIdx);
00518     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
00519     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
00520 }
00521 #endif
00522 
00523 void killLocal(void *_dummy,double curWallTime);    
00524 
00525 void readKillFile(){
00526     FILE *fp=fopen(killFile,"r");
00527     if(!fp){
00528         return;
00529     }
00530     int proc;
00531     double sec;
00532     while(fscanf(fp,"%d %lf",&proc,&sec)==2){
00533         if(proc == CkMyPe()){
00534             killTime = CmiWallTimer()+sec;
00535             printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
00536             CcdCallFnAfter(killLocal,NULL,sec*1000);    
00537         }
00538     }
00539     fclose(fp);
00540 }
00541 
00546 void readFaultFile(){
00547         FILE *fp=fopen(faultFile,"r");
00548         if(!fp){
00549                 return;
00550         }
00551         int proc;
00552         double sec;
00553         fscanf(fp,"%d %lf",&proc,&sec);
00554     faultMean = sec;
00555     if(proc == CkMyPe()){
00556             printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
00557             CcdCallFnAfter(killLocal,NULL,sec*1000);
00558     }
00559         fclose(fp);
00560 }
00561 
00562 void killLocal(void *_dummy,double curWallTime){
00563     printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
00564     if(CmiWallTimer()<killTime-1){
00565         CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
00566     }else{
00567 #if CMK_CONVERSE_MPI
00568         CkDieNow(); 
00569 #else
00570         kill(getpid(),SIGKILL);
00571 #endif
00572     }
00573 }
00574 
00575 #if ! CMK_CONVERSE_MPI
00576 void CkDieNow()
00577 {
00578     
00579     CmiPrintf("[%d] die now.\n", CmiMyPe());
00580     killTime = CmiWallTimer()+0.001;
00581     CcdCallFnAfter(killLocal,NULL,1);
00582 }
00583 #endif
00584 
00585 
00586 
00591 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
00592     Determinant *det, *auxDet;
00593     char *aux;
00594 
00595     DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
00596 
00597     
00598     if(_indexBufferedDets >= _maxBufferedDets){
00599         aux = CpvAccess(_localDets);
00600         _maxBufferedDets *= 2;
00601         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
00602         memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
00603         CmiFree(aux);
00604     }
00605 
00606     
00607     det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
00608     det->sender = sender;
00609     det->receiver = receiver;
00610     det->SN = SN;
00611     det->TN = TN;
00612     _numBufferedDets++;
00613     _indexBufferedDets++;
00614 
00615 #if COLLECT_STATS_MEMORY
00616     bufferedDetsSize++;
00617 #endif
00618 #if COLLECT_STATS_DETS
00619     numDets++;
00620 #endif
00621 }
00622 
00623 
00624 
00628 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
00629     if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
00630         DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
00631         void *origMsg = EnvToUsr(env);
00632         for(int i=0;i<CmiNumPes();i++){
00633             if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
00634                 void *copyMsg = CkCopyMsg(&origMsg);
00635                 envelope *copyEnv = UsrToEnv(copyMsg);
00636                 copyEnv->SN=0;
00637                 copyEnv->TN=0;
00638                 copyEnv->sender.type = TypeInvalid;
00639                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
00640                 sendGroupMsg(copyEnv,i,_infoIdx);
00641             }
00642         }
00643         return;
00644     }
00645 
00646     
00647     env->SN=0;
00648     env->TN=0;
00649     env->sender.type = TypeInvalid;
00650 
00651     CkObjID recver;
00652     recver.type = TypeGroup;
00653     recver.data.group.id = env->getGroupNum();
00654     recver.data.group.onPE = destPE;
00655     sendCommonMsg(recver,env,destPE,_infoIdx);
00656 }
00657 
00661 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
00662     if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
00663         DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
00664         void *origMsg = EnvToUsr(env);
00665         for(int i=0;i<CmiNumNodes();i++){
00666             if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
00667                 void *copyMsg = CkCopyMsg(&origMsg);
00668                 envelope *copyEnv = UsrToEnv(copyMsg);
00669                 copyEnv->SN=0;
00670                 copyEnv->TN=0;
00671                 copyEnv->sender.type = TypeInvalid;
00672                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
00673             }
00674         }
00675         return;
00676     }
00677 
00678     
00679     env->SN=0;
00680     env->TN=0;
00681     env->sender.type = TypeInvalid;
00682 
00683     CkObjID recver;
00684     recver.type = TypeNodeGroup;
00685     recver.data.group.id = env->getGroupNum();
00686     recver.data.group.onPE = destNode;
00687     sendCommonMsg(recver,env,destNode,_infoIdx);
00688 }
00689 
00693 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
00694     CkObjID recver;
00695     recver.type = TypeArray;
00696     recver.data.array.id = env->getArrayMgr();
00697     recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
00698 
00699     if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00700         char recverString[100],senderString[100];
00701         
00702         DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00703     }
00704 
00705     
00706     env->SN = 0;
00707     env->TN = 0;
00708 
00709     sendCommonMsg(recver,env,destPE,_infoIdx);
00710 };
00711 
00715 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
00716     CkObjID recver;
00717     recver.type = TypeChare;
00718     recver.data.chare.id = *pCid;
00719 
00720     if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
00721         char recverString[100],senderString[100];
00722         
00723         DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
00724     }
00725 
00726     
00727     env->SN = 0;
00728     env->TN = 0;
00729 
00730     sendCommonMsg(recver,env,destPE,_infoIdx);
00731 };
00732 
00736 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
00737     envelope *env = _env;
00738     MCount ticketNumber = 0;
00739     int resend=0; 
00740     char recverName[100];
00741     char senderString[100];
00742     double _startTime=CkWallTimer();
00743     
00744     DEBUG_MEM(CmiMemoryCheck());
00745 
00746     if(CpvAccess(_currentObj) == NULL){
00747 
00748         DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
00749         generalCldEnqueue(destPE,env,_infoIdx);
00750         return;
00751     }
00752 
00753     
00754     if(env->flags & CK_BYPASS_DET_MLOG){
00755         env->sender = CpvAccess(_currentObj)->mlogData->objID;
00756         env->recver = recver;
00757         DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
00758         generalCldEnqueue(destPE,env,_infoIdx);
00759         return;
00760     }
00761     
00762     
00763     env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
00764     if(env->sender.type == TypeInvalid){
00765         env->sender = CpvAccess(_currentObj)->mlogData->objID;
00766     }else{
00767 
00768 
00769         env->sender = CpvAccess(_currentObj)->mlogData->objID;
00770         env->SN = 0;
00771     }
00772     
00773     DEBUG_MEM(CmiMemoryCheck());
00774 
00775     CkObjID &sender = env->sender;
00776     env->recver = recver;
00777 
00778     Chare *obj = (Chare *)env->sender.getObject();
00779       
00780     if(env->SN == 0){
00781         DEBUG_MEM(CmiMemoryCheck());
00782         env->SN = obj->mlogData->nextSN(recver);
00783     }else{
00784         resend = 1;
00785     }
00786     
00787 
00788         DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
00789     
00790 
00791 
00792 
00793         
00794 
00795 
00796     
00797     _startTime = CkWallTimer();
00798 
00799     
00800     if(isLocal(destPE)){
00801         sendLocalMsg(env, _infoIdx);
00802     }else{
00803         if((teamSize > 1) && isTeamLocal(destPE)){
00804 
00805             
00806             Chare *senderObj = (Chare *)sender.getObject();
00807             SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
00808             if(ticketRow != NULL){
00809                 Ticket ticket = ticketRow->get(env->SN);
00810                 if(ticket.TN != 0){
00811                     ticketNumber = ticket.TN;
00812                     DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
00813                 }
00814             }
00815         }
00816         
00817         
00818         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
00819         sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
00820         
00821     }
00822 }
00823 
00828 inline bool isLocal(int destPE){
00829     
00830     if(destPE == CkMyPe())
00831         return true;
00832 
00833     return false;
00834 }
00835 
00840 inline bool isTeamLocal(int destPE){
00841 
00842     
00843     if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
00844         return true;
00845 
00846     return false;
00847 }
00848 
00852 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
00853     DEBUG_NOW(char recverString[100]);
00854     DEBUG_NOW(char senderString[100]);
00855 
00856     int totalSize;
00857 
00858     envelope *env = entry->env;
00859     DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
00860 
00861     
00862     Chare *obj = (Chare *)entry->env->sender.getObject();
00863     entry->env->recver = recver;
00864     entry->env->SN = SN;
00865     entry->env->TN = TN;
00866     if(!resend){
00867         
00868         if(!isTeamLocal(entry->destPE)){
00869             obj->mlogData->addLogEntry(entry);
00870 #if COLLECT_STATS_LOGGING
00871             MLOGFT_totalMessages += 1.0;
00872             MLOGFT_totalLogSize += entry->env->getTotalsize();
00873             if(entry->env->flags & CK_MULTICAST_MSG_MLOG){
00874                 MLOGFT_totalMcastLogSize += entry->env->getTotalsize(); 
00875             }
00876             if(entry->env->flags & CK_REDUCTION_MSG_MLOG){
00877                 MLOGFT_totalReductionLogSize += entry->env->getTotalsize(); 
00878             }
00879 
00880 #endif
00881         }else{
00882             
00883             entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
00884         }
00885     }
00886 
00887     
00888     if(_numBufferedDets > 0){
00889 
00890         
00891         CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
00892         CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
00893         CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
00894         CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
00895     
00896         
00897         CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
00898         CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
00899         CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
00900         CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
00901         DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
00902         CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
00903         CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
00904     }
00905 
00906     
00907     entry->indexBufDets = _indexBufferedDets;
00908     entry->numBufDets = _numBufferedDets;
00909 
00910     
00911     generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
00912 
00913     DEBUG_MEM(CmiMemoryCheck());
00914 #if COLLECT_STATS_MSGS
00915 #if COLLECT_STATS_MSGS_TOTAL
00916     totalMsgsTarget++;
00917     totalMsgsSize += (float)env->getTotalsize();
00918 #else
00919     numMsgsTarget[destPE]++;
00920     sizeMsgsTarget[destPE] += env->getTotalsize();
00921 #endif
00922 #endif
00923 #if COLLECT_STATS_DETS
00924     numPiggyDets += _numBufferedDets;
00925 #endif
00926 #if COLLECT_STATS_MEMORY
00927     msgLogSize += env->getTotalsize();
00928 #endif
00929 };
00930 
00931 
00937 void sendLocalMsg(envelope *env, int _infoIdx){
00938     DEBUG_PERF(double _startTime=CkWallTimer());
00939     DEBUG_MEM(CmiMemoryCheck());
00940     DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
00941     DEBUG(char senderString[100]);
00942     DEBUG(char recverString[100]);
00943     Ticket ticket;
00944 
00945     DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
00946 
00947     
00948     Chare *recverObj = (Chare *)env->recver.getObject();
00949 
00950     
00951     if(recverObj){
00952 
00953 
00954 
00955 
00956 
00957 
00958 
00959 
00960 
00961 
00962 
00963 
00964 
00965 
00966 
00967 
00968 
00969 
00970 
00971 
00972 
00973 
00974 
00975 
00976 
00977 
00978 
00979 
00980 
00981 
00982 
00983 
00984 
00985 
00986 
00987         
00988         _skipCldEnqueue(CmiMyPe(),env,_infoIdx);    
00989 
00990         DEBUG_MEM(CmiMemoryCheck());
00991     }else{
00992         DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
00993     }
00994 
00995 };
00996 
00997 
00998 
00999 
01000 
01001 
01002 
01006 void _removeDeterminantsHandler(char *buffer){
01007     RemoveDeterminantsHeader *header;
01008     int index, phase;
01009 
01010     
01011     header = (RemoveDeterminantsHeader *)buffer;
01012     index = header->index;
01013     phase = header->phase;
01014 
01015     
01016 
01017     
01018     if(phase == _phaseBufferedDets){
01019         if(index > _indexBufferedDets)
01020             CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
01021         CmiAssert(index <= _indexBufferedDets);
01022         _numBufferedDets = _indexBufferedDets - index;
01023     }
01024 
01025     
01026     CmiFree(buffer);
01027 
01028 }
01029 
01033 void _storeDeterminantsHandler(char *buffer){
01034     StoreDeterminantsHeader *header;
01035     Determinant *detPtr, det;
01036     int i, n, index, phase, destPE;
01037     std::vector<Determinant> *vec;
01038 
01039     
01040     header = (StoreDeterminantsHeader *)buffer;
01041     n = header->number;
01042     index = header->index;
01043     phase = header->phase;
01044     destPE = header->PE;
01045     detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
01046 
01047     DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
01048     DEBUG_MEM(CmiMemoryCheck());
01049 
01050     
01051     for(i = 0; i < n; i++){
01052         det.sender = detPtr->sender;
01053         det.receiver = detPtr->receiver;
01054         det.SN = detPtr->SN;
01055         det.TN = detPtr->TN;
01056 
01057         
01058         vec = CpvAccess(_remoteDets)->get(det.receiver);
01059         if(vec == NULL){
01060             vec = new std::vector<Determinant>();
01061             CpvAccess(_remoteDets)->put(det.receiver) = vec;
01062         }
01063 #if COLLECT_STATS_DETS
01064 #if COLLECT_STATS_DETS_DUP
01065         for(int j=0; j<vec->size(); j++){
01066             if(isSameDet(&(*vec)[j],&det)){
01067                 numDupDets++;
01068                 break;
01069             }
01070         }
01071 #endif
01072 #endif
01073 #if COLLECT_STATS_MEMORY
01074         storedDetsSize++;
01075 #endif
01076         vec->push_back(det);
01077         detPtr = detPtr++;
01078     }
01079 
01080     DEBUG_MEM(CmiMemoryCheck());
01081 
01082     
01083     CmiFree(buffer);
01084 
01085     
01086     CpvAccess(_removeDetsHeader)->index = index;
01087     CpvAccess(_removeDetsHeader)->phase = phase;
01088     CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
01089     CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
01090     
01091 }
01092 
01097 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
01098     DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
01099     double  _startTime = CkWallTimer();
01100     CmiFree(ticketRequest);
01101 
01102 }
01103 
01104 
01110 inline bool _getTicket(envelope *env, int *flag){
01111     DEBUG_MEM(CmiMemoryCheck());
01112     DEBUG(char recverName[100]);
01113     DEBUG(char senderString[100]);
01114     Ticket ticket;
01115     
01116     
01117     CkObjID sender = env->sender;
01118     CkObjID recver = env->recver;
01119     MCount SN = env->SN;
01120     MCount TN = env->TN;
01121     Chare *recverObj = (Chare *)recver.getObject();
01122     
01123     DEBUG(recver.toString(recverName);)
01124 
01125     
01126     if(recverObj->mlogData->restartFlag)
01127         return false;
01128 
01129     
01130     ticket = recverObj->mlogData->getTicket(sender, SN);
01131     TN = ticket.TN;
01132     
01133     
01134     if(teamSize > 1 && TN != 0){
01135         DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
01136         recverObj->mlogData->verifyTicket(sender,SN,TN);
01137     }
01138 
01139     
01140     
01141     if(TN == 0){
01142         ticket = recverObj->mlogData->next_ticket(sender,SN);
01143         *flag = NEW_TICKET;
01144     } else {
01145         *flag = OLD_TICKET;
01146     }
01147     if(ticket.TN > recverObj->mlogData->tProcessed){
01148         ticket.state = NEW_TICKET;
01149     } else {
01150         ticket.state = OLD_TICKET;
01151     }
01152     
01153     if(ticket.TN == 0){
01154         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
01155         return false;
01156     }
01157         
01158     
01159     env->TN = ticket.TN;
01160     DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
01161     return true;
01162 
01163 };
01164 
01165 bool fault_aware(CkObjID &recver){
01166     switch(recver.type){
01167         case TypeChare:
01168             return true;
01169         case TypeMainChare:
01170             return false;
01171         case TypeGroup:
01172         case TypeNodeGroup:
01173         case TypeArray:
01174             return true;
01175         default:
01176             return false;
01177     }
01178 };
01179 
01180 
01181 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
01182     DEBUG_NOW(char recverString[100]);
01183     DEBUG_NOW(char senderString[100]);
01184     DEBUG_MEM(CmiMemoryCheck());
01185     int flag;
01186     bool ticketSuccess;
01187 
01188     
01189     CkObjID recver = env->recver;
01190 
01191     
01192     if(env->flags & CK_BYPASS_DET_MLOG){
01193         DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
01194         return 1;   
01195     }
01196 
01197     
01198     if(!fault_aware(recver)){
01199         CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
01200         return 1;
01201     }
01202 
01203     
01204     Chare *obj = (Chare *)recver.getObject();
01205     *objPointer = obj;
01206     if(obj == NULL){
01207         if(_recoveryFlag){
01208             int possiblePE = recver.guessPE();
01209             if(possiblePE != CkMyPe()){
01210                 int totalSize = env->getTotalsize();
01211                 CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
01212                 DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
01213             }else{
01214                 
01215                 
01216                 CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01217                 DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
01218             }
01219             return 0;
01220         } else {
01221             return 1;
01222         }
01223     } 
01224 
01225     
01226     if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
01227         CmiFree(env);
01228         return 0;
01229     }
01230 
01231     DEBUG_MEM(CmiMemoryCheck());
01232     DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01233 
01234     
01235     ticketSuccess = _getTicket(env,&flag);
01236 
01237     
01238     if(!ticketSuccess){
01239         
01240         
01241         CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
01242         DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
01243 
01244         return 0;
01245     }
01246     
01247     
01248     
01249     
01250     if(flag == NEW_TICKET){
01251         
01252         addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
01253     }
01254 
01255     DEBUG_MEM(CmiMemoryCheck());
01256 
01257     double _startTime = CkWallTimer();
01258 
01259     if(env->TN == obj->mlogData->tProcessed+1){
01260         
01261         DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
01262         
01263         
01264     DEBUG_MEM(CmiMemoryCheck());
01265         while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
01266             void *qMsgPtr;
01267             CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
01268             envelope *qEnv = (envelope *)qMsgPtr;
01269             CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
01270     DEBUG_MEM(CmiMemoryCheck());
01271         }
01272 
01273         
01274 
01275     DEBUG_MEM(CmiMemoryCheck());
01276         return 1;
01277     }
01278 
01279     
01280     
01281     if(env->TN <= obj->mlogData->tProcessed){
01282         DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
01283         
01284         CmiFree(env);
01285         return 0;
01286     }
01287     
01288 
01289     DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
01290     
01291     
01292     CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
01293 
01294     DEBUG_MEM(CmiMemoryCheck());
01295     
01296     return 0;
01297 }
01298 
01302 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
01303     DEBUG(char senderString[100]);
01304     if(obj){
01305         if(sender.guessPE() == CkMyPe()){
01306             if(entry != NULL){
01307                 entry->env = NULL;
01308             }
01309         }
01310         obj->mlogData->tProcessed++;
01311 
01312 
01313 
01314     }
01315     DEBUG_MEM(CmiMemoryCheck());
01316 }
01317 
01318 
01319 
01320 
01321 
01322 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
01323 
01324     if(env->recver.type != TypeNodeGroup){
01325     
01326     
01327     
01328     
01329 
01330         _skipCldEnqueue(destPE,env,_infoIdx);
01331     }else{
01332         _noCldNodeEnqueue(destPE,env);
01333     }
01334 
01335 }
01336 
01341 int calledRetryTicketRequest=0;
01342 
01343 void _pingHandler(CkPingMsg *msg){
01344     printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
01345     CmiFree(msg);
01346 }
01347 
01348 
01349 
01350 
01351 
01352 
01353 
01354 std::vector<TProcessedLog> processedTicketLog;
01355 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
01356 void clearUpMigratedRetainedLists(int PE);
01357 
01358 void checkpointAlarm(void *_dummy,double curWallTime){
01359     double diff = curWallTime-lastCompletedAlarm;
01360     DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
01361 
01362 
01363 
01364 
01365     if(diff < ((chkptPeriod) - 2)){
01366         CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
01367         return;
01368     }
01369     CheckpointRequest request;
01370     CmiInitMsgHeader(request.header, sizeof(CheckpointRequest));
01371     request.PE = CkMyPe();
01372     CmiSetHandler(&request,_checkpointRequestHandlerIdx);
01373     CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
01374 };
01375 
01376 void _checkpointRequestHandler(CheckpointRequest *request){
01377     startMlogCheckpoint(NULL,CmiWallTimer());
01378 }
01379 
01383 void startMlogCheckpoint(void *_dummy, double curWallTime){
01384     double _startTime = CkWallTimer();
01385 
01386     
01387     checkpointCount++;
01388     _recoveryFlag = false;
01389 
01390 #if CMK_CONVERSE_MPI
01391     inCkptFlag = 1;
01392 #endif
01393     
01394 #if DEBUG_CHECKPOINT
01395     if(CmiMyPe() == 0){
01396         printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
01397     }
01398 #endif
01399 
01400     DEBUG_MEM(CmiMemoryCheck());
01401 
01402     PUP::sizer psizer;
01403     psizer | checkpointCount;
01404     for(int i=0; i<CmiNumPes(); i++){
01405         psizer | CpvAccess(_incarnation)[i];
01406     }
01407     CkPupROData(psizer);
01408     DEBUG_MEM(CmiMemoryCheck());
01409     CkPupGroupData(psizer,true);
01410     DEBUG_MEM(CmiMemoryCheck());
01411     CkPupNodeGroupData(psizer,true);
01412     DEBUG_MEM(CmiMemoryCheck());
01413     pupArrayElementsSkip(psizer,true,NULL);
01414     DEBUG_MEM(CmiMemoryCheck());
01415 
01416     int dataSize = psizer.size();
01417     int totalSize = sizeof(CheckPointDataMsg)+dataSize;
01418     char *msg = (char *)CmiAlloc(totalSize);
01419     CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01420     chkMsg->PE = CkMyPe();
01421     chkMsg->dataSize = dataSize;
01422     
01423     char *buf = &msg[sizeof(CheckPointDataMsg)];
01424     PUP::toMem pBuf(buf);
01425 
01426     pBuf | checkpointCount;
01427     for(int i=0; i<CmiNumPes(); i++){
01428         pBuf | CpvAccess(_incarnation)[i];
01429     }
01430     CkPupROData(pBuf);
01431     CkPupGroupData(pBuf,true);
01432     CkPupNodeGroupData(pBuf,true);
01433     pupArrayElementsSkip(pBuf,true,NULL);
01434 
01435     unAckedCheckpoint=1;
01436     CmiSetHandler(msg,_storeCheckpointHandlerIdx);
01437     CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
01438     
01439     
01440 
01441 
01442     processedTicketLog.clear();
01443     forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
01444 
01445 #if DEBUG_CHECKPOINT
01446     if(CmiMyPe() == 0){
01447         printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
01448     }
01449 #endif
01450 
01451 #if COLLECT_STATS_MEMORY
01452     CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
01453     msgLogSize = 0;
01454     bufferedDetsSize = 0;
01455     storedDetsSize = 0;
01456 #endif
01457 
01458     if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
01459         lastCompletedAlarm = curWallTime;
01460         CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
01461     }
01462     traceUserBracketEvent(28,_startTime,CkWallTimer());
01463 };
01464 
01468 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
01469     DEBUG(char objString[100]);
01470 
01471     std::vector<TProcessedLog> *log = (std::vector<TProcessedLog> *)data;
01472     TProcessedLog logEntry;
01473     logEntry.recver = mlogData->objID;
01474     logEntry.tProcessed = mlogData->tProcessed;
01475     log->push_back(logEntry);
01476 
01477     DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
01478 }
01479 
01480 class ElementPacker : public CkLocIterator {
01481 private:
01482     CkLocMgr *locMgr;
01483     PUP::er &p;
01484 public:
01485     ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
01486     void addLocation(CkLocation &loc) {
01487         CkArrayIndexMax idx=loc.getIndex();
01488         CkGroupID gID = locMgr->ckGetGroupID();
01489         p|gID;      
01490         p|idx;
01491         p|loc;
01492     }
01493 };
01494 
01498 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listsize){
01499     int numElements,i;
01500     int numGroups = CkpvAccess(_groupIDTable)->size();  
01501     if(!p.isUnpacking()){
01502         numElements = CkCountArrayElements();
01503     }   
01504     
01505     p | numElements;
01506     DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
01507     if(!p.isUnpacking()){
01508         CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
01509     }else{
01510         
01511 
01512         for(int j=0;j<listsize;j++){
01513             if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01514                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
01515                 listToSkip[j].idx.print();
01516             }
01517         }
01518         
01519         printf("numElements = %d\n",numElements);
01520     
01521         for (int i=0; i<numElements; i++) {
01522             CkGroupID gID;
01523             CkArrayIndexMax idx;
01524             p|gID;
01525             p|idx;
01526             int flag=0;
01527             int matchedIdx=0;
01528             for(int j=0;j<listsize;j++){
01529                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
01530                     if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
01531                         matchedIdx = j;
01532                         flag = 1;
01533                         break;
01534                     }
01535                 }
01536             }
01537             if(flag == 1){
01538                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01539             }else{
01540                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
01541             }
01542                 
01543             CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
01544             CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
01545             mgr->resume(idx,p,create,flag);
01546             if(flag == 1){
01547                 int homePE = mgr->homePe(idx);
01548                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
01549             }
01550         }
01551     }
01552 };
01553 
01557 void readCheckpointFromDisk(int size, char *data){
01558     FILE *f = fopen(fName,"rb");
01559     fread(data,1,size,f);
01560     fclose(f);
01561 }
01562 
01566 void writeCheckpointToDisk(int size, char *data){
01567     FILE *f = fopen(fName,"wb");
01568     fwrite(data, 1, size, f);
01569     fclose(f);
01570 }
01571 
01572 
01573 
01574 void _storeCheckpointHandler(char *msg){
01575     double _startTime=CkWallTimer();
01576         
01577     CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
01578     DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
01579     
01580     char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
01581 
01582     char *oldChkpt =    CpvAccess(_storedCheckpointData)->buf;
01583     if(oldChkpt != NULL){
01584         char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
01585         CmiFree(oldmsg);
01586     }
01587     
01588     int sendingPE = chkMsg->PE;
01589     
01590     CpvAccess(_storedCheckpointData)->buf = chkpt;
01591     CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
01592     CpvAccess(_storedCheckpointData)->PE = sendingPE;
01593 
01594     
01595     if(diskCkptFlag){
01596         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
01597         CpvAccess(_storedCheckpointData)->buf = NULL;
01598         CmiFree(msg);
01599     }
01600 
01601     int count=0;
01602     for(int j=migratedNoticeList.size()-1;j>=0;j--){
01603         if(migratedNoticeList[j].fromPE == sendingPE){
01604             migratedNoticeList[j].ackFrom = 1;
01605         }else{
01606             CmiAssert("migratedNoticeList entry for processor other than buddy");
01607         }
01608         if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01609             migratedNoticeList.erase(migratedNoticeList.begin() + j);
01610             count++;
01611         }
01612         
01613     }
01614     DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
01615     
01616     CheckPointAck ackMsg;
01617     CmiInitMsgHeader(ackMsg.header, sizeof(CheckPointAck));
01618     ackMsg.PE = CkMyPe();
01619     ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
01620     CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
01621     CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
01622     
01623     traceUserBracketEvent(29,_startTime,CkWallTimer());
01624 };
01625 
01626 
01632 void sendRemoveLogRequests(){
01633 #if SYNCHRONIZED_CHECKPOINT
01634     CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
01635 #endif
01636     double _startTime = CkWallTimer();  
01637 
01638     
01639     int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
01640     char *requestMsg = (char *)CmiAlloc(totalSize);
01641 
01642     
01643     RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01644     request->PE = CkMyPe();
01645     request->numberObjects = processedTicketLog.size();
01646     char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
01647     memcpy(listProcessedLogs,(char *)processedTicketLog.data(),processedTicketLog.size()*sizeof(TProcessedLog));
01648     char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
01649     int *numDeterminants = (int *)listDeterminants;
01650     numDeterminants[0] = 0; 
01651     listDeterminants = (char *)&numDeterminants[1];
01652 
01653     
01654     CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
01655     
01656     DEBUG_MEM(CmiMemoryCheck());
01657     for(int i=0;i<CkNumPes();i++){
01658         CmiSyncSend(i,totalSize,requestMsg);  
01659     }
01660     CmiFree(requestMsg);
01661 
01662     clearUpMigratedRetainedLists(CmiMyPe());
01663     
01664     
01665     traceUserBracketEvent(30,_startTime,CkWallTimer());
01666 
01667     DEBUG_MEM(CmiMemoryCheck());
01668 }
01669 
01670 
01671 void _checkpointAckHandler(CheckPointAck *ackMsg){
01672     DEBUG_MEM(CmiMemoryCheck());
01673     unAckedCheckpoint=0;
01674     DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
01675     DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
01676     if(onGoingLoadBalancing){
01677         onGoingLoadBalancing = 0;
01678         finishedCheckpointLoadBalancing();
01679     }else{
01680         sendRemoveLogRequests();
01681     }
01682     CmiFree(ackMsg);
01683     
01684 };
01685 
01686 
01690 inline void populateDeterminantTable(char *data){
01691     DEBUG(char recverString[100]);
01692     DEBUG(char senderString[100]);
01693     int numDets, *numDetsPtr;
01694     Determinant *detList;
01695     CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01696     SNToTicket *tickets;
01697     Ticket ticket;
01698 
01699     RemoveLogRequest *request = (RemoveLogRequest *)data;
01700     TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01701     
01702     numDetsPtr = (int *)&list[request->numberObjects];
01703     numDets = numDetsPtr[0];
01704     detList = (Determinant *)&numDetsPtr[1];
01705 
01706     
01707     for(int i=0; i<numDets; i++){
01708         table = detTable.get(detList[i].sender);
01709         if(table == NULL){
01710             table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
01711             detTable.put(detList[i].sender) = table;
01712         }
01713         tickets = table->get(detList[i].receiver);
01714         if(tickets == NULL){
01715             tickets = new SNToTicket();
01716             table->put(detList[i].receiver) = tickets; 
01717         }
01718         ticket.TN = detList[i].TN;
01719         tickets->put(detList[i].SN) = ticket;
01720     }
01721 
01722     DEBUG_MEM(CmiMemoryCheck());    
01723 
01724 }
01725 
01726 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
01727     int total;
01728     DEBUG(char nameString[100]);
01729     DEBUG_MEM(CmiMemoryCheck());
01730     CmiMemoryCheck();
01731     char *data = (char *)_data;
01732     RemoveLogRequest *request = (RemoveLogRequest *)data;
01733     TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
01734     CkQ<MlogEntry *> *mlog = mlogData->getMlog();
01735     CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
01736     SNToTicket *tickets;
01737     MCount TN;
01738 
01739     int count=0;
01740     total = mlog->length();
01741     for(int i=0; i<total; i++){
01742         MlogEntry *logEntry = mlog->deq();
01743 
01744         
01745         table = detTable.get(logEntry->env->sender);
01746         if(table != NULL){
01747             tickets = table->get(logEntry->env->recver);
01748             if(tickets != NULL){
01749                 TN = tickets->get(logEntry->env->SN).TN;
01750                 if(TN != 0){
01751                     logEntry->env->TN = TN;
01752                 }
01753             }
01754         }
01755     
01756         int match=0;
01757         for(int j=0;j<request->numberObjects;j++){
01758             if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
01759                 
01760                 match = 1;
01761                 break;
01762             }
01763         }
01764         char senderString[100],recverString[100];
01765 
01766         if(match){
01767             count++;
01768             delete logEntry;
01769         }else{
01770             mlog->enq(logEntry);
01771         }
01772     }
01773     if(count > 0){
01774         DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
01775     }
01776     DEBUG_MEM(CmiMemoryCheck());
01777     CmiMemoryCheck();
01778 }
01779 
01783 void _removeProcessedLogHandler(char *requestMsg){
01784     double start = CkWallTimer();
01785 
01786     
01787     populateDeterminantTable(requestMsg);
01788 
01789     
01790     forAllCharesDo(removeProcessedLogs,requestMsg);
01791 
01792     
01793     
01794     
01795     RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
01796     DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
01797     
01798     
01799 
01800 
01801     DEBUG_MEM(CmiMemoryCheck());
01802     clearUpMigratedRetainedLists(request->PE);
01803     
01804     traceUserBracketEvent(20,start,CkWallTimer());
01805     CmiFree(requestMsg);    
01806 };
01807 
01808 
01809 void clearUpMigratedRetainedLists(int PE){
01810     int count=0;
01811     CmiMemoryCheck();
01812     
01813     for(int j=migratedNoticeList.size()-1;j>=0;j--){
01814         if(migratedNoticeList[j].toPE == PE){
01815             migratedNoticeList[j].ackTo = 1;
01816         }
01817         if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
01818             migratedNoticeList.erase(migratedNoticeList.begin() + j);
01819             count++;
01820         }
01821     }
01822     DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
01823     
01824     for(int j=retainedObjectList.size()-1;j>=0;j--){
01825         if(retainedObjectList[j]->migRecord.toPE == PE){
01826             RetainedMigratedObject *obj = retainedObjectList[j];
01827             DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
01828             retainedObjectList.erase(retainedObjectList.begin() + j);
01829             if(obj->msg != NULL){
01830                 CmiMemoryCheck();
01831                 CmiFree(obj->msg);
01832             }
01833             delete obj;
01834         }
01835     }
01836 }
01837 
01838 
01839 
01840     
01841 
01847 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
01848     RestartRequest msg;
01849     CmiInitMsgHeader(msg.header, sizeof(RestartRequest));
01850     fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
01851 
01852     
01853     _restartFlag = true;
01854     _recoveryFlag = true;
01855     _numRestartResponses = 0;
01856 
01857     
01858     if(teamSize > 1){
01859         for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
01860             if(i != CkMyPe() && i < CkNumPes()){
01861                 
01862                 msg.PE = CkMyPe();
01863                 CmiSetHandler(&msg,_restartHandlerIdx);
01864                 CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
01865             }
01866         }
01867     }
01868 
01869     
01870     msg.PE = CkMyPe();
01871     CmiSetHandler(&msg,_getCheckpointHandlerIdx);
01872     CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01873 };
01874 
01879 void _restartHandler(RestartRequest *restartMsg){
01880     int i;
01881     int numGroups = CkpvAccess(_groupIDTable)->size();
01882     RestartRequest msg;
01883     CmiInitMsgHeader(msg.header, sizeof(RestartRequest));
01884     fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
01885 
01886     
01887     _restartFlag = true;
01888     _numRestartResponses = 0;
01889 
01890     
01891     
01892 
01893 
01894 
01895 
01896 
01897 
01898 
01899 
01900 
01901     
01902     msg.PE = CkMyPe();
01903     CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
01904     CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
01905 }
01906 
01907 
01911 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
01912 
01913     
01914     StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
01915 
01916     
01917     CkAssert(restartMsg->PE == storedChkpt->PE);
01918 
01919     storedRequest = restartMsg;
01920     verifyAckTotal = 0;
01921 
01922     for(int i=0;i<migratedNoticeList.size();i++){
01923         if(migratedNoticeList[i].fromPE == restartMsg->PE){
01924 
01925             if(migratedNoticeList[i].ackFrom == 0){
01926                 
01927                 
01928                 VerifyAckMsg msg;
01929                 CmiInitMsgHeader(msg.header, sizeof(VerifyAckMsg));
01930                 msg.migRecord = migratedNoticeList[i];
01931                 msg.index = i;
01932                 msg.fromPE = CmiMyPe();
01933                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
01934                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
01935                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
01936                 verifyAckTotal++;
01937             }
01938         }
01939     }
01940 
01941     
01942     if(verifyAckTotal == 0){
01943         sendCheckpointData(MLOG_RESTARTED);
01944     }
01945     verifyAckCount = 0;
01946 }
01947 
01951 void _recvRestartCheckpointHandler(char *_restartData){
01952     RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
01953     MigrationRecord *migratedAwayElements;
01954 
01955     globalLBID = restartData->lbGroupID;
01956     
01957     restartData->restartWallTime *= 1000;
01958     adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
01959     adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
01960     if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
01961 
01962     
01963     printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
01964     char *buf = &_restartData[sizeof(RestartProcessorData)];
01965     
01966     if(restartData->numMigratedAwayElements != 0){
01967         migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
01968         memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
01969         printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
01970         buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
01971     }
01972 
01973     
01974     forAllCharesDo(setTeamRecovery,NULL);
01975     
01976     PUP::fromMem pBuf(buf);
01977     pBuf | checkpointCount;
01978     for(int i=0; i<CmiNumPes(); i++){
01979         pBuf | CpvAccess(_incarnation)[i];
01980     }
01981     CkPupROData(pBuf);
01982     CkPupGroupData(pBuf,false);
01983     CkPupNodeGroupData(pBuf,false);
01984     pupArrayElementsSkip(pBuf,false,NULL);
01985     CkAssert(pBuf.size() == restartData->checkPointSize);
01986     printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
01987 
01988     
01989     for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
01990         CpvAccess(_incarnation)[i]++;
01991     }
01992     
01993     
01994     forAllCharesDo(unsetTeamRecovery,NULL);
01995 
01996     
01997     forAllCharesDo(initializeRestart,NULL);
01998     
01999     CmiFree(_restartData);  
02000 
02001     
02002 
02003 
02004 
02005 
02006 
02007 
02008 
02009 
02010 
02011 
02012 
02013 
02014 
02015 
02016 
02017     
02018     std::vector<TProcessedLog> objectVec;
02019     forAllCharesDo(createObjIDList, (void *)&objectVec);
02020     int numberObjects = objectVec.size();
02021     
02022     
02023 
02024 
02025     int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02026     char *resendMsg = (char *)CmiAlloc(totalSize);  
02027 
02028     ResendRequest *resendReq = (ResendRequest *)resendMsg;
02029     resendReq->PE =CkMyPe(); 
02030     resendReq->numberObjects = numberObjects;
02031     char *objList = &resendMsg[sizeof(ResendRequest)];
02032     memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
02033     
02034 
02035     
02036 
02037 
02038 
02039 
02040     
02041     
02042 
02043 
02044 
02045 
02046     CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02047     CpvAccess(_currentObj) = lb;
02048     lb->ReceiveDummyMigration(restartDecisionNumber);
02049 
02050     sleep(10);
02051     
02052     CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02053     for(int i=0;i<CkNumPes();i++){
02054         if(i != CkMyPe()){
02055             CmiSyncSend(i,totalSize,resendMsg); 
02056         }   
02057     }
02058     _resendMessagesHandler(resendMsg);
02059 
02060 }
02061 
02062 
02063 void CkMlogRestartDouble(void *,double){
02064     CkMlogRestart(NULL,NULL);
02065 };
02066 
02067 
02068 void CkMlogRestartLocal(){
02069     CkMlogRestart(NULL,NULL);
02070 };
02071 
02075 void _getCheckpointHandler(RestartRequest *restartMsg){
02076 
02077     
02078     StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02079 
02080     
02081     CkAssert(restartMsg->PE == storedChkpt->PE);
02082 
02083     storedRequest = restartMsg;
02084     verifyAckTotal = 0;
02085 
02086     for(int i=0;i<migratedNoticeList.size();i++){
02087         if(migratedNoticeList[i].fromPE == restartMsg->PE){
02088 
02089             if(migratedNoticeList[i].ackFrom == 0){
02090                 
02091                 
02092                 VerifyAckMsg msg;
02093                 CmiInitMsgHeader(msg.header, sizeof(VerifyAckMsg));
02094                 msg.migRecord = migratedNoticeList[i];
02095                 msg.index = i;
02096                 msg.fromPE = CmiMyPe();
02097                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
02098                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
02099                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
02100                 verifyAckTotal++;
02101             }
02102         }
02103     }
02104 
02105     
02106     if(verifyAckTotal == 0){
02107         sendCheckpointData(MLOG_CRASHED);
02108     }
02109     verifyAckCount = 0;
02110 }
02111 
02112 
02113 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
02114     CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
02115     CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
02116     if(rec != NULL) {
02117             
02118             
02119             CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
02120             
02121             LBDatabase *lbdb = rec->getLBDB();
02122             LDObjHandle ldHandle = rec->getLdHandle();
02123                 
02124             locMgr->setDuringMigration(true);
02125             
02126             locMgr->reclaim(verifyRequest->migRecord.idx);
02127             lbdb->UnregisterObj(ldHandle);
02128             
02129             locMgr->setDuringMigration(false);
02130             
02131             verifyAckedRequests++;
02132 
02133     }
02134     CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
02135     CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
02136 };
02137 
02138 
02139 void _verifyAckHandler(VerifyAckMsg *verifyReply){
02140     int index =     verifyReply->index;
02141     migratedNoticeList[index] = verifyReply->migRecord;
02142     verifyAckCount++;
02143     CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
02144     if(verifyAckCount == verifyAckTotal){
02145         sendCheckpointData(MLOG_CRASHED);
02146     }
02147 }
02148 
02149 
02155 void sendCheckpointData(int mode){  
02156     RestartRequest *restartMsg = storedRequest;
02157     StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
02158     int numMigratedAwayElements = migratedNoticeList.size();
02159     if(migratedNoticeList.size() != 0){
02160             printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
02161 
02162     }
02163         
02164     int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
02165     
02166     DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
02167     CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
02168     
02169     totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
02170     
02171     char *msg = (char *)CmiAlloc(totalSize);
02172     
02173     RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
02174     dataMsg->PE = CkMyPe();
02175     dataMsg->restartWallTime = CmiTimer();
02176     dataMsg->checkPointSize = storedChkpt->bufSize;
02177     
02178     dataMsg->numMigratedAwayElements = numMigratedAwayElements;
02179 
02180     
02181     dataMsg->numMigratedInElements = 0;
02182     dataMsg->migratedElementSize = 0;
02183     dataMsg->lbGroupID = globalLBID;
02184     
02185 
02186 
02187 
02188     
02189     char *buf = &msg[sizeof(RestartProcessorData)];
02190 
02191     if(dataMsg->numMigratedAwayElements != 0){
02192         memcpy(buf,migratedNoticeList.data(),migratedNoticeList.size()*sizeof(MigrationRecord));
02193         buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
02194     }
02195 
02196     if(diskCkptFlag){
02197         readCheckpointFromDisk(storedChkpt->bufSize,buf);
02198     } else {    
02199         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
02200     }
02201     buf = &buf[storedChkpt->bufSize];
02202 
02203     if(mode == MLOG_RESTARTED){
02204         CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
02205         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02206         CmiFree(restartMsg);
02207     }else{
02208         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
02209         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
02210         CmiFree(restartMsg);
02211     }
02212 };
02213 
02214 
02215 
02216 
02217 
02218 void createObjIDList(void *data,ChareMlogData *mlogData){
02219     std::vector<TProcessedLog> *list = (std::vector<TProcessedLog> *)data;
02220     TProcessedLog entry;
02221     entry.recver = mlogData->objID;
02222     entry.tProcessed = mlogData->tProcessed;
02223     list->push_back(entry);
02224     DEBUG_TEAM(char objString[100]);
02225     DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
02226     DEBUG_RECOVERY(printLog(&entry));
02227 }
02228 
02229 
02234 void _recvCheckpointHandler(char *_restartData){
02235     RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
02236     MigrationRecord *migratedAwayElements;
02237 
02238     globalLBID = restartData->lbGroupID;
02239     
02240     restartData->restartWallTime *= 1000;
02241     adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
02242     adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
02243     if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
02244 
02245     
02246     printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
02247     char *buf = &_restartData[sizeof(RestartProcessorData)];
02248     
02249     if(restartData->numMigratedAwayElements != 0){
02250         migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
02251         memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
02252         printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
02253         buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
02254     }
02255     
02256     PUP::fromMem pBuf(buf);
02257 
02258     pBuf | checkpointCount;
02259     for(int i=0; i<CmiNumPes(); i++){
02260         pBuf | CpvAccess(_incarnation)[i];
02261     }
02262     CkPupROData(pBuf);
02263     CkPupGroupData(pBuf,true);
02264     CkPupNodeGroupData(pBuf,true);
02265     pupArrayElementsSkip(pBuf,true,NULL);
02266     CkAssert(pBuf.size() == restartData->checkPointSize);
02267     printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
02268 
02269     
02270     CpvAccess(_incarnation)[CmiMyPe()]++;
02271     
02272     forAllCharesDo(initializeRestart,NULL);
02273     
02274     CmiFree(_restartData);
02275     
02276     _initDone();
02277 
02278     getGlobalStep(globalLBID);
02279 
02280     
02281     _numRestartResponses = 0;
02282     
02283     
02284     std::vector<TProcessedLog> objectVec;
02285     forAllCharesDo(createObjIDList, (void *)&objectVec);
02286     int numberObjects = objectVec.size();
02287     
02288     
02289     int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02290     char *resendMsg = (char *)CmiAlloc(totalSize);  
02291 
02292     ResendRequest *resendReq = (ResendRequest *)resendMsg;
02293     resendReq->PE =CkMyPe(); 
02294     resendReq->numberObjects = numberObjects;
02295     char *objList = &resendMsg[sizeof(ResendRequest)];
02296     memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
02297 
02298     CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
02299     CmiSyncBroadcastAndFree(totalSize,resendMsg);
02300     
02301 }
02302 
02307 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
02308     countUpdateHomeAcks++;
02309     CmiFree(updateHomeAck);
02310     
02311     if(countUpdateHomeAcks != CmiNumPes()){
02312         return;
02313     }
02314 
02315     
02316     std::vector<TProcessedLog> objectVec;
02317     forAllCharesDo(createObjIDList, (void *)&objectVec);
02318     int numberObjects = objectVec.size();
02319     
02320     
02321     int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02322     char *resendMsg = (char *)CmiAlloc(totalSize);  
02323 
02324     ResendRequest *resendReq = (ResendRequest *)resendMsg;
02325     resendReq->PE =CkMyPe(); 
02326     resendReq->numberObjects = numberObjects;
02327     char *objList = &resendMsg[sizeof(ResendRequest)];
02328     memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
02329 
02330     
02331     if(fastRecovery){
02332         distributeRestartedObjects();
02333         printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02334     }
02335     
02336     
02337 
02338 
02339 
02340 
02341     CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02342     CpvAccess(_currentObj) = lb;
02343     lb->ReceiveDummyMigration(restartDecisionNumber);
02344 
02345 
02346     
02347     CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02348     CmiSyncBroadcastAllAndFree(totalSize, resendMsg);
02349 
02350 };
02351 
02355 void initializeRestart(void *data, ChareMlogData *mlogData){
02356     mlogData->resendReplyRecvd = 0;
02357     mlogData->receivedTNs = new std::vector<MCount>;
02358     mlogData->restartFlag = true;
02359 };
02360 
02364 void updateHomePE(void *data,ChareMlogData *mlogData){
02365     RestartRequest *updateRequest = (RestartRequest *)data;
02366     int PE = updateRequest->PE; 
02367     
02368     
02369     if(mlogData->objID.type == TypeArray){
02370         
02371         CkGroupID myGID = mlogData->objID.data.array.id;
02372         CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
02373         CkArrayID aid(mlogData->objID.data.array.id);       
02374         
02375         CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
02376         if(locMgr->homePe(myIdx) == PE){
02377             DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
02378             DEBUG_RESTART(myIdx.print());
02379             informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
02380         }
02381     }
02382 };
02383 
02384 
02388 void _updateHomeRequestHandler(RestartRequest *updateRequest){
02389     int sender = updateRequest->PE;
02390     
02391     forAllCharesDo(updateHomePE,updateRequest);
02392     
02393     updateRequest->PE = CmiMyPe();
02394     CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
02395     CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
02396     if(sender == getCheckPointPE() && unAckedCheckpoint==1){
02397         CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
02398         checkpointCount--;
02399         startMlogCheckpoint(NULL,0);
02400     }
02401     if(sender == getCheckPointPE()){
02402         for(int i=0;i<retainedObjectList.size();i++){
02403             if(retainedObjectList[i]->acked == 0){
02404                 MigrationNotice migMsg;
02405                 CmiInitMsgHeader(migMsg.header, sizeof(MigrationNotice));
02406                 migMsg.migRecord = retainedObjectList[i]->migRecord;
02407                 migMsg.record = retainedObjectList[i];
02408                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
02409                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
02410             }
02411         }
02412     }
02413 }
02414 
02418 void fillTicketForChare(void *data, ChareMlogData *mlogData){
02419     DEBUG(char name[100]);
02420     ResendData *resendData = (ResendData *)data;
02421     int PE = resendData->PE; 
02422     int count=0;
02423     CkHashtableIterator *iterator;
02424     void *objp;
02425     void *objkey;
02426     CkObjID *objID;
02427     SNToTicket *snToTicket;
02428     Ticket ticket;
02429     
02430     
02431     iterator = mlogData->teamTable.iterator();
02432     while( (objp = iterator->next(&objkey)) != NULL ){
02433         objID = (CkObjID *)objkey;
02434     
02435         
02436         for(int j=0;j<resendData->numberObjects;j++){
02437             if((*objID) == (resendData->listObjects)[j].recver){
02438                 snToTicket = *(SNToTicket **)objp;
02439 
02440                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
02441                     ticket = snToTicket->get(snIndex);  
02442                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
02443                         
02444                         resendData->ticketVecs[j].push_back(ticket.TN);
02445                     }
02446                 }
02447             }
02448         }
02449     }
02450 
02451     
02452     delete iterator;
02453 }
02454 
02455 
02460 void setTeamRecovery(void *data, ChareMlogData *mlogData){
02461     char name[100];
02462     mlogData->teamRecoveryFlag = true;
02463 }
02464 
02468 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
02469     mlogData->teamRecoveryFlag = false;
02470 }
02471 
02475 void printLog(TProcessedLog *log){
02476     char recverString[100];
02477     CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
02478 }
02479 
02483 void printMsg(envelope *env, const char* par){
02484     char senderString[100];
02485     char recverString[100];
02486     CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
02487 }
02488 
02492 void printDet(Determinant *det, const char* par){
02493     char senderString[100];
02494     char recverString[100];
02495     CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
02496 }
02497 
02503 void resendMessageForChare(void *data, ChareMlogData *mlogData){
02504     DEBUG_RESTART(char nameString[100]);
02505     DEBUG_RESTART(char recverString[100]);
02506     DEBUG_RESTART(char senderString[100]);
02507 
02508     ResendData *resendData = (ResendData *)data;
02509     int PE = resendData->PE; 
02510     int count=0;
02511     int ticketRequests=0;
02512     CkQ<MlogEntry *> *log = mlogData->getMlog();
02513 
02514     DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
02515 
02516     
02517     for(int i=0;i<log->length();i++){
02518         MlogEntry *logEntry = (*log)[i];
02519         
02520         
02521         
02522         envelope *env = logEntry->env;
02523         if(env == NULL){
02524             continue;
02525         }
02526     
02527         
02528         if(env->recver.type != TypeInvalid){
02529             for(int j=0;j<resendData->numberObjects;j++){
02530                 if(env->recver == (resendData->listObjects)[j].recver){
02531                     if(PE != CkMyPe()){
02532                         DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
02533                         if(env->recver.type == TypeNodeGroup){
02534                             CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
02535                         }else{
02536                             CmiSetHandler(env,CmiGetXHandler(env));
02537                             CmiSyncSend(PE,env->getTotalsize(),(char *)env);
02538                         }
02539                     }else{
02540                         envelope *copyEnv = copyEnvelope(env);
02541                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
02542                     }
02543                     DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
02544                     count++;
02545                 }
02546             }
02547             
02548         }   
02549     }
02550     DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)  
02551 }
02552 
02557 void _sendDetsHandler(char *msg){
02558     ResendData d;
02559     std::vector<Determinant> *detVec;
02560     ResendRequest *resendReq = (ResendRequest *)msg;
02561 
02562     
02563     
02564     
02565     CmiResetGlobalReduceSeqID();
02566 
02567     
02568     char *listObjects = &msg[sizeof(ResendRequest)];
02569     d.numberObjects = resendReq->numberObjects;
02570     d.PE = resendReq->PE;
02571     d.listObjects = (TProcessedLog *)listObjects;
02572     d.ticketVecs = new std::vector<MCount>[d.numberObjects];
02573     detVec = new std::vector<Determinant>[d.numberObjects];
02574 
02575     
02576     
02577     std::vector<Determinant> *vec;
02578     for(int i=0; i<d.numberObjects; i++){
02579         vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
02580         if(vec != NULL){
02581             for(int j=0; j<vec->size(); j++){
02582 
02583                 
02584                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
02585 
02586                     
02587                     d.ticketVecs[i].push_back((*vec)[j].TN);
02588 
02589                     
02590                     detVec[i].push_back((*vec)[j]);
02591 
02592                     DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
02593                 }
02594             }
02595         }
02596     }
02597 
02598     int totalDetStored = 0;
02599     for(int i=0;i<d.numberObjects;i++){
02600         totalDetStored += detVec[i].size();
02601     }
02602 
02603     
02604     
02605     
02606     
02607     int totalTNStored=0;
02608     for(int i=0;i<d.numberObjects;i++){
02609         totalTNStored += d.ticketVecs[i].size();
02610     }
02611     
02612     int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
02613     char *resendReplyMsg = (char *)CmiAlloc(totalSize);
02614     
02615     ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
02616     resendReply->PE = CkMyPe();
02617     resendReply->numberObjects = d.numberObjects;
02618     
02619     char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
02620     CkObjID *replyObjects = (CkObjID *)replyListObjects;
02621     for(int i=0;i<d.numberObjects;i++){
02622         replyObjects[i] = d.listObjects[i].recver;
02623     }
02624     
02625     char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
02626     for(int i=0;i<d.numberObjects;i++){
02627         int vecsize = d.ticketVecs[i].size();
02628         memcpy(ticketList,&vecsize,sizeof(int));
02629         ticketList = &ticketList[sizeof(int)];
02630         memcpy(ticketList,d.ticketVecs[i].data(),sizeof(MCount)*vecsize);
02631         ticketList = &ticketList[sizeof(MCount)*vecsize];
02632     }
02633 
02634     
02635     for(int i=0;i<d.numberObjects;i++){
02636         int vecsize = detVec[i].size();
02637         memcpy(ticketList,&vecsize,sizeof(int));
02638         ticketList = &ticketList[sizeof(int)];
02639         memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
02640         ticketList = &ticketList[sizeof(Determinant)*vecsize];
02641     }
02642 
02643     CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
02644     CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
02645 
02646     delete [] detVec;
02647     delete [] d.ticketVecs;
02648 
02649     DEBUG_MEM(CmiMemoryCheck());
02650 
02651     if(resendReq->PE != CkMyPe()){
02652         CmiFree(msg);
02653     }   
02654 
02655     lastRestart = CmiWallTimer();
02656 
02657 }
02658 
02663 void _resendMessagesHandler(char *msg){
02664     ResendData d;
02665     ResendRequest *resendReq = (ResendRequest *)msg;
02666 
02667     
02668 
02669     
02670     char *listObjects = &msg[sizeof(ResendRequest)];
02671     d.numberObjects = resendReq->numberObjects;
02672     d.PE = resendReq->PE;
02673     d.listObjects = (TProcessedLog *)listObjects;
02674     
02675     DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
02676 
02677     
02678     
02679     if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
02680         forAllCharesDo(fillTicketForChare,&d);
02681     else
02682         forAllCharesDo(resendMessageForChare,&d);
02683 
02684     DEBUG_MEM(CmiMemoryCheck());
02685 
02686     
02687         CmiFree(msg);
02688     
02689 
02690     lastRestart = CmiWallTimer();
02691 }
02692 
02693 MCount maxVec(std::vector<MCount> *TNvec);
02694 void sortVec(std::vector<MCount> *TNvec);
02695 int searchVec(std::vector<MCount> *TNVec,MCount searchTN);
02696 
02700 void processDelayedRemoteMsgQueue(){
02701     DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
02702     
02703     while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
02704         void *qMsgPtr;
02705         CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
02706         envelope *qEnv = (envelope *)qMsgPtr;
02707         DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
02708         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
02709         DEBUG_MEM(CmiMemoryCheck());
02710     }
02711 
02712 }
02713 
02719 void _sendDetsReplyHandler(char *msg){
02720     ResendRequest *resendReply = (ResendRequest *)msg;
02721     CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
02722     char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
02723     
02724 
02725     DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
02726     
02727     for(int i =0; i< resendReply->numberObjects;i++){
02728         Chare *obj = (Chare *)listObjects[i].getObject();
02729         
02730         int vecsize;
02731         memcpy(&vecsize,listTickets,sizeof(int));
02732         listTickets = &listTickets[sizeof(int)];
02733         MCount *listTNs = (MCount *)listTickets;
02734         listTickets = &listTickets[vecsize*sizeof(MCount)];
02735     
02736         if(obj != NULL){
02737             
02738             processReceivedTN(obj,vecsize,listTNs);
02739         }else{
02740             
02741             int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
02742             char *TNMsg = (char *)CmiAlloc(totalSize);
02743             ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
02744             receivedTNData->recver = listObjects[i];
02745             receivedTNData->numTNs = vecsize;
02746             char *tnList = &TNMsg[sizeof(ReceivedTNData)];
02747             memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
02748             CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
02749             CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
02750         }
02751 
02752     }
02753 
02754     
02755     for(int i = 0; i < resendReply->numberObjects; i++){
02756         Chare *obj = (Chare *)listObjects[i].getObject();
02757         
02758         int vecsize;
02759         memcpy(&vecsize,listTickets,sizeof(int));
02760         listTickets = &listTickets[sizeof(int)];
02761         Determinant *listDets = (Determinant *)listTickets; 
02762         listTickets = &listTickets[vecsize*sizeof(Determinant)];
02763     
02764         if(obj != NULL){
02765             
02766             processReceivedDet(obj,vecsize,listDets);
02767         } else {
02768             
02769             
02770             int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
02771             char *detMsg = (char *)CmiAlloc(totalSize);
02772             ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
02773             receivedDetData->recver = listObjects[i];
02774             receivedDetData->numDets = vecsize;
02775             char *detList = &detMsg[sizeof(ReceivedDetData)];
02776             memcpy(detList,listDets,sizeof(Determinant)*vecsize);
02777             CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
02778             CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
02779         }
02780 
02781     }
02782 
02783     
02784     _numRestartResponses++;
02785     if(_numRestartResponses != CkNumPes())
02786         return;
02787     else 
02788         _numRestartResponses = 0;
02789 
02790     
02791     std::vector<TProcessedLog> objectVec;
02792     forAllCharesDo(createObjIDList, (void *)&objectVec);
02793     int numberObjects = objectVec.size();
02794     
02795     
02796     int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
02797     char *resendMsg = (char *)CmiAlloc(totalSize);  
02798 
02799     ResendRequest *resendReq = (ResendRequest *)resendMsg;
02800     resendReq->PE =CkMyPe(); 
02801     resendReq->numberObjects = numberObjects;
02802     char *objList = &resendMsg[sizeof(ResendRequest)];
02803     memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
02804 
02805     CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
02806     CpvAccess(_currentObj) = lb;
02807     lb->ReceiveDummyMigration(restartDecisionNumber);
02808 
02809 
02810 
02811 
02812     CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
02813     CmiSyncBroadcastAllAndFree(totalSize, resendMsg);
02814 
02815     
02816     if(fastRecovery){
02817         distributeRestartedObjects();
02818         printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
02819     }
02820 
02821 
02822 
02823 };
02824 
02828 void _receivedDetDataHandler(ReceivedDetData *msg){
02829     DEBUG_NOW(char objName[100]);
02830     Chare *obj = (Chare *) msg->recver.getObject();
02831     if(obj){        
02832         char *_msg = (char *)msg;
02833         DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02834         Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
02835         processReceivedDet(obj,msg->numDets,listDets);
02836         CmiFree(msg);
02837     }else{
02838         int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
02839         CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02840     }
02841 }
02842 
02846 void _receivedTNDataHandler(ReceivedTNData *msg){
02847     DEBUG_NOW(char objName[100]);
02848     Chare *obj = (Chare *) msg->recver.getObject();
02849     if(obj){        
02850         char *_msg = (char *)msg;
02851         DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
02852         MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
02853         processReceivedTN(obj,msg->numTNs,listTNs);
02854         CmiFree(msg);
02855     }else{
02856         int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
02857         CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
02858     }
02859 };
02860 
02864 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
02865     Determinant *det;
02866 
02867     
02868     for(int i=0; i<listSize; i++){
02869         det = &listDets[i];
02870         if(CkMyPe() == 4) printDet(det,"RECOVERY");
02871         obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
02872         DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
02873     }
02874     
02875     DEBUG_MEM(CmiMemoryCheck());
02876 }
02877     
02881 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
02882     
02883     obj->mlogData->resendReplyRecvd++;
02884 
02885     DEBUG(char objName[100]);
02886     DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
02887     
02888     
02889     
02890     
02891 
02892     
02893     for(int j=0;j<listSize;j++){
02894         obj->mlogData->receivedTNs->push_back(listTNs[j]);
02895     }
02896     
02897     
02898     
02899     
02900     
02901     if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
02902         obj->mlogData->resendReplyRecvd = 0;
02903 
02904 #if VERIFY_DETS
02905         
02906         sortVec(obj->mlogData->receivedTNs);
02907     
02908         
02909         
02910         if(obj->mlogData->receivedTNs->size() > 0){
02911             int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
02912             int vecsize = obj->mlogData->receivedTNs->size();
02913             int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
02914             
02915             
02916             if(teamSize > 1){
02917                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
02918                     obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02919             }else{
02920                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
02921             }
02922             
02923             if(numberHoles == 0){
02924             }else{
02925                 char objName[100];                  
02926                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
02927                 obj->mlogData->numberHoles = numberHoles;
02928                 obj->mlogData->ticketHoles = new MCount[numberHoles];
02929                 int countHoles=0;
02930                 for(int k=tProcessedIndex+1;k<vecsize;k++){
02931                     if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
02932                         
02933                         for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
02934                             DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
02935                             obj->mlogData->ticketHoles[countHoles] = newTN;
02936                             countHoles++;
02937                         }   
02938                     }
02939                 }
02940                 
02941                 if(countHoles != numberHoles){
02942                     char str[100];
02943                     printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
02944                 }
02945                 CkAssert(countHoles == numberHoles);                    
02946                 obj->mlogData->currentHoles = numberHoles;
02947             }
02948         }
02949 #else
02950         if(obj->mlogData->receivedTNs->size() > 0){
02951             obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
02952         }
02953 #endif
02954         
02955         delete obj->mlogData->receivedTNs;
02956         DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
02957         obj->mlogData->receivedTNs = NULL;
02958         obj->mlogData->restartFlag = false;
02959 
02960         
02961 
02962         DEBUG_RESTART(char objString[100]);
02963         DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
02964     }
02965 
02966 }
02967 
02969 MCount maxVec(std::vector<MCount> *TNvec){
02970     MCount max = 0;
02971     for(int i=0; i<TNvec->size(); i++){
02972         if((*TNvec)[i] > max)
02973             max = (*TNvec)[i];
02974     }
02975     return max;
02976 }
02977 
02978 void sortVec(std::vector<MCount> *TNvec){
02979     std::sort(TNvec->begin(), TNvec->end());
02980     std::erase(std::unique(TNvec->begin(), TNvec->end()), TNvec->end());
02981 }   
02982 
02983 int searchVec(std::vector<MCount> *TNVec,MCount searchTN){
02984     
02985     auto it = std::lower_bound(TNvec->begin(), TNvec->end(), searchTN);
02986     if (it == TNvec.end() || *it != searchTN)
02987         return -1;
02988     else
02989         return it - TNvec->begin();
02990 };
02991 
02992 
02993 
02994 
02995 
02996 
02997 
02998 
02999 
03000 
03001 
03002 class ElementDistributor: public CkLocIterator{
03003     CkLocMgr *locMgr;
03004     int *targetPE;
03005 
03006     void pupLocation(CkLocation &loc,PUP::er &p){
03007         CkArrayIndexMax idx=loc.getIndex();
03008         CkGroupID gID = locMgr->ckGetGroupID();
03009         p|gID;      
03010         p|idx;
03011         p|loc;
03012     };
03013 public:
03014     ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
03015 
03016     void addLocation(CkLocation &loc){
03017 
03018         
03019         if(*targetPE == CkMyPe()){
03020             *targetPE = (*targetPE +1)%CkNumPes();
03021             return;
03022         }
03023             
03024         CkArrayIndexMax idx = loc.getIndex();
03025         CkLocRec *rec = loc.getLocalRecord();
03026         CkLocMgr *locMgr = loc.getManager();
03027         std::vector<CkMigratable *> eltList;
03028             
03029         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
03030         idx.print();
03031 
03032         
03033         CpvAccess(_numEmigrantRecObjs)++;
03034         locMgr->migratableList((CkLocRec *)rec,eltList);
03035         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
03036         
03037         
03038         locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
03039         reductionMgr->incNumEmigrantRecObjs();
03040     
03041         
03042         PUP::sizer psizer;
03043         pupLocation(loc,psizer);
03044         int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
03045         char *msg = (char *)CmiAlloc(totalSize);
03046         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
03047         distributeMsg->PE = CkMyPe();
03048         char *buf = &msg[sizeof(DistributeObjectMsg)];
03049         PUP::toMem pmem(buf);
03050         pmem.becomeDeleting();
03051         pupLocation(loc,pmem);
03052             
03053         locMgr->setDuringMigration(true);
03054         delete rec;
03055         locMgr->setDuringMigration(false);
03056         locMgr->inform(idx,*targetPE);
03057 
03058         CmiSetHandler(msg,_distributedLocationHandlerIdx);
03059         CmiSyncSendAndFree(*targetPE,totalSize,msg);
03060 
03061         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
03062 
03063         
03064         *targetPE = *targetPE + 1;
03065         if(*targetPE > (CkMyPe() + parallelRecovery)){
03066             *targetPE = CkMyPe() + 1;
03067         }
03068     }
03069 
03070 };
03071 
03075 void distributeRestartedObjects(){
03076     int numGroups = CkpvAccess(_groupIDTable)->size();  
03077     int i;
03078     int targetPE=CkMyPe()+1;
03079     CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
03080 };
03081 
03085 void _sendBackLocationHandler(char *receivedMsg){
03086     printf("Array element received at processor %d after recovery\n",CkMyPe());
03087     DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
03088     int sourcePE = distributeMsg->PE;
03089     char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
03090     PUP::fromMem pmem(buf);
03091     CkGroupID gID;
03092     CkArrayIndexMax idx;
03093     pmem |gID;
03094     pmem |idx;
03095     CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03096     donotCountMigration=1;
03097     mgr->resume(idx,pmem,true);
03098     donotCountMigration=0;
03099     informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
03100     printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
03101     idx.print();
03102 
03103     
03104     std::vector<CkMigratable *> eltList;
03105     CkLocRec *rec = mgr->elementRec(idx);
03106     mgr->migratableList((CkLocRec *)rec,eltList);
03107     CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
03108     reductionMgr->decNumEmigrantRecObjs();
03109     reductionMgr->decGCount();
03110 
03111     
03112     CpvAccess(_numEmigrantRecObjs)--;
03113     if(CpvAccess(_numEmigrantRecObjs) == 0){
03114         (*resumeLbFnPtr)(centralLb);
03115     }
03116 
03117 }
03118 
03122 void _distributedLocationHandler(char *receivedMsg){
03123     printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
03124     DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
03125     int sourcePE = distributeMsg->PE;
03126     char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
03127     PUP::fromMem pmem(buf);
03128     CkGroupID gID;
03129     CkArrayIndexMax idx;
03130     pmem |gID;
03131     pmem |idx;
03132     CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03133     donotCountMigration=1;
03134     mgr->resume(idx,pmem,true);
03135     donotCountMigration=0;
03136     informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
03137     printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
03138     idx.print();
03139 
03140     CkLocRec *rec = mgr->elementRec(idx);
03141     CmiAssert(rec != NULL);
03142 
03143     
03144     CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,rec));
03145     CpvAccess(_numImmigrantRecObjs)++;
03146     
03147     std::vector<CkMigratable *> eltList;
03148     mgr->migratableList((CkLocRec *)rec,eltList);
03149     for(int i=0;i<eltList.size();i++){
03150         if(eltList[i]->mlogData->toResumeOrNot && eltList[i]->mlogData->resumeCount < globalResumeCount){
03151             CpvAccess(_currentObj) = eltList[i];
03152             eltList[i]->mlogData->immigrantRecFlag = true;
03153             eltList[i]->mlogData->immigrantSourcePE = sourcePE;
03154 
03155             
03156             CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
03157             reductionMgr->incNumImmigrantRecObjs();
03158             reductionMgr->decGCount();
03159 
03160             eltList[i]->ResumeFromSync();
03161         }
03162     }
03163 }
03164 
03165 
03168 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
03169     DummyMigrationMsg buf;
03170     CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
03171     buf.flag = MLOG_OBJECT;
03172     buf.lbID = lbID;
03173     buf.mgrID = locMgrID;
03174     buf.idx = idx;
03175     buf.locationPE = locationPE;
03176     CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03177     CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
03178 };
03179 
03180 
03185 void sendDummyMigrationCounts(int *dummyCounts){
03186     DummyMigrationMsg buf;
03187     CmiInitMsgHeader(buf.header, sizeof(DummyMigrationMsg));
03188     buf.flag = MLOG_COUNT;
03189     buf.lbID = globalLBID;
03190     CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
03191     for(int i=0;i<CmiNumPes();i++){
03192         if(i != CmiMyPe() && dummyCounts[i] != 0){
03193             buf.count = dummyCounts[i];
03194             CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
03195         }
03196     }
03197 }
03198 
03199 
03203 void _dummyMigrationHandler(DummyMigrationMsg *msg){
03204     CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03205     if(msg->flag == MLOG_OBJECT){
03206         DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
03207         LDObjHandle h;
03208         lb->Migrated(h,1);
03209     }
03210     if(msg->flag == MLOG_COUNT){
03211         DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
03212         msg->count -= verifyAckedRequests;
03213         for(int i=0;i<msg->count;i++){
03214             LDObjHandle h;
03215             lb->Migrated(h,1);
03216         }
03217     }
03218     verifyAckedRequests=0;
03219     CmiFree(msg);
03220 };
03221 
03222 
03223 
03224 
03225 
03226 
03227 
03228 
03229 class ElementCaller :  public CkLocIterator {
03230 private:
03231     CkLocMgr *locMgr;
03232     MlogFn fnPointer;
03233     void *data;
03234 public:
03235     ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
03236         locMgr = _locMgr;
03237         fnPointer = _fnPointer;
03238         data = _data;
03239     };
03240     void addLocation(CkLocation &loc){
03241         std::vector<CkMigratable *> list;
03242         CkLocRec *local = loc.getLocalRecord();
03243         locMgr->migratableList (local,list);
03244         for(int i=0;i<list.size();i++){
03245             CkMigratable *migratableElement = list[i];
03246             fnPointer(data,migratableElement->mlogData);
03247         }
03248     }
03249 };
03250 
03254 void forAllCharesDo(MlogFn fnPointer, void *data){
03255     int numGroups = CkpvAccess(_groupIDTable)->size();
03256     for(int i=0;i<numGroups;i++){
03257         Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
03258         fnPointer(data,obj->mlogData);
03259     }
03260     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
03261     for(int i=0;i<numNodeGroups;i++){
03262         Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
03263         fnPointer(data,obj->mlogData);
03264     }
03265     int i;
03266     CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
03267 };
03268 
03269 
03270 
03271 
03272 
03273 
03279 void initMlogLBStep(CkGroupID gid){
03280     DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
03281     countLBMigratedAway = 0;
03282     countLBToMigrate=0;
03283     onGoingLoadBalancing=1;
03284     migrationDoneCalled=0;
03285     checkpointBarrierCount=0;
03286     if(globalLBID.idx != 0){
03287         CmiAssert(globalLBID.idx == gid.idx);
03288     }
03289     globalLBID = gid;
03290 #if SYNCHRONIZED_CHECKPOINT
03291     garbageCollectMlog();
03292 #endif
03293 }
03294 
03298 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
03299     CkArrayIndexMax idx = loc->getIndex();
03300     CkGroupID gID = locMgr->ckGetGroupID();
03301     p|gID;      
03302     p|idx;
03303     p|*loc;
03304 };
03305 
03309 void sendBackImmigrantRecObjs(){
03310     CkLocation *loc;
03311     CkLocMgr *locMgr;
03312     CkArrayIndexMax idx;
03313     CkLocRec *rec;
03314     PUP::sizer psizer;
03315     int targetPE;
03316     std::vector<CkMigratable *> eltList;
03317     CkReductionMgr *reductionMgr;
03318  
03319     
03320     for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
03321 
03322         
03323         loc = (*CpvAccess(_immigrantRecObjs))[i];
03324         idx = loc->getIndex();
03325         rec = loc->getLocalRecord();
03326         locMgr = loc->getManager();
03327         locMgr->migratableList(rec,eltList);
03328         targetPE = eltList[i]->mlogData->immigrantSourcePE;
03329 
03330         
03331         reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
03332         reductionMgr->decNumImmigrantRecObjs();
03333 
03334         CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
03335         idx.print();
03336 
03337         
03338         locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
03339             
03340         
03341         pupLocation(loc,locMgr,psizer);
03342         int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
03343         char *msg = (char *)CmiAlloc(totalSize);
03344         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
03345         distributeMsg->PE = CkMyPe();
03346         char *buf = &msg[sizeof(DistributeObjectMsg)];
03347         PUP::toMem pmem(buf);
03348         pmem.becomeDeleting();
03349         pupLocation(loc,locMgr,pmem);
03350         
03351         locMgr->setDuringMigration(true);
03352         delete rec;
03353         locMgr->setDuringMigration(false);
03354         locMgr->inform(idx,targetPE);
03355 
03356         
03357         CmiSetHandler(msg,_sendBackLocationHandlerIdx);
03358         CmiSyncSendAndFree(targetPE,totalSize,msg);
03359 
03360         
03361         delete loc;
03362 
03363         CmiAssert(locMgr->lastKnown(idx) == targetPE);
03364         
03365     }
03366 
03367     
03368     CpvAccess(_immigrantRecObjs)->clear();
03369     CpvAccess(_numImmigrantRecObjs) = 0;
03370 
03371 }
03372 
03377 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
03378     resumeLbFnPtr = _fnPtr;
03379     centralLb = _centralLb;
03380 
03381     
03382     if(CpvAccess(_numImmigrantRecObjs) > 0){
03383         sendBackImmigrantRecObjs(); 
03384     }
03385 
03386     
03387     if(CpvAccess(_numEmigrantRecObjs) > 0)
03388         return;
03389 
03390     
03391     (*resumeLbFnPtr)(centralLb);
03392 }
03393 
03394 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
03395     DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03396     DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
03397 
03398     resumeLbFnPtr = _fnPtr;
03399     centralLb = _centralLb;
03400     migrationDoneCalled = 1;
03401     if(countLBToMigrate == countLBMigratedAway){
03402         DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03403         startMlogCheckpoint(NULL,CmiWallTimer());
03404     }
03405 };
03406 
03407 void finishedCheckpointLoadBalancing(){
03408     DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
03409 
03410     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
03411     CmiSetHandler(msg,_checkpointBarrierHandlerIdx);
03412     CmiReduce(msg,CmiMsgHeaderSizeBytes,doNothingMsg);
03413 };
03414 
03415 
03416 void sendMlogLocation(int targetPE, envelope *env){
03417 #if !SYNCHRONIZED_CHECKPOINT
03418     void *_msg = EnvToUsr(env);
03419     CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03420 
03421     int existing = 0;
03422     
03423     
03424     
03425     for(int i=0;i<retainedObjectList.size();i++){
03426         MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
03427         if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
03428             DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
03429             existing = 1;
03430             break;
03431         }
03432     }
03433 
03434     if(existing){
03435         return;
03436     }
03437     
03438     countLBToMigrate++;
03439     
03440     MigrationNotice migMsg;
03441     CmiInitMsgHeader(migMsg.header, sizeof(MigrationNotice));
03442     migMsg.migRecord.gID = msg->gid;
03443     migMsg.migRecord.idx = msg->idx;
03444     migMsg.migRecord.fromPE = CkMyPe();
03445     migMsg.migRecord.toPE =  targetPE;
03446     
03447     DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
03448     
03449     RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
03450     retainedObject->migRecord = migMsg.migRecord;
03451     retainedObject->acked  = 0;
03452     
03453     CkPackMessage(&env);
03454     
03455     migMsg.record = retainedObject;
03456     retainedObject->msg = env;
03457     int size = retainedObject->size = env->getTotalsize();
03458     
03459     retainedObjectList.push_back(retainedObject);
03460     
03461     CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
03462     CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
03463     
03464     DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
03465 #endif
03466 }
03467 
03468 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
03469     msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
03470     migratedNoticeList.push_back(msg->migRecord);
03471 
03472     MigrationNoticeAck buf;
03473     CmiInitMsgHeader(buf.header, sizeof(MigrationNoticeAck));
03474     buf.record = msg->record;
03475     CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
03476     CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
03477 }
03478 
03479 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
03480     
03481     RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
03482     retainedObject->acked = 1;
03483 
03484     CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
03485     CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
03486 
03487     
03488     CkGroupID gID = retainedObject->migRecord.gID ;
03489     CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03490     informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
03491     
03492     countLBMigratedAway++;
03493     if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
03494         DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
03495         startMlogCheckpoint(NULL,CmiWallTimer());
03496     }
03497 };
03498 
03499 void _receiveMlogLocationHandler(void *buf){
03500     envelope *env = (envelope *)buf;
03501     DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
03502     CkUnpackMessage(&env);
03503     void *_msg = EnvToUsr(env);
03504     CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
03505     CkGroupID gID= msg->gid;
03506     DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
03507     CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
03508     CpvAccess(_currentObj)=mgr;
03509     mgr->immigrate(msg);
03510 };
03511 
03512 
03513 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
03514 
03515 
03516 
03517 
03518 
03519 
03520 
03521 
03522 }
03523 
03527 void _checkpointBarrierHandler(CheckpointBarrierMsg *barrierMsg){
03528 
03529     
03530     CmiFree(barrierMsg);
03531 
03532     
03533     CheckpointBarrierMsg *msg = (CheckpointBarrierMsg *)CmiAlloc(sizeof(CheckpointBarrierMsg));
03534     CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
03535     CmiSyncBroadcastAllAndFree(sizeof(CheckpointBarrierMsg),(char *)msg); 
03536 }
03537 
03538 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
03539     DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
03540     DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
03541 
03542 #if !SYNCHRONIZED_CHECKPOINT
03543     
03544     sendRemoveLogRequests();
03545 #endif
03546 
03547 #if CMK_CONVERSE_MPI
03548     inCkptFlag = 0;
03549 #endif
03550 
03551     
03552     (*resumeLbFnPtr)(centralLb);
03553 
03554     
03555     CmiFree(msg);
03556 }
03557 
03561 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
03562     int total;
03563     MlogEntry *logEntry;
03564     CkQ<MlogEntry *> *mlog = mlogData->getMlog();
03565 
03566     
03567     total = mlog->length();
03568     for(int i=0; i<total; i++){
03569         logEntry = mlog->deq();
03570         delete logEntry;
03571     }
03572 
03573 }
03574 
03580 void garbageCollectMlog(){
03581     CkHashtableIterator *iterator;
03582     std::vector<Determinant> *detArray;
03583 
03584     DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
03585 
03586     
03587     _indexBufferedDets = 0;
03588     _numBufferedDets = 0;
03589     _phaseBufferedDets++;
03590 
03591     
03592     iterator = CpvAccess(_remoteDets)->iterator();
03593     while(iterator->hasNext()){
03594         detArray = *(std::vector<Determinant> **)iterator->next();
03595         detArray->clear();
03596     }
03597     
03598     
03599     delete iterator;
03600 
03601     
03602     forAllCharesDo(garbageCollectMlogForChare, NULL);
03603 }
03604 
03610 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
03611     double _startTime = CmiWallTimer();
03612     CurrentLocationMsg msg;
03613     CmiInitMsgHeader(msg.header, sizeof(CurrentLocationMsg));
03614     msg.mgrID = locMgrID;
03615     msg.idx = idx;
03616     msg.locationPE = currentPE;
03617     msg.fromPE = CkMyPe();
03618 
03619     DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
03620     CmiSetHandler(&msg,_receiveLocationHandlerIdx);
03621     CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
03622     traceUserBracketEvent(37,_startTime,CmiWallTimer());
03623 }
03624 
03625 
03626 void _receiveLocationHandler(CurrentLocationMsg *data){
03627     double _startTime = CmiWallTimer();
03628     CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
03629     if(mgr == NULL){
03630         CmiFree(data);
03631         return;
03632     }
03633     CkLocRec *rec = mgr->elementNrec(data->idx);
03634     DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
03635     if(rec != NULL){
03636         if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe()){
03637             if(data->fromPE == data->locationPE){
03638                 CmiAbort("Another processor has the same object");
03639             }
03640         }
03641     }
03642     if(rec!= NULL && data->fromPE != CmiMyPe()){
03643         int targetPE = data->fromPE;
03644         data->fromPE = CmiMyPe();
03645         data->locationPE = CmiMyPe();
03646         DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
03647         CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
03648     }else{
03649         mgr->inform(data->idx,data->locationPE);
03650     }
03651     CmiFree(data);
03652     traceUserBracketEvent(38,_startTime,CmiWallTimer());
03653 }
03654 
03655 
03656 
03657 void getGlobalStep(CkGroupID gID){
03658     LBStepMsg msg;
03659     CmiInitMsgHeader(msg.header, sizeof(LBStepMsg));
03660     int destPE = 0;
03661     msg.lbID = gID;
03662     msg.fromPE = CmiMyPe();
03663     msg.step = -1;
03664     CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
03665     CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
03666 };
03667 
03668 void _getGlobalStepHandler(LBStepMsg *msg){
03669     CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
03670     msg->step = lb->step();
03671     CmiAssert(msg->fromPE != CmiMyPe());
03672     CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
03673     CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
03674     CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
03675 };
03676 
03680 void _recvGlobalStepHandler(LBStepMsg *msg){
03681     
03682     
03683     restartDecisionNumber = msg->step;
03684     CmiFree(msg);
03685 
03686     CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
03687 
03688     
03689     ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
03690     resendReplyMsg->PE = CkMyPe();
03691     resendReplyMsg->numberObjects = 0;
03692     _sendDetsReplyHandler((char *)resendReplyMsg);
03693 };
03694 
03698 void _messageLoggingExit(){
03699     
03700     
03701     if(CkMyPe() == 0)
03702         printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
03703 
03704 #if COLLECT_STATS_LOGGING
03705     printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
03706     printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
03707     printf("[%d] MULTICAST MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalMcastLogSize/(float)MEGABYTE);
03708     printf("[%d] REDUCTION MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalReductionLogSize/(float)MEGABYTE);
03709 #endif
03710 
03711 #if COLLECT_STATS_MSGS
03712 #if COLLECT_STATS_MSGS_TOTAL
03713     printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
03714     printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
03715 #else
03716     printf("[%d] TARGETS: ",CmiMyPe());
03717     for(int i=0; i<CmiNumPes(); i++){
03718 #if COLLECT_STATS_MSG_COUNT
03719         printf("%d ",numMsgsTarget[i]);
03720 #else
03721         printf("%d ",sizeMsgsTarget[i]);
03722 #endif
03723     }
03724     printf("\n");
03725 #endif
03726 #endif
03727 
03728 #if COLLECT_STATS_DETS
03729     printf("\n");
03730     printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
03731     printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
03732 #if COLLECT_STATS_DETS_DUP
03733     printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
03734 #endif
03735 #endif
03736 
03737 }
03738 
03739 void MlogEntry::pup(PUP::er &p){
03740     p | destPE;
03741     p | _infoIdx;
03742     int size;
03743     if(!p.isUnpacking()){
03744 
03745 
03746 
03747 
03748         if(env == NULL){
03749             
03750             size = 0;
03751         }else{
03752             size = env->getTotalsize();
03753         }   
03754     }
03755     
03756     p | size;
03757     if(p.isUnpacking()){
03758         if(size > 0){
03759             env = (envelope *)_allocEnv(ForChareMsg,size);
03760         }else{
03761             env = NULL;
03762         }
03763     }
03764     if(size > 0){
03765         p((char *)env,size);
03766     }
03767 };
03768 
03769 void RestoredLocalMap::pup(PUP::er &p){
03770     p | minSN;
03771     p | maxSN;
03772     p | count;
03773     if(p.isUnpacking()){
03774         TNArray = new MCount[count];
03775     }
03776     p(TNArray,count);
03777 };
03778 
03779 
03780 
03781 
03782 
03783 
03784 MCount ChareMlogData::nextSN(const CkObjID &recver){
03785     MCount *SN = snTable.getPointer(recver);
03786     if(SN==NULL){
03787         snTable.put(recver) = 1;
03788         return 1;
03789     }else{
03790         (*SN)++;
03791         return *SN;
03792     }
03793 };
03794  
03798 MCount ChareMlogData::newTN(){
03799     MCount TN;
03800     if(currentHoles > 0){
03801         int holeidx = numberHoles-currentHoles;
03802         TN = ticketHoles[holeidx];
03803         currentHoles--;
03804         if(currentHoles == 0){
03805             delete []ticketHoles;
03806             numberHoles = 0;
03807         }
03808     }else{
03809         TN = ++tCount;
03810     }
03811     return TN;
03812 };
03813 
03817 inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
03818     Ticket ticket;
03819 
03820     SNToTicket *ticketRow = ticketTable.get(sender);
03821     if(ticketRow != NULL){
03822         return ticketRow->get(SN);
03823     }
03824     return ticket;
03825 }
03826 
03830 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
03831     Ticket ticket;
03832 
03833     SNToTicket *ticketRow = ticketTable.get(sender);
03834     if(ticketRow != NULL){
03835         Ticket earlierTicket = ticketRow->get(SN);
03836         if(earlierTicket.TN != 0){
03837             CkAssert(earlierTicket.TN == TN);
03838             return;
03839         }
03840     }else{
03841         ticketRow = new SNToTicket();
03842         ticketTable.put(sender) = ticketRow;
03843     }
03844     ticket.TN = TN;
03845     ticketRow->put(SN) = ticket;
03846 }
03847 
03851 inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
03852     DEBUG(char senderName[100];)
03853     DEBUG(char recverName[100];)
03854     double _startTime = CmiWallTimer();
03855     Ticket ticket;
03856 
03857     
03858     if(restartFlag){
03859         ticket.TN = 0;
03860         return ticket;
03861     }
03862     
03863     SNToTicket *ticketRow = ticketTable.get(sender);
03864     if(ticketRow != NULL){
03865         Ticket earlierTicket = ticketRow->get(SN);
03866         if(earlierTicket.TN == 0){
03867             ticket.TN = newTN();
03868             ticketRow->put(SN) = ticket;
03869             DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
03870         }else{
03871             ticket.TN = earlierTicket.TN;
03872             if(ticket.TN > tCount){
03873                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03874             }
03875                 CmiAssert(ticket.TN <= tCount);
03876         }
03877         DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
03878     }else{
03879         SNToTicket *newRow = new SNToTicket;        
03880         ticket.TN = newTN();
03881         newRow->put(SN) = ticket;
03882         ticketTable.put(sender) = newRow;
03883         DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
03884     }
03885     ticket.state = NEW_TICKET;
03886 
03887 
03888     return ticket;  
03889 };
03890 
03894 void ChareMlogData::addLogEntry(MlogEntry *entry){
03895     DEBUG(char nameString[100]);
03896     DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
03897     DEBUG_MEM(CmiMemoryCheck());
03898 
03899     
03900     mlog.enq(entry);
03901 };
03902 
03903 double totalSearchRestoredTime=0;
03904 double totalSearchRestoredCount=0;
03905 
03910 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
03911     double start= CkWallTimer();
03912     MCount TN=0;    
03913     
03914     DEBUG(char senderName[100]);
03915     DEBUG(char recverName[100]);
03916     DEBUG(if(TN != 0){ CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN);});
03917 
03918     totalSearchRestoredTime += CkWallTimer()-start;
03919     totalSearchRestoredCount++;
03920     return TN;
03921 }
03922 
03928 void ChareMlogData::pup(PUP::er &p){
03929     int tCountAux = 0;
03930     int startSize=0;
03931     char nameStr[100];
03932     if(p.isSizing()){
03933         PUP::sizer *sizep = (PUP::sizer *)&p;
03934         startSize = sizep->size();
03935     }
03936     double _startTime = CkWallTimer();
03937     
03938     p | objID;
03939     if(teamRecoveryFlag)
03940         p | tCountAux;
03941     else
03942         p | tCount;
03943     p | tProcessed;
03944     if(p.isUnpacking()){
03945         DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
03946     }
03947     p | toResumeOrNot;
03948     p | resumeCount;
03949     DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
03950     
03951 
03952     
03953     int lengthReceivedTNs;
03954     if(!p.isUnpacking()){
03955         if(receivedTNs == NULL){
03956             lengthReceivedTNs = -1;
03957         }else{
03958             lengthReceivedTNs = receivedTNs->size();        
03959         }
03960     }
03961     
03962     p | lengthReceivedTNs;
03963     if(p.isUnpacking()){
03964         if(lengthReceivedTNs == -1){
03965             receivedTNs = NULL;
03966         }else{
03967             receivedTNs = new std::vector<MCount>;
03968             for(int i=0;i<lengthReceivedTNs;i++){
03969                 MCount tempTicket;
03970                 p | tempTicket;
03971                 CkAssert(tempTicket > 0);
03972                 receivedTNs->push_back(tempTicket);
03973             }
03974         }
03975     }else{
03976         for(int i=0;i<lengthReceivedTNs;i++){
03977             p | (*receivedTNs)[i];
03978         }
03979     }
03980     
03981     p | currentHoles;
03982     p | numberHoles;
03983     if(p.isUnpacking()){
03984         if(numberHoles > 0){
03985             ticketHoles = new MCount[numberHoles];          
03986         }else{
03987             ticketHoles = NULL;
03988         }
03989     }
03990     if(numberHoles > 0){
03991         p(ticketHoles,numberHoles);
03992     }
03993     
03994     snTable.pup(p);
03995 
03996 
03997 
03998 
03999 
04000 
04001 
04002 
04003 
04004 
04005 
04006 
04007 
04008 
04009 
04010 
04011 
04012 
04013 
04014     
04015     p | resendReplyRecvd;
04016     p | restartFlag;
04017 
04018     
04019 
04020 
04021 
04022 
04023 
04024 
04025 
04026 
04027 
04028 
04029 
04030 
04031 
04032 
04033 
04034 
04035 
04036 
04037 
04038 
04039 
04040 
04041 
04042 
04043 
04044     
04045     {
04046         int ticketTableSize;
04047         if(!p.isUnpacking()){
04048             ticketTableSize = ticketTable.numObjects();
04049         }
04050         
04051         p | ticketTableSize;
04052         if(!p.isUnpacking()){
04053             CkHashtableIterator *iter = ticketTable.iterator();
04054             while(iter->hasNext()){
04055                 CkObjID *objID;
04056                 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
04057                 p | (*objID);
04058                 (*ticketRow)->pup(p);
04059             }
04060             
04061             delete iter;
04062         }else{
04063             for(int i=0;i<ticketTableSize;i++){
04064                 CkObjID objID;
04065                 p | objID;
04066                 SNToTicket *ticketRow = new SNToTicket;
04067                 ticketRow->pup(p);
04068                 if(!teamRecoveryFlag)
04069                     ticketTable.put(objID) = ticketRow;
04070                 else
04071                     delete ticketRow;
04072             }
04073         }
04074     }   
04075     
04076     if(p.isSizing()){
04077         PUP::sizer *sizep = (PUP::sizer *)&p;
04078         int pupSize = sizep->size()-startSize;
04079         DEBUG(char name[40]);
04080         DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
04081     
04082     }
04083     
04084     double _finTime = CkWallTimer();
04085     DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
04086 };
04087 
04088 
04089 
04090 
04097 int getCheckPointPE(){
04098     return (CmiMyPe() + 1) % CmiNumPes();
04099 }
04100 
04104 int getReverseCheckPointPE(){
04105     return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
04106 }
04107 
04108 
04109 envelope *copyEnvelope(envelope *env){
04110     envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
04111     memcpy(newEnv,env,env->getTotalsize());
04112     return newEnv;
04113 }
04114 
04115 
04116 inline bool isSameDet(Determinant *first, Determinant *second){
04117     return first->sender == second->sender && first->receiver == second->receiver && first->SN == second->SN && first->TN == second->TN;
04118 }
04119 
04120 #endif