00001
00005
00006 #include <charm++.h>
00007 #include "ck.h"
00008 #include "envelope.h"
00009 #include "CentralLB.h"
00010 #include "LBDBManager.h"
00011 #include "LBSimulation.h"
00012
00013 #define DEBUGF(x) // CmiPrintf x;
00014 #define DEBUG(x) // x;
00015
00016 #if CMK_MEM_CHECKPOINT
00017
00018 #define USE_REDUCTION 0
00019 #define USE_LDB_SPANNING_TREE 0
00020 #elif defined(_FAULT_MLOG_)
00021
00022 #define USE_REDUCTION 0
00023 #define USE_LDB_SPANNING_TREE 0
00024 #else
00025 #define USE_REDUCTION 1
00026 #define USE_LDB_SPANNING_TREE 1
00027 #endif
00028
00029 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00030 extern int _restartFlag;
00031 extern void getGlobalStep(CkGroupID );
00032 extern void initMlogLBStep(CkGroupID );
00033 extern int globalResumeCount;
00034 extern void sendDummyMigrationCounts(int *);
00035 #endif
00036
00037 #if CMK_GRID_QUEUE_AVAILABLE
00038 CpvExtern(void *, CkGridObject);
00039 #endif
00040
00041 CkGroupID loadbalancer;
00042 int * lb_ptr;
00043 int load_balancer_created;
00044
00045 CreateLBFunc_Def(CentralLB, "CentralLB base class")
00046
00047 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
00048 LBMigrateMsg *, LBInfo &info, int considerComm);
00049
00050
00051
00052
00053
00054
00055
00056
00057 void CentralLB::staticStartLB(void* data)
00058 {
00059 CentralLB *me = (CentralLB*)(data);
00060 me->StartLB();
00061 }
00062
00063 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00064 {
00065 CentralLB *me = (CentralLB*)(data);
00066 me->Migrated(h, waitBarrier);
00067 }
00068
00069 void CentralLB::staticAtSync(void* data)
00070 {
00071 CentralLB *me = (CentralLB*)(data);
00072 me->AtSync();
00073 }
00074
00075 void CentralLB::initLB(const CkLBOptions &opt)
00076 {
00077 #if CMK_LBDB_ON
00078 lbname = "CentralLB";
00079 thisProxy = CProxy_CentralLB(thisgroup);
00080
00081
00082
00083 receiver = theLbdb->
00084 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
00085 notifier = theLbdb->getLBDB()->
00086 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
00087 startLbFnHdl = theLbdb->getLBDB()->
00088 AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
00089
00090
00091 if (opt.getSeqNo() > 0) turnOff();
00092
00093 stats_msg_count = 0;
00094 statsMsgsList = NULL;
00095 statsData = NULL;
00096
00097 storedMigrateMsg = NULL;
00098 reduction_started = 0;
00099
00100
00101 if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
00102 else predicted_model=0;
00103
00104 theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
00105
00106 myspeed = theLbdb->ProcessorSpeed();
00107
00108 migrates_completed = 0;
00109 future_migrates_completed = 0;
00110 migrates_expected = -1;
00111 future_migrates_expected = -1;
00112 cur_ld_balancer = _lb_args.central_pe();
00113 lbdone = 0;
00114 count_msgs=0;
00115 statsMsg = NULL;
00116
00117 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
00118
00119 load_balancer_created = 1;
00120 #endif
00121 }
00122
00123 CentralLB::~CentralLB()
00124 {
00125 #if CMK_LBDB_ON
00126 delete [] statsMsgsList;
00127 delete statsData;
00128 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00129 if (theLbdb) {
00130 theLbdb->getLBDB()->
00131 RemoveNotifyMigrated(notifier);
00132 theLbdb->
00133 RemoveStartLBFn((LDStartLBFn)(staticStartLB));
00134 }
00135 #endif
00136 }
00137
00138 void CentralLB::turnOn()
00139 {
00140 #if CMK_LBDB_ON
00141 theLbdb->getLBDB()->
00142 TurnOnBarrierReceiver(receiver);
00143 theLbdb->getLBDB()->
00144 TurnOnNotifyMigrated(notifier);
00145 theLbdb->getLBDB()->
00146 TurnOnStartLBFn(startLbFnHdl);
00147 #endif
00148 }
00149
00150 void CentralLB::turnOff()
00151 {
00152 #if CMK_LBDB_ON
00153 theLbdb->getLBDB()->
00154 TurnOffBarrierReceiver(receiver);
00155 theLbdb->getLBDB()->
00156 TurnOffNotifyMigrated(notifier);
00157 theLbdb->getLBDB()->
00158 TurnOffStartLBFn(startLbFnHdl);
00159 #endif
00160 }
00161
00162 void CentralLB::AtSync()
00163 {
00164 #if CMK_LBDB_ON
00165 DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
00166
00167 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00168 CpvAccess(_currentObj)=this;
00169 #endif
00170
00171
00172 if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
00173 MigrationDone(0);
00174 return;
00175 }
00176 if(CmiNodeAlive(CkMyPe())){
00177 thisProxy [CkMyPe()].ProcessAtSync();
00178 }
00179 #endif
00180 }
00181
00182 #include "ComlibStrategy.h"
00183
00184 void CentralLB::ProcessAtSync()
00185 {
00186 #if CMK_LBDB_ON
00187 if (reduction_started) return;
00188
00189 CmiAssert(CmiNodeAlive(CkMyPe()));
00190 if (CkMyPe() == cur_ld_balancer) {
00191 start_lb_time = CkWallTimer();
00192 }
00193
00194
00195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00196 initMlogLBStep(thisgroup);
00197 #endif
00198
00199
00200 BuildStatsMsg();
00201
00202 #if USE_REDUCTION
00203
00204
00205 int counts[2];
00206 counts[0] = theLbdb->GetObjDataSz();
00207 counts[1] = theLbdb->GetCommDataSz();
00208
00209 CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL),
00210 thisProxy[0]);
00211 contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
00212 reduction_started = 1;
00213 #else
00214 SendStats();
00215 #endif
00216 #endif
00217 }
00218
00219
00220 void CentralLB::ReceiveCounts(CkReductionMsg *msg)
00221 {
00222 CmiAssert(CkMyPe() == 0);
00223 if (statsData == NULL) statsData = new LDStats;
00224
00225 int *counts = (int *)msg->getData();
00226 int n_objs = counts[0];
00227 int n_comm = counts[1];
00228
00229
00230 statsData->objData.resize(n_objs);
00231 statsData->from_proc.resize(n_objs);
00232 statsData->to_proc.resize(n_objs);
00233 statsData->commData.resize(n_comm);
00234
00235 DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
00236
00237
00238 thisProxy.SendStats();
00239 }
00240
00241 void CentralLB::BuildStatsMsg()
00242 {
00243 #if CMK_LBDB_ON
00244
00245 const int osz = theLbdb->GetObjDataSz();
00246 const int csz = theLbdb->GetCommDataSz();
00247
00248 int npes = CkNumPes();
00249 CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
00250 _MEMCHECK(msg);
00251 msg->from_pe = CkMyPe();
00252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00253 msg->step = step();
00254 #endif
00255
00256
00257
00258
00259
00260
00261
00262 #if CMK_LB_CPUTIMER
00263 theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
00264 &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
00265 #else
00266 theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
00267 &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
00268 #endif
00269
00270 msg->pe_speed = myspeed;
00271 DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
00272
00273 msg->n_objs = osz;
00274 theLbdb->GetObjData(msg->objData);
00275 msg->n_comm = csz;
00276 theLbdb->GetCommData(msg->commData);
00277
00278 DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
00279
00280 if(CkMyPe() == cur_ld_balancer) {
00281 msg->avail_vector = new char[CkNumPes()];
00282 LBDatabaseObj()->get_avail_vector(msg->avail_vector);
00283 msg->next_lb = LBDatabaseObj()->new_lbbalancer();
00284 }
00285
00286 CmiAssert(statsMsg == NULL);
00287 statsMsg = msg;
00288 #endif
00289 }
00290
00291
00292 void CentralLB::SendStats()
00293 {
00294 #if CMK_LBDB_ON
00295 CmiAssert(statsMsg != NULL);
00296 reduction_started = 0;
00297
00298 #if USE_LDB_SPANNING_TREE
00299 if(CkNumPes()>1024)
00300 {
00301 if (CkMyPe() == cur_ld_balancer)
00302 thisProxy[CkMyPe()].ReceiveStats(statsMsg);
00303 else
00304 thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
00305 }
00306 else
00307 #endif
00308 {
00309 DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
00310 thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
00311 }
00312
00313 statsMsg = NULL;
00314
00315 #ifdef __BIGSIM__
00316 BgEndStreaming();
00317 #endif
00318
00319 {
00320
00321 LDOMHandle h;
00322 h.id.id.idx = 0;
00323 theLbdb->getLBDB()->RegisteringObjects(h);
00324 }
00325 #endif
00326 }
00327
00328 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00329 extern int donotCountMigration;
00330 #endif
00331
00332 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
00333 {
00334 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00335 if(donotCountMigration){
00336 return ;
00337 }
00338 #endif
00339
00340 #if CMK_LBDB_ON
00341 if (waitBarrier) {
00342 migrates_completed++;
00343 DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
00344 if (migrates_completed == migrates_expected) {
00345 MigrationDone(1);
00346 }
00347 }
00348 else {
00349 future_migrates_completed ++;
00350 DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
00351 if (future_migrates_completed == future_migrates_expected) {
00352 CheckMigrationComplete();
00353 }
00354 }
00355 #endif
00356 }
00357
00358 void CentralLB::MissMigrate(int waitForBarrier)
00359 {
00360 LDObjHandle h;
00361 Migrated(h, waitForBarrier);
00362 }
00363
00364
00365
00366 void CentralLB::buildStats()
00367 {
00368 statsData->nprocs() = stats_msg_count;
00369
00370 statsData->objData.resize(statsData->n_objs);
00371 statsData->from_proc.resize(statsData->n_objs);
00372 statsData->to_proc.resize(statsData->n_objs);
00373 statsData->commData.resize(statsData->n_comm);
00374
00375 int nobj = 0;
00376 int ncom = 0;
00377 int nmigobj = 0;
00378
00379 for (int pe=0; pe<CkNumPes(); pe++) {
00380 int i;
00381 CLBStatsMsg *msg = statsMsgsList[pe];
00382 if(msg == NULL) continue;
00383 for (i=0; i<msg->n_objs; i++) {
00384 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
00385 statsData->objData[nobj] = msg->objData[i];
00386 if (msg->objData[i].migratable) nmigobj++;
00387 nobj++;
00388 }
00389 for (i=0; i<msg->n_comm; i++) {
00390 statsData->commData[ncom] = msg->commData[i];
00391 ncom++;
00392 }
00393
00394 delete msg;
00395 statsMsgsList[pe]=0;
00396 }
00397 statsData->n_migrateobjs = nmigobj;
00398 }
00399
00400
00401
00402
00403 void CentralLB::depositData(CLBStatsMsg *m)
00404 {
00405 int i;
00406 if (m == NULL) return;
00407
00408 const int pe = m->from_pe;
00409 struct ProcStats &procStat = statsData->procs[pe];
00410 procStat.pe = pe;
00411 procStat.total_walltime = m->total_walltime;
00412 procStat.idletime = m->idletime;
00413 procStat.bg_walltime = m->bg_walltime;
00414 #if CMK_LB_CPUTIMER
00415 procStat.total_cputime = m->total_cputime;
00416 procStat.bg_cputime = m->bg_cputime;
00417 #endif
00418 procStat.pe_speed = m->pe_speed;
00419
00420 procStat.available = CmiTrue;
00421 procStat.n_objs = m->n_objs;
00422
00423 int &nobj = statsData->n_objs;
00424 int &nmigobj = statsData->n_migrateobjs;
00425 for (i=0; i<m->n_objs; i++) {
00426 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
00427 statsData->objData[nobj] = m->objData[i];
00428 if (m->objData[i].migratable) nmigobj++;
00429 nobj++;
00430 CmiAssert(nobj <= statsData->objData.capacity());
00431 }
00432 int &n_comm = statsData->n_comm;
00433 for (i=0; i<m->n_comm; i++) {
00434 statsData->commData[n_comm] = m->commData[i];
00435 n_comm++;
00436 CmiAssert(n_comm <= statsData->commData.capacity());
00437 }
00438 delete m;
00439 }
00440
00441 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
00442 {
00443 #if CMK_LBDB_ON
00444 if (statsMsgsList == NULL) {
00445 statsMsgsList = new CLBStatsMsg*[CkNumPes()];
00446 CmiAssert(statsMsgsList != NULL);
00447 for(int i=0; i < CkNumPes(); i++)
00448 statsMsgsList[i] = 0;
00449 }
00450 if (statsData == NULL) statsData = new LDStats;
00451
00452
00453 int count = msg.getCount();
00454 for (int num = 0; num < count; num++)
00455 {
00456 CLBStatsMsg *m = msg.getMessage(num);
00457 CmiAssert(m!=NULL);
00458 const int pe = m->from_pe;
00459 DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
00460 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00461
00462
00463
00464
00465
00466
00467 #endif
00468
00469 if(!CmiNodeAlive(pe)){
00470 DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
00471 continue;
00472 }
00473
00474 if (m->avail_vector!=NULL) {
00475 LBDatabaseObj()->set_avail_vector(m->avail_vector, m->next_lb);
00476 }
00477
00478 if (statsMsgsList[pe] != 0) {
00479 CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
00480 pe);
00481 } else {
00482 statsMsgsList[pe] = m;
00483 #if USE_REDUCTION
00484 depositData(m);
00485 #else
00486
00487 struct ProcStats &procStat = statsData->procs[pe];
00488 procStat.pe = pe;
00489 procStat.total_walltime = m->total_walltime;
00490 procStat.idletime = m->idletime;
00491 procStat.bg_walltime = m->bg_walltime;
00492 #if CMK_LB_CPUTIMER
00493 procStat.total_cputime = m->total_cputime;
00494 procStat.bg_cputime = m->bg_cputime;
00495 #endif
00496 procStat.pe_speed = m->pe_speed;
00497
00498 procStat.available = CmiTrue;
00499 procStat.n_objs = m->n_objs;
00500
00501 statsData->n_objs += m->n_objs;
00502 statsData->n_comm += m->n_comm;
00503 #endif
00504 stats_msg_count++;
00505 }
00506 }
00507
00508 const int clients = CkNumValidPes();
00509 DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
00510
00511 if (stats_msg_count == clients) {
00512 DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
00513 statsData->nprocs() = stats_msg_count;
00514 thisProxy[CkMyPe()].LoadBalance();
00515 }
00516 #endif
00517 }
00518
00520 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
00521 {
00522 #if CMK_LBDB_ON
00523 CmiAssert(CkMyPe() != 0);
00524 bufMsg.add(msg);
00525 count_msgs++;
00526
00527 if (count_msgs == st.numChildren+1) {
00528 if(st.parent == 0)
00529 {
00530 thisProxy[0].ReceiveStats(bufMsg);
00531
00532 }
00533 else
00534 thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
00535 count_msgs = 0;
00536 bufMsg.free();
00537 }
00538 #endif
00539 }
00540
00541 void CentralLB::LoadBalance()
00542 {
00543 #if CMK_LBDB_ON
00544 int proc;
00545 const int clients = CkNumPes();
00546
00547 #if ! USE_REDUCTION
00548
00549 buildStats();
00550 #else
00551 for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
00552 #endif
00553
00554 if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
00555
00556 if (_lb_args.debug())
00557 CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
00558 lbname, cur_ld_balancer, step(), start_lb_time,
00559 CmiMemoryUsage()/(1024.0*1024.0));
00560
00561
00562 if (LBSimulation::doSimulation) simulationRead();
00563
00564 char *availVector = LBDatabaseObj()->availVector();
00565 for(proc = 0; proc < clients; proc++)
00566 statsData->procs[proc].available = (CmiBool)availVector[proc];
00567
00568 preprocess(statsData);
00569
00570
00571
00572 if (_lb_args.printSummary()) {
00573 LBInfo info(clients);
00574
00575 info.getInfo(statsData, clients, 0);
00576 LBRealType mLoad, mCpuLoad, totalLoad;
00577 info.getSummary(mLoad, mCpuLoad, totalLoad);
00578 int nmsgs, nbytes;
00579 statsData->computeNonlocalComm(nmsgs, nbytes);
00580 CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
00581
00582
00583
00584
00585 }
00586
00587 #if CMK_REPLAYSYSTEM
00588 LDHandle *loadBalancer_pointers;
00589 if (_replaySystem) {
00590 loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
00591 for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
00592 }
00593 #endif
00594
00595 LBMigrateMsg* migrateMsg = Strategy(statsData);
00596 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00597 migrateMsg->step = step();
00598 #endif
00599
00600 #if CMK_REPLAYSYSTEM
00601 CpdHandleLBMessage(&migrateMsg);
00602 if (_replaySystem) {
00603 for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
00604 free(loadBalancer_pointers);
00605 }
00606 #endif
00607
00608 LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
00609 migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
00610
00611
00612 simulationWrite();
00613
00614
00615
00616 if (_lb_args.printSummary()) {
00617 LBInfo info(clients);
00618
00619 getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
00620 LBRealType mLoad, mCpuLoad, totalLoad;
00621 info.getSummary(mLoad, mCpuLoad, totalLoad);
00622 int nmsgs, nbytes;
00623 statsData->computeNonlocalComm(nmsgs, nbytes);
00624 CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
00625 for (int i=0; i<clients; i++)
00626 migrateMsg->expectedLoad[i] = info.peLoads[i];
00627 }
00628
00629 DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
00630 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00631 lbDecisionCount++;
00632 migrateMsg->lbDecisionCount = lbDecisionCount;
00633 #endif
00634
00635 envelope *env = UsrToEnv(migrateMsg);
00636 if (1) {
00637
00638 thisProxy.ReceiveMigration(migrateMsg);
00639 }
00640 else {
00641
00642 for (int p=0; p<CkNumPes(); p++) {
00643 LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
00644 thisProxy[p].ReceiveMigration(m);
00645 }
00646 delete migrateMsg;
00647 }
00648
00649
00650
00651 statsData->clear();
00652 stats_msg_count=0;
00653 #endif
00654 }
00655
00656
00657 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
00658 {
00659 #if CMK_LBDB_ON
00660 for (int pe=0 ; pe<count; pe++)
00661 {
00662 for (int i=0; i<len[pe]; i++)
00663 if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
00664 LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID()))
00665 return 0;
00666 }
00667 #endif
00668 return 1;
00669 }
00670
00671
00672 void CentralLB::removeNonMigratable(LDStats* stats, int count)
00673 {
00674 int i;
00675
00676
00677 int have = 0;
00678 for (i=0; i<stats->n_objs; i++)
00679 {
00680 LDObjData &odata = stats->objData[i];
00681 if (!odata.migratable) {
00682 have = 1; break;
00683 }
00684 }
00685 if (have == 0) return;
00686
00687 CkVec<LDObjData> nonmig;
00688 CkVec<int> new_from_proc, new_to_proc;
00689 nonmig.resize(stats->n_migrateobjs);
00690 new_from_proc.resize(stats->n_migrateobjs);
00691 new_to_proc.resize(stats->n_migrateobjs);
00692 int n_objs = 0;
00693 for (i=0; i<stats->n_objs; i++)
00694 {
00695 LDObjData &odata = stats->objData[i];
00696 if (odata.migratable) {
00697 nonmig[n_objs] = odata;
00698 new_from_proc[n_objs] = stats->from_proc[i];
00699 new_to_proc[n_objs] = stats->to_proc[i];
00700 n_objs ++;
00701 }
00702 else {
00703 stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
00704 #if CMK_LB_CPUTIMER
00705 stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
00706 #endif
00707 }
00708 }
00709 CmiAssert(stats->n_migrateobjs == n_objs);
00710
00711 stats->makeCommHash();
00712
00713 CkVec<LDCommData> newCommData;
00714 newCommData.resize(stats->n_comm);
00715 int n_comm = 0;
00716 for (i=0; i<stats->n_comm; i++)
00717 {
00718 LDCommData& cdata = stats->commData[i];
00719 if (!cdata.from_proc())
00720 {
00721 int idx = stats->getSendHash(cdata);
00722 CmiAssert(idx != -1);
00723 if (!stats->objData[idx].migratable) continue;
00724 }
00725 switch (cdata.receiver.get_type()) {
00726 case LD_PROC_MSG:
00727 break;
00728 case LD_OBJ_MSG: {
00729 int idx = stats->getRecvHash(cdata);
00730 if (stats->complete_flag)
00731 CmiAssert(idx != -1);
00732 else if (idx == -1) continue;
00733 if (!stats->objData[idx].migratable) continue;
00734 break;
00735 }
00736 case LD_OBJLIST_MSG:
00737 break;
00738 }
00739 newCommData[n_comm] = cdata;
00740 n_comm ++;
00741 }
00742
00743 if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
00744
00745
00746 stats->objData = nonmig;
00747 stats->from_proc = new_from_proc;
00748 stats->to_proc = new_to_proc;
00749 stats->n_objs = n_objs;
00750
00751 stats->commData = newCommData;
00752 stats->n_comm = n_comm;
00753
00754 stats->deleteCommHash();
00755 stats->makeCommHash();
00756
00757 }
00758
00759
00760 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00761 extern int restarted;
00762 #endif
00763
00764 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
00765 {
00766 storedMigrateMsg = m;
00767 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00768 ProcessReceiveMigration((CkReductionMsg*)NULL);
00769 #else
00770 CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
00771 thisProxy);
00772 contribute(0, NULL, CkReduction::max_int, cb);
00773 #endif
00774 }
00775
00776 void CentralLB::ProcessReceiveMigration(CkReductionMsg *msg)
00777 {
00778 #if CMK_LBDB_ON
00779 int i;
00780 LBMigrateMsg *m = storedMigrateMsg;
00781 CmiAssert(m!=NULL);
00782 delete msg;
00783
00784 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00785 int *dummyCounts;
00786
00787 DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
00788
00789 if(step() > m->step){
00790 char str[100];
00791 envelope *env = UsrToEnv(m);
00792 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
00793 return;
00794 }
00795 lbDecisionCount = m->lbDecisionCount;
00796 #endif
00797
00798 if (_lb_args.debug() > 1)
00799 if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
00800
00801 for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
00802 CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
00803
00804 if(!CmiNodeAlive(CkMyPe())){
00805 delete m;
00806 return;
00807 }
00808 migrates_expected = 0;
00809 future_migrates_expected = 0;
00810 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00811 int sending=0;
00812 int dummy=0;
00813 LBDB *_myLBDB = theLbdb->getLBDB();
00814 if(_restartFlag){
00815 dummyCounts = new int[CmiNumPes()];
00816 bzero(dummyCounts,sizeof(int)*CmiNumPes());
00817 }
00818 #endif
00819 for(i=0; i < m->n_moves; i++) {
00820 MigrateInfo& move = m->moves[i];
00821 const int me = CkMyPe();
00822 if (move.from_pe == me && move.to_pe != me) {
00823 DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
00824
00825 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
00826 if (theLbdb->Migrate(move.obj,move.to_pe) == 0)
00827 thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
00828 #else
00829 if(_restartFlag == 0){
00830 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
00831 theLbdb->Migrate(move.obj,move.to_pe);
00832 sending++;
00833 }else{
00834 if(_myLBDB->validObjHandle(move.obj)){
00835 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
00836 theLbdb->Migrate(move.obj,move.to_pe);
00837 sending++;
00838 }else{
00839 DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
00840 dummyCounts[move.to_pe]++;
00841 dummy++;
00842 }
00843 }
00844 #endif
00845 } else if (move.from_pe != me && move.to_pe == me) {
00846 DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
00847 if (!move.async_arrival) migrates_expected++;
00848 else future_migrates_expected++;
00849 }
00850 }
00851 DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
00852
00853
00854 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00855 if(_restartFlag){
00856 sendDummyMigrationCounts(dummyCounts);
00857 _restartFlag =0;
00858 delete []dummyCounts;
00859 }
00860 #endif
00861
00862
00863 #if 0
00864 if (m->n_moves ==0) {
00865 theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
00866 }
00867 #endif
00868 cur_ld_balancer = m->next_lb;
00869 if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
00870 LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
00871 }
00872
00873 if (migrates_expected == 0 || migrates_completed == migrates_expected)
00874 MigrationDone(1);
00875 delete m;
00876
00877
00878 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00879
00880
00881 #endif
00882 #endif
00883 }
00884
00885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00886 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
00887 DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
00888
00889
00890
00891
00892 thisProxy[CkMyPe()].ResumeClients(1);
00893 }
00894 #endif
00895
00896 void CentralLB::MigrationDone(int balancing)
00897 {
00898 #if CMK_LBDB_ON
00899 migrates_completed = 0;
00900 migrates_expected = -1;
00901
00902 if (balancing) theLbdb->ClearLoads();
00903
00904 theLbdb->incStep();
00905 DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
00906
00907
00908 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00909 savedBalancing = balancing;
00910 startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
00911 #endif
00912
00913 LBDatabase::Object()->MigrationDone();
00914
00915 LoadbalanceDone(balancing);
00916 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
00917
00918 if (balancing && _lb_args.syncResume()) {
00919 CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
00920 thisProxy);
00921 contribute(0, NULL, CkReduction::sum_int, cb);
00922 }
00923 else{
00924 if(CmiNodeAlive(CkMyPe())){
00925 thisProxy [CkMyPe()].ResumeClients(balancing);
00926 }
00927 }
00928 #if CMK_GRID_QUEUE_AVAILABLE
00929 CmiGridQueueDeregisterAll ();
00930 CpvAccess(CkGridObject) = NULL;
00931 #endif
00932 #endif
00933 #endif
00934 }
00935
00936 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00937 void CentralLB::endMigrationDone(int balancing){
00938 DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
00939
00940
00941 if (balancing && _lb_args.syncResume()) {
00942 CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
00943 thisProxy);
00944 contribute(0, NULL, CkReduction::sum_int, cb);
00945 }
00946 else{
00947 if(CmiNodeAlive(CkMyPe())){
00948 DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
00949 thisProxy [CkMyPe()].ResumeClients(balancing);
00950 }
00951 }
00952
00953 }
00954 #endif
00955
00956 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00957 void resumeCentralLbAfterChkpt(void *_lb){
00958 CentralLB *lb= (CentralLB *)_lb;
00959 CpvAccess(_currentObj)=lb;
00960 lb->endMigrationDone(lb->savedBalancing);
00961 }
00962 #endif
00963
00964
00965 void CentralLB::ResumeClients(CkReductionMsg *msg)
00966 {
00967 ResumeClients(1);
00968 delete msg;
00969 }
00970
00971 void CentralLB::ResumeClients(int balancing)
00972 {
00973 #if CMK_LBDB_ON
00974 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00975 resumeCount++;
00976 globalResumeCount = resumeCount;
00977 #endif
00978 DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
00979 if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
00980 double end_lb_time = CkWallTimer();
00981 }
00982
00983 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
00984 if (balancing) ComlibNotifyMigrationDone();
00985 #endif
00986
00987 theLbdb->ResumeClients();
00988 if (balancing) {
00989 CheckMigrationComplete();
00990 if (future_migrates_expected == 0 ||
00991 future_migrates_expected == future_migrates_completed) {
00992 CheckMigrationComplete();
00993 }
00994 }
00995 #endif
00996 }
00997
00998
00999
01000
01001
01002
01003
01004
01005
01006 void CentralLB::CheckMigrationComplete()
01007 {
01008 #if CMK_LBDB_ON
01009 lbdone ++;
01010 if (lbdone == 2) {
01011 if (_lb_args.debug() && CkMyPe()==0) {
01012 double end_lb_time = CkWallTimer();
01013 CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
01014 lbname, cur_ld_balancer, step()-1, end_lb_time,
01015 end_lb_time-start_lb_time);
01016 }
01017 lbdone = 0;
01018 future_migrates_expected = -1;
01019 future_migrates_completed = 0;
01020 DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
01021
01022 LDOMHandle h;
01023 h.id.id.idx = 0;
01024 theLbdb->getLBDB()->DoneRegisteringObjects(h);
01025
01026
01027 theLbdb->nextLoadbalancer(seqno);
01028 }
01029 #endif
01030 }
01031
01032 void CentralLB::preprocess(LDStats* stats)
01033 {
01034 if (_lb_args.ignoreBgLoad())
01035 stats->clearBgLoad();
01036
01037
01038 if (_lb_predict) FuturePredictor(statsData);
01039 }
01040
01041
01042 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
01043 {
01044 #if CMK_LBDB_ON
01045 double strat_start_time = CkWallTimer();
01046 if (_lb_args.debug())
01047 CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
01048
01049 work(stats);
01050
01051 if (_lb_args.debug()>2) {
01052 CkPrintf("CharmLB> Obj Map:\n");
01053 for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
01054 CkPrintf("\n");
01055 }
01056
01057 LBMigrateMsg *msg = createMigrateMsg(stats);
01058
01059 if (_lb_args.debug()) {
01060 double strat_end_time = CkWallTimer();
01061 envelope *env = UsrToEnv(msg);
01062
01063 double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
01064 CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
01065 lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
01066 CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
01067 CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
01068 lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
01069 }
01070
01071 return msg;
01072 #else
01073 return NULL;
01074 #endif
01075 }
01076
01077 void CentralLB::work(LDStats* stats)
01078 {
01079
01080 stats->print();
01081 }
01082
01083
01084 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
01085 {
01086 int i;
01087 CkVec<MigrateInfo*> migrateInfo;
01088 for (i=0; i<stats->n_objs; i++) {
01089 LDObjData &objData = stats->objData[i];
01090 int frompe = stats->from_proc[i];
01091 int tope = stats->to_proc[i];
01092 if (frompe != tope) {
01093
01094
01095 MigrateInfo *migrateMe = new MigrateInfo;
01096 migrateMe->obj = objData.handle;
01097 migrateMe->from_pe = frompe;
01098 migrateMe->to_pe = tope;
01099 migrateMe->async_arrival = objData.asyncArrival;
01100 migrateInfo.insertAtEnd(migrateMe);
01101 }
01102 }
01103
01104 int migrate_count=migrateInfo.length();
01105 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
01106 msg->n_moves = migrate_count;
01107 for(i=0; i < migrate_count; i++) {
01108 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
01109 msg->moves[i] = *item;
01110 delete item;
01111 migrateInfo[i] = 0;
01112 }
01113 return msg;
01114 }
01115
01116 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
01117 {
01118 int nmoves = 0;
01119 int nunavail = 0;
01120 int i;
01121 for (i=0; i<m->n_moves; i++) {
01122 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
01123 if (item->from_pe == p || item->to_pe == p) nmoves++;
01124 }
01125 for (i=0; i<CkNumPes();i++) {
01126 if (!m->avail_vector[i]) nunavail++;
01127 }
01128 LBMigrateMsg* msg;
01129 if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
01130 else msg = new(nmoves,0,0,0) LBMigrateMsg;
01131 msg->n_moves = nmoves;
01132 msg->level = m->level;
01133 msg->next_lb = m->next_lb;
01134 for (i=0,nmoves=0; i<m->n_moves; i++) {
01135 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
01136 if (item->from_pe == p || item->to_pe == p) {
01137 msg->moves[nmoves] = *item;
01138 nmoves++;
01139 }
01140 }
01141
01142 if (nunavail)
01143 for (i=0; i<CkNumPes();i++) {
01144 msg->avail_vector[i] = m->avail_vector[i];
01145 msg->expectedLoad[i] = m->expectedLoad[i];
01146 }
01147 return msg;
01148 }
01149
01150 void CentralLB::simulationWrite() {
01151 if(step() == LBSimulation::dumpStep)
01152 {
01153
01154 int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
01155 char *dumpFileName = (char *)malloc(dumpFileSize);
01156 while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
01157 free(dumpFileName);
01158 dumpFileSize+=3;
01159 dumpFileName = (char *)malloc(dumpFileSize);
01160 }
01161 writeStatsMsgs(dumpFileName);
01162 free(dumpFileName);
01163 CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
01164 ++LBSimulation::dumpStep;
01165 --LBSimulation::dumpStepSize;
01166 if (LBSimulation::dumpStepSize <= 0) {
01167 CmiPrintf("Charm++> Exiting...\n");
01168 CkExit();
01169 }
01170 return;
01171 }
01172 }
01173
01174 void CentralLB::simulationRead() {
01175 LBSimulation *simResults = NULL, *realResults;
01176 LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
01177 voidMessage->n_moves=0;
01178 for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
01179
01180 int simFileSize = strlen(LBSimulation::dumpFile) + 4;
01181 char *simFileName = (char *)malloc(simFileSize);
01182 while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
01183 free(simFileName);
01184 simFileSize+=3;
01185 simFileName = (char *)malloc(simFileSize);
01186 }
01187 readStatsMsgs(simFileName);
01188
01189
01190 if (simResults == NULL) {
01191 simResults = new LBSimulation(LBSimulation::simProcs);
01192 realResults = new LBSimulation(LBSimulation::simProcs);
01193 }
01194 else {
01195
01196 if (!LBSimulation::procsChanged) {
01197
01198
01199
01200 realResults->reset();
01201
01202 for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
01203 findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
01204 simResults->PrintDifferences(realResults,statsData);
01205 }
01206 simResults->reset();
01207 }
01208
01209
01210 double startT = CkWallTimer();
01211 preprocess(statsData);
01212 CmiPrintf("%s> Strategy starts ... \n", lbname);
01213 LBMigrateMsg* migrateMsg = Strategy(statsData);
01214 CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
01215 lbname, CkWallTimer()-startT, (int)(useMem()/1000));
01216
01217
01218 findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
01219
01220
01221 CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
01222
01223 if (LBSimulation::showDecisionsOnly) {
01224 simResults->PrintDecisions(migrateMsg, simFileName,
01225 LBSimulation::simProcs);
01226 } else {
01227 simResults->PrintSimulationResults();
01228 }
01229
01230 free(simFileName);
01231 delete migrateMsg;
01232 CmiPrintf("Charm++> LBSim: Passing to the next step\n");
01233 }
01234
01235 delete simResults;
01236 CmiPrintf("Charm++> Exiting...\n");
01237 CkExit();
01238 }
01239
01240 void CentralLB::readStatsMsgs(const char* filename)
01241 {
01242 #if CMK_LBDB_ON
01243 int i;
01244 FILE *f = fopen(filename, "r");
01245 if (f==NULL) {
01246 CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
01247 CmiAbort("");
01248 }
01249
01250
01251
01252
01253 if (statsMsgsList) {
01254 for(i = 0; i < stats_msg_count; i++)
01255 delete statsMsgsList[i];
01256 delete[] statsMsgsList;
01257 statsMsgsList=0;
01258 }
01259
01260 PUP::fromDisk pd(f);
01261 PUP::machineInfo machInfo;
01262
01263 pd((char *)&machInfo, sizeof(machInfo));
01264 PUP::xlater p(machInfo, pd);
01265
01266 if (_lb_args.lbversion() > 1) {
01267 p|_lb_args.lbversion();
01268 CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
01269 CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
01270 }
01271 p|stats_msg_count;
01272
01273 CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
01274 if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
01275 if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
01276
01277
01278 statsData->pup(p);
01279
01280 CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
01281 CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
01282
01283
01284 CmiPrintf("ReadStatsMsg from %s completed\n", filename);
01285 #endif
01286 }
01287
01288 void CentralLB::writeStatsMsgs(const char* filename)
01289 {
01290 #if CMK_LBDB_ON
01291 FILE *f = fopen(filename, "w");
01292 if (f==NULL) {
01293 CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
01294 CmiAbort("");
01295 }
01296
01297 const PUP::machineInfo &machInfo = PUP::machineInfo::current();
01298 PUP::toDisk p(f);
01299 p((char *)&machInfo, sizeof(machInfo));
01300
01301 p|_lb_args.lbversion();
01302 p|stats_msg_count;
01303 statsData->pup(p);
01304
01305 fclose(f);
01306
01307 CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
01308 #endif
01309 }
01310
01311
01312
01313 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
01314 LBMigrateMsg *msg, LBInfo &info,
01315 int considerComm)
01316 {
01317 #if CMK_LBDB_ON
01318 stats->makeCommHash();
01319
01320
01321 for(int i = 0; i < msg->n_moves; i++) {
01322 MigrateInfo &mInfo = msg->moves[i];
01323 int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
01324 CmiAssert(idx != -1);
01325 stats->to_proc[idx] = mInfo.to_pe;
01326 }
01327
01328 info.getInfo(stats, count, considerComm);
01329 #endif
01330 }
01331
01332
01333 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
01334 {
01335 CkAssert(simResults != NULL && count == simResults->numPes);
01336
01337
01338 double startT = CkWallTimer();
01339 getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
01340 CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
01341 }
01342
01343 void CentralLB::pup(PUP::er &p) {
01344 BaseLB::pup(p);
01345 if (p.isUnpacking()) {
01346 initLB(CkLBOptions(seqno));
01347 }
01348 p|reduction_started;
01349 int has_statsMsg=0;
01350 if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
01351 p|has_statsMsg;
01352 if (has_statsMsg) {
01353 if (p.isUnpacking())
01354 statsMsg = new CLBStatsMsg;
01355 statsMsg->pup(p);
01356 }
01357 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01358 p | lbDecisionCount;
01359 p | resumeCount;
01360 #endif
01361
01362 }
01363
01364 int CentralLB::useMem() {
01365 return sizeof(CentralLB) + statsData->useMem() +
01366 CkNumPes() * sizeof(CLBStatsMsg *);
01367 }
01368
01369
01376 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
01377 n_objs = osz;
01378 n_comm = csz;
01379 objData = new LDObjData[osz];
01380 commData = new LDCommData[csz];
01381 avail_vector = NULL;
01382 }
01383
01384 CLBStatsMsg::~CLBStatsMsg() {
01385 delete [] objData;
01386 delete [] commData;
01387 delete [] avail_vector;
01388 }
01389
01390 void CLBStatsMsg::pup(PUP::er &p) {
01391 int i;
01392 p|from_pe;
01393 p|pe_speed;
01394 p|total_walltime;
01395 p|idletime;
01396 p|bg_walltime;
01397 #if CMK_LB_CPUTIMER
01398 p|total_cputime;
01399 p|bg_cputime;
01400 #endif
01401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01402 p | step;
01403 #endif
01404 p|n_objs;
01405 if (p.isUnpacking()) objData = new LDObjData[n_objs];
01406 for (i=0; i<n_objs; i++) p|objData[i];
01407 p|n_comm;
01408 if (p.isUnpacking()) commData = new LDCommData[n_comm];
01409 for (i=0; i<n_comm; i++) p|commData[i];
01410
01411 int has_avail_vector;
01412 if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
01413 p|has_avail_vector;
01414 if (p.isUnpacking()) {
01415 if (has_avail_vector) avail_vector = new char[CkNumPes()];
01416 else avail_vector = NULL;
01417 }
01418 if (has_avail_vector) p(avail_vector, CkNumPes());
01419
01420 p(next_lb);
01421 }
01422
01423
01424
01425
01426
01427 void CkMarshalledCLBStatsMessage::free() {
01428 int count = msgs.size();
01429 for (int i=0; i<count; i++) {
01430 delete msgs[i];
01431 msgs[i] = NULL;
01432 }
01433 msgs.free();
01434 }
01435
01436 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
01437 {
01438 int count = m.getCount();
01439 for (int i=0; i<count; i++) add(m.getMessage(i));
01440 }
01441
01442 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
01443 {
01444 int count = msgs.size();
01445 p|count;
01446 for (int i=0; i<count; i++) {
01447 CLBStatsMsg *msg;
01448 if (p.isUnpacking()) msg = new CLBStatsMsg;
01449 else {
01450 msg = msgs[i]; CmiAssert(msg!=NULL);
01451 }
01452 msg->pup(p);
01453 if (p.isUnpacking()) add(msg);
01454 }
01455 }
01456
01457 SpanningTree::SpanningTree()
01458 {
01459 double sq = sqrt(CkNumPes()*4.0-3.0) - 1;
01460 arity = (int)ceil(sq/2);
01461 calcParent(CkMyPe());
01462 calcNumChildren(CkMyPe());
01463 }
01464
01465 void SpanningTree::calcParent(int n)
01466 {
01467 parent=-1;
01468 if(n != 0 && arity > 0)
01469 parent = (n-1)/arity;
01470 }
01471
01472 void SpanningTree::calcNumChildren(int n)
01473 {
01474 numChildren = 0;
01475 if (arity == 0) return;
01476 int fullNode=(CkNumPes()-1-arity)/arity;
01477 if(n <= fullNode)
01478 numChildren = arity;
01479 if(n == fullNode+1)
01480 numChildren = CkNumPes()-1-(fullNode+1)*arity;
01481 if(n > fullNode+1)
01482 numChildren = 0;
01483 }
01484
01485 #include "CentralLB.def.h"
01486