00001 #ifndef _CKMESSAGELOGGING_H_
00002 #define _CKMESSAGELOGGING_H_
00003
00004 #include "ckobjid.h"
00005
00006 CpvExtern(Chare *,_currentObj);
00007 CpvExtern(int, _numImmigrantRecObjs);
00008
00009
00010 #define NEW_TICKET 1
00011 #define OLD_TICKET 2
00012 #define FORWARDED_TICKET 0x8000
00013
00014
00015 #define MLOG_RESTARTED 0
00016 #define MLOG_CRASHED 1
00017 #define MEGABYTE 1048576
00018
00019
00020 extern char objString[100];
00021
00022
00023 #define INITIAL_BUFFERED_DETERMINANTS 1024
00024
00025
00026 #define SYNCHRONIZED_CHECKPOINT 1
00027
00028 #define DEBUGGING(x) // x
00029 #define DEBUGGING_NOW(x) x
00030
00031 class MlogEntry;
00032
00033 class RestoredLocalMap;
00034
00035 #define RSSN_INITIAL_SIZE 16
00036
00040 class RSSN{
00041 private:
00042 MCount *data;
00043 int currentSize, start, end;
00044 public:
00045
00046
00047 RSSN(){
00048 currentSize = RSSN_INITIAL_SIZE;
00049 start = 0;
00050 end = 0;
00051 data = new MCount[RSSN_INITIAL_SIZE];
00052 memset(data,0,sizeof(MCount)*currentSize);
00053 }
00054
00055 ~RSSN()
00056 {
00057 if(data != NULL)
00058 {
00059 delete []data;
00060 data = NULL;
00061 }
00062 }
00063
00064
00065
00066 int checkAndStore(MCount ssn){
00067 int index, oldCS, num, i;
00068 MCount *old;
00069
00070
00071 if((start == end && ssn == (data[start] + 1)) || data[start] == 0){
00072 data[start] = ssn;
00073 return 0;
00074 }
00075
00076
00077 if(ssn <= data[start]){
00078 DEBUGGING(CkPrintf("[%d] Repeated ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
00079 return 1;
00080 }
00081
00082
00083 if(ssn-data[start] >= currentSize){
00084 DEBUGGING(CkPrintf("[%d] Extending Data %d %d %d\n",CkMyPe(),ssn,data[start],currentSize));
00085
00086
00087 data[0] = ssn;
00088 start = end = 0;
00089 return 0;
00090
00091 old = data;
00092 oldCS = currentSize;
00093 currentSize *= 2;
00094 data = new MCount[currentSize];
00095 memset(data,0,sizeof(MCount)*currentSize);
00096 for(i=start, num=0; i!=end; i=(i+1)%oldCS,num++){
00097 data[num] = old[i];
00098 }
00099 start = 0;
00100 end = num-1;
00101 delete[] old;
00102 }
00103
00104 DEBUGGING(CkPrintf("[%d] Ahead ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
00105
00106
00107 num = end - start;
00108 if(num < 0) num += currentSize;
00109 num++;
00110 index = (start+ssn-data[start])%currentSize;
00111 data[index] = ssn;
00112 if((ssn-data[start]) >= num) end = index;
00113
00114
00115 index = start + 1;
00116 while(data[index]){
00117 data[start] = 0;
00118 start = index;
00119 index = (index + 1)%currentSize;
00120 if(index == end) break;
00121 }
00122 return 0;
00123 }
00124
00125
00126 inline void pup(PUP::er &p){
00127 p | start;
00128 p | end;
00129 p | currentSize;
00130 if(p.isUnpacking()){
00131 if(currentSize > RSSN_INITIAL_SIZE){
00132 delete[] data;
00133 data = new MCount[currentSize];
00134 }
00135 }
00136 for(int i=0;i<currentSize;i++){
00137 p | data[i];
00138 }
00139 }
00140
00141 };
00142
00143
00153 class ChareMlogData{
00154 public:
00155
00156 CkObjID objID;
00157
00158 int resendReplyRecvd;
00159
00160 bool restartFlag;
00161
00162 bool teamRecoveryFlag;
00163 bool toResumeOrNot;
00164 int resumeCount;
00165 bool immigrantRecFlag;
00166 int immigrantSourcePE;
00167
00168 private:
00169
00170 CkHashtableT<CkHashtableAdaptorT<CkObjID>, MCount> ssnTable;
00171
00172 CkHashtableT<CkHashtableAdaptorT<CkObjID>, RSSN *> receivedSsnTable;
00173
00174 CkQ<MlogEntry *> mlog;
00175
00176 public:
00180 ChareMlogData():ssnTable(100,0.4),receivedSsnTable(100,0.4){
00181 restartFlag=false;
00182 teamRecoveryFlag=false;
00183 resendReplyRecvd=0;
00184 toResumeOrNot=false;
00185 resumeCount=0;
00186 immigrantRecFlag = false;
00187 };
00188 inline MCount nextSN(const CkObjID &recver);
00189 int checkAndStoreSsn(const CkObjID &sender, MCount ssn);
00190 void addLogEntry(MlogEntry *entry);
00191 virtual void pup(PUP::er &p);
00192 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
00193 };
00194
00202 class MlogEntry{
00203 public:
00204 envelope *env;
00205 int destPE;
00206 int _infoIdx;
00207
00208 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
00209 env = _env;
00210 destPE = _destPE;
00211 _infoIdx = __infoIdx;
00212 }
00213 MlogEntry(){
00214 env = 0;
00215 destPE = -1;
00216 _infoIdx = 0;
00217 }
00218 ~MlogEntry(){
00219 if(env){
00220 CmiFree(env);
00221 }
00222 }
00223 virtual void pup(PUP::er &p);
00224 };
00225
00229 class StoredCheckpoint{
00230 public:
00231 char *buf;
00232 int bufSize;
00233 int PE;
00234 StoredCheckpoint(){
00235 buf = NULL;
00236 bufSize = 0;
00237 PE = -1;
00238 };
00239 };
00240
00241 typedef struct{
00242 char header[CmiMsgHeaderSizeBytes];
00243 int PE;
00244 int dataSize;
00245 } CheckPointDataMsg;
00246
00247 typedef struct{
00248 char header[CmiMsgHeaderSizeBytes];
00249 int PE;
00250 } DistributeObjectMsg;
00251
00252
00253
00254
00255
00256
00257
00258
00259 typedef CheckPointDataMsg CheckPointAck;
00260
00261
00265 typedef struct{
00266 char header[CmiMsgHeaderSizeBytes];
00267 int PE;
00268 } RestartRequest;
00269
00270 typedef RestartRequest CkPingMsg;
00271 typedef RestartRequest CheckpointRequest;
00272
00273 typedef struct{
00274 char header[CmiMsgHeaderSizeBytes];
00275 int PE;
00276 double restartWallTime;
00277 int checkPointSize;
00278 int numMigratedAwayElements;
00279 int numMigratedInElements;
00280 int migratedElementSize;
00281 int numLocalMessages;
00282 CkGroupID lbGroupID;
00283 } RestartProcessorData;
00284
00285 typedef struct{
00286 char header[CmiMsgHeaderSizeBytes];
00287 int PE;
00288 int numberObjects;
00289 } ResendRequest;
00290
00291 typedef ResendRequest RemoveLogRequest;
00292
00293 typedef struct {
00294 char header[CmiMsgHeaderSizeBytes];
00295 CkObjID recver;
00296 int numTNs;
00297 } ReceivedTNData;
00298
00299
00300 typedef struct {
00301 char header[CmiMsgHeaderSizeBytes];
00302 CkObjID recver;
00303 int numDets;
00304 } ReceivedDetData;
00305
00306 typedef struct{
00307 int PE;
00308 int numberObjects;
00309 CkObjID *listObjects;
00310 } ResendData;
00311
00312 typedef struct {
00313 CkGroupID gID;
00314 CkArrayIndexMax idx;
00315 int fromPE,toPE;
00316 char ackFrom,ackTo;
00317 } MigrationRecord;
00318
00319 typedef struct {
00320 char header[CmiMsgHeaderSizeBytes];
00321 MigrationRecord migRecord;
00322 void *record;
00323 } MigrationNotice;
00324
00325 typedef struct {
00326 char header[CmiMsgHeaderSizeBytes];
00327 void *record;
00328 } MigrationNoticeAck;
00329
00330 typedef struct {
00331 MigrationRecord migRecord;
00332 void *msg;
00333 int size;
00334 char acked;
00335 } RetainedMigratedObject;
00336
00337 typedef struct {
00338 char header[CmiMsgHeaderSizeBytes];
00339 MigrationRecord migRecord;
00340 int index;
00341 int fromPE;
00342 } VerifyAckMsg;
00343
00344 typedef struct {
00345 char header[CmiMsgHeaderSizeBytes];
00346 int checkpointCount;
00347 int fromPE;
00348 } CheckpointBarrierMsg;
00349
00350
00351
00352 typedef struct {
00353 char header[CmiMsgHeaderSizeBytes];
00354 CkGroupID mgrID;
00355 CkArrayIndexMax idx;
00356 int locationPE;
00357 int fromPE;
00358 } CurrentLocationMsg;
00359
00360 typedef struct {
00361 char header[CmiMsgHeaderSizeBytes];
00362 CkGroupID lbID;
00363 int fromPE;
00364 int step;
00365 } LBStepMsg;
00366
00367
00368 #define MLOG_OBJECT 1
00369 #define MLOG_COUNT 2
00370
00371 typedef struct {
00372 char header[CmiMsgHeaderSizeBytes];
00373 int flag;
00374 CkGroupID lbID;
00375 int count;
00377 CkGroupID mgrID;
00378 CkArrayIndexMax idx;
00379 int locationPE;
00380 } DummyMigrationMsg;
00381
00382
00383
00384
00385
00386 typedef void (*MlogFn)(void *,ChareMlogData *);
00387
00388 void _messageLoggingInit();
00389
00390
00391 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
00392 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
00393 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
00394 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
00395 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
00396 void sendRemoteMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
00397 void sendLocalMsg(envelope *env, int _infoIdx);
00398
00399
00400 void _pingHandler(CkPingMsg *msg);
00401
00402
00403 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
00404 extern void _noCldNodeEnqueue(int node, envelope *env);
00405 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
00406
00407
00408 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
00409 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
00410
00411
00412
00413 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
00414
00415
00416 void CkStartMlogCheckpoint(CkCallback &cb);
00417 void checkpointAlarm(void *_dummy,double curWallTime);
00418 void startMlogCheckpoint(void *_dummy,double curWallTime);
00419 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listSize=0);
00420
00421
00422 void _checkpointRequestHandler(CheckpointRequest *request);
00423 void _storeCheckpointHandler(char *msg);
00424 void _checkpointAckHandler(CheckPointAck *ackMsg);
00425 void _removeProcessedLogHandler(char *requestMsg);
00426 void garbageCollectMlog();
00427 void _startCheckpointHandler(CheckpointBarrierMsg *msg);
00428 void _endCheckpointHandler(char *msg);
00429
00430
00431 extern int _checkpointRequestHandlerIdx;
00432 extern int _storeCheckpointHandlerIdx;
00433 extern int _checkpointAckHandlerIdx;
00434 extern int _removeProcessedLogHandlerIdx;
00435
00436
00437
00438
00439
00440 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
00441 void CkMlogRestartDouble(void *,double);
00442 void initializeRestart(void *data,ChareMlogData *mlogData);
00443 void distributeRestartedObjects();
00444 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
00445
00446
00447 void CkMlogRestartLocal();
00448
00449
00450 void _getCheckpointHandler(RestartRequest *restartMsg);
00451 void _recvCheckpointHandler(char *_restartData);
00452 void _resendMessagesHandler(char *msg);
00453 void _sendDetsHandler(char *msg);
00454 void _sendDetsReplyHandler(char *msg);
00455 void _receivedTNDataHandler(ReceivedTNData *msg);
00456 void _receivedDetDataHandler(ReceivedDetData *msg);
00457 void _distributedLocationHandler(char *receivedMsg);
00458 void _sendBackLocationHandler(char *receivedMsg);
00459 void _updateHomeRequestHandler(RestartRequest *updateRequest);
00460 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
00461 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
00462 void _verifyAckHandler(VerifyAckMsg *verifyReply);
00463 void _dummyMigrationHandler(DummyMigrationMsg *msg);
00464
00465
00466 void _restartHandler(RestartRequest *restartMsg);
00467 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
00468 void _recvRestartCheckpointHandler(char *_restartData);
00469
00470
00471 extern int _getCheckpointHandlerIdx;
00472 extern int _recvCheckpointHandlerIdx;
00473 extern int _resendMessagesHandlerIdx;
00474 extern int _sendDetsHandlerIdx;
00475 extern int _sendDetsReplyHandlerIdx;
00476 extern int _receivedTNDataHandlerIdx;
00477 extern int _receivedDetDataHandlerIdx;
00478 extern int _distributedLocationHandlerIdx;
00479 extern int _updateHomeRequestHandlerIdx;
00480 extern int _updateHomeAckHandlerIdx;
00481 extern int _verifyAckRequestHandlerIdx;
00482 extern int _verifyAckHandlerIdx;
00483 extern int _dummyMigrationHandlerIdx;
00484
00486
00487
00488 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
00489 void finishedCheckpointLoadBalancing();
00490 void sendMlogLocation(int targetPE,envelope *env);
00491 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
00492 void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb);
00493
00494
00495 void _receiveMlogLocationHandler(void *buf);
00496 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
00497 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
00498 void _getGlobalStepHandler(LBStepMsg *msg);
00499 void _recvGlobalStepHandler(LBStepMsg *msg);
00500 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
00501 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
00502
00503
00504 extern int onGoingLoadBalancing;
00505 extern void *centralLb;
00506 extern void (*resumeLbFnPtr)(void *) ;
00507 extern int _receiveMlogLocationHandlerIdx;
00508 extern int _receiveMigrationNoticeHandlerIdx;
00509 extern int _receiveMigrationNoticeAckHandlerIdx;
00510 extern int _getGlobalStepHandlerIdx;
00511 extern int _recvGlobalStepHandlerIdx;
00512 extern int _checkpointBarrierHandlerIdx;
00513 extern int _checkpointBarrierAckHandlerIdx;
00514
00515
00516 extern std::vector<MigrationRecord> migratedNoticeList;
00517 extern std::vector<RetainedMigratedObject *> retainedObjectList;
00518
00519 int getCheckPointPE();
00520 void forAllCharesDo(MlogFn fnPointer,void *data);
00521 envelope *copyEnvelope(envelope *env);
00522 extern void _initDone(void);
00523
00524
00525 extern void _resetNodeBocInitVec(void);
00526
00527
00528 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
00529
00530
00531 void _receiveLocationHandler(CurrentLocationMsg *data);
00532
00533
00534 extern int _receiveLocationHandlerIdx;
00535
00536
00537 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
00538
00539 #endif