00001
00006
00007 #include <charm++.h>
00008 #include "ck.h"
00009 #include "envelope.h"
00010 #include "CentralLB.h"
00011 #include "LBDBManager.h"
00012 #include "LBSimulation.h"
00013
00014 #define DEBUGF(x) // CmiPrintf x;
00015 #define DEBUG(x) // x;
00016
00017 #if CMK_MEM_CHECKPOINT
00018
00019 #define USE_REDUCTION 0
00020 #define USE_LDB_SPANNING_TREE 0
00021 #else
00022 #define USE_REDUCTION 1
00023 #define USE_LDB_SPANNING_TREE 1
00024 #endif
00025
00026 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00027 extern bool _restartFlag;
00028 extern void getGlobalStep(CkGroupID );
00029 extern void initMlogLBStep(CkGroupID );
00030 extern int globalResumeCount;
00031 extern void sendDummyMigrationCounts(int *);
00032 #endif
00033
00034 #if CMK_GRID_QUEUE_AVAILABLE
00035 CpvExtern(void *, CkGridObject);
00036 #endif
00037
00038 #if CMK_GLOBAL_LOCATION_UPDATE
00039 extern void UpdateLocation(MigrateInfo& migData);
00040 #endif
00041
00042 #if CMK_SHRINK_EXPAND
00043 extern "C" void charmrun_realloc(char *s);
00044 extern char willContinue;
00045 extern realloc_state pending_realloc_state;
00046 extern char * se_avail_vector;
00047 extern "C" int mynewpe;
00048 extern char *_shrinkexpand_basedir;
00049 int numProcessAfterRestart;
00050 int mynewpe=0;
00051 #endif
00052 CkGroupID loadbalancer;
00053 int * lb_ptr;
00054 bool load_balancer_created;
00055
00056 CreateLBFunc_Def(CentralLB, "CentralLB base class")
00057
00058 static int broadcastThreshold = 32;
00059
00060 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
00061 LBMigrateMsg *, LBInfo &info, int considerComm);
00062
00063
00064
00065
00066
00067
00068
00069
00070 void CentralLB::staticStartLB(void* data)
00071 {
00072 CentralLB *me = (CentralLB*)(data);
00073 me->StartLB();
00074 }
00075
00076 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00077 {
00078 CentralLB *me = (CentralLB*)(data);
00079 me->Migrated(waitBarrier);
00080 }
00081
00082 void CentralLB::staticAtSync(void* data)
00083 {
00084 CentralLB *me = (CentralLB*)(data);
00085 me->AtSync();
00086 }
00087
00088 void CentralLB::initLB(const CkLBOptions &opt)
00089 {
00090 #if CMK_LBDB_ON
00091 lbname = "CentralLB";
00092 thisProxy = CProxy_CentralLB(thisgroup);
00093
00094 loadbalancer = thisgroup;
00095
00096 receiver = theLbdb->
00097 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
00098 notifier = theLbdb->getLBDB()->
00099 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
00100 startLbFnHdl = theLbdb->getLBDB()->
00101 AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
00102
00103
00104 if (opt.getSeqNo() > 0 || (_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
00105 turnOff();
00106
00107 stats_msg_count = 0;
00108 statsMsgsList = NULL;
00109 statsData = NULL;
00110
00111 storedMigrateMsg = NULL;
00112 reduction_started = false;
00113
00114
00115 if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
00116 else predicted_model=0;
00117
00118 theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
00119
00120 myspeed = theLbdb->ProcessorSpeed();
00121
00122 migrates_completed = 0;
00123 future_migrates_completed = 0;
00124 migrates_expected = -1;
00125 future_migrates_expected = -1;
00126 cur_ld_balancer = _lb_args.central_pe();
00127 lbdone = 0;
00128 count_msgs=0;
00129 statsMsg = NULL;
00130 use_thread = false;
00131
00132 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
00133
00134 load_balancer_created = true;
00135 #endif
00136 #ifdef TEMP_LDB
00137 logicalCoresPerNode=physicalCoresPerNode=4;
00138 logicalCoresPerChip=4;
00139 numSockets=1;
00140 #endif
00141
00142 }
00143
00144 CentralLB::~CentralLB()
00145 {
00146 #if CMK_LBDB_ON
00147 delete [] statsMsgsList;
00148 delete statsData;
00149 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00150 if (theLbdb) {
00151 theLbdb->getLBDB()->
00152 RemoveNotifyMigrated(notifier);
00153 theLbdb->
00154 RemoveStartLBFn((LDStartLBFn)(staticStartLB));
00155 }
00156 #endif
00157 }
00158
00159 void CentralLB::turnOn()
00160 {
00161 #if CMK_LBDB_ON
00162 theLbdb->getLBDB()->
00163 TurnOnBarrierReceiver(receiver);
00164 theLbdb->getLBDB()->
00165 TurnOnNotifyMigrated(notifier);
00166 theLbdb->getLBDB()->
00167 TurnOnStartLBFn(startLbFnHdl);
00168 #endif
00169 }
00170
00171 void CentralLB::turnOff()
00172 {
00173 #if CMK_LBDB_ON
00174 theLbdb->getLBDB()->
00175 TurnOffBarrierReceiver(receiver);
00176 theLbdb->getLBDB()->
00177 TurnOffNotifyMigrated(notifier);
00178 theLbdb->getLBDB()->
00179 TurnOffStartLBFn(startLbFnHdl);
00180 #endif
00181 }
00182
00183 void CentralLB::SetPESpeed(int speed)
00184 {
00185 myspeed = speed;
00186 }
00187
00188 int CentralLB::GetPESpeed()
00189 {
00190 return myspeed;
00191 }
00192
00193 void CentralLB::AtSync()
00194 {
00195 #if CMK_LBDB_ON
00196 DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
00197 #if CMK_MEM_CHECKPOINT
00198 CkSetInLdb();
00199 #endif
00200 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00201 CpvAccess(_currentObj)=this;
00202 #endif
00203
00204
00205 if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
00206 MigrationDone(0);
00207 return;
00208 }
00209 #if CMK_FAULT_EVAC
00210 if(CmiNodeAlive(CkMyPe()))
00211 #endif
00212 {
00213 thisProxy [CkMyPe()].ProcessAtSync();
00214 }
00215 #endif
00216 }
00217
00218 void CentralLB::ProcessAtSync()
00219 {
00220 #if CMK_LBDB_ON
00221 if (reduction_started) return;
00222
00223 #if CMK_FAULT_EVAC
00224 CmiAssert(CmiNodeAlive(CkMyPe()));
00225 #endif
00226 if (CkMyPe() == cur_ld_balancer) {
00227 start_lb_time = CkWallTimer();
00228 }
00229
00230 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00231 initMlogLBStep(thisgroup);
00232 #endif
00233
00234
00235 BuildStatsMsg();
00236
00237 #if USE_REDUCTION
00238
00239
00240 int counts[2];
00241 counts[0] = theLbdb->GetObjDataSz();
00242 counts[1] = theLbdb->GetCommDataSz();
00243
00244 CkCallback cb;
00245 if (concurrent)
00246 cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy);
00247 else
00248 cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy[0]);
00249 contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
00250 reduction_started = true;
00251 #else
00252 SendStats();
00253 #endif
00254 #endif
00255 }
00256
00257 #if defined(TEMP_LDB)
00258 static int cpufreq_sysfs_write (
00259 const char *setting,int proc
00260 )
00261 {
00262 char path[100];
00263 sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",proc);
00264 FILE *fd = fopen (path, "w");
00265
00266 if (!fd) {
00267 printf("PROC#%d ooooooo666 FILE OPEN ERROR file=%s\n",CkMyPe(),path);
00268 return -1;
00269 }
00270
00271
00272 fseek ( fd , 0 , SEEK_SET );
00273 int numw=fprintf (fd, setting);
00274 if (numw <= 0) {
00275
00276 fclose (fd);
00277 printf("FILE WRITING ERROR\n");
00278 return 0;
00279 }
00280
00281 fclose(fd);
00282 return 1;
00283 }
00284
00285
00286 static int cpufreq_sysfs_read (int proc)
00287 {
00288 FILE *fd;
00289 char path[100];
00290 int i=proc;
00291 sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",i);
00292
00293 fd = fopen (path, "r");
00294
00295 if (!fd) {
00296 printf("33 FILE OPEN ERROR file=%s\n",path);
00297 return 0;
00298 }
00299 char val[10];
00300 fgets(val,10,fd);
00301 int ff=atoi(val);
00302 fclose (fd);
00303
00304 return ff;
00305 }
00306
00307 float CentralLB::getTemp(int cpu)
00308 {
00309 char val[10];
00310 FILE *f;
00311 char path[100];
00312 sprintf(path,"/sys/devices/platform/coretemp.%d/temp1_input",cpu);
00313 f=fopen(path,"r");
00314 if (!f) {
00315 printf("777 FILE OPEN ERROR file=%s\n",path);
00316 exit(0);
00317 }
00318
00319 if(f==NULL) {printf("ddddddddddddddddddddddddddd\n");exit(0);}
00320 fgets(val,10,f);
00321 fclose(f);
00322 return atof(val)/1000;
00323 }
00324 #endif
00325
00326
00327
00328 void CentralLB::ReceiveCounts(int *counts, int n)
00329 {
00330 if (!concurrent) CmiAssert(CkMyPe() == 0);
00331 if (statsData == NULL) statsData = new LDStats;
00332
00333
00334 CmiAssert(n == 2);
00335 int n_objs = counts[0];
00336 int n_comm = counts[1];
00337
00338
00339 statsData->objData.resize(n_objs);
00340 statsData->from_proc.resize(n_objs);
00341 statsData->to_proc.resize(n_objs);
00342 statsData->commData.resize(n_comm);
00343
00344 DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
00345
00346 if (concurrent) {
00347 CkCallback cb = CkCallback(CkReductionTarget(CentralLB, SendStats), thisProxy);
00348 contribute(cb);
00349 }
00350 else thisProxy.SendStats();
00351 }
00352
00353 void CentralLB::BuildStatsMsg()
00354 {
00355 #if CMK_LBDB_ON
00356
00357 const int osz = theLbdb->GetObjDataSz();
00358 const int csz = theLbdb->GetCommDataSz();
00359
00360 int npes = CkNumPes();
00361 CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
00362 _MEMCHECK(msg);
00363 msg->from_pe = CkMyPe();
00364 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00365 msg->step = step();
00366 #endif
00367
00368
00369
00370
00371
00372
00373
00374 #if CMK_LB_CPUTIMER
00375 theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
00376 &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
00377 #else
00378 theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
00379 &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
00380 #endif
00381 #if defined(TEMP_LDB)
00382 float mytemp=getTemp(CkMyPe()%physicalCoresPerNode);
00383 int freq=cpufreq_sysfs_read (CkMyPe()%logicalCoresPerNode);
00384 msg->pe_temp=mytemp;
00385 msg->pe_speed=freq;
00386 #else
00387 msg->pe_speed = myspeed;
00388 #endif
00389
00390 DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
00391
00392 msg->n_objs = osz;
00393 theLbdb->GetObjData(msg->objData);
00394 msg->n_comm = csz;
00395 theLbdb->GetCommData(msg->commData);
00396
00397 DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
00398
00399 if(CkMyPe() == cur_ld_balancer) {
00400 msg->avail_vector = new char[CkNumPes()];
00401 LBDatabaseObj()->get_avail_vector(msg->avail_vector);
00402 msg->next_lb = LBDatabaseObj()->new_lbbalancer();
00403 }
00404
00405 CmiAssert(statsMsg == NULL);
00406 statsMsg = msg;
00407 #endif
00408 }
00409
00410
00411
00412 void CentralLB::SendStats()
00413 {
00414 #if CMK_LBDB_ON
00415 CmiAssert(statsMsg != NULL);
00416 reduction_started = false;
00417
00418 #if USE_LDB_SPANNING_TREE
00419 if(CkNumPes()>1024)
00420 {
00421 if (CkMyPe() == cur_ld_balancer)
00422 thisProxy[CkMyPe()].ReceiveStats(statsMsg);
00423 else
00424 thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
00425 }
00426 else
00427 #endif
00428 {
00429 DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
00430 thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
00431 }
00432
00433 statsMsg = NULL;
00434
00435 #ifdef __BIGSIM__
00436 BgEndStreaming();
00437 #endif
00438
00439 {
00440
00441 LDOMHandle h;
00442 h.id.id.idx = 0;
00443 theLbdb->getLBDB()->RegisteringObjects(h);
00444 }
00445 #endif
00446 }
00447
00448 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00449 extern int donotCountMigration;
00450 #endif
00451
00452 void CentralLB::Migrated(int waitBarrier)
00453 {
00454 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00455 if(donotCountMigration){
00456 return ;
00457 }
00458 #endif
00459
00460 #if CMK_LBDB_ON
00461 if (waitBarrier) {
00462 migrates_completed++;
00463 DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
00464 if (migrates_completed == migrates_expected) {
00465 MigrationDone(1);
00466 }
00467 }
00468 else {
00469 future_migrates_completed ++;
00470 DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
00471 if (future_migrates_completed == future_migrates_expected) {
00472 CheckMigrationComplete();
00473 }
00474 }
00475 #endif
00476 }
00477
00478 void CentralLB::MissMigrate(int waitForBarrier)
00479 {
00480 Migrated(waitForBarrier);
00481 }
00482
00483
00484
00485 void CentralLB::buildStats()
00486 {
00487 statsData->nprocs() = stats_msg_count;
00488
00489 statsData->objData.resize(statsData->n_objs);
00490 statsData->from_proc.resize(statsData->n_objs);
00491 statsData->to_proc.resize(statsData->n_objs);
00492 statsData->commData.resize(statsData->n_comm);
00493
00494 int nobj = 0;
00495 int ncom = 0;
00496 int nmigobj = 0;
00497
00498 for (int pe=0; pe<CkNumPes(); pe++) {
00499 int i;
00500 CLBStatsMsg *msg = statsMsgsList[pe];
00501 if(msg == NULL) continue;
00502 for (i=0; i<msg->n_objs; i++) {
00503 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
00504 statsData->objData[nobj] = msg->objData[i];
00505 if (msg->objData[i].migratable) nmigobj++;
00506 nobj++;
00507 }
00508 for (i=0; i<msg->n_comm; i++) {
00509 statsData->commData[ncom] = msg->commData[i];
00510 ncom++;
00511 }
00512
00513 delete msg;
00514 statsMsgsList[pe]=0;
00515 }
00516 statsData->n_migrateobjs = nmigobj;
00517 }
00518
00519
00520
00521
00522 void CentralLB::depositData(CLBStatsMsg *m)
00523 {
00524 int i;
00525 if (m == NULL) return;
00526
00527 const int pe = m->from_pe;
00528 struct ProcStats &procStat = statsData->procs[pe];
00529 #if defined(TEMP_LDB)
00530 procStat.pe_temp=m->pe_temp;
00531 procStat.pe_speed=m->pe_speed;
00532 #endif
00533
00534 procStat.pe = pe;
00535 procStat.total_walltime = m->total_walltime;
00536 procStat.idletime = m->idletime;
00537 procStat.bg_walltime = m->bg_walltime;
00538 #if CMK_LB_CPUTIMER
00539 procStat.total_cputime = m->total_cputime;
00540 procStat.bg_cputime = m->bg_cputime;
00541 #endif
00542 procStat.pe_speed = m->pe_speed;
00543
00544
00545 procStat.available = true;
00546 procStat.n_objs = m->n_objs;
00547
00548 int &nobj = statsData->n_objs;
00549 int &nmigobj = statsData->n_migrateobjs;
00550 for (i=0; i<m->n_objs; i++) {
00551 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
00552 statsData->objData[nobj] = m->objData[i];
00553 if (m->objData[i].migratable) nmigobj++;
00554 nobj++;
00555 CmiAssert(nobj <= statsData->objData.capacity());
00556 }
00557 int &n_comm = statsData->n_comm;
00558 for (i=0; i<m->n_comm; i++) {
00559 statsData->commData[n_comm] = m->commData[i];
00560 n_comm++;
00561 CmiAssert(n_comm <= statsData->commData.capacity());
00562 }
00563 delete m;
00564 }
00565
00566 void CentralLB::ReceiveStatsFromRoot(CkMarshalledCLBStatsMessage &&msg) {
00567 #if CMK_LBDB_ON
00568 if (CkMyPe() == cur_ld_balancer) return;
00569 else ReceiveStats(std::move(msg));
00570 #endif
00571 }
00572
00573 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &&msg)
00574 {
00575 #if CMK_LBDB_ON
00576 if (concurrent && (CkMyPe() == cur_ld_balancer)) {
00577 thisProxy.ReceiveStatsFromRoot(msg);
00578 }
00579
00580 if (statsMsgsList == NULL) {
00581 statsMsgsList = new CLBStatsMsg*[CkNumPes()];
00582 CmiAssert(statsMsgsList != NULL);
00583 for(int i=0; i < CkNumPes(); i++)
00584 statsMsgsList[i] = 0;
00585 }
00586 if (statsData == NULL) statsData = new LDStats;
00587
00588
00589 int count = msg.getCount();
00590 for (int num = 0; num < count; num++)
00591 {
00592 CLBStatsMsg *m = msg.getMessage(num);
00593 CmiAssert(m!=NULL);
00594 const int pe = m->from_pe;
00595 DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
00596 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00597
00598
00599
00600
00601
00602
00603 #endif
00604
00605 #if CMK_FAULT_EVAC
00606 if(!CmiNodeAlive(pe)){
00607 DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
00608 continue;
00609 }
00610 #endif
00611
00612 if (m->avail_vector!=NULL) {
00613 LBDatabaseObj()->set_avail_vector(m->avail_vector, m->next_lb);
00614 }
00615
00616 if (statsMsgsList[pe] != 0) {
00617 CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
00618 pe);
00619 } else {
00620 statsMsgsList[pe] = m;
00621 #if USE_REDUCTION
00622 depositData(m);
00623 #else
00624
00625 struct ProcStats &procStat = statsData->procs[pe];
00626 procStat.pe = pe;
00627 procStat.total_walltime = m->total_walltime;
00628 procStat.idletime = m->idletime;
00629 procStat.bg_walltime = m->bg_walltime;
00630 #if CMK_LB_CPUTIMER
00631 procStat.total_cputime = m->total_cputime;
00632 procStat.bg_cputime = m->bg_cputime;
00633 #endif
00634 procStat.pe_speed = m->pe_speed;
00635
00636 procStat.available = true;
00637 procStat.n_objs = m->n_objs;
00638
00639 statsData->n_objs += m->n_objs;
00640 statsData->n_comm += m->n_comm;
00641 #if defined(TEMP_LDB)
00642 procStat.pe_temp=m->pe_temp;
00643 procStat.pe_speed=m->pe_speed;
00644 #endif
00645 #endif
00646
00647 stats_msg_count++;
00648 }
00649 }
00650
00651 #if CMK_FAULT_EVAC
00652 const int clients = CkNumValidPes();
00653 #else
00654 const int clients = CkNumPes();
00655 #endif
00656
00657 DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
00658
00659 if (stats_msg_count == clients) {
00660 DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
00661 statsData->nprocs() = stats_msg_count;
00662 if (use_thread)
00663 thisProxy[CkMyPe()].t_LoadBalance();
00664 else
00665 thisProxy[CkMyPe()].LoadBalance();
00666 }
00667 #endif
00668 }
00669
00671 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &&msg)
00672 {
00673 #if CMK_LBDB_ON
00674 CmiAssert(CkMyPe() != 0);
00675 bufMsg.add(std::move(msg));
00676 count_msgs++;
00677
00678 if (count_msgs == st.numChildren+1) {
00679 if(st.parent == 0)
00680 {
00681 thisProxy[0].ReceiveStats(bufMsg);
00682
00683 }
00684 else
00685 thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
00686 count_msgs = 0;
00687 bufMsg.free();
00688 }
00689 #endif
00690 }
00691
00692 #if CMK_REPLAYSYSTEM
00693 static LDHandle *loadBalancer_pointers;
00694 #endif
00695
00696 void CentralLB::LoadBalance()
00697 {
00698 #if CMK_LBDB_ON
00699 int proc;
00700 const int clients = CkNumPes();
00701
00702 #if ! USE_REDUCTION
00703
00704 buildStats();
00705 #else
00706 for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
00707 #endif
00708
00709 theLbdb->ResetAdaptive();
00710 if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
00711
00712 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
00713 CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
00714 lbname, cur_ld_balancer, step(), start_lb_time,
00715 CmiMemoryUsage()/(1024.0*1024.0));
00716
00717
00718 if (LBSimulation::doSimulation) simulationRead();
00719
00720 char *availVector = LBDatabaseObj()->availVector();
00721 for(proc = 0; proc < clients; proc++)
00722 statsData->procs[proc].available = (bool)availVector[proc];
00723
00724
00725 removeCommDataOfDeletedObjs(statsData);
00726 preprocess(statsData);
00727
00728
00729
00730 if (_lb_args.printSummary()) {
00731 LBInfo info(clients);
00732
00733 info.getInfo(statsData, clients, 0);
00734 LBRealType mLoad, mCpuLoad, totalLoad;
00735 info.getSummary(mLoad, mCpuLoad, totalLoad);
00736 int nmsgs, nbytes;
00737 statsData->computeNonlocalComm(nmsgs, nbytes);
00738 CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
00739
00740
00741
00742
00743 }
00744
00745 #if CMK_REPLAYSYSTEM
00746 if (_replaySystem && !concurrent) {
00747 loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
00748 for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
00749 }
00750 #endif
00751
00752 storedMigrateMsg = Strategy(statsData);
00753
00754 if (!concurrent) ApplyDecision();
00755 #endif
00756 }
00757
00758 void CentralLB::ApplyDecision() {
00759 #if CMK_LBDB_ON
00760 const int clients = CkNumPes();
00761
00762 LBMigrateMsg *migrateMsg;
00763 if (concurrent) {
00764 migrateMsg = createMigrateMsg(statsData);
00765 if (_lb_args.debug()) printStrategyStats(migrateMsg);
00766 } else {
00767 migrateMsg = storedMigrateMsg;
00768 storedMigrateMsg = NULL;
00769 }
00770
00771 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00772 migrateMsg->step = step();
00773 #endif
00774
00775 #if CMK_REPLAYSYSTEM
00776 CpdHandleLBMessage(&migrateMsg);
00777 if (_replaySystem && !concurrent) {
00778 for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
00779 free(loadBalancer_pointers);
00780 }
00781 #endif
00782
00783 LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
00784 migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
00785
00786
00787 simulationWrite();
00788
00789
00790
00791 if (_lb_args.printSummary()) {
00792 LBInfo info(clients);
00793
00794 getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
00795 LBRealType mLoad, mCpuLoad, totalLoad;
00796 info.getSummary(mLoad, mCpuLoad, totalLoad);
00797 int nmsgs, nbytes;
00798 statsData->computeNonlocalComm(nmsgs, nbytes);
00799 CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
00800 for (int i=0; i<clients; i++)
00801 migrateMsg->expectedLoad[i] = info.peLoads[i];
00802 }
00803
00804 DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
00805 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00806 lbDecisionCount++;
00807 migrateMsg->lbDecisionCount = lbDecisionCount;
00808 #endif
00809
00810 envelope *env = UsrToEnv(migrateMsg);
00811 #if CMK_SCATTER_LB_RESULTS
00812 InitiateScatter(migrateMsg);
00813 #else
00814 if (1) {
00815
00816 thisProxy.ReceiveMigration(migrateMsg);
00817 }
00818 else {
00819
00820 for (int p=0; p<CkNumPes(); p++) {
00821 LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
00822 thisProxy[p].ReceiveMigration(m);
00823 }
00824 delete migrateMsg;
00825 }
00826 #endif
00827
00828
00829 statsData->clear();
00830 stats_msg_count=0;
00831 #endif
00832 }
00833
00834 void CentralLB::t_LoadBalance()
00835 {
00836 LoadBalance();
00837 }
00838
00839 void CentralLB::InitiateScatter(LBMigrateMsg *msg) {
00840
00841 if (CkNumPes() <= broadcastThreshold) {
00842 thisProxy.ReceiveMigration(msg);
00843 return;
00844 }
00845
00846 int middlePe = CkNumPes() / 2;
00847
00848
00849
00850 LBScatterMsg *leftMsg = new (middlePe, msg->n_moves)
00851 LBScatterMsg(0, middlePe - 1);
00852 LBScatterMsg *rightMsg = new (CkNumPes() - middlePe, msg->n_moves)
00853 LBScatterMsg(middlePe, CkNumPes() - 1);
00854
00855 int *migrateTally = new int[CkNumPes()];
00856 memset(migrateTally, 0, CkNumPes() * sizeof(int));
00857
00858 for (int i = 0; i < msg->n_moves; i++) {
00859 MigrateInfo* item = (MigrateInfo*) &msg->moves[i];
00860 migrateTally[item->to_pe]++;
00861 if (item->from_pe < middlePe) {
00862 leftMsg->moves[leftMsg->numMigrates++] = *item;
00863 }
00864 else {
00865 rightMsg->moves[rightMsg->numMigrates++] = *item;
00866 }
00867 }
00868
00869 memcpy(leftMsg->numMigratesPerPe, migrateTally, middlePe * sizeof(int));
00870 memcpy(rightMsg->numMigratesPerPe, &migrateTally[middlePe], (CkNumPes() - middlePe) * sizeof(int));
00871
00872 delete [] migrateTally;
00873
00874
00875 envelope *env = UsrToEnv(rightMsg);
00876 env->shrinkUsersize((msg->n_moves - rightMsg->numMigrates) * sizeof(MigrateDecision));
00877
00878
00879
00880 env = UsrToEnv(leftMsg);
00881 env->shrinkUsersize((msg->n_moves - leftMsg->numMigrates) * sizeof(MigrateDecision));
00882
00883
00884
00885 thisProxy[middlePe].ScatterMigrationResults(rightMsg);
00886
00887 delete msg;
00888 ScatterMigrationResults(leftMsg);
00889 }
00890
00891 void CentralLB::ScatterMigrationResults(LBScatterMsg *msg) {
00892
00893 int finished = false;
00894 do {
00895 CkAssert(msg->firstPeInSpan == CkMyPe());
00896 int numPesInSpan = msg->lastPeInSpan - msg->firstPeInSpan + 1 ;
00897
00898 if (numPesInSpan <= broadcastThreshold) {
00899 for (int i = msg->firstPeInSpan; i < msg->lastPeInSpan; i++) {
00900
00901 LBScatterMsg *msgCopy = new (numPesInSpan, msg->numMigrates)
00902 LBScatterMsg(msg->firstPeInSpan, msg->lastPeInSpan);
00903 msgCopy->numMigrates = msg->numMigrates;
00904 memcpy(msgCopy->numMigratesPerPe, msg->numMigratesPerPe,
00905 numPesInSpan * sizeof(int));
00906 memcpy(msgCopy->moves, msg->moves,
00907 msg->numMigrates * sizeof(MigrateDecision));
00908 thisProxy[i].ReceiveMigration(msgCopy);
00909 }
00910
00911 thisProxy[msg->lastPeInSpan].ReceiveMigration(msg);
00912 finished = true;
00913 }
00914 else {
00915 int middlePe = (msg->firstPeInSpan + msg->lastPeInSpan + 1) / 2;
00916
00917 LBScatterMsg *leftMsg = msg;
00918 int numMigrates = leftMsg->numMigrates;
00919 int numPesInRightSpan = leftMsg->lastPeInSpan - middlePe + 1;
00920 LBScatterMsg *rightMsg =
00921 new (numPesInRightSpan, leftMsg->numMigrates)
00922 LBScatterMsg(middlePe, leftMsg->lastPeInSpan);
00923 leftMsg->numMigrates = 0;
00924 leftMsg->lastPeInSpan = middlePe - 1;
00925 for (int i = 0; i < numMigrates; i++) {
00926 if (leftMsg->moves[i].fromPe < middlePe) {
00927 leftMsg->moves[leftMsg->numMigrates++] = leftMsg->moves[i];
00928 }
00929 else {
00930 rightMsg->moves[rightMsg->numMigrates++] = leftMsg->moves[i];
00931 }
00932 }
00933
00934 memcpy(rightMsg->numMigratesPerPe,
00935 &leftMsg->numMigratesPerPe[middlePe - leftMsg->firstPeInSpan],
00936 (numPesInRightSpan) * sizeof(int));
00937
00938
00939 envelope *env = UsrToEnv(rightMsg);
00940 env->shrinkUsersize((numMigrates - rightMsg->numMigrates)
00941 * sizeof(MigrateDecision));
00942
00943
00944
00945 env = UsrToEnv(leftMsg);
00946 env->shrinkUsersize((numMigrates - leftMsg->numMigrates)
00947 * sizeof(MigrateDecision));
00948
00949 thisProxy[middlePe].ScatterMigrationResults(rightMsg);
00950 }
00951
00952 } while (!finished);
00953
00954 }
00955
00956
00957 static bool isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
00958 {
00959 #if CMK_LBDB_ON
00960 for (int pe=0 ; pe<count; pe++)
00961 {
00962 for (int i=0; i<len[pe]; i++)
00963 if (objData[pe][i].objID() == commData.sender.objID() ||
00964 objData[pe][i].objID() == commData.receiver.get_destObj().objID())
00965 return false;
00966 }
00967 #endif
00968 return true;
00969 }
00970
00971
00972 void CentralLB::removeNonMigratable(LDStats* stats, int count)
00973 {
00974 int i;
00975
00976
00977 int have = 0;
00978 for (i=0; i<stats->n_objs; i++)
00979 {
00980 LDObjData &odata = stats->objData[i];
00981 if (!odata.migratable) {
00982 have = 1; break;
00983 }
00984 }
00985 if (have == 0) return;
00986
00987 CkVec<LDObjData> nonmig;
00988 CkVec<int> new_from_proc, new_to_proc;
00989 nonmig.resize(stats->n_migrateobjs);
00990 new_from_proc.resize(stats->n_migrateobjs);
00991 new_to_proc.resize(stats->n_migrateobjs);
00992 int n_objs = 0;
00993 for (i=0; i<stats->n_objs; i++)
00994 {
00995 LDObjData &odata = stats->objData[i];
00996 if (odata.migratable) {
00997 nonmig[n_objs] = odata;
00998 new_from_proc[n_objs] = stats->from_proc[i];
00999 new_to_proc[n_objs] = stats->to_proc[i];
01000 n_objs ++;
01001 }
01002 else {
01003 stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
01004 #if CMK_LB_CPUTIMER
01005 stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
01006 #endif
01007 }
01008 }
01009 CmiAssert(stats->n_migrateobjs == n_objs);
01010
01011 stats->makeCommHash();
01012
01013 CkVec<LDCommData> newCommData;
01014 newCommData.resize(stats->n_comm);
01015 int n_comm = 0;
01016 for (i=0; i<stats->n_comm; i++)
01017 {
01018 LDCommData& cdata = stats->commData[i];
01019 if (!cdata.from_proc())
01020 {
01021 int idx = stats->getSendHash(cdata);
01022 CmiAssert(idx != -1);
01023 if (!stats->objData[idx].migratable) continue;
01024 }
01025 switch (cdata.receiver.get_type()) {
01026 case LD_PROC_MSG:
01027 break;
01028 case LD_OBJ_MSG: {
01029 int idx = stats->getRecvHash(cdata);
01030 if (stats->complete_flag)
01031 CmiAssert(idx != -1);
01032 else if (idx == -1) continue;
01033 if (!stats->objData[idx].migratable) continue;
01034 break;
01035 }
01036 case LD_OBJLIST_MSG:
01037 break;
01038 }
01039 newCommData[n_comm] = cdata;
01040 n_comm ++;
01041 }
01042
01043 if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
01044
01045
01046 stats->objData = nonmig;
01047 stats->from_proc = new_from_proc;
01048 stats->to_proc = new_to_proc;
01049 stats->n_objs = n_objs;
01050
01051 stats->commData = newCommData;
01052 stats->n_comm = n_comm;
01053
01054 stats->deleteCommHash();
01055 stats->makeCommHash();
01056
01057 }
01058
01059
01060 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01061 extern int restarted;
01062 #endif
01063
01064 void CentralLB::ReceiveMigration(LBScatterMsg *m) {
01065 if (concurrent) {
01066 if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
01067
01068 statsData->clear();
01069 stats_msg_count=0;
01070 }
01071 storedMigrateMsg = NULL;
01072 storedScatterMsg = m;
01073 #if CMK_MEM_CHECKPOINT
01074 CkResetInLdb();
01075 #endif
01076 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01077 restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
01078 #else
01079 contribute(CkCallback(CkReductionTarget(CentralLB, ProcessMigrationDecision),
01080 thisProxy));
01081 #endif
01082
01083 }
01084
01085 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
01086 {
01087 if (concurrent) {
01088 if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
01089
01090 statsData->clear();
01091 stats_msg_count=0;
01092 }
01093 storedMigrateMsg = m;
01094 #if CMK_MEM_CHECKPOINT
01095 CkResetInLdb();
01096 #endif
01097 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01098 restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
01099 #else
01100 contribute(CkCallback(CkReductionTarget(CentralLB, ProcessReceiveMigration),
01101 thisProxy));
01102 #endif
01103 }
01104
01105 void CentralLB::ProcessMigrationDecision() {
01106 #if CMK_LBDB_ON
01107 LBScatterMsg *m = storedScatterMsg;
01108 CkAssert(m != NULL);
01109
01110 migrates_expected = m->numMigratesPerPe[CkMyPe() - m->firstPeInSpan];
01111 future_migrates_expected = 0;
01112
01113 for(int i = 0; i < m->numMigrates; i++) {
01114 MigrateDecision& move = m->moves[i];
01115 const int me = CkMyPe();
01116 if (move.fromPe == me) {
01117 DEBUGF(("[%d] migrating object to %d\n", move.fromPe, move.toPe));
01118
01119 LDObjHandle objInfo = theLbdb->GetObjHandle(move.dbIndex);
01120
01121 if (theLbdb->Migrate(objInfo,move.toPe) == 0) {
01122 CkAbort("Error: Async arrival not supported in scattering mode\n");
01123 }
01124 }
01125 }
01126
01127 if (migrates_expected == 0 || migrates_completed == migrates_expected) {
01128 MigrationDone(1);
01129 }
01130
01131 delete m;
01132 #endif
01133 }
01134
01135 void CentralLB::ProcessReceiveMigration()
01136 {
01137 #if CMK_LBDB_ON
01138 int i;
01139 LBMigrateMsg *m = storedMigrateMsg;
01140 CmiAssert(m!=NULL);
01141
01142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01143 int *dummyCounts;
01144
01145 DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
01146
01147 if(step() > m->step){
01148 char str[100];
01149 envelope *env = UsrToEnv(m);
01150 return;
01151 }
01152 lbDecisionCount = m->lbDecisionCount;
01153 #endif
01154
01155 if (_lb_args.debug() > 1)
01156 if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
01157
01158 for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
01159 CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
01160 #if CMK_FAULT_EVAC
01161 if(!CmiNodeAlive(CkMyPe())){
01162 delete m;
01163 return;
01164 }
01165 #endif
01166 migrates_expected = 0;
01167 future_migrates_expected = 0;
01168 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01169 int sending=0;
01170 int dummy=0;
01171 LBDB *_myLBDB = theLbdb->getLBDB();
01172 if(_restartFlag){
01173 dummyCounts = new int[CmiNumPes()];
01174 memset(dummyCounts,0,sizeof(int)*CmiNumPes());
01175 }
01176 #endif
01177 for(i=0; i < m->n_moves; i++) {
01178 MigrateInfo& move = m->moves[i];
01179 const int me = CkMyPe();
01180 if (move.from_pe == me && move.to_pe != me) {
01181 #if CMK_DRONE_MODE
01182 int to_pe_rank0 = CMK_RANK_0(move.to_pe);
01183 if(move.from_pe == to_pe_rank0) continue;
01184 move.to_pe = to_pe_rank0;
01185 #endif
01186
01187 DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
01188
01189 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
01190 if (theLbdb->Migrate(move.obj,move.to_pe) == 0)
01191 thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
01192 #else
01193 if(_restartFlag){
01194 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
01195 theLbdb->Migrate(move.obj,move.to_pe);
01196 sending++;
01197 }else{
01198 if(_myLBDB->validObjHandle(move.obj)){
01199 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
01200 theLbdb->Migrate(move.obj,move.to_pe);
01201 sending++;
01202 }else{
01203 DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
01204 dummyCounts[move.to_pe]++;
01205 dummy++;
01206 }
01207 }
01208 #endif
01209 } else if (move.from_pe != me && move.to_pe == me) {
01210 #if CMK_DRONE_MODE
01211 int to_pe_rank0 = CMK_RANK_0(move.to_pe);
01212 if(me != to_pe_rank0) continue;
01213 #endif
01214 DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
01215 if (!move.async_arrival) migrates_expected++;
01216 else future_migrates_expected++;
01217 }
01218 else {
01219 #if CMK_GLOBAL_LOCATION_UPDATE
01220 UpdateLocation(move);
01221 #endif
01222 }
01223
01224 }
01225 DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
01226
01227
01228 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01229 if(_restartFlag){
01230 sendDummyMigrationCounts(dummyCounts);
01231 _restartFlag =false;
01232 delete []dummyCounts;
01233 }
01234 #endif
01235
01236
01237 #if 0
01238 if (m->n_moves ==0) {
01239 theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
01240 }
01241 #endif
01242 cur_ld_balancer = m->next_lb;
01243 if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
01244 LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
01245 }
01246
01247 if (migrates_expected == 0 || migrates_completed == migrates_expected)
01248 MigrationDone(1);
01249 delete m;
01250
01251
01252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01253
01254
01255 #endif
01256 #endif
01257 }
01258
01259 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01260 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
01261 DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
01262
01263
01264
01265
01266 thisProxy[CkMyPe()].ResumeClients(1);
01267 }
01268 #endif
01269
01270
01271 void CentralLB::CheckForRealloc(){
01272 #if CMK_SHRINK_EXPAND
01273 if(pending_realloc_state == REALLOC_MSG_RECEIVED) {
01274 pending_realloc_state = REALLOC_IN_PROGRESS;
01275 CkPrintf("Load balancer invoking charmrun to handle reallocation on pe %d\n", CkMyPe());
01276 double end_lb_time = CkWallTimer();
01277 CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
01278 lbname, cur_ld_balancer, step()-1, end_lb_time, end_lb_time-start_lb_time);
01279
01280 CkCallback cb(CkIndex_CentralLB::ResumeFromReallocCheckpoint(), thisProxy[0]);
01281 CkStartCheckpoint(_shrinkexpand_basedir, cb);
01282 }
01283 else{
01284 thisProxy.MigrationDoneImpl(1);
01285 }
01286 #endif
01287 }
01288
01289 void CentralLB::ResumeFromReallocCheckpoint(){
01290 #if CMK_SHRINK_EXPAND
01291 std::vector<char> avail(se_avail_vector, se_avail_vector + CkNumPes());
01292 free(se_avail_vector);
01293 thisProxy.WillIbekilled(avail, numProcessAfterRestart);
01294 #endif
01295 }
01296
01297
01298
01299 #if CMK_SHRINK_EXPAND
01300 int GetNewPeNumber(std::vector<char> avail){
01301 int mype = CkMyPe();
01302 int count =0;
01303 for (int i =0; i <mype; i++){
01304 if(avail[i] ==0) count++;
01305 }
01306 return (mype - count);
01307 }
01308 #endif
01309
01310 void CentralLB::WillIbekilled(std::vector<char> avail, int newnumProcessAfterRestart){
01311 #if CMK_SHRINK_EXPAND
01312 numProcessAfterRestart = newnumProcessAfterRestart;
01313 mynewpe = GetNewPeNumber(avail);
01314 willContinue = avail[CkMyPe()];
01315 CkCallback cb(CkIndex_CentralLB::StartCleanup(), thisProxy[0]);
01316 contribute(cb);
01317 #endif
01318 }
01319
01320 void CentralLB::StartCleanup(){
01321 #if CMK_SHRINK_EXPAND
01322 CkCleanup();
01323 #endif
01324 }
01325 void CentralLB::MigrationDone(int balancing)
01326 {
01327 #if CMK_SHRINK_EXPAND
01328
01329 CkCallback cb(CkIndex_CentralLB::CheckForRealloc(), thisProxy[0]);
01330 contribute(cb);
01331 return;
01332 #else
01333 MigrationDoneImpl(balancing);
01334 #endif
01335 }
01336 void CentralLB::MigrationDoneImpl (int balancing)
01337 {
01338
01339 #if CMK_LBDB_ON
01340 migrates_completed = 0;
01341 migrates_expected = -1;
01342
01343 if (balancing) theLbdb->ClearLoads();
01344
01345 theLbdb->incStep();
01346 DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
01347
01348
01349 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01350 savedBalancing = balancing;
01351 startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
01352 #endif
01353
01354 LBDatabase::Object()->MigrationDone();
01355
01356 LoadbalanceDone(balancing);
01357 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
01358
01359 if (balancing && _lb_args.syncResume()) {
01360 contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
01361 thisProxy));
01362 }
01363 else{
01364 #if CMK_FAULT_EVAC
01365 if(CmiNodeAlive(CkMyPe()))
01366 #endif
01367 {
01368 thisProxy [CkMyPe()].ResumeClients(balancing);
01369 }
01370 }
01371 #if CMK_GRID_QUEUE_AVAILABLE
01372 CmiGridQueueDeregisterAll ();
01373 CpvAccess(CkGridObject) = NULL;
01374 #endif // if CMK_GRID_QUEUE_AVAILABLE
01375 #endif // if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
01376 #endif // if CMK_LBDB_ON
01377 }
01378
01379 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01380 void CentralLB::endMigrationDone(int balancing){
01381 DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
01382
01383
01384 if (balancing && _lb_args.syncResume()) {
01385 contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
01386 thisProxy));
01387 }
01388 else{
01389 if(CmiNodeAlive(CkMyPe())){
01390 DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
01391 thisProxy [CkMyPe()].ResumeClients(balancing);
01392 }
01393 }
01394
01395 }
01396 #endif
01397
01398 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01399 void resumeCentralLbAfterChkpt(void *_lb){
01400 CentralLB *lb= (CentralLB *)_lb;
01401 CpvAccess(_currentObj)=lb;
01402 lb->endMigrationDone(lb->savedBalancing);
01403 }
01404 void resumeAfterRestoreParallelRecovery(void *_lb){
01405 CentralLB *lb= (CentralLB *)_lb;
01406 lb->ProcessReceiveMigration();
01407 }
01408 #endif
01409
01410
01411 void CentralLB::ResumeClients()
01412 {
01413 ResumeClients(1);
01414 }
01415
01416 void CentralLB::ResumeClients(int balancing)
01417 {
01418 #if CMK_LBDB_ON
01419 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01420 resumeCount++;
01421 globalResumeCount = resumeCount;
01422 #endif
01423 DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
01424
01425 theLbdb->ResumeClients();
01426 if (balancing) {
01427
01428 CheckMigrationComplete();
01429 if (future_migrates_expected == 0 ||
01430 future_migrates_expected == future_migrates_completed) {
01431 CheckMigrationComplete();
01432 }
01433 }
01434 #endif
01435 }
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445 void CentralLB::CheckMigrationComplete()
01446 {
01447 #if CMK_LBDB_ON
01448 lbdone ++;
01449 if (lbdone == 2) {
01450 double end_lb_time = CkWallTimer();
01451 if (_lb_args.debug() && CkMyPe()==0) {
01452 CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
01453 lbname, CkMyPe(), step()-1, end_lb_time,
01454 end_lb_time-start_lb_time);
01455 }
01456
01457 theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
01458
01459 lbdone = 0;
01460 future_migrates_expected = -1;
01461 future_migrates_completed = 0;
01462
01463
01464 DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
01465
01466 LDOMHandle h;
01467 h.id.id.idx = 0;
01468 theLbdb->getLBDB()->DoneRegisteringObjects(h);
01469
01470
01471 if (!(_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
01472 theLbdb->nextLoadbalancer(seqno);
01473 }
01474 #endif
01475 }
01476
01477
01478 void CentralLB::removeCommDataOfDeletedObjs(LDStats* stats) {
01479 stats->makeCommHash();
01480
01481 CkVec<LDCommData> newCommData;
01482 newCommData.resize(stats->n_comm);
01483 int n_comm = 0;
01484 for (int i=0; i<stats->n_comm; i++) {
01485 LDCommData& cdata = stats->commData[i];
01486 switch (cdata.receiver.get_type()) {
01487 case LD_PROC_MSG:
01488 break;
01489 case LD_OBJ_MSG: {
01490 if (!cdata.from_proc()) {
01491 int sidx = stats->getSendHash(cdata);
01492 int ridx = stats->getRecvHash(cdata);
01493 if (sidx == -1 || ridx == -1) continue;
01494 }
01495 break;
01496 }
01497 case LD_OBJLIST_MSG: {
01498 int sidx = stats->getSendHash(cdata);
01499 if (sidx == -1) continue;
01500 int nobjs;
01501 LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
01502 for (int id=0; id<nobjs; id++) {
01503 int idx = stats->getHash(objs[id]);
01504 if (idx == -1)
01505 {
01506 objs[id] = objs[nobjs-1];
01507 id--;
01508 nobjs--;
01509 }
01510 }
01511 if(nobjs == 0) continue;
01512 cdata.receiver.dest.destObjs.len = nobjs;
01513 break;
01514 }
01515 }
01516
01517 stats->commData[n_comm] = cdata;
01518 n_comm++;
01519 }
01520
01521 stats->commData.resize(n_comm);
01522 stats->n_comm = n_comm;
01523 }
01524
01525 void CentralLB::preprocess(LDStats* stats)
01526 {
01527 if (_lb_args.ignoreBgLoad())
01528 stats->clearBgLoad();
01529
01530
01531 if (_lb_predict) FuturePredictor(statsData);
01532 }
01533
01534 void CentralLB::printStrategyStats(LBMigrateMsg *msg) {
01535 #if CMK_LBDB_ON
01536 envelope *env = UsrToEnv(msg);
01537
01538 double strat_end_time = CkWallTimer();
01539 double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
01540 CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
01541 lbname, CkMyPe(), (int)lbdbMemsize, (int)(useMem()/1000));
01542 CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, CkMyPe(), msg->n_moves, env->getTotalsize()/1024.0/1024.0);
01543 CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
01544 lbname, CkMyPe(), strat_end_time, strat_end_time-strat_start_time);
01545 #endif
01546 }
01547
01548
01549 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
01550 {
01551 #if CMK_LBDB_ON
01552 strat_start_time = CkWallTimer();
01553 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
01554 CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
01555
01556 work(stats);
01557
01558
01559 if ((_lb_args.debug()>2) && (CkMyPe() == cur_ld_balancer)) {
01560 CkPrintf("CharmLB> Obj Map:\n");
01561 for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
01562 CkPrintf("\n");
01563 }
01564
01565 if (concurrent) return NULL;
01566
01567 LBMigrateMsg *msg = createMigrateMsg(stats);
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580 double strat_end_time = CkWallTimer();
01581 theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
01582
01583 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer)) {
01584 printStrategyStats(msg);
01585 }
01586 return msg;
01587 #else
01588 return NULL;
01589 #endif
01590 }
01591
01592
01593
01594
01595
01596
01597 void CentralLB::changeFreq(int nFreq)
01598 {
01599 #ifdef TEMP_LDB
01600
01601
01602 {
01603
01604 {
01605 char newfreq[10];
01606 sprintf(newfreq,"%d",nFreq);
01607 cpufreq_sysfs_write(newfreq,CkMyPe()%physicalCoresPerNode);
01608
01609 }
01610 }
01611 #else
01612 CmiAbort("You should never call CentralLB::changeFreq without using the flag TEMP_LDB\n");
01613 #endif
01614
01615 }
01616
01617 void CentralLB::work(LDStats* stats)
01618 {
01619
01620 stats->print();
01621 }
01622
01623
01624 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
01625 {
01626 int i;
01627 CkVec<MigrateInfo*> migrateInfo;
01628 for (i=0; i<stats->n_objs; i++) {
01629 LDObjData &objData = stats->objData[i];
01630 int frompe = stats->from_proc[i];
01631 int tope = stats->to_proc[i];
01632 if (frompe != tope) {
01633
01634
01635 MigrateInfo *migrateMe = new MigrateInfo;
01636 migrateMe->obj = objData.handle;
01637 migrateMe->from_pe = frompe;
01638 migrateMe->to_pe = tope;
01639 migrateMe->async_arrival = objData.asyncArrival;
01640 migrateInfo.insertAtEnd(migrateMe);
01641 }
01642 }
01643
01644 int migrate_count=migrateInfo.length();
01645 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
01646 msg->n_moves = migrate_count;
01647 for(i=0; i < migrate_count; i++) {
01648 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
01649 msg->moves[i] = *item;
01650 delete item;
01651 migrateInfo[i] = 0;
01652 }
01653 return msg;
01654 }
01655
01656 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
01657 {
01658 int nmoves = 0;
01659 int nunavail = 0;
01660 int i;
01661 for (i=0; i<m->n_moves; i++) {
01662 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
01663 if (item->from_pe == p || item->to_pe == p) nmoves++;
01664 }
01665 for (i=0; i<CkNumPes();i++) {
01666 if (!m->avail_vector[i]) nunavail++;
01667 }
01668 LBMigrateMsg* msg;
01669 if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
01670 else msg = new(nmoves,0,0,0) LBMigrateMsg;
01671 msg->n_moves = nmoves;
01672 msg->level = m->level;
01673 msg->next_lb = m->next_lb;
01674 for (i=0,nmoves=0; i<m->n_moves; i++) {
01675 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
01676 if (item->from_pe == p || item->to_pe == p) {
01677 msg->moves[nmoves] = *item;
01678 nmoves++;
01679 }
01680 }
01681
01682 if (nunavail)
01683 for (i=0; i<CkNumPes();i++) {
01684 msg->avail_vector[i] = m->avail_vector[i];
01685 msg->expectedLoad[i] = m->expectedLoad[i];
01686 }
01687 return msg;
01688 }
01689
01690 void CentralLB::simulationWrite() {
01691 if(step() == LBSimulation::dumpStep)
01692 {
01693
01694 int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
01695 char *dumpFileName = (char *)malloc(dumpFileSize);
01696 while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
01697 free(dumpFileName);
01698 dumpFileSize+=3;
01699 dumpFileName = (char *)malloc(dumpFileSize);
01700 }
01701 writeStatsMsgs(dumpFileName);
01702 free(dumpFileName);
01703 CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
01704 ++LBSimulation::dumpStep;
01705 --LBSimulation::dumpStepSize;
01706 if (LBSimulation::dumpStepSize <= 0) {
01707 CmiPrintf("Charm++> Exiting...\n");
01708 CkExit();
01709 }
01710 return;
01711 }
01712 }
01713
01714 void CentralLB::simulationRead() {
01715 if (concurrent) CkAbort("Error: LB simulation not supported in concurrent mode");
01716 LBSimulation *simResults = NULL, *realResults;
01717 LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
01718 voidMessage->n_moves=0;
01719 for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
01720
01721 int simFileSize = strlen(LBSimulation::dumpFile) + 4;
01722 char *simFileName = (char *)malloc(simFileSize);
01723 while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
01724 free(simFileName);
01725 simFileSize+=3;
01726 simFileName = (char *)malloc(simFileSize);
01727 }
01728 readStatsMsgs(simFileName);
01729
01730
01731 if (simResults == NULL) {
01732 simResults = new LBSimulation(LBSimulation::simProcs);
01733 realResults = new LBSimulation(LBSimulation::simProcs);
01734 }
01735 else {
01736
01737 if (!LBSimulation::procsChanged) {
01738
01739
01740
01741 realResults->reset();
01742
01743 for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
01744 findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
01745 simResults->PrintDifferences(realResults,statsData);
01746 }
01747 simResults->reset();
01748 }
01749
01750
01751 double startT = CkWallTimer();
01752 preprocess(statsData);
01753 CmiPrintf("%s> Strategy starts ... \n", lbname);
01754 LBMigrateMsg* migrateMsg = Strategy(statsData);
01755 CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
01756 lbname, CkWallTimer()-startT, (int)(useMem()/1000));
01757
01758
01759 findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
01760
01761
01762 CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
01763
01764 if (LBSimulation::showDecisionsOnly) {
01765 simResults->PrintDecisions(migrateMsg, simFileName,
01766 LBSimulation::simProcs);
01767 } else {
01768 simResults->PrintSimulationResults();
01769 }
01770
01771 free(simFileName);
01772 delete migrateMsg;
01773 CmiPrintf("Charm++> LBSim: Passing to the next step\n");
01774 }
01775
01776 delete simResults;
01777 CmiPrintf("Charm++> Exiting...\n");
01778 CkExit();
01779 }
01780
01781 void CentralLB::readStatsMsgs(const char* filename)
01782 {
01783 #if CMK_LBDB_ON
01784 int i;
01785 FILE *f = fopen(filename, "r");
01786 if (f==NULL) {
01787 CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
01788 CmiAbort("");
01789 }
01790
01791
01792
01793
01794 if (statsMsgsList) {
01795 for(i = 0; i < stats_msg_count; i++)
01796 delete statsMsgsList[i];
01797 delete[] statsMsgsList;
01798 statsMsgsList=0;
01799 }
01800
01801 PUP::fromDisk pd(f);
01802 PUP::machineInfo machInfo;
01803
01804 pd((char *)&machInfo, sizeof(machInfo));
01805 PUP::xlater p(machInfo, pd);
01806
01807 if (_lb_args.lbversion() > 1) {
01808 p|_lb_args.lbversion();
01809 CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
01810 CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
01811 }
01812 p|stats_msg_count;
01813
01814 CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
01815 if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
01816 if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
01817
01818
01819 statsData->pup(p);
01820
01821 CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
01822 CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
01823
01824
01825 CmiPrintf("ReadStatsMsg from %s completed\n", filename);
01826 #endif
01827 }
01828
01829 void CentralLB::writeStatsMsgs(const char* filename)
01830 {
01831 #if CMK_LBDB_ON
01832 FILE *f = fopen(filename, "w");
01833 if (f==NULL) {
01834 CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
01835 CmiAbort("");
01836 }
01837
01838 const PUP::machineInfo &machInfo = PUP::machineInfo::current();
01839 PUP::toDisk p(f);
01840 p((char *)&machInfo, sizeof(machInfo));
01841
01842 p|_lb_args.lbversion();
01843 p|stats_msg_count;
01844 statsData->pup(p);
01845
01846 fclose(f);
01847
01848 CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
01849 #endif
01850 }
01851
01852
01853
01854 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
01855 LBMigrateMsg *msg, LBInfo &info,
01856 int considerComm)
01857 {
01858 #if CMK_LBDB_ON
01859 stats->makeCommHash();
01860
01861
01862 for(int i = 0; i < msg->n_moves; i++) {
01863 MigrateInfo &mInfo = msg->moves[i];
01864 int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
01865 CmiAssert(idx != -1);
01866 stats->to_proc[idx] = mInfo.to_pe;
01867 }
01868
01869 info.getInfo(stats, count, considerComm);
01870 #endif
01871 }
01872
01873
01874 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
01875 {
01876 CkAssert(simResults != NULL && count == simResults->numPes);
01877
01878
01879 double startT = CkWallTimer();
01880 getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
01881 CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
01882 }
01883
01884 void CentralLB::pup(PUP::er &p) {
01885 if (p.isUnpacking()) {
01886 initLB(CkLBOptions(seqno));
01887 }
01888 p|reduction_started;
01889 int has_statsMsg=0;
01890 if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
01891 p|has_statsMsg;
01892 if (has_statsMsg) {
01893 if (p.isUnpacking())
01894 statsMsg = new CLBStatsMsg;
01895 statsMsg->pup(p);
01896 }
01897 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01898 p | lbDecisionCount;
01899 p | resumeCount;
01900 #endif
01901 p | use_thread;
01902 }
01903
01904 int CentralLB::useMem() {
01905 return sizeof(CentralLB) + statsData->useMem() +
01906 CkNumPes() * sizeof(CLBStatsMsg *);
01907 }
01908
01909
01916 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
01917 n_objs = osz;
01918 n_comm = csz;
01919 objData = new LDObjData[osz];
01920 commData = new LDCommData[csz];
01921 avail_vector = NULL;
01922 }
01923
01924 CLBStatsMsg::~CLBStatsMsg() {
01925 delete [] objData;
01926 delete [] commData;
01927 delete [] avail_vector;
01928 }
01929
01930 void CLBStatsMsg::pup(PUP::er &p) {
01931 int i;
01932 p|from_pe;
01933 p|pe_speed;
01934 p|total_walltime;
01935 p|idletime;
01936 #if defined(TEMP_LDB)
01937 p|pe_temp;
01938 #endif
01939
01940 p|bg_walltime;
01941 #if CMK_LB_CPUTIMER
01942 p|total_cputime;
01943 p|bg_cputime;
01944 #endif
01945 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01946 p | step;
01947 #endif
01948 p|n_objs;
01949 if (p.isUnpacking()) objData = new LDObjData[n_objs];
01950 for (i=0; i<n_objs; i++) p|objData[i];
01951 p|n_comm;
01952 if (p.isUnpacking()) commData = new LDCommData[n_comm];
01953 for (i=0; i<n_comm; i++) p|commData[i];
01954
01955 int has_avail_vector;
01956 if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
01957 p|has_avail_vector;
01958 if (p.isUnpacking()) {
01959 if (has_avail_vector) avail_vector = new char[CkNumPes()];
01960 else avail_vector = NULL;
01961 }
01962 if (has_avail_vector) p(avail_vector, CkNumPes());
01963
01964 p(next_lb);
01965 }
01966
01967
01968
01969
01970
01971 void CkMarshalledCLBStatsMessage::free() {
01972 int count = msgs.size();
01973 for (int i=0; i<count; i++) {
01974 delete msgs[i];
01975 msgs[i] = NULL;
01976 }
01977 msgs.clear();
01978 }
01979
01980 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &&m)
01981 {
01982 int count = m.getCount();
01983 for (int i=0; i<count; i++) add(m.getMessage(i));
01984 }
01985
01986 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
01987 {
01988 int count = msgs.size();
01989 p|count;
01990 for (int i=0; i<count; i++) {
01991 CLBStatsMsg *msg;
01992 if (p.isUnpacking()) msg = new CLBStatsMsg;
01993 else {
01994 msg = msgs[i]; CmiAssert(msg!=NULL);
01995 }
01996 msg->pup(p);
01997 if (p.isUnpacking()) add(msg);
01998 }
01999 }
02000
02001 SpanningTree::SpanningTree()
02002 {
02003 double sq = sqrt(CkNumPes()*4.0-3.0) - 1;
02004 arity = (int)ceil(sq/2);
02005 calcParent(CkMyPe());
02006 calcNumChildren(CkMyPe());
02007 }
02008
02009 void SpanningTree::calcParent(int n)
02010 {
02011 parent=-1;
02012 if(n != 0 && arity > 0)
02013 parent = (n-1)/arity;
02014 }
02015
02016 void SpanningTree::calcNumChildren(int n)
02017 {
02018 numChildren = 0;
02019 if (arity == 0) return;
02020 int fullNode=(CkNumPes()-1-arity)/arity;
02021 if(n <= fullNode)
02022 numChildren = arity;
02023 if(n == fullNode+1)
02024 numChildren = CkNumPes()-1-(fullNode+1)*arity;
02025 if(n > fullNode+1)
02026 numChildren = 0;
02027 }
02028
02029 #include "CentralLB.def.h"
02030