00001 #ifndef _CKMESSAGELOGGING_H_
00002 #define _CKMESSAGELOGGING_H_
00003
00004 #include "ckobjid.h"
00005
00006 #if CMK_HAS_STRINGS_H
00007 #include <strings.h>
00008 #else
00009 #define bzero(s,n) memset(s,0,n)
00010 #endif
00011
00012 CpvExtern(Chare *,_currentObj);
00013
00014
00015 #define NEW_TICKET 1
00016 #define OLD_TICKET 2
00017 #define FORWARDED_TICKET 0x8000
00018
00019
00020 #define MLOG_RESTARTED 0
00021 #define MLOG_CRASHED 1
00022 #define MEGABYTE 1048576
00023
00024
00025 extern char objString[100];
00026
00030 class Ticket {
00031 public:
00032 MCount TN;
00033 int state;
00034 Ticket(){
00035 TN = 0;
00036 state = 0;
00037 }
00038 Ticket(int x){
00039 TN = x;
00040 state = 0;
00041 }
00042 };
00043 PUPbytes(Ticket)
00044 class MlogEntry;
00045
00053 typedef struct{
00054 char header[CmiMsgHeaderSizeBytes];
00055 CkObjID sender;
00056 CkObjID recver;
00057 MCount SN;
00058 MCount TN;
00059 MlogEntry *entry;
00060 int senderPE;
00061 int recverPE;
00062 } LocalMessageLog;
00063 PUPbytes(LocalMessageLog)
00064
00065 class MlogEntry;
00066 class RestoredLocalMap;
00067
00068 #define INITSIZE_SNTOTICKET 100
00069
00074 class SNToTicket{
00075 private:
00076 Ticket initial[INITSIZE_SNTOTICKET];
00077 Ticket *ticketVec;
00078 MCount startSN;
00079 int currentSize;
00080 MCount finishSN;
00081 public:
00082 SNToTicket(){
00083 currentSize = INITSIZE_SNTOTICKET;
00084 ticketVec = &initial[0];
00085 bzero(ticketVec,sizeof(Ticket)*currentSize);
00086 startSN = 0;
00087 finishSN = 0;
00088 }
00092 inline MCount getFinishSN(){
00093 return finishSN;
00094 }
00098 inline MCount getStartSN(){
00099 return startSN;
00100 }
00101
00102 inline Ticket &put(MCount SN){
00103 if(SN > finishSN) finishSN = SN;
00104 if(startSN == 0){
00105 startSN = SN;
00106 }
00107 int index = SN-startSN;
00108 if(index >= currentSize){
00109 int oldSize = currentSize;
00110 Ticket *old = ticketVec;
00111
00112 currentSize = index*2;
00113 ticketVec = new Ticket[currentSize];
00114 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
00115 if(old != &initial[0]){
00116 delete [] old;
00117 }
00118 }
00119 return ticketVec[index];
00120 }
00121
00122 inline Ticket get(MCount SN){
00123 int index = SN-startSN;
00124 CmiAssert(index >= 0);
00125 if(index >= currentSize){
00126 Ticket tn;
00127 return tn;
00128 }else{
00129 return ticketVec[index];
00130 }
00131 }
00132
00133 inline void pup(PUP::er &p){
00134 p | startSN;
00135 p | currentSize;
00136 if(p.isUnpacking()){
00137 if(currentSize > INITSIZE_SNTOTICKET){
00138 ticketVec = new Ticket[currentSize];
00139 }
00140 }
00141 for(int i=0;i<currentSize;i++){
00142 p | ticketVec[i];
00143 }
00144 }
00145 };
00146
00147
00157 class ChareMlogData{
00158 public:
00159
00160 CkObjID objID;
00161
00162 MCount tCount;
00163
00164 MCount tProcessed;
00165
00166
00167 CkVec<MCount> *receivedTNs;
00168 MCount *ticketHoles;
00169 int numberHoles;
00170 int currentHoles;
00171 CkVec<LocalMessageLog> restoredLocalMsgLog;
00172 int maxRestoredLocalTN;
00173 int resendReplyRecvd;
00174 int restartFlag;
00175 int teamRecoveryFlag;
00176 CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable;
00177
00178 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
00179
00180 int toResumeOrNot;
00181 int resumeCount;
00182
00183 private:
00184
00185
00186 CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
00187
00188 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
00189
00190 CkQ<MlogEntry *> mlog;
00191
00192
00193 inline MCount newTN();
00194
00195 public:
00199 ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
00200 tCount = 0;
00201 tProcessed = 0;
00202 numberHoles = 0;
00203 ticketHoles = NULL;
00204 currentHoles = 0;
00205 restartFlag=0;
00206 teamRecoveryFlag=0;
00207 receivedTNs = NULL;
00208 resendReplyRecvd=0;
00209 maxRestoredLocalTN=0;
00210 toResumeOrNot=0;
00211 resumeCount=0;
00212 };
00213 inline MCount nextSN(const CkObjID &recver);
00214 inline Ticket next_ticket(CkObjID &sender,MCount SN);
00215 inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
00216 void addLogEntry(MlogEntry *entry);
00217 virtual void pup(PUP::er &p);
00218 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
00219 MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
00220 void addToRestoredLocalQ(LocalMessageLog *logEntry);
00221 void sortRestoredLocalMsgLog();
00222 };
00223
00227 class MlogEntry{
00228 public:
00229 envelope *env;
00230 int destPE;
00231 int _infoIdx;
00232 char unackedLocal;
00233
00234 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
00235 env = _env;
00236 destPE = _destPE;
00237 _infoIdx = __infoIdx;
00238 unackedLocal = 0;
00239 }
00240 MlogEntry(){
00241 env = 0;
00242 destPE = -1;
00243 _infoIdx = 0;
00244 unackedLocal = 0;
00245 }
00246 ~MlogEntry(){
00247 if(env){
00248 CmiFree(env);
00249 }
00250 }
00251 virtual void pup(PUP::er &p);
00252 };
00253
00257 class LocationID{
00258 public:
00259 CkArrayIndex idx;
00260 CkGroupID gid;
00261 };
00262
00266 class StoredCheckpoint{
00267 public:
00268 char *buf;
00269 int bufSize;
00270 int PE;
00271 StoredCheckpoint(){
00272 buf = NULL;
00273 bufSize = 0;
00274 PE = -1;
00275 };
00276 };
00277
00283 class RestoredLocalMap {
00284 public:
00285 MCount minSN,maxSN,count;
00286 MCount *TNArray;
00287 RestoredLocalMap(){
00288 minSN=maxSN=count=0;
00289 TNArray=NULL;
00290 };
00291 RestoredLocalMap(int i){
00292 minSN=maxSN=count=0;
00293 TNArray=NULL;
00294 };
00295
00296 virtual void pup(PUP::er &p);
00297 };
00298
00299
00300 typedef struct {
00301 char header[CmiMsgHeaderSizeBytes];
00302 CkObjID sender;
00303 CkObjID recver;
00304 MlogEntry *logEntry;
00305 MCount SN;
00306 MCount TN;
00307 int senderPE;
00308 } TicketRequest;
00309 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
00310 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
00311
00312 typedef struct{
00313 TicketRequest request;
00314 Ticket ticket;
00315 int recverPE;
00316 } TicketReply;
00317
00318
00319 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog);
00320
00321 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
00322 extern int _maxBufferedMessages;
00323
00324 CpvExtern(char**,_bufferedTicketRequests);
00325 extern int _maxBufferedTicketRequests;
00326
00327
00328
00329 typedef struct {
00330 char header[CmiMsgHeaderSizeBytes];
00331 int numberLogs;
00332 } BufferedLocalLogHeader;
00333
00334 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
00335
00336 typedef struct{
00337 char header[CmiMsgHeaderSizeBytes];
00338 MlogEntry *entry;
00339 } LocalMessageLogAck;
00340
00341 typedef struct{
00342 char header[CmiMsgHeaderSizeBytes];
00343 int PE;
00344 int dataSize;
00345 } CheckPointDataMsg;
00346
00347
00348
00349
00350
00351
00352
00353 typedef CheckPointDataMsg CheckPointAck;
00354
00355 typedef struct{
00356 CkObjID recver;
00357 MCount tProcessed;
00358 } TProcessedLog;
00359
00363 typedef struct{
00364 char header[CmiMsgHeaderSizeBytes];
00365 int PE;
00366 } RestartRequest;
00367
00368 typedef RestartRequest CkPingMsg;
00369 typedef RestartRequest CheckpointRequest;
00370
00371 typedef struct{
00372 char header[CmiMsgHeaderSizeBytes];
00373 int PE;
00374 double restartWallTime;
00375 int checkPointSize;
00376 int numMigratedAwayElements;
00377 int numMigratedInElements;
00378 int migratedElementSize;
00379 int numLocalMessages;
00380 CkGroupID lbGroupID;
00381 } RestartProcessorData;
00382
00383 typedef struct{
00384 char header[CmiMsgHeaderSizeBytes];
00385 int PE;
00386 int numberObjects;
00387 } ResendRequest;
00388
00389 typedef ResendRequest RemoveLogRequest;
00390
00391 typedef struct {
00392 char header[CmiMsgHeaderSizeBytes];
00393 CkObjID recver;
00394 int numTNs;
00395 } ReceivedTNData;
00396
00397 typedef struct{
00398 int PE;
00399 int numberObjects;
00400 TProcessedLog *listObjects;
00401 MCount *maxTickets;
00402 CkVec<MCount> *ticketVecs;
00403 } ResendData;
00404
00405 typedef struct {
00406 CkGroupID gID;
00407 CkArrayIndex idx;
00408 int fromPE,toPE;
00409 char ackFrom,ackTo;
00410 } MigrationRecord;
00411
00412 typedef struct {
00413 char header[CmiMsgHeaderSizeBytes];
00414 MigrationRecord migRecord;
00415 void *record;
00416 } MigrationNotice;
00417
00418 typedef struct {
00419 char header[CmiMsgHeaderSizeBytes];
00420 void *record;
00421 } MigrationNoticeAck;
00422
00423 typedef struct {
00424 MigrationRecord migRecord;
00425 void *msg;
00426 int size;
00427 char acked;
00428 } RetainedMigratedObject;
00429
00430 typedef struct {
00431 char header[CmiMsgHeaderSizeBytes];
00432 MigrationRecord migRecord;
00433 int index;
00434 int fromPE;
00435 } VerifyAckMsg;
00436
00437 typedef struct {
00438 char header[CmiMsgHeaderSizeBytes];
00439 int checkpointCount;
00440 int fromPE;
00441 } CheckpointBarrierMsg;
00442
00443
00444
00445 typedef struct {
00446 char header[CmiMsgHeaderSizeBytes];
00447 CkGroupID mgrID;
00448 CkArrayIndex idx;
00449 int locationPE;
00450 int fromPE;
00451 } CurrentLocationMsg;
00452
00453 typedef struct {
00454 char header[CmiMsgHeaderSizeBytes];
00455 CkGroupID lbID;
00456 int fromPE;
00457 int step;
00458 } LBStepMsg;
00459
00460
00461 #define MLOG_OBJECT 1
00462 #define MLOG_COUNT 2
00463
00464 typedef struct {
00465 char header[CmiMsgHeaderSizeBytes];
00466 int flag;
00467 CkGroupID lbID;
00468 int count;
00470 CkGroupID mgrID;
00471 CkArrayIndex idx;
00472 int locationPE;
00473 } DummyMigrationMsg;
00474
00475
00476
00477
00478
00479 typedef void (*MlogFn)(void *,ChareMlogData *);
00480
00481 void _messageLoggingInit();
00482
00483
00484 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
00485 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
00486 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
00487 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
00488 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
00489 void ticketLogLocalMessage(MlogEntry *entry);
00490 void sendLocalMessageCopy(MlogEntry *entry);
00491 void sendBufferedLocalMessageCopy();
00492 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
00493 void sendBufferedTicketRequests(int destPE);
00494 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
00495
00496
00497
00498
00499
00500 extern int _ticketRequestHandlerIdx;
00501 extern int _ticketHandlerIdx;
00502 extern int _localMessageCopyHandlerIdx;
00503 extern int _localMessageAckHandlerIdx;
00504 extern int _bufferedLocalMessageCopyHandlerIdx;
00505 extern int _bufferedLocalMessageAckHandlerIdx;
00506 extern int _bufferedTicketRequestHandlerIdx;
00507 extern int _bufferedTicketHandlerIdx;
00508
00509
00510 void _ticketRequestHandler(TicketRequest *);
00511 void _ticketHandler(TicketReply *);
00512 void _localMessageCopyHandler(LocalMessageLog *);
00513 void _localMessageAckHandler(LocalMessageLogAck *);
00514 void _pingHandler(CkPingMsg *msg);
00515 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
00516 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
00517 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
00518 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
00519
00520
00521
00522 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
00523 extern void _noCldNodeEnqueue(int node, envelope *env);
00524 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
00525 void retryTicketRequest(void *_ticketRequest,double curWallTime);
00526
00527
00528 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
00529 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
00530
00531
00532
00533 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
00534
00535
00536 void checkpointAlarm(void *_dummy,double curWallTime);
00537 void startMlogCheckpoint(void *_dummy,double curWallTime);
00538 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
00539
00540
00541 void _checkpointRequestHandler(CheckpointRequest *request);
00542 void _storeCheckpointHandler(char *msg);
00543 void _checkpointAckHandler(CheckPointAck *ackMsg);
00544 void _removeProcessedLogHandler(char *requestMsg);
00545
00546
00547 extern int _checkpointRequestHandlerIdx;
00548 extern int _storeCheckpointHandlerIdx;
00549 extern int _checkpointAckHandlerIdx;
00550 extern int _removeProcessedLogHandlerIdx;
00551
00552
00553
00554
00555
00556 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
00557 void CkMlogRestartDouble(void *,double);
00558 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
00559 void initializeRestart(void *data,ChareMlogData *mlogData);
00560 void distributeRestartedObjects();
00561 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
00562 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE);
00563
00564
00565 void CkMlogRestartLocal();
00566
00567
00568 void _getCheckpointHandler(RestartRequest *restartMsg);
00569 void _recvCheckpointHandler(char *_restartData);
00570 void _resendMessagesHandler(char *msg);
00571 void _resendReplyHandler(char *msg);
00572 void _receivedTNDataHandler(ReceivedTNData *msg);
00573 void _distributedLocationHandler(char *receivedMsg);
00574 void _updateHomeRequestHandler(RestartRequest *updateRequest);
00575 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
00576 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
00577 void _verifyAckHandler(VerifyAckMsg *verifyReply);
00578 void _dummyMigrationHandler(DummyMigrationMsg *msg);
00579
00580
00581 void _restartHandler(RestartRequest *restartMsg);
00582 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
00583 void _recvRestartCheckpointHandler(char *_restartData);
00584
00585
00586
00587 extern int _getCheckpointHandlerIdx;
00588 extern int _recvCheckpointHandlerIdx;
00589 extern int _resendMessagesHandlerIdx;
00590 extern int _resendReplyHandlerIdx;
00591 extern int _receivedTNDataHandlerIdx;
00592 extern int _distributedLocationHandlerIdx;
00593 extern int _updateHomeRequestHandlerIdx;
00594 extern int _updateHomeAckHandlerIdx;
00595 extern int _verifyAckRequestHandlerIdx;
00596 extern int _verifyAckHandlerIdx;
00597 extern int _dummyMigrationHandlerIdx;
00598
00599
00601
00602
00603 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
00604 void finishedCheckpointLoadBalancing();
00605 void sendMlogLocation(int targetPE,envelope *env);
00606 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
00607
00608
00609 void _receiveMlogLocationHandler(void *buf);
00610 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
00611 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
00612 void _getGlobalStepHandler(LBStepMsg *msg);
00613 void _recvGlobalStepHandler(LBStepMsg *msg);
00614 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
00615 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
00616
00617
00618
00619 extern int onGoingLoadBalancing;
00620 extern void *centralLb;
00621 extern void (*resumeLbFnPtr)(void *) ;
00622 extern int _receiveMlogLocationHandlerIdx;
00623 extern int _receiveMigrationNoticeHandlerIdx;
00624 extern int _receiveMigrationNoticeAckHandlerIdx;
00625 extern int _getGlobalStepHandlerIdx;
00626 extern int _recvGlobalStepHandlerIdx;
00627 extern int _checkpointBarrierHandlerIdx;
00628 extern int _checkpointBarrierAckHandlerIdx;
00629
00630
00631 extern CkVec<MigrationRecord> migratedNoticeList;
00632 extern CkVec<RetainedMigratedObject *> retainedObjectList;
00633
00634 int getCheckPointPE();
00635 void forAllCharesDo(MlogFn fnPointer,void *data);
00636 envelope *copyEnvelope(envelope *env);
00637 extern void _initDone(void);
00638
00639
00640 extern void _resetNodeBocInitVec(void);
00641
00642
00643 void informLocationHome(CkGroupID mgrID,CkArrayIndex idx,int homePE,int currentPE);
00644
00645
00646 void _receiveLocationHandler(CurrentLocationMsg *data);
00647
00648
00649 extern int _receiveLocationHandlerIdx;
00650
00651
00652 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
00653 inline void processRemoteMlogMessages(){
00654 CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
00655 }
00656
00657 #endif