00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include "unistd.h"
00043
00044 #include "charm++.h"
00045 #include "ck.h"
00046 #include "register.h"
00047 #include "conv-ccs.h"
00048 #include <signal.h>
00049
00050 void noopck(const char*, ...)
00051 {}
00052
00053
00054
00055 #define DEBUGF noopck
00056
00057
00058 #define NODE_CHECKPOINT 0
00059
00060 static int replicaDieHandlerIdx;
00061 static int replicaDieBcastHandlerIdx;
00062 static int changePhaseHandlerIdx;
00063
00064
00065 #if CMK_CONVERSE_MPI
00066 #define CK_NO_PROC_POOL 0
00067 static int pingHandlerIdx;
00068 static int pingCheckHandlerIdx;
00069 static int buddyDieHandlerIdx;
00070 static double lastPingTime = -1;
00071 void pingBuddy();
00072 void pingCheckHandler();
00073 #else
00074 #define CK_NO_PROC_POOL 0
00075 #endif
00076
00077 #define CMK_CHKP_ALL 1
00078 #define CMK_USE_BARRIER 0
00079
00080 #define FAIL_DET_THRESHOLD 10
00081
00082
00083 #define STREAMING_INFORMHOME 1
00084 CpvDeclare(int, _crashedNode);
00085
00086
00087 bool CkMemCheckPT::inRestarting = false;
00088 bool CkMemCheckPT::inCheckpointing = false;
00089 bool CkMemCheckPT::inLoadbalancing = false;
00090 double CkMemCheckPT::startTime;
00091 char *CkMemCheckPT::stage;
00092 CkCallback CkMemCheckPT::cpCallback;
00093
00094 bool _memChkptOn = true;
00095
00096 CkGroupID ckCheckPTGroupID;
00097
00098 static bool checkpointed = false;
00099
00101
00102
00103 #ifdef CMK_MEM_CHECKPOINT
00104
00105 char *killFile;
00106
00107 int killFlag=0;
00108
00109 double killTime=0.0;
00110 #endif
00111
00112 #ifdef CKLOCMGR_LOOP
00113 #undef CKLOCMGR_LOOP
00114 #endif
00115
00116 #define CKLOCMGR_LOOP(code) { \
00117 int numGroups = CkpvAccess(_groupIDTable)->size(); \
00118 for(int i=0;i<numGroups;i++) { \
00119 IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj(); \
00120 if(obj->isLocMgr()) { \
00121 CkLocMgr *mgr = (CkLocMgr*)obj; \
00122 code \
00123 } \
00124 } \
00125 }
00126
00128
00129 CpvDeclare(CkProcCheckPTMessage**, procChkptBuf);
00130
00131 CpvDeclare(int, chkpPointer);
00132 CpvDeclare(int, chkpNum);
00133
00134
00135
00136 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
00137
00138 inline int CkMemCheckPT::BuddyPE(int pe)
00139 {
00140 int budpe;
00141 #if NODE_CHECKPOINT
00142
00143 int r1 = CmiPhysicalRank(pe);
00144 int budnode = CmiPhysicalNodeID(pe);
00145 do {
00146 budnode = (budnode+1)%CmiNumPhysicalNodes();
00147 int *pelist;
00148 int num;
00149 CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
00150 budpe = pelist[r1 % num];
00151 } while (isFailed(budpe));
00152 if (budpe == pe) {
00153 CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
00154 CmiAbort("Failed to find a buddy processor");
00155 }
00156 #else
00157 budpe = pe;
00158 while (budpe == pe || isFailed(budpe))
00159 budpe = (budpe+1)%CkNumPes();
00160 #endif
00161 return budpe;
00162 }
00163
00164
00165
00166 #if CMK_MEM_CHECKPOINT
00167 void ArrayElement::init_checkpt() {
00168 if (!_memChkptOn) return;
00169 if (CkInRestarting()) {
00170 CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
00171 }
00172
00173 if (thisArray->getLocMgr()->managers.begin()->second != thisArray) return;
00174
00175 budPEs[0] = CkMyPe();
00176 budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
00177 CmiAssert(budPEs[0] != budPEs[1]);
00178
00179 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
00180
00181 checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);
00182 checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
00183 }
00184 #endif
00185
00186
00187 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
00188 #if CMK_MEM_CHECKPOINT
00189
00190
00191
00192 CkLocMgr *locMgr = thisArray->getLocMgr();
00193 CmiAssert(myRec!=NULL);
00194 size_t size;
00195 {
00196 PUP::sizer p;
00197 locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
00198 size = p.size();
00199 }
00200 size_t packSize = size/sizeof(double) +1;
00201 CkArrayCheckPTMessage *msg =
00202 new (packSize, 0) CkArrayCheckPTMessage;
00203 msg->len = size;
00204 msg->index =thisIndexMax;
00205 msg->aid = thisArrayID;
00206 msg->locMgr = locMgr->getGroupID();
00207 msg->cp_flag = true;
00208 {
00209 PUP::toMem p(msg->packData);
00210 locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
00211 }
00212
00213 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
00214 checkptMgr.recvData(msg, 2, budPEs);
00215 delete m;
00216 #endif
00217 }
00218
00219
00220 class CkMemCheckPTInfo: public CkCheckPTInfo
00221 {
00222 CkArrayCheckPTMessage *ckBuffer;
00223 public:
00224 CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno):
00225 CkCheckPTInfo(a, loc, idx, pno)
00226 {
00227 ckBuffer = NULL;
00228 }
00229 ~CkMemCheckPTInfo()
00230 {
00231 if (ckBuffer) delete ckBuffer;
00232 }
00233 inline void updateBuffer(CkArrayCheckPTMessage *data)
00234 {
00235 CmiAssert(data!=NULL);
00236 if (ckBuffer) delete ckBuffer;
00237 ckBuffer = data;
00238 }
00239 inline CkArrayCheckPTMessage * getCopy()
00240 {
00241 if (ckBuffer == NULL) {
00242 CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
00243 CmiAbort("Abort!");
00244 }
00245 return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
00246 }
00247 inline void updateBuddy(int b1, int b2) {
00248 CmiAssert(ckBuffer);
00249 ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
00250 pNo = b1; if (pNo == CkMyPe()) pNo = b2;
00251 CmiAssert(pNo != CkMyPe());
00252 }
00253 inline size_t getSize() {
00254 CmiAssert(ckBuffer);
00255 return ckBuffer->len;
00256 }
00257 };
00258
00259
00260 class CkDiskCheckPTInfo: public CkCheckPTInfo
00261 {
00262 std::string fname;
00263 int bud1, bud2;
00264 size_t len;
00265 public:
00266 CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
00267 {
00268 #if CMK_USE_MKSTEMP
00269 #if CMK_CONVERSE_MPI
00270 fname = "/tmp/ckpt" + std::to_string(CmiMyPartition()) + "-" + std::to_string(CkMyPe()) + "-" + std::to_string(myidx) + "-XXXXXX";
00271 #else
00272 fname = "/tmp/ckpt" + std::to_string(CkMyPe()) + "-" + std::to_string(myidx) + "-XXXXXX";
00273 #endif
00274 if(mkstemp(&fname[0]) < 0)
00275 {
00276 CmiAbort("mkstemp fail in checkpoint");
00277 }
00278 #else
00279 fname = tmpnam(NULL);
00280 #endif
00281 bud1 = bud2 = -1;
00282 len = 0;
00283 }
00284 ~CkDiskCheckPTInfo()
00285 {
00286 remove(fname.c_str());
00287 }
00288 inline void updateBuffer(CkArrayCheckPTMessage *data)
00289 {
00290 double t = CmiWallTimer();
00291
00292 envelope *env = UsrToEnv(data);
00293 CkUnpackMessage(&env);
00294 data = (CkArrayCheckPTMessage *)EnvToUsr(env);
00295 FILE *f = fopen(fname.c_str(),"wb");
00296 PUP::toDisk p(f);
00297 CkPupMessage(p, (void **)&data);
00298
00299
00300 fclose(f);
00301 bud1 = data->bud1;
00302 bud2 = data->bud2;
00303 len = data->len;
00304 delete data;
00305
00306 }
00307 inline CkArrayCheckPTMessage * getCopy()
00308 {
00309 CkArrayCheckPTMessage *data;
00310 FILE *f = fopen(fname.c_str(),"rb");
00311 PUP::fromDisk p(f);
00312 CkPupMessage(p, (void **)&data);
00313 fclose(f);
00314 data->bud1 = bud1;
00315 data->bud2 = bud2;
00316 return data;
00317 }
00318 inline void updateBuddy(int b1, int b2) {
00319 bud1 = b1; bud2 = b2;
00320 pNo = b1; if (pNo == CkMyPe()) pNo = b2;
00321 CmiAssert(pNo != CkMyPe());
00322 }
00323 inline size_t getSize() {
00324 return len;
00325 }
00326 };
00327
00328 CkMemCheckPT::CkMemCheckPT(int w)
00329 {
00330 int numnodes = 0;
00331 #if NODE_CHECKPOINT
00332 numnodes = CmiNumPhysicalNodes();
00333 #else
00334 numnodes = CkNumPes();
00335 #endif
00336 #if CK_NO_PROC_POOL
00337 if (numnodes <= 2)
00338 #else
00339 if (numnodes == 1)
00340 #endif
00341 {
00342 if (CkMyPe() == 0) CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
00343 _memChkptOn = false;
00344 }
00345 inRestarting = false;
00346 recvCount = peCount = 0;
00347 recvChkpCount = 0;
00348 ackCount = 0;
00349 expectCount = -1;
00350 where = w;
00351
00352 #if CMK_CONVERSE_MPI
00353 if(CkNumPes() > 1) {
00354 void pingBuddy();
00355 void pingCheckHandler();
00356 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
00357 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
00358 }
00359 #endif
00360 #if CMK_CHKP_ALL
00361 initEntry();
00362 #endif
00363 }
00364
00365 void CkMemCheckPT::initEntry()
00366 {
00367 #if CMK_CHKP_ALL
00368 chkpTable[0].init(where, 0);
00369 chkpTable[1].init(where, 1);
00370 #endif
00371 }
00372
00373 CkMemCheckPT::~CkMemCheckPT()
00374 {
00375 for (CkCheckPTInfo* it : ckTable) {
00376 delete it;
00377 }
00378 }
00379
00380 void CkMemCheckPT::pup(PUP::er& p)
00381 {
00382 p|cpStarter;
00383 p|thisFailedPe;
00384 p|failedPes;
00385 p|ckCheckPTGroupID;
00386 p|cpCallback;
00387 p|where;
00388 p|peCount;
00389 if (p.isUnpacking()) {
00390 recvCount = peCount = 0;
00391 recvChkpCount = 0;
00392 ackCount = 0;
00393 expectCount = -1;
00394 inCheckpointing = false;
00395 #if CMK_CONVERSE_MPI
00396 if(CkNumPes() > 1) {
00397 void pingBuddy();
00398 void pingCheckHandler();
00399 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
00400 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
00401 }
00402 #endif
00403 }
00404 }
00405
00406
00407 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m)
00408 {
00409 #if CMK_MEM_CHECKPOINT
00410 DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);
00411
00412 PUP::fromMem p(m->packData);
00413 CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
00414 CmiAssert(mgr);
00415 CmiUInt8 id = mgr->lookupID(m->index);
00416 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
00417 mgr->resume(m->index, id, p, true);
00418 #else
00419 mgr->resume(m->index, id, p, false);
00420 #endif
00421
00422
00423 CkArray *arrmgr = m->aid.ckLocalBranch();
00424 CmiAssert(arrmgr);
00425 ArrayElement *elt = arrmgr->lookup(m->index);
00426 CmiAssert(elt);
00427 CkLocRec *rec = elt->myRec;
00428 std::vector<CkMigratable *> list;
00429 mgr->migratableList(rec, list);
00430 CmiAssert(!list.empty());
00431 for (int l=0; l<list.size(); l++) {
00432 elt = (ArrayElement *)list[l];
00433 elt->budPEs[0] = m->bud1;
00434 elt->budPEs[1] = m->bud2;
00435
00436
00437 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
00438 contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
00439 if (c) c->redNo = 0;
00440 }
00441 }
00442 #endif
00443 delete m;
00444 }
00445
00446
00447 bool CkMemCheckPT::isFailed(int pe)
00448 {
00449 for (int i=0; i<failedPes.size(); i++)
00450 if (failedPes[i] == pe) return true;
00451 return false;
00452 }
00453
00454
00455 void CkMemCheckPT::failed(int pe)
00456 {
00457 if (isFailed(pe)) return;
00458 failedPes.push_back(pe);
00459 }
00460
00461 int CkMemCheckPT::totalFailed()
00462 {
00463 return failedPes.size();
00464 }
00465
00466
00467 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
00468 {
00469
00470 int idx, len = ckTable.size();
00471 for (idx=0; idx<len; idx++) {
00472 CkCheckPTInfo *entry = ckTable[idx];
00473 if (index == entry->index) {
00474 if (loc == entry->locMgr) {
00475
00476 return;
00477 }
00478
00479
00480 if (aid == entry->aid) {
00481 CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
00482 CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
00483 }
00484 }
00485 }
00486 CkCheckPTInfo *newEntry;
00487 if (where == CkCheckPoint_inMEM)
00488 newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
00489 else
00490 newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
00491 ckTable.push_back(newEntry);
00492
00493 }
00494
00495 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
00496 {
00497 #if !CMK_CHKP_ALL
00498 int buddy = msg->bud1;
00499 if (buddy == CkMyPe()) buddy = msg->bud2;
00500 createEntry(msg->aid, msg->locMgr, msg->index, buddy);
00501 recvData(msg);
00502
00503 thisProxy[buddy].gotData();
00504 #else
00505 initEntry();
00506 thisProxy[msg->bud2].gotData();
00507 recvArrayCheckpoint(msg);
00508 #endif
00509 }
00510
00511
00512
00513 void CkMemCheckPT::doItNow(int starter, CkCallback &&cb)
00514 {
00515 checkpointed = true;
00516 cpCallback = cb;
00517 inCheckpointing = true;
00518 cpStarter = starter;
00519 if (CkMyPe() == cpStarter) {
00520 startTime = CmiWallTimer();
00521 CkPrintf("[%d] Start checkpointing starter: %d... \n", CkMyPe(), cpStarter);
00522 }
00523 #if !CMK_CHKP_ALL
00524 int len = ckTable.size();
00525 for (int i=0; i<len; i++) {
00526 CkCheckPTInfo *entry = ckTable[i];
00527
00528
00529
00530 if (!isMaster(entry->pNo)) continue;
00531
00532
00533 CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
00534 CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
00535 }
00536
00537 if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
00538 #else
00539 startArrayCheckpoint();
00540 #endif
00541
00542 sendProcData();
00543 }
00544
00545 class MemElementPacker : public CkLocIterator{
00546 private:
00547 CkLocMgr *locMgr;
00548 PUP::er &p;
00549 public:
00550 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
00551 void addLocation(CkLocation &loc){
00552 CkArrayIndexMax idx = loc.getIndex();
00553 CkGroupID gID = locMgr->ckGetGroupID();
00554 CmiUInt8 id = loc.getID();
00555 ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
00556 CmiAssert(elt);
00557
00558 p|gID;
00559 p|idx;
00560 p|id;
00561 locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
00562 }
00563 };
00564
00565 void CkMemCheckPT::pupAllElements(PUP::er &p){
00566 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
00567 int numElements;
00568 if(!p.isUnpacking()){
00569 numElements = CkCountArrayElements();
00570 }
00571
00572 p | numElements;
00573 if(!p.isUnpacking()){
00574 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
00575 }
00576 #endif
00577 }
00578
00579 void CkMemCheckPT::startArrayCheckpoint(){
00580 #if CMK_CHKP_ALL
00581 size_t size;
00582 {
00583 PUP::sizer psizer;
00584 pupAllElements(psizer);
00585 size = psizer.size();
00586 }
00587 size_t packSize = size/sizeof(double)+1;
00588
00589 CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
00590 msg->len = size;
00591 msg->cp_flag = true;
00592 int budPEs[2];
00593 msg->bud1=CkMyPe();
00594 msg->bud2=ChkptOnPe(CkMyPe());
00595 {
00596 PUP::toMem p(msg->packData);
00597 pupAllElements(p);
00598 }
00599 thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
00600 chkpTable[0].updateBuffer(CpvAccess(chkpPointer)^1,msg);
00601 recvCount++;
00602 #endif
00603 }
00604
00605 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
00606 {
00607 #if CMK_CHKP_ALL
00608 int idx = 1;
00609 if(msg->bud1 == CkMyPe()){
00610 idx = 0;
00611 }
00612
00613 bool isChkpting = msg->cp_flag;
00614 int pointer;
00615 if(isChkpting)
00616 pointer = CpvAccess(chkpPointer)^1;
00617 else
00618 pointer = CpvAccess(chkpPointer);
00619
00620 chkpTable[idx].updateBuffer(pointer,msg);
00621
00622 if(isChkpting){
00623 recvCount++;
00624
00625 recvChkpCount++;
00626 if(recvChkpCount==2)
00627 {
00628 CpvAccess(chkpNum)++;
00629 recvChkpCount=0;
00630 }
00631
00632 if(recvCount == 2){
00633 if (where == CkCheckPoint_inMEM) {
00634 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
00635 }
00636 else if (where == CkCheckPoint_inDISK) {
00637
00638 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, syncFiles), thisgroup));
00639 }
00640 else
00641 CmiAbort("Unknown checkpoint scheme");
00642 recvCount = 0;
00643 }
00644 }
00645 #endif
00646 }
00647
00648
00649 static inline void _handleProcData(PUP::er &p)
00650 {
00651
00652 CkPupROData(p);
00653
00654
00655 if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
00656
00657 #ifndef CMK_CHARE_USE_PTR
00658
00659 CkPupChareData(p);
00660 #endif
00661
00662
00663 CkPupGroupData(p);
00664
00665
00666 if(CkMyRank()==0) CkPupNodeGroupData(p);
00667 }
00668
00669 void CkMemCheckPT::sendProcData()
00670 {
00671
00672 size_t size;
00673 {
00674 PUP::sizer p;
00675 _handleProcData(p);
00676 size = p.size();
00677 }
00678 size_t packSize = size;
00679 CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
00680 DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %ld to %d\n", CkMyPe(), (CmiUInt8)size, ChkptOnPe(CkMyPe()));
00681 {
00682 PUP::toMem p(msg->packData);
00683 _handleProcData(p);
00684 }
00685 msg->pe = CkMyPe();
00686 msg->len = size;
00687 msg->reportPe = cpStarter;
00688 thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
00689 }
00690
00691 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
00692 {
00693 int pointer = CpvAccess(chkpPointer)^1;
00694 if (CpvAccess(procChkptBuf)[pointer]) delete CpvAccess(procChkptBuf)[pointer];
00695 CpvAccess(procChkptBuf)[pointer] = msg;
00696 DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
00697
00698 recvChkpCount++;
00699 if(recvChkpCount==2)
00700 {
00701 CpvAccess(chkpNum)++;
00702 recvChkpCount=0;
00703 }
00704
00705 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
00706 }
00707
00708
00709 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
00710 {
00711 int len = ckTable.size();
00712 int idx;
00713 for (idx=0; idx<len; idx++) {
00714 CkCheckPTInfo *entry = ckTable[idx];
00715 if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
00716 }
00717 CkAssert(idx < len);
00718 bool isChkpting = msg->cp_flag;
00719 ckTable[idx]->updateBuffer(msg);
00720 if (isChkpting) {
00721
00722
00723 recvCount ++;
00724 if (recvCount == ckTable.size()) {
00725 if (where == CkCheckPoint_inMEM) {
00726 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
00727 }
00728 else if (where == CkCheckPoint_inDISK) {
00729
00730 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, syncFiles), thisgroup));
00731 }
00732 else
00733 CmiAbort("Unknown checkpoint scheme");
00734 recvCount = 0;
00735 }
00736 }
00737 }
00738
00739
00740 void CkMemCheckPT::syncFiles()
00741 {
00742 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
00743 if(system("sync")< 0)
00744 {
00745 CmiAbort("sync file failed");
00746 }
00747 #endif
00748 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
00749 }
00750
00751
00752 void CkMemCheckPT::cpFinish()
00753 {
00754 CmiAssert(CkMyPe() == cpStarter);
00755 peCount++;
00756
00757 if (peCount == 2)
00758 {
00759 CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
00760 cpCallback.send();
00761 peCount = 0;
00762 thisProxy.report();
00763 }
00764 }
00765
00766
00767 void CkMemCheckPT::report()
00768 {
00769 inCheckpointing = false;
00770 #if !CMK_CHKP_ALL
00771 int objsize = 0;
00772 int len = ckTable.size();
00773 for (int i=0; i<len; i++) {
00774 CkCheckPTInfo *entry = ckTable[i];
00775 CmiAssert(entry);
00776 objsize += entry->getSize();
00777 }
00778 #else
00779
00780 CpvAccess(chkpPointer) = CpvAccess(chkpPointer)^1;
00781 if(CkMyPe()==0)
00782 CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)[CpvAccess(chkpPointer)]->len);
00783 #endif
00784 }
00785
00786
00787
00788
00789
00790
00791 inline bool CkMemCheckPT::isMaster(int buddype)
00792 {
00793 #if 0
00794 int mype = CkMyPe();
00795
00796 if (CkNumPes() - totalFailed() == 2) {
00797 return mype > buddype;
00798 }
00799 for (int i=1; i<CkNumPes(); i++) {
00800 int me = (buddype+i)%CkNumPes();
00801 if (isFailed(me)) continue;
00802 if (me == mype) return 1;
00803 else return 0;
00804 }
00805 return 0;
00806 #else
00807
00808 int mype = CkMyPe();
00809
00810 if (CkNumPes() - totalFailed() == 2) {
00811 return mype < buddype;
00812 }
00813 #if NODE_CHECKPOINT
00814 int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
00815 for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
00816 #else
00817 for (int i=1; i<CkNumPes(); i++) {
00818 #endif
00819 int me = (mype+i)%CkNumPes();
00820 if (isFailed(me)) continue;
00821 if (me == buddype) return 1;
00822 else return 0;
00823 }
00824 return 0;
00825 #endif
00826 }
00827
00828
00829
00830 #if 0
00831
00832 class ElementDestroyer : public CkLocIterator {
00833 private:
00834 CkLocMgr *locMgr;
00835 public:
00836 ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
00837 void addLocation(CkLocation &loc) {
00838 CkArrayIndex idx=loc.getIndex();
00839 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
00840 loc.destroy();
00841 }
00842 };
00843 #endif
00844
00845
00846 void CkMemCheckPT::resetLB(int diepe)
00847 {
00848 #if CMK_LBDB_ON
00849 int i;
00850 std::vector<char> bitmap(CkNumPes());
00851
00852 get_avail_vector(bitmap.data());
00853
00854 for (i=0; i<failedPes.size(); i++)
00855 bitmap[failedPes[i]] = 0;
00856 bitmap[diepe] = 0;
00857
00858 #if CK_NO_PROC_POOL
00859 set_avail_vector(bitmap.data());
00860 #endif
00861
00862
00863 if (CkMyNode() == diepe)
00864 for (i=0; i<CkNumPes(); i++)
00865 if (bitmap[i]==0) failed(i);
00866 #endif
00867 }
00868
00869
00870
00871
00872
00873
00874 void CkMemCheckPT::restart(int diePe)
00875 {
00876 #if CMK_MEM_CHECKPOINT
00877 double curTime = CmiWallTimer();
00878 if (CkMyPe() == diePe)
00879 CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
00880 stage = (char*)"resetLB";
00881 startTime = curTime;
00882 if (CkMyPe() == diePe)
00883 CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
00884
00885 #if CK_NO_PROC_POOL
00886 failed(diePe);
00887 #endif
00888 thisFailedPe = diePe;
00889
00890 if (CkMyPe() == diePe) CmiAssert(ckTable.empty());
00891
00892 inRestarting = true;
00893
00894
00895 if (CkMyPe() != diePe) resetLB(diePe);
00896
00897 CKLOCMGR_LOOP(mgr->startInserting(););
00898
00899
00900 barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
00901
00902
00903
00904
00905 #endif
00906 }
00907
00908
00909 void CkMemCheckPT::removeArrayElements()
00910 {
00911 #if CMK_MEM_CHECKPOINT
00912 int len = ckTable.size();
00913 double curTime = CmiWallTimer();
00914 if (CkMyPe() == thisFailedPe)
00915 CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
00916 stage = (char*)"removeArrayElements";
00917 startTime = curTime;
00918
00919 if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
00920 if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
00921
00922
00923
00924 #if CK_NO_PROC_POOL
00925 CKLOCMGR_LOOP(mgr->flushAllRecs(););
00926 #else
00927 CKLOCMGR_LOOP(mgr->flushLocalRecs(););
00928 #endif
00929
00930
00931
00932 barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
00933 #endif
00934 }
00935
00936
00937 void CkMemCheckPT::resetReductionMgr()
00938 {
00939
00940 int numGroups = CkpvAccess(_groupIDTable)->size();
00941 for(int i=0;i<numGroups;i++) {
00942 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
00943 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
00944 obj->flushStates();
00945 obj->ckJustMigrated();
00946 }
00947
00948
00949
00950 #if 1
00951
00952 barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
00953 #else
00954 if (CkMyPe() == 0)
00955 CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
00956 #endif
00957 }
00958
00959
00960 void CkMemCheckPT::recoverBuddies()
00961 {
00962 int idx;
00963 int len = ckTable.size();
00964
00965
00966 double curTime = CmiWallTimer();
00967 if (CkMyPe() == thisFailedPe)
00968 CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, curTime-startTime);
00969 stage = (char *)"recoverBuddies";
00970 if (CkMyPe() == thisFailedPe)
00971 CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f\n",CkMyPe(), stage, curTime);
00972 startTime = curTime;
00973
00974
00975 expectCount = 0;
00976 #if !CMK_CHKP_ALL
00977 for (idx=0; idx<len; idx++) {
00978 CkCheckPTInfo *entry = ckTable[idx];
00979 if (entry->pNo == thisFailedPe) {
00980 #if CK_NO_PROC_POOL
00981
00982 int budPe = BuddyPE(CkMyPe());
00983 #else
00984 int budPe = thisFailedPe;
00985 #endif
00986 CkArrayCheckPTMessage *msg = entry->getCopy();
00987 msg->bud1 = budPe;
00988 msg->bud2 = CkMyPe();
00989 msg->cp_flag = 0;
00990 thisProxy[budPe].recoverEntry(msg);
00991 expectCount ++;
00992 }
00993 }
00994 #else
00995
00996 if(CkMyPe()!=thisFailedPe&&chkpTable[1].bud1==thisFailedPe){
00997 #if CK_NO_PROC_POOL
00998
00999 int budPe = BuddyPE(CkMyPe());
01000 #else
01001 int budPe = thisFailedPe;
01002 #endif
01003 CkArrayCheckPTMessage *msg = chkpTable[1].getCopy(CpvAccess(chkpPointer));
01004 CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
01005 msg->cp_flag = 0;
01006 msg->bud1 = budPe;
01007 msg->bud2 = CkMyPe();
01008 thisProxy[budPe].recoverEntry(msg);
01009 expectCount ++;
01010 }
01011 #endif
01012
01013 #if 1
01014 if (expectCount == 0) {
01015 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
01016
01017 }
01018 #else
01019 if (CkMyPe() == 0) {
01020 CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
01021 }
01022 #endif
01023
01024
01025 }
01026
01027 void CkMemCheckPT::gotData()
01028 {
01029 ackCount ++;
01030 if (ackCount == expectCount) {
01031 ackCount = 0;
01032 expectCount = -1;
01033
01034 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
01035 }
01036 }
01037
01038 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx, CmiUInt8 *id, int nowOnPe)
01039 {
01040
01041 for (int i=0; i<n; i++) {
01042 CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
01043 mgr->updateLocation(idx[i], id[i], nowOnPe);
01044 }
01045 thisProxy[nowOnPe].gotReply();
01046 }
01047
01048
01049 void CkMemCheckPT::recoverArrayElements()
01050 {
01051 double curTime = CmiWallTimer();
01052 int len = ckTable.size();
01053
01054 stage = (char *)"recoverArrayElements";
01055 if (CkMyPe() == thisFailedPe)
01056 CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
01057 startTime = curTime;
01058 int flag = 0;
01059
01060 int count = 0;
01061
01062 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
01063 std::vector<CkGroupID> * gmap = new std::vector<CkGroupID>[CkNumPes()];
01064 std::vector<CkArrayIndex> * imap = new std::vector<CkArrayIndex>[CkNumPes()];
01065 #endif
01066
01067 #if !CMK_CHKP_ALL
01068 for (int idx=0; idx<len; idx++)
01069 {
01070 CkCheckPTInfo *entry = ckTable[idx];
01071 #if CK_NO_PROC_POOL
01072
01073
01074 if (!isMaster(entry->pNo)) continue;
01075 #else
01076
01077 if (CkMyPe() == entry->pNo+1 ||
01078 CkMyPe()+CkNumPes() == entry->pNo+1) continue;
01079 #endif
01080
01081
01082 entry->updateBuddy(CkMyPe(), entry->pNo);
01083 CkArrayCheckPTMessage *msg = entry->getCopy();
01084
01085
01086 inmem_restore(msg);
01087 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
01088 CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
01089 int homePe = mgr->homePe(msg->index);
01090 if (homePe != CkMyPe()) {
01091 gmap[homePe].push_back(msg->locMgr);
01092 imap[homePe].push_back(msg->index);
01093 CkAbort("Missing element IDs");
01094 }
01095 #endif
01096 CkFreeMsg(msg);
01097 count ++;
01098 }
01099 #else
01100 CkArrayCheckPTMessage * msg = chkpTable[0].getCopy(CpvAccess(chkpPointer));
01101 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
01102 recoverAll(msg,gmap,imap);
01103 #else
01104 recoverAll(msg);
01105 #endif
01106 CkFreeMsg(msg);
01107 #endif
01108 curTime = CmiWallTimer();
01109 if (CkMyPe() == thisFailedPe)
01110 CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
01111 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
01112 for (int i=0; i<CkNumPes(); i++) {
01113 if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
01114 thisProxy[i].updateLocations(gmap[i].size(), gmap[i].data(), imap[i].data(), CkMyPe());
01115 flag++;
01116 }
01117 }
01118 delete [] imap;
01119 delete [] gmap;
01120 #endif
01121 DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
01122
01123 CKLOCMGR_LOOP(mgr->doneInserting(););
01124
01125
01126 CpvAccess(_crashedNode) = -1;
01127 inRestarting = false;
01128 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
01129 if (CkMyPe() == 0)
01130 CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
01131 #else
01132 if(flag == 0)
01133 {
01134 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
01135 }
01136 #endif
01137 }
01138
01139 void CkMemCheckPT::gotReply(){
01140 contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
01141 }
01142
01143 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,std::vector<CkGroupID> * gmap, std::vector<CkArrayIndex> * imap){
01144 #if CMK_CHKP_ALL
01145 PUP::fromMem p(msg->packData);
01146 int numElements = 0;
01147 p|numElements;
01148 if(p.isUnpacking()){
01149 for(int i=0;i<numElements;i++){
01150 CkGroupID gID;
01151 CkArrayIndex idx;
01152 CmiUInt8 id;
01153 p|gID;
01154 p|idx;
01155 p|id;
01156 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
01157 int homePe = mgr->homePe(idx);
01158 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
01159 mgr->resume(idx, id, p, true, true);
01160 #else
01161 mgr->resume(idx, id, p, false, true);
01162 #endif
01163 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
01164 homePe = mgr->homePe(idx);
01165 if (homePe != CkMyPe()) {
01166 gmap[homePe].push_back(gID);
01167 imap[homePe].push_back(idx);
01168 }
01169 #endif
01170 }
01171 }
01172 if(CkMyPe()==thisFailedPe)
01173 CkPrintf("recover all ends\n");
01174 #endif
01175 }
01176
01177 static double restartT;
01178
01179
01180
01181 void CkMemCheckPT::finishUp()
01182 {
01183
01184
01185 recvCount = peCount = 0;
01186 recvChkpCount = 0;
01187 if (CkMyPe() == thisFailedPe)
01188 {
01189 CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
01190
01191 cpCallback.send();
01192 CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
01193 }
01194 #if CMK_CONVERSE_MPI
01195 if (CmiMyPe() == BuddyPE(thisFailedPe)) {
01196 lastPingTime = CmiWallTimer();
01197 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
01198 }
01199 #endif
01200
01201 #if CK_NO_PROC_POOL
01202 #if NODE_CHECKPOINT
01203 int numnodes = CmiNumPhysicalNodes();
01204 #else
01205 int numnodes = CkNumPes();
01206 #endif
01207 if (numnodes-totalFailed() <=2) {
01208 if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
01209 _memChkptOn = false;
01210 }
01211 #endif
01212 }
01213
01214
01215 void CkMemCheckPT::quiescence(CkCallback &&cb)
01216 {
01217 static int pe_count = 0;
01218 pe_count ++;
01219 CmiAssert(CkMyPe() == 0);
01220
01221 if (pe_count == CkNumPes()) {
01222 pe_count = 0;
01223 cb.send();
01224 }
01225 }
01226
01227
01228
01229 void CkStartMemCheckpoint(CkCallback &cb)
01230 {
01231 #if CMK_MEM_CHECKPOINT
01232 if(cb.isInvalid())
01233 CkAbort("callback after checkpoint is not set properly");
01234 if (!_memChkptOn) {
01235 CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
01236 cb.send();
01237 return;
01238 }
01239 if (CkInRestarting()) {
01240
01241 cb.send();
01242 return;
01243 }
01244
01245 CkMemCheckPT::cpCallback = cb;
01246
01247
01248 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
01249 checkptMgr.doItNow(CkMyPe(), cb);
01250 #else
01251
01252 CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
01253 cb.send();
01254 #endif
01255 }
01256
01257 void CkRestartCheckPoint(int diePe)
01258 {
01259 CkPrintf("CkRestartCheckPoint CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
01260 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
01261
01262 checkptMgr.restart(diePe);
01263 }
01264
01265 static int _diePE = -1;
01266
01267
01268 static void CkRestartCheckPointCallback(void *ignore, void *msg)
01269 {
01270 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
01271 CkRestartCheckPoint(_diePE);
01272 }
01273
01274
01275 static int askPhaseHandlerIdx;
01276 static int recvPhaseHandlerIdx;
01277 static int askProcDataHandlerIdx;
01278 static int restartBcastHandlerIdx;
01279 static int recoverProcDataHandlerIdx;
01280 static int restartBeginHandlerIdx;
01281 static int notifyHandlerIdx;
01282 static int reportChkpSeqHandlerIdx;
01283 static int getChkpSeqHandlerIdx;
01284
01285
01286 static void restartBeginHandler(char *msg)
01287 {
01288 CmiFree(msg);
01289 #if CMK_MEM_CHECKPOINT
01290 #if CMK_USE_BARRIER
01291 if(CkMyPe()!=_diePE){
01292 printf("restar begin on %d\n",CkMyPe());
01293 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01294 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
01295 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
01296 }else{
01297 CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
01298 CkRestartCheckPointCallback(NULL, NULL);
01299 }
01300 #else
01301 static int count = 0;
01302 CmiAssert(CkMyPe() == _diePE);
01303 count ++;
01304 if (count == CkNumPes()) {
01305 CkRestartCheckPointCallback(NULL, NULL);
01306 count = 0;
01307 }
01308 #endif
01309 #endif
01310 }
01311
01312 extern void _discard_charm_message();
01313 extern void _resume_charm_message();
01314
01315 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
01316 return data;
01317 }
01318
01319 static void * minChkpNumMsg(int * size, void * data, void ** remote, int count)
01320 {
01321 int minNum = *(int *)((char *)data+CmiMsgHeaderSizeBytes);
01322 for(int i = 0; i < count;i++)
01323 {
01324 int num = *(int *)((char *)(remote[i])+CmiMsgHeaderSizeBytes);
01325 if(num != -1 && (num < minNum || minNum == -1))
01326 {
01327 minNum = num;
01328 }
01329 }
01330 *(int *)((char *)data+CmiMsgHeaderSizeBytes) = minNum;
01331 return data;
01332 }
01333
01334 static void restartBcastHandler(char *msg)
01335 {
01336 #if CMK_MEM_CHECKPOINT
01337
01338 CkMemCheckPT::inRestarting = true;
01339 _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
01340 CpvAccess(chkpNum) = *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int));
01341 CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
01342
01343 if (CkMyPe()==_diePE)
01344 CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
01345
01346 CmiFree(msg);
01347
01348 _resume_charm_message();
01349
01350
01351 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01352 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
01353 #if CMK_USE_BARRIER
01354
01355 CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
01356
01357 #else
01358 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
01359 #endif
01360 checkpointed = false;
01361 #endif
01362 }
01363
01364 extern void _initDone();
01365
01366
01367 static void recoverProcDataHandler(char *msg)
01368 {
01369 #if CMK_MEM_CHECKPOINT
01370 int i;
01371 envelope *env = (envelope *)msg;
01372 CkUnpackMessage(&env);
01373 CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
01374 CpvAccess(chkpNum) = procMsg->pointer;
01375 CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
01376 CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
01377 CmiPrintf("[%d] ----- recoverProcDataHandler cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
01378 PUP::fromMem p(procMsg->packData);
01379 _handleProcData(p);
01380
01381 CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
01382
01383 CKLOCMGR_LOOP(mgr->startInserting(););
01384
01385 char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
01386 *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
01387 *(int *)(reqmsg+CmiMsgHeaderSizeBytes+sizeof(int)) = CpvAccess(chkpNum);
01388 CmiSetHandler(reqmsg, restartBcastHandlerIdx);
01389 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)reqmsg);
01390
01391 _initDone();
01392
01393 CmiPrintf("[%d] ----- recoverProcDataHandler done at %f\n", CkMyPe(), CkWallTimer());
01394 #endif
01395 }
01396
01397
01398
01399 static void askProcDataHandler(char *msg)
01400 {
01401 #if CMK_MEM_CHECKPOINT
01402 int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
01403 CpvAccess(chkpNum) = *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int));
01404 CmiFree(msg);
01405 int pointer = CpvAccess(chkpNum)%2;
01406 CpvAccess(chkpPointer) = pointer;
01407 CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
01408 if (CpvAccess(procChkptBuf)[pointer] == NULL) {
01409 CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
01410 CkAbort("no checkpoint found");
01411 }
01412 CpvAccess(procChkptBuf)[pointer]->pointer = CpvAccess(chkpNum);
01413 envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)[pointer]));
01414 CmiAssert(CpvAccess(procChkptBuf)[pointer]->pe == diePe);
01415
01416 CpvAccess(procChkptBuf)[pointer]->cur_restart_phase = CpvAccess(_curRestartPhase);
01417 CkPackMessage(&env);
01418 CmiSetHandler(env, recoverProcDataHandlerIdx);
01419 CmiSyncSendAndFree(CpvAccess(procChkptBuf)[pointer]->pe, env->getTotalsize(), (char *)env);
01420 CpvAccess(procChkptBuf)[pointer] = NULL;
01421 CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
01422 #endif
01423 }
01424
01425
01426 void qd_callback(void *m)
01427 {
01428 CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
01429 CkFreeMsg(m);
01430
01431 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01432 CmiSetHandler(msg,reportChkpSeqHandlerIdx);
01433 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
01434 }
01435
01436 static void reportChkpSeqHandler(char * m)
01437 {
01438 CmiFree(m);
01439 CmiResetGlobalReduceSeqID();
01440 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
01441 int num = CpvAccess(chkpNum);
01442 if(CkMyNode() == CpvAccess(_crashedNode))
01443 {
01444 num = -1;
01445 }
01446 *(int *)(msg+CmiMsgHeaderSizeBytes) = num;
01447 CmiSetHandler(msg,getChkpSeqHandlerIdx);
01448 CmiReduce(msg,CmiMsgHeaderSizeBytes+sizeof(int),minChkpNumMsg);
01449 }
01450
01451 static void getChkpSeqHandler(char * m)
01452 {
01453 CpvAccess(chkpNum) = *(int *)(m+CmiMsgHeaderSizeBytes);
01454 CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
01455 CmiFree(m);
01456 #ifdef CMK_SMP
01457 for(int i=0;i<CmiMyNodeSize();i++){
01458 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
01459 *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
01460 *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int)) =CpvAccess(chkpNum);
01461 CmiSetHandler(msg, askProcDataHandlerIdx);
01462 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);
01463 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)msg);
01464 }
01465 return;
01466 #endif
01467 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
01468 *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
01469 *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int)) =CpvAccess(chkpNum);
01470
01471 CmiSetHandler(msg, askProcDataHandlerIdx);
01472 int pe = ChkptOnPe(CpvAccess(_crashedNode));
01473 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)msg);
01474 }
01475
01476 static void changePhaseHandler(char *msg){
01477 #if CMK_MEM_CHECKPOINT
01478 CpvAccess(_curRestartPhase)--;
01479 if(CkMyNode()==CpvAccess(_crashedNode)){
01480 if(CmiMyRank()==0){
01481 CkCallback cb(qd_callback);
01482 CkStartQD(cb);
01483 CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
01484 }
01485 }
01486 #endif
01487 }
01488
01489
01490 void CkMemRestart(const char *dummy, CkArgMsg *args)
01491 {
01492 #if CMK_MEM_CHECKPOINT
01493 _diePE = CmiMyNode();
01494 CkMemCheckPT::startTime = restartT = CmiWallTimer();
01495 CmiPrintf("[%d] I am restarting cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
01496 CkMemCheckPT::inRestarting = true;
01497
01498 CpvAccess( _crashedNode )= CmiMyNode();
01499
01500 _discard_charm_message();
01501 restartT = CmiWallTimer();
01502 CmiPrintf("[%d] I am restarting cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
01503
01504
01505 if(CmiNumPartitions()>1){
01506 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01507 CmiSetHandler(msg, changePhaseHandlerIdx);
01508 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
01509 }else{
01510
01511
01512
01513
01514
01515
01516 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01517 CmiSetHandler(msg,reportChkpSeqHandlerIdx);
01518 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
01519 }
01520 #else
01521 CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
01522 #endif
01523 }
01524
01525
01526
01527 int CkInRestarting()
01528 {
01529 #if CMK_MEM_CHECKPOINT
01530 if (CpvAccess( _crashedNode)!=-1) return 1;
01531
01532
01533
01534 return (int)CkMemCheckPT::inRestarting;
01535 #else
01536 return 0;
01537 #endif
01538 }
01539
01540 static int CkInCheckpointing()
01541 {
01542 return CkMemCheckPT::inCheckpointing;
01543 }
01544
01545 void CkSetInLdb(){
01546 #if CMK_MEM_CHECKPOINT
01547 CkMemCheckPT::inLoadbalancing = true;
01548 #endif
01549 }
01550
01551 int CkInLdb(){
01552 #if CMK_MEM_CHECKPOINT
01553 return (int)CkMemCheckPT::inLoadbalancing;
01554 #endif
01555 return 0;
01556 }
01557
01558 void CkResetInLdb(){
01559 #if CMK_MEM_CHECKPOINT
01560 CkMemCheckPT::inLoadbalancing = false;
01561 #endif
01562 }
01563
01564
01565
01566
01567
01568 static int arg_where = CkCheckPoint_inMEM;
01569
01570 #if CMK_MEM_CHECKPOINT
01571 void init_memcheckpt(char **argv)
01572 {
01573 if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
01574 arg_where = CkCheckPoint_inDISK;
01575 }
01576
01577
01578 CpvInitialize(int, _crashedNode);
01579 CpvAccess(_crashedNode) = -1;
01580
01581 }
01582 #endif
01583
01584 extern int quietModeRequested;
01585
01586 class CkMemCheckPTInit: public Chare {
01587 public:
01588 CkMemCheckPTInit(CkArgMsg *m) {
01589 delete m;
01590 #if CMK_MEM_CHECKPOINT
01591 if (arg_where == CkCheckPoint_inDISK) {
01592 if (!quietModeRequested) CkPrintf("Charm++> Double-disk Checkpointing. \n");
01593 }
01594 ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
01595 if (!quietModeRequested) CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
01596 #endif
01597 }
01598 };
01599
01600 static void notifyHandler(char *msg)
01601 {
01602 #if CMK_MEM_CHECKPOINT
01603 CmiFree(msg);
01604
01605 CpvAccess(_curRestartPhase) ++;
01606 CpvAccess(_qd)->flushStates();
01607 _discard_charm_message();
01608
01609 #endif
01610 }
01611
01612 static void notify_crash(int node)
01613 {
01614 #ifdef CMK_MEM_CHECKPOINT
01615 CpvAccess( _crashedNode) = node;
01616 #ifdef CMK_SMP
01617 for(int i=0;i<CkMyNodeSize();i++){
01618 CpvAccessOther(_crashedNode,i)=node;
01619 }
01620 #endif
01621 CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
01622 CkMemCheckPT::inRestarting = true;
01623
01624
01625 int pe = CmiNodeFirst(CkMyNode());
01626 for(int i=0;i<CkMyNodeSize();i++){
01627 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
01628 CmiSetHandler(msg, notifyHandlerIdx);
01629 CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
01630 }
01631 #endif
01632 }
01633
01634 extern void (*notify_crash_fn)(int node);
01635
01636 #if CMK_CONVERSE_MPI
01637
01638
01639
01640
01641
01642 void mpi_restart_crashed(int pe, int rank);
01643 int find_spare_mpirank(int pe,int partition);
01644
01645
01646
01647 static void replicaDieHandler(char * msg){
01648 #if CMK_MEM_CHECKPOINT
01649 #if CMK_HAS_PARTITION
01650
01651 CmiSetHandler(msg, replicaDieBcastHandlerIdx);
01652 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
01653 #endif
01654 #endif
01655 }
01656
01657 static void replicaDieBcastHandler(char *msg){
01658 #if CMK_MEM_CHECKPOINT
01659 #if CMK_HAS_PARTITION
01660 int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
01661 int partition = *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int));
01662 find_spare_mpirank(diePe,partition);
01663 CmiFree(msg);
01664 #endif
01665 #endif
01666 }
01667
01668 void buddyDieHandler(char *msg)
01669 {
01670 #if CMK_MEM_CHECKPOINT
01671
01672 int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
01673 notify_crash(diepe);
01674
01675 CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
01676 int newrank;
01677 newrank = find_spare_mpirank(diepe,CmiMyPartition());
01678 int buddy = obj->BuddyPE(CmiMyPe());
01679 if (buddy == diepe) {
01680 mpi_restart_crashed(diepe, newrank);
01681
01682 }
01683 #endif
01684 }
01685
01686 void pingHandler(void *msg)
01687 {
01688 lastPingTime = CmiWallTimer();
01689 CmiFree(msg);
01690 }
01691
01692 void pingCheckHandler()
01693 {
01694 #if CMK_MEM_CHECKPOINT
01695 double now = CmiWallTimer();
01696 if (lastPingTime > 0 && now - lastPingTime > FAIL_DET_THRESHOLD && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
01697 int i, pe, buddy;
01698
01699 CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
01700 for (i = 1; i < CmiNumPes(); i++) {
01701 pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
01702 if (obj->BuddyPE(pe) == CmiMyPe()) break;
01703 }
01704 buddy = pe;
01705 CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
01706
01707
01708
01709
01710
01711
01712
01713 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
01714 *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
01715 CmiSetHandler(msg, buddyDieHandlerIdx);
01716 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
01717 #if CMK_HAS_PARTITION
01718
01719 for(int i=0;i<CmiNumPartitions();i++){
01720 if(i!=CmiMyPartition()){
01721 char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
01722 *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
01723 *(int *)(rMsg+CmiMsgHeaderSizeBytes+sizeof(int)) = CmiMyPartition();
01724 CmiSetHandler(rMsg, replicaDieHandlerIdx);
01725 CmiInterSyncSendAndFree(CkMyPe(),i,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
01726 }
01727 }
01728 #endif
01729 }
01730 else
01731 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
01732 #endif
01733 }
01734
01735 void pingBuddy()
01736 {
01737 #if CMK_MEM_CHECKPOINT
01738 CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
01739 if (obj) {
01740 int buddy = obj->BuddyPE(CkMyPe());
01741
01742 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
01743 *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
01744 CmiSetHandler(msg, pingHandlerIdx);
01745 CmiGetRestartPhase(msg) = 9999;
01746 CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
01747 }
01748 CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
01749 #endif
01750 }
01751 #endif
01752
01753
01754 void CkRegisterRestartHandler( )
01755 {
01756 #if CMK_MEM_CHECKPOINT
01757 notifyHandlerIdx = CkRegisterHandler(notifyHandler);
01758 askProcDataHandlerIdx = CkRegisterHandler(askProcDataHandler);
01759 recoverProcDataHandlerIdx = CkRegisterHandler(recoverProcDataHandler);
01760 restartBcastHandlerIdx = CkRegisterHandler(restartBcastHandler);
01761 restartBeginHandlerIdx = CkRegisterHandler(restartBeginHandler);
01762 reportChkpSeqHandlerIdx = CkRegisterHandler(reportChkpSeqHandler);
01763 getChkpSeqHandlerIdx = CkRegisterHandler(getChkpSeqHandler);
01764
01765 #if CMK_CONVERSE_MPI
01766 pingHandlerIdx = CkRegisterHandler(pingHandler);
01767 pingCheckHandlerIdx = CkRegisterHandler(pingCheckHandler);
01768 buddyDieHandlerIdx = CkRegisterHandler(buddyDieHandler);
01769 replicaDieHandlerIdx = CkRegisterHandler(replicaDieHandler);
01770 replicaDieBcastHandlerIdx = CkRegisterHandler(replicaDieBcastHandler);
01771 #endif
01772 changePhaseHandlerIdx = CkRegisterHandler(changePhaseHandler);
01773
01774 CpvInitialize(CkProcCheckPTMessage **, procChkptBuf);
01775 CpvAccess(procChkptBuf) = new CkProcCheckPTMessage *[2];
01776 CpvAccess(procChkptBuf)[0] = NULL;
01777 CpvAccess(procChkptBuf)[1] = NULL;
01778
01779 CpvInitialize(int, chkpPointer);
01780 CpvAccess(chkpPointer) = 0;
01781 CpvInitialize(int, chkpNum);
01782 CpvAccess(chkpNum) = 0;
01783
01784 notify_crash_fn = notify_crash;
01785
01786 #if ! CMK_CONVERSE_MPI
01787
01788
01789
01790 #endif
01791 #endif
01792 }
01793
01794
01795 int CkHasCheckpoints()
01796 {
01797 return (int)checkpointed;
01798 }
01799
01801
01802
01806 #ifdef CMK_MEM_CHECKPOINT
01807 #if CMK_HAS_GETPID
01808 void killLocal(void *_dummy,double curWallTime){
01809 printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
01810 if(CmiWallTimer()<killTime-1){
01811 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);
01812 }else{
01813 #if CMK_CONVERSE_MPI
01814 CkDieNow();
01815 #else
01816 kill(getpid(),SIGKILL);
01817 #endif
01818 }
01819 }
01820 #else
01821 void killLocal(void *_dummy,double curWallTime){
01822 CmiAbort("kill() not supported!");
01823 }
01824 #endif
01825 #endif
01826
01827 #ifdef CMK_MEM_CHECKPOINT
01828
01831 void readKillFile(){
01832 FILE *fp=fopen(killFile,"r");
01833 if(!fp){
01834 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
01835 return;
01836 }
01837 int proc;
01838 double sec;
01839 while(fscanf(fp,"%d %lf",&proc,&sec)==2){
01840 if(proc == CkMyNode() && CkMyRank() == 0){
01841 killTime = CmiWallTimer()+sec;
01842 printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
01843 CcdCallFnAfter(killLocal,NULL,sec*1000);
01844 }
01845 }
01846 fclose(fp);
01847 }
01848
01849 #if ! CMK_CONVERSE_MPI
01850 void CkDieNow()
01851 {
01852 #if __FAULT__
01853
01854 CmiPrintf("[%d] die now.\n", CmiMyPe());
01855 killTime = CmiWallTimer()+0.001;
01856 CcdCallFnAfter(killLocal,NULL,1);
01857 #endif
01858 }
01859 #endif
01860
01861 #endif
01862
01863 #include "CkMemCheckpoint.def.h"
01864
01865