00001
00005
00006 #include <math.h>
00007 #include "HbmLB.h"
00008 #include "LBDBManager.h"
00009 #include "GreedyLB.h"
00010 #include "GreedyCommLB.h"
00011 #include "RefineCommLB.h"
00012 #include "RefineLB.h"
00013
00014 #define DEBUGF(x) // CmiPrintf x;
00015
00016 CreateLBFunc_Def(HbmLB, "HybridBase load balancer")
00017
00018 void HbmLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00019 {
00020 HbmLB *me = (HbmLB*)(data);
00021
00022 me->Migrated(h, waitBarrier);
00023 }
00024
00025 void HbmLB::staticAtSync(void* data)
00026 {
00027 HbmLB *me = (HbmLB*)(data);
00028
00029 me->AtSync();
00030 }
00031
00032 HbmLB::HbmLB(const CkLBOptions &opt): CBase_HbmLB(opt)
00033 {
00034 #if CMK_LBDB_ON
00035 lbname = (char *)"HbmLB";
00036 thisProxy = CProxy_HbmLB(thisgroup);
00037 receiver = theLbdb->
00038 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
00039 (void*)(this));
00040 notifier = theLbdb->getLBDB()->
00041 NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
00042
00043
00044 tree = new HypercubeTree;
00045
00046 currentLevel = 0;
00047 foundNeighbors = 0;
00048
00049 maxLoad = 0.0;
00050 vector_n_moves = 0;
00051 maxLoad = 0.0;
00052 maxCpuLoad = 0.0;
00053 totalLoad = 0.0;
00054 maxCommCount = 0;
00055 maxCommBytes = 0.0;
00056
00057 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
00058 #endif
00059 }
00060
00061 HbmLB::~HbmLB()
00062 {
00063 #if CMK_LBDB_ON
00064 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00065 if (theLbdb) {
00066 theLbdb->getLBDB()->
00067 RemoveNotifyMigrated(notifier);
00068
00069
00070 }
00071 delete tree;
00072 #endif
00073 }
00074
00075
00076 void HbmLB::FindNeighbors()
00077 {
00078 if (foundNeighbors == 0) {
00079
00080
00081
00082 int nlevels = tree->numLevels();
00083 int mype = CkMyPe();
00084 for (int level=0; level<nlevels; level++)
00085 {
00086 LevelData *data = new LevelData;
00087 data->parent = tree->parent(mype, level);
00088 if (tree->isroot(mype, level)) {
00089 data->nChildren = tree->numChildren(mype, level);
00090 data->children = new int[data->nChildren];
00091 tree->getChildren(mype, level, data->children, data->nChildren);
00092 data->statsData = new LDStats(data->nChildren+1);
00093
00094 ProcStats &procStat = data->statsData->procs[data->nChildren];
00095 procStat.available = false;
00096 }
00097 levelData.push_back(data);
00098 DEBUGF(("[%d] level: %d nchildren:%d - %d %d\n", CkMyPe(), level, data->nChildren, data->nChildren>0?data->children[0]:-1, data->nChildren>1?data->children[1]:-1));
00099 }
00100
00101 foundNeighbors = 1;
00102 }
00103 }
00104
00105 void HbmLB::AtSync()
00106 {
00107 #if CMK_LBDB_ON
00108
00109
00110 FindNeighbors();
00111
00112
00113 if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
00114 MigrationDone(0);
00115 return;
00116 }
00117
00118 thisProxy[CkMyPe()].ProcessAtSync();
00119 #endif
00120 }
00121
00122 void HbmLB::ProcessAtSync()
00123 {
00124 #if CMK_LBDB_ON
00125 int i;
00126 start_lb_time = 0;
00127
00128 if (CkMyPe() == 0) {
00129 start_lb_time = CkWallTimer();
00130 if (_lb_args.debug())
00131 CkPrintf("[%s] Load balancing step %d starting at %f\n",
00132 lbName(), step(), CkWallTimer());
00133 }
00134
00135
00136 LBRealType total_walltime, total_cputime, idletime, bg_walltime, bg_cputime;
00137 theLbdb->TotalTime(&total_walltime,&total_cputime);
00138 theLbdb->IdleTime(&idletime);
00139 theLbdb->BackgroundLoad(&bg_walltime,&bg_cputime);
00140
00141 myStats.n_objs = theLbdb->GetObjDataSz();
00142 myStats.objData.resize(myStats.n_objs);
00143 myStats.from_proc.resize(myStats.n_objs);
00144 myStats.to_proc.resize(myStats.n_objs);
00145 theLbdb->GetObjData(myStats.objData.getVec());
00146 for (i=0; i<myStats.n_objs; i++)
00147 myStats.from_proc[i] = myStats.to_proc[i] = 0;
00148
00149 myStats.n_comm = theLbdb->GetCommDataSz();
00150 myStats.commData.resize(myStats.n_comm);
00151 theLbdb->GetCommData(myStats.commData.getVec());
00152
00153 myStats.complete_flag = 0;
00154
00155
00156 DEBUGF(("[%d] Send stats to parent %d\n", CkMyPe(), levelData[0]->parent));
00157 double tload = 0.0;
00158 for (i=0; i<myStats.n_objs; i++) tload += myStats.objData[i].wallTime;
00159 thisProxy[levelData[0]->parent].ReceiveStats(tload, CkMyPe(), 0);
00160 #endif
00161 }
00162
00163 void HbmLB::ReceiveStats(double t, int frompe, int fromlevel)
00164 {
00165 #if CMK_LBDB_ON
00166 FindNeighbors();
00167
00168 int atlevel = fromlevel + 1;
00169 CmiAssert(tree->isroot(CkMyPe(), atlevel));
00170
00171 DEBUGF(("[%d] ReceiveStats from PE %d from level: %d\n", CkMyPe(), frompe, fromlevel));
00172 int neighborIdx = NeighborIndex(frompe, atlevel);
00173 CmiAssert(neighborIdx==0 || neighborIdx==1);
00174 LevelData *lData = levelData[atlevel];
00175 lData->statsList[neighborIdx] = t;
00176
00177 int &stats_msg_count = levelData[atlevel]->stats_msg_count;
00178 stats_msg_count ++;
00179
00180 DEBUGF(("[%d] ReceiveStats at level: %d %d/%d\n", CkMyPe(), atlevel, stats_msg_count, levelData[atlevel]->nChildren));
00181 if (stats_msg_count == levelData[atlevel]->nChildren)
00182 {
00183 stats_msg_count = 0;
00184 int parent = levelData[atlevel]->parent;
00185
00186
00187 thisProxy[CkMyPe()].Loadbalancing(atlevel);
00188 }
00189
00190 #endif
00191 }
00192
00193
00194 inline double myabs(double x) { return x>0.0?x:-x; }
00195 inline double mymax(double x, double y) { return x>y?x:y; }
00196
00197
00198
00199 void HbmLB::Loadbalancing(int atlevel)
00200 {
00201
00202 CmiAssert(atlevel >= 1);
00203
00204 LevelData *lData = levelData[atlevel];
00205 LDStats *statsData = lData->statsData;
00206 CmiAssert(statsData);
00207
00208
00209
00210
00211
00212 if (_lb_args.ignoreBgLoad()) statsData->clearBgLoad();
00213
00214 currentLevel = atlevel;
00215
00216 double start_lb_time(CkWallTimer());
00217
00218 double lload = lData->statsList[0];
00219 double rload = lData->statsList[1];
00220
00221 double diff = myabs(lload-rload);
00222 double maxl = mymax(lload, rload);
00223 double avg = (lload+rload)/2.0;
00224 CkPrintf("[%d] lload: %f rload: %f atlevel: %d\n", CkMyPe(), lload, rload, atlevel);
00225 if (diff/avg > 0.02) {
00226
00227 int numpes = (int)pow(2.0, atlevel);
00228 double delta = myabs(lload-rload) / numpes;
00229
00230 int overloaded = lData->children[0];
00231 if (lload < rload) {
00232 overloaded = lData->children[1];
00233 }
00234 DEBUGF(("[%d] branch %d is overloaded by %f... \n", CkMyPe(), overloaded, delta));
00235 thisProxy[overloaded].ReceiveMigrationDelta(delta, atlevel, atlevel);
00236 }
00237 else {
00238 LoadbalancingDone(atlevel);
00239 }
00240 }
00241
00242
00243 void HbmLB::LoadbalancingDone(int atlevel)
00244 {
00245 LevelData *lData = levelData[atlevel];
00246 DEBUGF(("[%d] LoadbalancingDone at level: %d\n", CkMyPe(), atlevel));
00247 if (lData->parent != -1) {
00248
00249 double lload = lData->statsList[0];
00250 double rload = lData->statsList[1];
00251 double totalLoad = lload + rload;
00252 thisProxy[lData->parent].ReceiveStats(totalLoad, CkMyPe(), atlevel);
00253 }
00254 else {
00255
00256
00257 thisProxy.ReceiveResumeClients(1, tree->numLevels()-1);
00258 }
00259 }
00260
00261 void HbmLB::ReceiveResumeClients(int balancing, int fromlevel){
00262 #if 0
00263 int atlevel = fromlevel-1;
00264 LevelData *lData = levelData[atlevel];
00265 if (atlevel != 0)
00266 thisProxy.ReceiveResumeClients(balancing, atlevel, lData->nChildren, lData->children);
00267 else
00268 ResumeClients(balancing);
00269 #else
00270 ResumeClients(balancing);
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 #endif
00283 }
00284
00285
00286 void HbmLB::ReceiveMigrationDelta(double t, int lblevel, int fromlevel)
00287 {
00288 #if CMK_LBDB_ON
00289 int i;
00290 int atlevel = fromlevel-1;
00291 LevelData *lData = levelData[atlevel];
00292 if (atlevel != 0) {
00293 thisProxy.ReceiveMigrationDelta(t, lblevel, atlevel, lData->nChildren, lData->children);
00294 return;
00295 }
00296
00297
00298
00299 CkVec<int> migs;
00300 CkVec<LDObjData> &objData = myStats.objData;
00301 for (i=0; i<myStats.n_objs; i++) {
00302 LDObjData &oData = objData[i];
00303 if (oData.wallTime < t) {
00304 migs.push_back(i);
00305 t -= oData.wallTime;
00306 if (t == 0.0) break;
00307 }
00308 }
00309
00310 int nmigs = migs.size();
00311
00312 int matchPE = CkMyPe() ^ (1<<(lblevel-1));
00313
00314 DEBUGF(("[%d] migrating %d objs to %d at lblevel %d! \n", CkMyPe(),nmigs,matchPE,lblevel));
00315 thisProxy[matchPE].ReceiveMigrationCount(nmigs, lblevel);
00316
00317
00318 for (i=0; i<nmigs; i++) {
00319 int idx = migs[i]-i;
00320 LDObjData &oData = objData[idx];
00321 CkVec<LDCommData> comms;
00322 collectCommData(idx, comms);
00323 thisProxy[matchPE].ObjMigrated(oData, comms.getVec(), comms.size());
00324 theLbdb->Migrate(oData.handle, matchPE);
00325
00326 DEBUGF(("myStats.removeObject: %d, %d, %d\n", migs[i], i, objData.size()));
00327 myStats.removeObject(idx);
00328 }
00329 #endif
00330 }
00331
00332
00333 void HbmLB::collectCommData(int objIdx, CkVec<LDCommData> &comms)
00334 {
00335 #if CMK_LBDB_ON
00336 LevelData *lData = levelData[0];
00337
00338 LDObjData &objData = myStats.objData[objIdx];
00339
00340 for (int com=0; com<myStats.n_comm; com++) {
00341 LDCommData &cdata = myStats.commData[com];
00342 if (cdata.from_proc()) continue;
00343 if (cdata.sender.objID() == objData.objID() && cdata.sender.omID() == objData.omID())
00344 comms.push_back(cdata);
00345 }
00346 #endif
00347 }
00348
00349
00350
00351 void HbmLB::ObjMigrated(LDObjData data, LDCommData *cdata, int n)
00352 {
00353 LevelData *lData = levelData[0];
00354
00355 CkVec<LDObjData> &oData = myStats.objData;
00356
00357
00358 lData->obj_completed++;
00359 data.handle.handle = -100;
00360 oData.push_back(data);
00361 myStats.n_objs++;
00362 if (data.migratable) myStats.n_migrateobjs++;
00363 myStats.from_proc.push_back(-1);
00364 myStats.to_proc.push_back(0);
00365
00366
00367 if (n) {
00368 CkVec<LDCommData> &cData = myStats.commData;
00369 for (int i=0; i<n; i++)
00370 cData.push_back(cdata[i]);
00371 myStats.n_comm += n;
00372 myStats.deleteCommHash();
00373 }
00374
00375 if (lData->migrationDone()) {
00376
00377 MigrationDone(1);
00378 }
00379 }
00380
00381 void HbmLB::ReceiveMigrationCount(int count, int lblevel)
00382 {
00383 lbLevel = lblevel;
00384
00385 LevelData *lData = levelData[0];
00386 lData->migrates_expected = count;
00387 if (lData->migrationDone()) {
00388
00389 MigrationDone(1);
00390 }
00391 }
00392
00393 void HbmLB::Migrated(LDObjHandle h, int waitBarrier)
00394 {
00395 LevelData *lData = levelData[0];
00396
00397 lData->migrates_completed++;
00398 newObjs.push_back(h);
00399 DEBUGF(("[%d] An object migrated! %d %d\n", CkMyPe(),lData->migrates_completed,lData->migrates_expected));
00400 if (lData->migrationDone()) {
00401
00402 MigrationDone(1);
00403 }
00404 }
00405
00406 void HbmLB::NotifyObjectMigrationDone(int fromlevel, int lblevel)
00407 {
00408
00409 int atlevel = fromlevel + 1;
00410 LevelData *lData = levelData[atlevel];
00411
00412 lData->mig_reported ++;
00413 DEBUGF(("[%d] HbmLB::NotifyObjectMigrationDone at level: %d lblevel: %d reported: %d!\n", CkMyPe(), atlevel, lblevel, lData->mig_reported));
00414 if (atlevel < lblevel) {
00415 if (lData->mig_reported == lData->nChildren) {
00416 lData->mig_reported = 0;
00417 thisProxy[lData->parent].NotifyObjectMigrationDone(atlevel, lbLevel);
00418 }
00419 }
00420 else {
00421 if (lData->mig_reported == lData->nChildren/2) {
00422 lData->mig_reported = 0;
00423
00424 LoadbalancingDone(atlevel);
00425 }
00426 }
00427 }
00428
00429
00430 void HbmLB::MigrationDone(int balancing)
00431 {
00432 #if CMK_LBDB_ON
00433 int i, j;
00434 LevelData *lData = levelData[0];
00435
00436 DEBUGF(("[%d] HbmLB::MigrationDone lbLevel:%d numLevels:%d!\n", CkMyPe(), lbLevel, tree->numLevels()));
00437
00438 CmiAssert(newObjs.size() == lData->migrates_expected);
00439
00440 #if 0
00441 if (lbLevel == tree->numLevels()-1) {
00442 theLbdb->incStep();
00443
00444 lData->clear();
00445 }
00446 else {
00447 lData->migrates_expected = -1;
00448 lData->migrates_completed = 0;
00449 lData->obj_completed = 0;
00450 }
00451 #else
00452 lData->migrates_expected = -1;
00453 lData->migrates_completed = 0;
00454 lData->obj_completed = 0;
00455 #endif
00456
00457 CkVec<LDObjData> &oData = myStats.objData;
00458
00459
00460 int count=0;
00461 for (i=0; i<oData.size(); i++)
00462 if (oData[i].handle.handle == -100) count++;
00463 CmiAssert(count == newObjs.size());
00464
00465 for (i=0; i<oData.size(); i++) {
00466 if (oData[i].handle.handle == -100) {
00467 LDObjHandle &handle = oData[i].handle;
00468 for (j=0; j<newObjs.size(); j++) {
00469 if (handle.omID() == newObjs[j].omID() &&
00470 handle.objID() == newObjs[j].objID()) {
00471 handle = newObjs[j];
00472 break;
00473 }
00474 }
00475 CmiAssert(j<newObjs.size());
00476 }
00477 }
00478 newObjs.free();
00479
00480 thisProxy[lData->parent].NotifyObjectMigrationDone(0, lbLevel);
00481 #endif
00482 }
00483
00484 void HbmLB::ResumeClients(double result)
00485 {
00486 if (CkMyPe() == 0 && _lb_args.printSummary()) {
00487 double mload = result;
00488 CkPrintf("[%d] MAX Load: %f at step %d.\n", CkMyPe(), mload, step()-1);
00489 }
00490 ResumeClients(1);
00491 }
00492
00493 void HbmLB::ResumeClients(int balancing)
00494 {
00495 #if CMK_LBDB_ON
00496 DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
00497
00498 theLbdb->incStep();
00499
00500 LevelData *lData = levelData[0];
00501 lData->clear();
00502
00503 if (CkMyPe() == 0 && balancing) {
00504 double end_lb_time = CkWallTimer();
00505 if (_lb_args.debug())
00506 CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
00507 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
00508 }
00509 if (balancing && _lb_args.printSummary()) {
00510 int count = 1;
00511 LBInfo info(count);
00512 LDStats *stats = &myStats;
00513 info.getInfo(stats, count, 0);
00514 LBRealType mLoad, mCpuLoad, totalLoad;
00515 info.getSummary(mLoad, mCpuLoad, totalLoad);
00516 int nmsgs, nbytes;
00517 stats->computeNonlocalComm(nmsgs, nbytes);
00518 CkPrintf("[%d] Load with %d objs: max (with comm): %f max (obj only): %f total: %f on %d processors at step %d useMem: %fKB nonlocal: %d %.2fKB.\n", CkMyPe(), stats->n_objs, mLoad, mCpuLoad, totalLoad, count, step()-1, (1.0*useMem())/1024, nmsgs, nbytes/1024.0);
00519 thisProxy[0].reportLBQulity(mLoad, mCpuLoad, totalLoad, nmsgs, 1.0*nbytes/1024.0);
00520 }
00521
00522
00523 theLbdb->ClearLoads();
00524
00525 theLbdb->ResumeClients();
00526 #endif
00527 }
00528
00529
00530 void HbmLB::reportLBQulity(double mload, double mCpuLoad, double totalload, int nmsgs, double bytes)
00531 {
00532 static int pecount=0;
00533 CmiAssert(CkMyPe() == 0);
00534 if (mload > maxLoad) maxLoad = mload;
00535 if (mCpuLoad > maxCpuLoad) maxCpuLoad = mCpuLoad;
00536 totalLoad += totalload;
00537 maxCommCount += nmsgs;
00538 maxCommBytes += bytes;
00539 pecount++;
00540 if (pecount == CkNumPes()) {
00541 CkPrintf("[%d] Load Summary: max (with comm): %f max (obj only): %f total: %f at step %d nonlocal: %d msgs, %.2fKB reported from %d PEs.\n", CkMyPe(), maxLoad, maxCpuLoad, totalLoad, step(), maxCommCount, maxCommBytes, pecount);
00542 maxLoad = 0.0;
00543 maxCpuLoad = 0.0;
00544 totalLoad = 0.0;
00545 maxCommCount = 0;
00546 maxCommBytes = 0.0;
00547 pecount = 0;
00548 }
00549 }
00550
00551 void HbmLB::work(LDStats* stats)
00552 {
00553 #if CMK_LBDB_ON
00554 CkPrintf("[%d] HbmLB::work called!\n", CkMyPe());
00555 #endif
00556 }
00557
00558 int HbmLB::NeighborIndex(int pe, int atlevel)
00559 {
00560 int peslot = -1;
00561 for(int i=0; i < levelData[atlevel]->nChildren; i++) {
00562 if (pe == levelData[atlevel]->children[i]) {
00563 peslot = i;
00564 break;
00565 }
00566 }
00567 return peslot;
00568 }
00569
00570 int HbmLB::useMem()
00571 {
00572 int i;
00573 int memused = 0;
00574 for (i=0; i<levelData.size(); i++)
00575 if (levelData[i]) memused+=levelData[i]->useMem();
00576 return memused;
00577 }
00578
00579 #include "HbmLB.def.h"
00580