00001 #ifndef _CKMESSAGELOGGING_H_
00002 #define _CKMESSAGELOGGING_H_
00003
00004 #include "ckobjid.h"
00005
00006 CpvExtern(Chare *,_currentObj);
00007 CpvExtern(int, _numImmigrantRecObjs);
00008
00009
00010 #define NEW_TICKET 1
00011 #define OLD_TICKET 2
00012 #define FORWARDED_TICKET 0x8000
00013
00014
00015 #define MLOG_RESTARTED 0
00016 #define MLOG_CRASHED 1
00017 #define MEGABYTE 1048576
00018
00019
00020 extern char objString[100];
00021
00022
00023 #define INITIAL_BUFFERED_DETERMINANTS 1024
00024
00025
00026 #define SYNCHRONIZED_CHECKPOINT 1
00027
00034 typedef struct {
00035
00036 CkObjID sender;
00037
00038 CkObjID receiver;
00039
00040 MCount SN;
00041
00042 MCount TN;
00043 } Determinant;
00044
00048 typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, std::vector<Determinant> *> CkDeterminantHashtableT;
00049
00053 typedef struct {
00054 char header[CmiMsgHeaderSizeBytes];
00055 int phase;
00056 int index;
00057 } RemoveDeterminantsHeader;
00058
00062 typedef struct {
00063 char header[CmiMsgHeaderSizeBytes];
00064 int number;
00065 int index;
00066 int phase;
00067 int PE;
00068 } StoreDeterminantsHeader;
00069
00073 class Ticket {
00074 public:
00075 MCount TN;
00076 int state;
00077 Ticket(){
00078 TN = 0;
00079 state = 0;
00080 }
00081 Ticket(int x){
00082 TN = x;
00083 state = 0;
00084 }
00085 };
00086 PUPbytes(Ticket)
00087 class MlogEntry;
00088
00089 class RestoredLocalMap;
00090
00091 #define INITSIZE_SNTOTICKET 100
00092
00097 class SNToTicket{
00098 private:
00099 Ticket initial[INITSIZE_SNTOTICKET];
00100 Ticket *ticketVec;
00101 MCount startSN;
00102 int currentSize;
00103 MCount finishSN;
00104 public:
00105 SNToTicket(){
00106 currentSize = INITSIZE_SNTOTICKET;
00107 ticketVec = &initial[0];
00108 memset(ticketVec,0,sizeof(Ticket)*currentSize);
00109 startSN = 0;
00110 finishSN = 0;
00111 }
00115 inline MCount getFinishSN(){
00116 return finishSN;
00117 }
00121 inline MCount getStartSN(){
00122 return startSN;
00123 }
00124
00125 inline Ticket &put(MCount SN){
00126 if(SN > finishSN) finishSN = SN;
00127 if(startSN == 0){
00128 startSN = SN;
00129 }
00130 int index = SN-startSN;
00131 if(index >= currentSize){
00132 int oldSize = currentSize;
00133 Ticket *old = ticketVec;
00134
00135 currentSize = index*2;
00136 ticketVec = new Ticket[currentSize];
00137 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
00138 if(old != &initial[0]){
00139 delete [] old;
00140 }
00141 }
00142 return ticketVec[index];
00143 }
00144
00145 inline Ticket get(MCount SN){
00146 int index = SN-startSN;
00147 CmiAssert(index >= 0);
00148 if(index >= currentSize){
00149 Ticket tn;
00150 return tn;
00151 }else{
00152 return ticketVec[index];
00153 }
00154 }
00155
00156 inline void pup(PUP::er &p){
00157 p | startSN;
00158 p | currentSize;
00159 if(p.isUnpacking()){
00160 if(currentSize > INITSIZE_SNTOTICKET){
00161 ticketVec = new Ticket[currentSize];
00162 }
00163 }
00164 for(int i=0;i<currentSize;i++){
00165 p | ticketVec[i];
00166 }
00167 }
00168 };
00169
00170
00180 class ChareMlogData{
00181 public:
00182
00183 CkObjID objID;
00184
00185 MCount tCount;
00186
00187 MCount tProcessed;
00188
00189
00190 std::vector<MCount> *receivedTNs;
00191 MCount *ticketHoles;
00192 int numberHoles;
00193 int currentHoles;
00194
00195 int resendReplyRecvd;
00196
00197 bool restartFlag;
00198
00199 bool teamRecoveryFlag;
00200
00201 CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
00202
00203 bool toResumeOrNot;
00204 int resumeCount;
00205 bool immigrantRecFlag;
00206 int immigrantSourcePE;
00207
00208 private:
00209
00210
00211 CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
00212
00213 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
00214
00215 CkQ<MlogEntry *> mlog;
00216
00217
00218 inline MCount newTN();
00219
00220 public:
00224 ChareMlogData():teamTable(100,0.4),snTable(100,0.4),ticketTable(1000,0.3){
00225 tCount = 0;
00226 tProcessed = 0;
00227 numberHoles = 0;
00228 ticketHoles = NULL;
00229 currentHoles = 0;
00230 restartFlag=false;
00231 teamRecoveryFlag=0;
00232 receivedTNs = NULL;
00233 resendReplyRecvd=0;
00234 toResumeOrNot=false;
00235 resumeCount=0;
00236 immigrantRecFlag = false;
00237 };
00238 inline MCount nextSN(const CkObjID &recver);
00239 inline Ticket next_ticket(CkObjID &sender,MCount SN);
00240 inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
00241 inline Ticket getTicket(CkObjID &sender, MCount SN);
00242 void addLogEntry(MlogEntry *entry);
00243 virtual void pup(PUP::er &p);
00244 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
00245 MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
00246 };
00247
00255 class MlogEntry{
00256 public:
00257 envelope *env;
00258 int destPE;
00259 int _infoIdx;
00260 int indexBufDets;
00261 int numBufDets;
00262
00263 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
00264 env = _env;
00265 destPE = _destPE;
00266 _infoIdx = __infoIdx;
00267 }
00268 MlogEntry(){
00269 env = 0;
00270 destPE = -1;
00271 _infoIdx = 0;
00272 }
00273 ~MlogEntry(){
00274 if(env){
00275 CmiFree(env);
00276 }
00277 }
00278 virtual void pup(PUP::er &p);
00279 };
00280
00284 class StoredCheckpoint{
00285 public:
00286 char *buf;
00287 int bufSize;
00288 int PE;
00289 StoredCheckpoint(){
00290 buf = NULL;
00291 bufSize = 0;
00292 PE = -1;
00293 };
00294 };
00295
00301 class RestoredLocalMap {
00302 public:
00303 MCount minSN,maxSN,count;
00304 MCount *TNArray;
00305 RestoredLocalMap(){
00306 minSN=maxSN=count=0;
00307 TNArray=NULL;
00308 };
00309 RestoredLocalMap(int i){
00310 minSN=maxSN=count=0;
00311 TNArray=NULL;
00312 };
00313
00314 virtual void pup(PUP::er &p);
00315 };
00316
00317
00318 typedef struct {
00319 char header[CmiMsgHeaderSizeBytes];
00320 CkObjID sender;
00321 CkObjID recver;
00322 MlogEntry *logEntry;
00323 MCount SN;
00324 MCount TN;
00325 int senderPE;
00326 } TicketRequest;
00327 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
00328 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
00329
00330 typedef struct{
00331 TicketRequest request;
00332 Ticket ticket;
00333 int recverPE;
00334 } TicketReply;
00335
00336 CpvExtern(char**,_bufferedTicketRequests);
00337 extern int _maxBufferedTicketRequests;
00338
00339
00340
00341 typedef struct {
00342 char header[CmiMsgHeaderSizeBytes];
00343 int numberLogs;
00344 } BufferedLocalLogHeader;
00345
00346 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
00347
00348 typedef struct{
00349 char header[CmiMsgHeaderSizeBytes];
00350 int PE;
00351 int dataSize;
00352 } CheckPointDataMsg;
00353
00354 typedef struct{
00355 char header[CmiMsgHeaderSizeBytes];
00356 int PE;
00357 } DistributeObjectMsg;
00358
00359
00360
00361
00362
00363
00364
00365
00366 typedef CheckPointDataMsg CheckPointAck;
00367
00368 typedef struct{
00369 CkObjID recver;
00370 MCount tProcessed;
00371 } TProcessedLog;
00372
00373
00377 typedef struct{
00378 char header[CmiMsgHeaderSizeBytes];
00379 int PE;
00380 } RestartRequest;
00381
00382 typedef RestartRequest CkPingMsg;
00383 typedef RestartRequest CheckpointRequest;
00384
00385 typedef struct{
00386 char header[CmiMsgHeaderSizeBytes];
00387 int PE;
00388 double restartWallTime;
00389 int checkPointSize;
00390 int numMigratedAwayElements;
00391 int numMigratedInElements;
00392 int migratedElementSize;
00393 int numLocalMessages;
00394 CkGroupID lbGroupID;
00395 } RestartProcessorData;
00396
00397 typedef struct{
00398 char header[CmiMsgHeaderSizeBytes];
00399 int PE;
00400 int numberObjects;
00401 } ResendRequest;
00402
00403 typedef ResendRequest RemoveLogRequest;
00404
00405 typedef struct {
00406 char header[CmiMsgHeaderSizeBytes];
00407 CkObjID recver;
00408 int numTNs;
00409 } ReceivedTNData;
00410
00411
00412 typedef struct {
00413 char header[CmiMsgHeaderSizeBytes];
00414 CkObjID recver;
00415 int numDets;
00416 } ReceivedDetData;
00417
00418 typedef struct{
00419 int PE;
00420 int numberObjects;
00421 TProcessedLog *listObjects;
00422 std::vector<MCount> *ticketVecs;
00423 } ResendData;
00424
00425 typedef struct {
00426 CkGroupID gID;
00427 CkArrayIndexMax idx;
00428 int fromPE,toPE;
00429 char ackFrom,ackTo;
00430 } MigrationRecord;
00431
00432 typedef struct {
00433 char header[CmiMsgHeaderSizeBytes];
00434 MigrationRecord migRecord;
00435 void *record;
00436 } MigrationNotice;
00437
00438 typedef struct {
00439 char header[CmiMsgHeaderSizeBytes];
00440 void *record;
00441 } MigrationNoticeAck;
00442
00443 typedef struct {
00444 MigrationRecord migRecord;
00445 void *msg;
00446 int size;
00447 char acked;
00448 } RetainedMigratedObject;
00449
00450 typedef struct {
00451 char header[CmiMsgHeaderSizeBytes];
00452 MigrationRecord migRecord;
00453 int index;
00454 int fromPE;
00455 } VerifyAckMsg;
00456
00457 typedef struct {
00458 char header[CmiMsgHeaderSizeBytes];
00459 int checkpointCount;
00460 int fromPE;
00461 } CheckpointBarrierMsg;
00462
00463
00464
00465 typedef struct {
00466 char header[CmiMsgHeaderSizeBytes];
00467 CkGroupID mgrID;
00468 CkArrayIndexMax idx;
00469 int locationPE;
00470 int fromPE;
00471 } CurrentLocationMsg;
00472
00473 typedef struct {
00474 char header[CmiMsgHeaderSizeBytes];
00475 CkGroupID lbID;
00476 int fromPE;
00477 int step;
00478 } LBStepMsg;
00479
00480
00481 #define MLOG_OBJECT 1
00482 #define MLOG_COUNT 2
00483
00484 typedef struct {
00485 char header[CmiMsgHeaderSizeBytes];
00486 int flag;
00487 CkGroupID lbID;
00488 int count;
00490 CkGroupID mgrID;
00491 CkArrayIndexMax idx;
00492 int locationPE;
00493 } DummyMigrationMsg;
00494
00495
00496
00497
00498
00499 typedef void (*MlogFn)(void *,ChareMlogData *);
00500
00501 void _messageLoggingInit();
00502
00503
00504 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
00505 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
00506 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
00507 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
00508 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
00509 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
00510 void sendLocalMsg(envelope *env, int _infoIdx);
00511
00512
00513 void _ticketRequestHandler(TicketRequest *);
00514 void _ticketHandler(TicketReply *);
00515 void _pingHandler(CkPingMsg *msg);
00516 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
00517 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
00518 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
00519 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
00520 void _storeDeterminantsHandler(char *buffer);
00521 void _removeDeterminantsHandler(char *buffer);
00522
00523
00524
00525 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
00526 extern void _noCldNodeEnqueue(int node, envelope *env);
00527 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
00528 void retryTicketRequest(void *_ticketRequest,double curWallTime);
00529
00530
00531 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
00532 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
00533
00534
00535
00536 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
00537
00538
00539 void checkpointAlarm(void *_dummy,double curWallTime);
00540 void startMlogCheckpoint(void *_dummy,double curWallTime);
00541 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listSize=0);
00542
00543
00544 void _checkpointRequestHandler(CheckpointRequest *request);
00545 void _storeCheckpointHandler(char *msg);
00546 void _checkpointAckHandler(CheckPointAck *ackMsg);
00547 void _removeProcessedLogHandler(char *requestMsg);
00548 void garbageCollectMlog();
00549
00550
00551 extern int _checkpointRequestHandlerIdx;
00552 extern int _storeCheckpointHandlerIdx;
00553 extern int _checkpointAckHandlerIdx;
00554 extern int _removeProcessedLogHandlerIdx;
00555
00556
00557
00558
00559
00560 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
00561 void CkMlogRestartDouble(void *,double);
00562 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
00563 void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
00564 void initializeRestart(void *data,ChareMlogData *mlogData);
00565 void distributeRestartedObjects();
00566 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
00567
00568
00569 void CkMlogRestartLocal();
00570
00571
00572 void _getCheckpointHandler(RestartRequest *restartMsg);
00573 void _recvCheckpointHandler(char *_restartData);
00574 void _resendMessagesHandler(char *msg);
00575 void _sendDetsHandler(char *msg);
00576 void _sendDetsReplyHandler(char *msg);
00577 void _receivedTNDataHandler(ReceivedTNData *msg);
00578 void _receivedDetDataHandler(ReceivedDetData *msg);
00579 void _distributedLocationHandler(char *receivedMsg);
00580 void _sendBackLocationHandler(char *receivedMsg);
00581 void _updateHomeRequestHandler(RestartRequest *updateRequest);
00582 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
00583 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
00584 void _verifyAckHandler(VerifyAckMsg *verifyReply);
00585 void _dummyMigrationHandler(DummyMigrationMsg *msg);
00586
00587
00588 void _restartHandler(RestartRequest *restartMsg);
00589 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
00590 void _recvRestartCheckpointHandler(char *_restartData);
00591
00592
00593 extern int _getCheckpointHandlerIdx;
00594 extern int _recvCheckpointHandlerIdx;
00595 extern int _resendMessagesHandlerIdx;
00596 extern int _sendDetsHandlerIdx;
00597 extern int _sendDetsReplyHandlerIdx;
00598 extern int _receivedTNDataHandlerIdx;
00599 extern int _receivedDetDataHandlerIdx;
00600 extern int _distributedLocationHandlerIdx;
00601 extern int _updateHomeRequestHandlerIdx;
00602 extern int _updateHomeAckHandlerIdx;
00603 extern int _verifyAckRequestHandlerIdx;
00604 extern int _verifyAckHandlerIdx;
00605 extern int _dummyMigrationHandlerIdx;
00606
00608
00609
00610 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
00611 void finishedCheckpointLoadBalancing();
00612 void sendMlogLocation(int targetPE,envelope *env);
00613 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
00614 void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb);
00615
00616
00617 void _receiveMlogLocationHandler(void *buf);
00618 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
00619 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
00620 void _getGlobalStepHandler(LBStepMsg *msg);
00621 void _recvGlobalStepHandler(LBStepMsg *msg);
00622 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
00623 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
00624
00625
00626 extern int onGoingLoadBalancing;
00627 extern void *centralLb;
00628 extern void (*resumeLbFnPtr)(void *) ;
00629 extern int _receiveMlogLocationHandlerIdx;
00630 extern int _receiveMigrationNoticeHandlerIdx;
00631 extern int _receiveMigrationNoticeAckHandlerIdx;
00632 extern int _getGlobalStepHandlerIdx;
00633 extern int _recvGlobalStepHandlerIdx;
00634 extern int _checkpointBarrierHandlerIdx;
00635 extern int _checkpointBarrierAckHandlerIdx;
00636
00637
00638 extern std::vector<MigrationRecord> migratedNoticeList;
00639 extern std::vector<RetainedMigratedObject *> retainedObjectList;
00640
00641 int getCheckPointPE();
00642 int getReverseCheckPointPE();
00643 inline bool isSameDet(Determinant *first, Determinant *second);
00644 void forAllCharesDo(MlogFn fnPointer,void *data);
00645 envelope *copyEnvelope(envelope *env);
00646 extern void _initDone(void);
00647
00648
00649 extern void _resetNodeBocInitVec(void);
00650
00651
00652 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
00653
00654
00655 void _receiveLocationHandler(CurrentLocationMsg *data);
00656
00657
00658 extern int _receiveLocationHandlerIdx;
00659
00660
00661 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
00662
00663 #endif