00001 #ifndef _CKMESSAGELOGGING_H_
00002 #define _CKMESSAGELOGGING_H_
00003
00004 #include "ckobjid.h"
00005
00006 #if CMK_HAS_STRINGS_H
00007 #include <strings.h>
00008 #else
00009 #define bzero(s,n) memset(s,0,n)
00010 #endif
00011
00012 CpvExtern(Chare *,_currentObj);
00013
00014
00015 #define NEW_TICKET 1
00016 #define OLD_TICKET 2
00017 #define FORWARDED_TICKET 0x8000
00018
00019
00020 #define MLOG_RESTARTED 0
00021 #define MLOG_CRASHED 1
00022 #define MEGABYTE 1048576
00023
00024
00025 extern char objString[100];
00026
00027
00028 #define INITIAL_BUFFERED_DETERMINANTS 1024
00029
00030
00031 #define SYNCHRONIZED_CHECKPOINT 1
00032
00039 typedef struct {
00040
00041 CkObjID sender;
00042
00043 CkObjID receiver;
00044
00045 MCount SN;
00046
00047 MCount TN;
00048 } Determinant;
00049
00053 typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> CkDeterminantHashtableT;
00054
00058 typedef struct {
00059 char header[CmiMsgHeaderSizeBytes];
00060 int phase;
00061 int index;
00062 } RemoveDeterminantsHeader;
00063
00067 typedef struct {
00068 char header[CmiMsgHeaderSizeBytes];
00069 int number;
00070 int index;
00071 int phase;
00072 int PE;
00073 } StoreDeterminantsHeader;
00074
00078 class Ticket {
00079 public:
00080 MCount TN;
00081 int state;
00082 Ticket(){
00083 TN = 0;
00084 state = 0;
00085 }
00086 Ticket(int x){
00087 TN = x;
00088 state = 0;
00089 }
00090 };
00091 PUPbytes(Ticket)
00092 class MlogEntry;
00093
00094 class RestoredLocalMap;
00095
00096 #define INITSIZE_SNTOTICKET 100
00097
00102 class SNToTicket{
00103 private:
00104 Ticket initial[INITSIZE_SNTOTICKET];
00105 Ticket *ticketVec;
00106 MCount startSN;
00107 int currentSize;
00108 MCount finishSN;
00109 public:
00110 SNToTicket(){
00111 currentSize = INITSIZE_SNTOTICKET;
00112 ticketVec = &initial[0];
00113 bzero(ticketVec,sizeof(Ticket)*currentSize);
00114 startSN = 0;
00115 finishSN = 0;
00116 }
00120 inline MCount getFinishSN(){
00121 return finishSN;
00122 }
00126 inline MCount getStartSN(){
00127 return startSN;
00128 }
00129
00130 inline Ticket &put(MCount SN){
00131 if(SN > finishSN) finishSN = SN;
00132 if(startSN == 0){
00133 startSN = SN;
00134 }
00135 int index = SN-startSN;
00136 if(index >= currentSize){
00137 int oldSize = currentSize;
00138 Ticket *old = ticketVec;
00139
00140 currentSize = index*2;
00141 ticketVec = new Ticket[currentSize];
00142 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
00143 if(old != &initial[0]){
00144 delete [] old;
00145 }
00146 }
00147 return ticketVec[index];
00148 }
00149
00150 inline Ticket get(MCount SN){
00151 int index = SN-startSN;
00152 CmiAssert(index >= 0);
00153 if(index >= currentSize){
00154 Ticket tn;
00155 return tn;
00156 }else{
00157 return ticketVec[index];
00158 }
00159 }
00160
00161 inline void pup(PUP::er &p){
00162 p | startSN;
00163 p | currentSize;
00164 if(p.isUnpacking()){
00165 if(currentSize > INITSIZE_SNTOTICKET){
00166 ticketVec = new Ticket[currentSize];
00167 }
00168 }
00169 for(int i=0;i<currentSize;i++){
00170 p | ticketVec[i];
00171 }
00172 }
00173 };
00174
00175
00185 class ChareMlogData{
00186 public:
00187
00188 CkObjID objID;
00189
00190 MCount tCount;
00191
00192 MCount tProcessed;
00193
00194
00195 CkVec<MCount> *receivedTNs;
00196 MCount *ticketHoles;
00197 int numberHoles;
00198 int currentHoles;
00199
00200 int resendReplyRecvd;
00201
00202 int restartFlag;
00203
00204 int teamRecoveryFlag;
00205
00206 CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
00207
00208 int toResumeOrNot;
00209 int resumeCount;
00210 int immigrantRecFlag;
00211
00212 private:
00213
00214
00215 CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
00216
00217 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
00218
00219 CkQ<MlogEntry *> mlog;
00220
00221
00222 inline MCount newTN();
00223
00224 public:
00228 ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
00229 tCount = 0;
00230 tProcessed = 0;
00231 numberHoles = 0;
00232 ticketHoles = NULL;
00233 currentHoles = 0;
00234 restartFlag=0;
00235 teamRecoveryFlag=0;
00236 receivedTNs = NULL;
00237 resendReplyRecvd=0;
00238 toResumeOrNot=0;
00239 resumeCount=0;
00240 immigrantRecFlag = 0;
00241 };
00242 inline MCount nextSN(const CkObjID &recver);
00243 inline Ticket next_ticket(CkObjID &sender,MCount SN);
00244 inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
00245 inline Ticket getTicket(CkObjID &sender, MCount SN);
00246 void addLogEntry(MlogEntry *entry);
00247 virtual void pup(PUP::er &p);
00248 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
00249 MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
00250 };
00251
00259 class MlogEntry{
00260 public:
00261 envelope *env;
00262 int destPE;
00263 int _infoIdx;
00264 int indexBufDets;
00265 int numBufDets;
00266
00267 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
00268 env = _env;
00269 destPE = _destPE;
00270 _infoIdx = __infoIdx;
00271 }
00272 MlogEntry(){
00273 env = 0;
00274 destPE = -1;
00275 _infoIdx = 0;
00276 }
00277 ~MlogEntry(){
00278 if(env){
00279 CmiFree(env);
00280 }
00281 }
00282 virtual void pup(PUP::er &p);
00283 };
00284
00288 class LocationID{
00289 public:
00290 CkArrayIndexMax idx;
00291 CkGroupID gid;
00292 int PE;
00293 };
00294
00298 class StoredCheckpoint{
00299 public:
00300 char *buf;
00301 int bufSize;
00302 int PE;
00303 StoredCheckpoint(){
00304 buf = NULL;
00305 bufSize = 0;
00306 PE = -1;
00307 };
00308 };
00309
00315 class RestoredLocalMap {
00316 public:
00317 MCount minSN,maxSN,count;
00318 MCount *TNArray;
00319 RestoredLocalMap(){
00320 minSN=maxSN=count=0;
00321 TNArray=NULL;
00322 };
00323 RestoredLocalMap(int i){
00324 minSN=maxSN=count=0;
00325 TNArray=NULL;
00326 };
00327
00328 virtual void pup(PUP::er &p);
00329 };
00330
00331
00332 typedef struct {
00333 char header[CmiMsgHeaderSizeBytes];
00334 CkObjID sender;
00335 CkObjID recver;
00336 MlogEntry *logEntry;
00337 MCount SN;
00338 MCount TN;
00339 int senderPE;
00340 } TicketRequest;
00341 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
00342 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
00343
00344 typedef struct{
00345 TicketRequest request;
00346 Ticket ticket;
00347 int recverPE;
00348 } TicketReply;
00349
00350 CpvExtern(char**,_bufferedTicketRequests);
00351 extern int _maxBufferedTicketRequests;
00352
00353
00354
00355 typedef struct {
00356 char header[CmiMsgHeaderSizeBytes];
00357 int numberLogs;
00358 } BufferedLocalLogHeader;
00359
00360 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
00361
00362 typedef struct{
00363 char header[CmiMsgHeaderSizeBytes];
00364 int PE;
00365 int dataSize;
00366 } CheckPointDataMsg;
00367
00368
00369
00370
00371
00372
00373
00374 typedef CheckPointDataMsg CheckPointAck;
00375
00376 typedef struct{
00377 CkObjID recver;
00378 MCount tProcessed;
00379 } TProcessedLog;
00380
00381
00385 typedef struct{
00386 char header[CmiMsgHeaderSizeBytes];
00387 int PE;
00388 } RestartRequest;
00389
00390 typedef RestartRequest CkPingMsg;
00391 typedef RestartRequest CheckpointRequest;
00392
00393 typedef struct{
00394 char header[CmiMsgHeaderSizeBytes];
00395 int PE;
00396 double restartWallTime;
00397 int checkPointSize;
00398 int numMigratedAwayElements;
00399 int numMigratedInElements;
00400 int migratedElementSize;
00401 int numLocalMessages;
00402 CkGroupID lbGroupID;
00403 } RestartProcessorData;
00404
00405 typedef struct{
00406 char header[CmiMsgHeaderSizeBytes];
00407 int PE;
00408 int numberObjects;
00409 } ResendRequest;
00410
00411 typedef ResendRequest RemoveLogRequest;
00412
00413 typedef struct {
00414 char header[CmiMsgHeaderSizeBytes];
00415 CkObjID recver;
00416 int numTNs;
00417 } ReceivedTNData;
00418
00419
00420 typedef struct {
00421 char header[CmiMsgHeaderSizeBytes];
00422 CkObjID recver;
00423 int numDets;
00424 } ReceivedDetData;
00425
00426 typedef struct{
00427 int PE;
00428 int numberObjects;
00429 TProcessedLog *listObjects;
00430 CkVec<MCount> *ticketVecs;
00431 } ResendData;
00432
00433 typedef struct {
00434 CkGroupID gID;
00435 CkArrayIndexMax idx;
00436 int fromPE,toPE;
00437 char ackFrom,ackTo;
00438 } MigrationRecord;
00439
00440 typedef struct {
00441 char header[CmiMsgHeaderSizeBytes];
00442 MigrationRecord migRecord;
00443 void *record;
00444 } MigrationNotice;
00445
00446 typedef struct {
00447 char header[CmiMsgHeaderSizeBytes];
00448 void *record;
00449 } MigrationNoticeAck;
00450
00451 typedef struct {
00452 MigrationRecord migRecord;
00453 void *msg;
00454 int size;
00455 char acked;
00456 } RetainedMigratedObject;
00457
00458 typedef struct {
00459 char header[CmiMsgHeaderSizeBytes];
00460 MigrationRecord migRecord;
00461 int index;
00462 int fromPE;
00463 } VerifyAckMsg;
00464
00465 typedef struct {
00466 char header[CmiMsgHeaderSizeBytes];
00467 int checkpointCount;
00468 int fromPE;
00469 } CheckpointBarrierMsg;
00470
00471
00472
00473 typedef struct {
00474 char header[CmiMsgHeaderSizeBytes];
00475 CkGroupID mgrID;
00476 CkArrayIndexMax idx;
00477 int locationPE;
00478 int fromPE;
00479 } CurrentLocationMsg;
00480
00481 typedef struct {
00482 char header[CmiMsgHeaderSizeBytes];
00483 CkGroupID lbID;
00484 int fromPE;
00485 int step;
00486 } LBStepMsg;
00487
00488
00489 #define MLOG_OBJECT 1
00490 #define MLOG_COUNT 2
00491
00492 typedef struct {
00493 char header[CmiMsgHeaderSizeBytes];
00494 int flag;
00495 CkGroupID lbID;
00496 int count;
00498 CkGroupID mgrID;
00499 CkArrayIndexMax idx;
00500 int locationPE;
00501 } DummyMigrationMsg;
00502
00503
00504
00505
00506
00507 typedef void (*MlogFn)(void *,ChareMlogData *);
00508
00509 void _messageLoggingInit();
00510
00511
00512 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
00513 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
00514 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
00515 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
00516 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
00517 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
00518 void sendLocalMsg(MlogEntry *entry);
00519
00520
00521 void _ticketRequestHandler(TicketRequest *);
00522 void _ticketHandler(TicketReply *);
00523 void _pingHandler(CkPingMsg *msg);
00524 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
00525 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
00526 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
00527 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
00528 void _storeDeterminantsHandler(char *buffer);
00529 void _removeDeterminantsHandler(char *buffer);
00530
00531
00532
00533 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
00534 extern void _noCldNodeEnqueue(int node, envelope *env);
00535 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
00536 void retryTicketRequest(void *_ticketRequest,double curWallTime);
00537
00538
00539 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
00540 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
00541
00542
00543
00544 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
00545
00546
00547 void checkpointAlarm(void *_dummy,double curWallTime);
00548 void startMlogCheckpoint(void *_dummy,double curWallTime);
00549 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
00550
00551
00552 void _checkpointRequestHandler(CheckpointRequest *request);
00553 void _storeCheckpointHandler(char *msg);
00554 void _checkpointAckHandler(CheckPointAck *ackMsg);
00555 void _removeProcessedLogHandler(char *requestMsg);
00556 void garbageCollectMlog();
00557
00558
00559 extern int _checkpointRequestHandlerIdx;
00560 extern int _storeCheckpointHandlerIdx;
00561 extern int _checkpointAckHandlerIdx;
00562 extern int _removeProcessedLogHandlerIdx;
00563
00564
00565
00566
00567
00568 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
00569 void CkMlogRestartDouble(void *,double);
00570 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
00571 void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
00572 void initializeRestart(void *data,ChareMlogData *mlogData);
00573 void distributeRestartedObjects();
00574 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
00575
00576
00577 void CkMlogRestartLocal();
00578
00579
00580 void _getCheckpointHandler(RestartRequest *restartMsg);
00581 void _recvCheckpointHandler(char *_restartData);
00582 void _resendMessagesHandler(char *msg);
00583 void _sendDetsHandler(char *msg);
00584 void _sendDetsReplyHandler(char *msg);
00585 void _receivedTNDataHandler(ReceivedTNData *msg);
00586 void _receivedDetDataHandler(ReceivedDetData *msg);
00587 void _distributedLocationHandler(char *receivedMsg);
00588 void _updateHomeRequestHandler(RestartRequest *updateRequest);
00589 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
00590 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
00591 void _verifyAckHandler(VerifyAckMsg *verifyReply);
00592 void _dummyMigrationHandler(DummyMigrationMsg *msg);
00593
00594
00595 void _restartHandler(RestartRequest *restartMsg);
00596 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
00597 void _recvRestartCheckpointHandler(char *_restartData);
00598
00599
00600 extern int _getCheckpointHandlerIdx;
00601 extern int _recvCheckpointHandlerIdx;
00602 extern int _resendMessagesHandlerIdx;
00603 extern int _sendDetsHandlerIdx;
00604 extern int _sendDetsReplyHandlerIdx;
00605 extern int _receivedTNDataHandlerIdx;
00606 extern int _receivedDetDataHandlerIdx;
00607 extern int _distributedLocationHandlerIdx;
00608 extern int _updateHomeRequestHandlerIdx;
00609 extern int _updateHomeAckHandlerIdx;
00610 extern int _verifyAckRequestHandlerIdx;
00611 extern int _verifyAckHandlerIdx;
00612 extern int _dummyMigrationHandlerIdx;
00613
00615
00616
00617 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
00618 void finishedCheckpointLoadBalancing();
00619 void sendMlogLocation(int targetPE,envelope *env);
00620 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
00621
00622
00623 void _receiveMlogLocationHandler(void *buf);
00624 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
00625 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
00626 void _getGlobalStepHandler(LBStepMsg *msg);
00627 void _recvGlobalStepHandler(LBStepMsg *msg);
00628 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
00629 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
00630
00631
00632 extern int onGoingLoadBalancing;
00633 extern void *centralLb;
00634 extern void (*resumeLbFnPtr)(void *) ;
00635 extern int _receiveMlogLocationHandlerIdx;
00636 extern int _receiveMigrationNoticeHandlerIdx;
00637 extern int _receiveMigrationNoticeAckHandlerIdx;
00638 extern int _getGlobalStepHandlerIdx;
00639 extern int _recvGlobalStepHandlerIdx;
00640 extern int _checkpointBarrierHandlerIdx;
00641 extern int _checkpointBarrierAckHandlerIdx;
00642
00643
00644 extern CkVec<MigrationRecord> migratedNoticeList;
00645 extern CkVec<RetainedMigratedObject *> retainedObjectList;
00646
00647 int getCheckPointPE();
00648 inline int isSameDet(Determinant *first, Determinant *second);
00649 void forAllCharesDo(MlogFn fnPointer,void *data);
00650 envelope *copyEnvelope(envelope *env);
00651 extern void _initDone(void);
00652
00653
00654 extern void _resetNodeBocInitVec(void);
00655
00656
00657 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
00658
00659
00660 void _receiveLocationHandler(CurrentLocationMsg *data);
00661
00662
00663 extern int _receiveLocationHandlerIdx;
00664
00665
00666 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
00667
00668 #endif