00001
00013 #include "HierarchicalLB.h"
00014 #include "HierarchicalLBTypes.h"
00015
00016 #include "elements.h"
00017
00018 CreateLBFunc_Def(
00019 HierarchicalLB, "The scalable hierarchical greedy load balancer"
00020 )
00021
00022 #define HIER_LB_THRESHOLD 1.005
00023 #define HIER_LB_NARY 8
00024 #define HIER_LB_ROOT 0
00025 #define HIER_LB_BIN_SIZE 10
00026
00027 #if DEBUG_HIER_LB_ON
00028 #define DEBUG_HIER_LB(...) CkPrintf(__VA_ARGS__)
00029 #else
00030 #define DEBUG_HIER_LB(...)
00031 #endif
00032
00033 HierarchicalLB::HierarchicalLB(CkMigrateMessage *m)
00034 : CBase_HierarchicalLB(m)
00035 { }
00036
00037 HierarchicalLB::HierarchicalLB(const CkLBOptions &opt)
00038 : CBase_HierarchicalLB(opt)
00039 {
00040 lbname = "HierarchicalLB";
00041 if (CkMyPe() == 0)
00042 CkPrintf("[%d] HierarchicalLB created\n",CkMyPe());
00043 InitLB(opt);
00044 }
00045
00046 void HierarchicalLB::turnOn()
00047 {
00048 #if CMK_LBDB_ON
00049 theLbdb->getLBDB()->
00050 TurnOnBarrierReceiver(receiver);
00051 theLbdb->getLBDB()->
00052 TurnOnNotifyMigrated(notifier);
00053 theLbdb->getLBDB()->
00054 TurnOnStartLBFn(startLbFnHdl);
00055 #endif
00056 }
00057
00058 void HierarchicalLB::turnOff()
00059 {
00060 #if CMK_LBDB_ON
00061 theLbdb->getLBDB()->
00062 TurnOffBarrierReceiver(receiver);
00063 theLbdb->getLBDB()->
00064 TurnOffNotifyMigrated(notifier);
00065 theLbdb->getLBDB()->
00066 TurnOffStartLBFn(startLbFnHdl);
00067 #endif
00068 }
00069
00070 void HierarchicalLB::InitLB(const CkLBOptions &opt) {
00071 thisProxy = CProxy_HierarchicalLB(thisgroup);
00072 if (opt.getSeqNo() > 0) turnOff();
00073 }
00074
00075 int
00076 HierarchicalLB::histogram_time_sample(double const& time_milli) {
00077 int const bin_size = HIER_LB_BIN_SIZE;
00078 int const bin = (
00079 (static_cast<int>(time_milli)) / bin_size * bin_size
00080 ) + bin_size;
00081 return bin;
00082 }
00083
00084
00085 HierarchicalLB::hier_objid_t
00086 HierarchicalLB::convert_to_hier_objid(int const& obj_id, int const& pe) {
00087 hier_objid_t pe_conv = (hier_objid_t)pe;
00088 return ((pe_conv << 32) | (hier_objid_t)obj_id);
00089 }
00090
00091
00092 int
00093 HierarchicalLB::hier_objid_get_pe(hier_objid_t const& id) {
00094 return (int)(id >> 32);
00095 }
00096
00097
00098 int
00099 HierarchicalLB::hier_objid_get_id(hier_objid_t const& id) {
00100 return (int)(id);
00101 }
00102
00103 void
00104 HierarchicalLB::Strategy(const DistBaseLB::LDStats* const stats) {
00105 if (CkMyPe() == 0) {
00106 CkPrintf("[%d] In HierarchicalLB strategy\n", CkMyPe());
00107 }
00108
00109 my_stats = stats;
00110
00111 my_load = 0.0;
00112 for (int i = 0; i < my_stats->n_objs; i++) {
00113 auto const& time = my_stats->objData[i].wallTime;
00114
00115 auto time_milli = time * 1000;
00116 auto bin = histogram_time_sample(time_milli);
00117
00118
00119 my_load += time_milli;
00120
00121 if (my_stats->objData[i].migratable) {
00122 hier_objid_t obj_id = HierarchicalLB::convert_to_hier_objid(i, CkMyPe());
00123 obj_sample[bin].push_back(obj_id);
00124 }
00125
00126
00127
00128
00129
00130
00131
00132 }
00133
00134 DEBUG_HIER_LB(
00135 "%d: my_load=%f, n_objs=%d\n",
00136 CkMyPe(), my_load, my_stats->n_objs
00137 );
00138
00139 if (!tree_is_setup) {
00140 setup_tree();
00141 tree_is_setup = true;
00142 }
00143
00144
00145 CkCallback cb(CkReductionTarget(HierarchicalLB, avg_load_reduction), thisProxy);
00146 contribute(sizeof(double), &my_load, CkReduction::sum_double, cb);
00147 }
00148
00149 void
00150 HierarchicalLB::setup_tree() {
00151 CkAssert(
00152 tree_is_setup == false &&
00153 "Tree must not already be set up when is this called"
00154 );
00155
00156 rank = CkMyPe();
00157 nproc = CkNumPes();
00158
00159 for (int i = 0; i < HIER_LB_NARY; i++) {
00160 int const child = rank * HIER_LB_NARY + i + 1;
00161 if (child < nproc) {
00162 children[child] = new ChildLoadInfo();
00163 DEBUG_HIER_LB("\t%d: child = %d\n", rank, child);
00164 }
00165 }
00166
00167 if (children.size() == 0) {
00168 for (int i = 0; i < HIER_LB_NARY; i++) {
00169 int factorProcs = nproc / HIER_LB_NARY * HIER_LB_NARY;
00170 if (factorProcs < nproc) {
00171 factorProcs += HIER_LB_NARY;
00172 }
00173 int const child = (rank * HIER_LB_NARY + i + 1) - factorProcs - 1;
00174
00175 if (child < nproc && child >= 0) {
00176 children[child] = new ChildLoadInfo();
00177 children[child]->final_child = true;
00178 DEBUG_HIER_LB("\t%d: child-x = %d\n", rank, child);
00179 }
00180 }
00181 }
00182
00183 parent = (rank - 1) / HIER_LB_NARY;
00184
00185 int factorProcs = nproc / HIER_LB_NARY * HIER_LB_NARY;
00186 if (factorProcs < nproc) {
00187 factorProcs += HIER_LB_NARY;
00188 }
00189
00190 bottomParent = ((rank + 1 + factorProcs) - 1) / HIER_LB_NARY;
00191
00192 DEBUG_HIER_LB(
00193 "\t%d: parent=%d, bottomParent=%d, children.size()=%ld\n",
00194 rank, parent, bottomParent, children.size()
00195 );
00196 }
00197
00198 void
00199 HierarchicalLB::calc_load_over() {
00200 auto cur_item = obj_sample.begin();
00201 auto threshold = HIER_LB_THRESHOLD * avg_load;
00202
00203 DEBUG_HIER_LB(
00204 "%d: calc_load_over: my_load=%f, avg_load=%f, threshold=%f\n",
00205 rank, my_load, avg_load, threshold
00206 );
00207
00208 while (my_load > threshold && cur_item != obj_sample.end()) {
00209 if (cur_item->second.size() != 0) {
00210 auto const obj = cur_item->second.back();
00211
00212 load_over[cur_item->first].push_back(obj);
00213 cur_item->second.pop_back();
00214
00215 auto to_id = hier_objid_get_id(obj);
00216 auto const& obj_time_milli = my_stats->objData[to_id].wallTime * 1000;
00217
00218 my_load -= obj_time_milli;
00219
00220 DEBUG_HIER_LB(
00221 "%d: calc_load_over: my_load=%f, threshold=%f, adding unit, bin=%d\n",
00222 rank, my_load, threshold, cur_item->first
00223 );
00224 } else {
00225 cur_item++;
00226 }
00227 }
00228
00229 for (auto i = 0; i < obj_sample.size(); i++) {
00230 if (obj_sample[i].size() == 0) {
00231 obj_sample.erase(obj_sample.find(i));
00232 }
00233 }
00234 }
00235
00236 void
00237 HierarchicalLB::lb_tree_msg(
00238 double const child_load, int const child, cont_hier_objid_t load,
00239 int child_size
00240 ) {
00241 DEBUG_HIER_LB(
00242 "%d: lb_tree_msg: child=%d, child_load=%f, child_size=%d, "
00243 "child_msgs=%d, children.size()=%ld, agg_node_size=%d, "
00244 "avg_load=%f, child_avg=%f, incoming load.size=%ld\n",
00245 rank, child, child_load, child_size, child_msgs+1, children.size(),
00246 agg_node_size + child_size, avg_load, child_load/child_size,
00247 load.size()
00248 );
00249
00250 if (load.size() > 0) {
00251 for (auto& bin : load) {
00252
00253
00254 DEBUG_HIER_LB(
00255 "%d: \t lb_tree_msg: combining bins for bin=%d, size=%ld\n",
00256 rank, bin.first, bin.second.size()
00257 );
00258
00259 if (bin.second.size() > 0) {
00260
00261
00262 auto given_iter = given_objs.find(bin.first);
00263
00264 if (given_iter == given_objs.end()) {
00265
00266 given_objs.emplace(
00267 std::piecewise_construct,
00268 std::forward_as_tuple(bin.first),
00269 std::forward_as_tuple(cont_hier_bin_t{})
00270 );
00271
00272 given_iter = given_objs.find(bin.first);
00273
00274 CkAssert(
00275 given_iter != given_objs.end() &&
00276 "An insertion just took place so this must not fail"
00277 );
00278 }
00279
00280
00281 total_child_load += bin.first * bin.second.size();
00282
00283 given_iter->second.splice(
00284 given_iter->second.begin(), bin.second
00285 );
00286 }
00287 }
00288 }
00289
00290 agg_node_size += child_size;
00291
00292 auto child_iter = children.find(child);
00293
00294 CkAssert(
00295 child_iter != children.end() && "Entry must exist in children map"
00296 );
00297
00298 child_iter->second->node_size = child_size;
00299 child_iter->second->cur_load = child_load;
00300 child_iter->second->pe = child;
00301
00302 total_child_load += child_load;
00303
00304 child_msgs++;
00305
00306 if (child_size > 0 && child_load != 0.0) {
00307 auto live_iter = live_children.find(child);
00308 if (live_iter == live_children.end()) {
00309 live_children.emplace(
00310 std::piecewise_construct,
00311 std::forward_as_tuple(child),
00312 std::forward_as_tuple(child_iter->second)
00313 );
00314 }
00315 }
00316
00317 CkAssert(
00318 child_msgs <= children.size() &&
00319 "Number of children must be greater or less than"
00320 );
00321
00322 if (child_msgs == children.size()) {
00323 if (rank == HIER_LB_ROOT) {
00324 DEBUG_HIER_LB(
00325 "lb_tree_msg: %d: reached root!: total_load=%f, avg=%f\n",
00326 rank, total_child_load, total_child_load/agg_node_size
00327 );
00328 send_down_tree();
00329 } else {
00330 distribute_amoung_children();
00331 }
00332 }
00333 }
00334
00335 ChildLoadInfo*
00336 HierarchicalLB::find_min_child() {
00337 if (live_children.size() == 0) {
00338 return nullptr;
00339 }
00340
00341 ChildLoadInfo* cur = live_children.begin()->second;
00342
00343
00344 DEBUG_HIER_LB(
00345 "%d: find_min_child, cur.pe=%d, load=%f\n",
00346 rank, cur->pe, cur->cur_load
00347 );
00348
00349 for (auto&& c : live_children) {
00350 auto const& load = c.second->cur_load / c.second->node_size;
00351 auto const& cur_load = cur->cur_load / cur->node_size;
00352 if (load < cur_load || cur->node_size == 0) {
00353 cur = c.second;
00354 }
00355 }
00356
00357 return cur;
00358 }
00359
00360 void
00361 HierarchicalLB::down_tree_msg(
00362 int const& from, cont_hier_objid_t excess_load, bool final_child
00363 ) {
00364 DEBUG_HIER_LB(
00365 "%d: down_tree_msg: from=%d, bottomParent=%d: load=%ld\n",
00366 rank, from, bottomParent, excess_load.size()
00367 );
00368
00369 if (final_child) {
00370
00371 taken_objs = std::move(excess_load);
00372
00373 int total_taken_load = 0;
00374 for (auto&& item : taken_objs) {
00375 total_taken_load = item.first * item.second.size();
00376
00377 DEBUG_HIER_LB(
00378 "%d: down_tree_msg: from=%d, taken_bin=%d, taken_bin_count=%ld, "
00379 "total_taken_load=%d\n",
00380 rank, from, item.first, item.second.size(), total_taken_load
00381 );
00382 }
00383
00384 my_load += total_taken_load;
00385
00386 DEBUG_HIER_LB(
00387 "%d: down_tree_msg: new load profile=%f, total_taken_load=%d, "
00388 "avg_load=%f\n",
00389 rank, my_load, total_taken_load, avg_load
00390 );
00391 } else {
00392 given_objs = std::move(excess_load);
00393 send_down_tree();
00394 }
00395 }
00396
00397 void
00398 HierarchicalLB::send_down_tree() {
00399 DEBUG_HIER_LB("%d: send_down_tree\n", rank);
00400
00401 auto cIter = given_objs.rbegin();
00402
00403 while (cIter != given_objs.rend()) {
00404 ChildLoadInfo* c = find_min_child();
00405 int const weight = c->node_size;
00406 double const threshold = avg_load * weight * HIER_LB_THRESHOLD;
00407
00408 DEBUG_HIER_LB(
00409 "\t %d: distribute min child: c=%p, child=%d, cur_load=%f, "
00410 "weight=%d, avg_load=%f, threshold=%f\n",
00411 rank, c, c ? c->pe : -1, c ? c->cur_load : -1.0,
00412 weight, avg_load, threshold
00413 );
00414
00415 if (c == nullptr || weight == 0) {
00416 break;
00417 } else {
00418 if (cIter->second.size() != 0) {
00419 DEBUG_HIER_LB(
00420 "\t distribute: %d, child=%d, cur_load=%f, time=%d\n",
00421 rank, c->pe, c->cur_load, cIter->first
00422 );
00423
00424
00425 auto task = cIter->second.back();
00426 c->recs[cIter->first].push_back(task);
00427 c->cur_load += cIter->first;
00428
00429 cIter->second.pop_back();
00430 } else {
00431 cIter++;
00432 }
00433 }
00434 }
00435
00436 clear_obj_map(given_objs);
00437
00438 for (auto& c : children) {
00439
00440
00441
00442
00443 thisProxy[c.second->pe].down_tree_msg(
00444 CkMyPe(), c.second->recs, c.second->final_child
00445 );
00446 c.second->recs.clear();
00447
00448 }
00449 }
00450
00451 void
00452 HierarchicalLB::clear_obj_map(cont_hier_objid_t& objs) {
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463 std::vector<int> to_remove{};
00464 for (auto&& bin : objs) {
00465 if (bin.second.size() == 0) {
00466 to_remove.push_back(bin.first);
00467 }
00468 }
00469
00470 for (auto&& r : to_remove) {
00471 auto giter = objs.find(r);
00472 CkAssert(
00473 giter != objs.end() && "Must exist"
00474 );
00475 objs.erase(giter);
00476 }
00477 }
00478
00479 void
00480 HierarchicalLB::distribute_amoung_children() {
00481 DEBUG_HIER_LB("distribute_amoung_children: %d, parent=%d\n", rank, parent);
00482
00483 auto cIter = given_objs.rbegin();
00484
00485 while (cIter != given_objs.rend()) {
00486 ChildLoadInfo* c = find_min_child();
00487 int const weight = c->node_size;
00488 double const threshold = avg_load * weight * HIER_LB_THRESHOLD;
00489
00490 DEBUG_HIER_LB(
00491 "\t %d: distribute min child: c=%p, child=%d, cur_load=%f, "
00492 "weight=%d, avg_load=%f, threshold=%f\n",
00493 rank, c, c ? c->pe : -1, c ? c->cur_load : -1.0,
00494 weight, avg_load, threshold
00495 );
00496
00497 if (c == nullptr || c->cur_load > threshold || weight == 0) {
00498 break;
00499 } else {
00500 if (cIter->second.size() != 0) {
00501 DEBUG_HIER_LB(
00502 "\t distribute: %d, child=%d, cur_load=%f, time=%d\n",
00503 rank, c->pe, c->cur_load, cIter->first
00504 );
00505
00506
00507 auto task = cIter->second.back();
00508 c->recs[cIter->first].push_back(task);
00509 c->cur_load += cIter->first;
00510
00511 cIter->second.pop_back();
00512 } else {
00513 cIter++;
00514 }
00515 }
00516 }
00517
00518 clear_obj_map(given_objs);
00519
00520 thisProxy[parent].lb_tree_msg(
00521 total_child_load, rank, given_objs, agg_node_size
00522 );
00523
00524 given_objs.clear();
00525 }
00526
00527 void
00528 HierarchicalLB::avg_load_reduction(double x) {
00529 avg_load = x/CkNumPes();
00530
00531 thr_avg = HIER_LB_THRESHOLD * avg_load;
00532
00533 DEBUG_HIER_LB(
00534 "avg_load_reduction: %d: total=%f, avg_load=%f, thr_avg=%f\n",
00535 rank, x, avg_load, thr_avg
00536 );
00537
00538 calc_load_over();
00539
00540 thisProxy[bottomParent].lb_tree_msg(my_load, rank, load_over, 1);
00541
00542 if (children.size() == 0) {
00543 cont_hier_objid_t empty_obj{};
00544
00545 thisProxy[parent].lb_tree_msg(0.0, rank, empty_obj, agg_node_size);
00546 }
00547
00548
00549 if (CkMyPe() == 0) {
00550 CkCallback cb(CkIndex_HierarchicalLB::done_hier(), thisProxy);
00551 CkStartQD(cb);
00552 }
00553 }
00554
00555 void HierarchicalLB::done_hier() {
00556 DEBUG_HIER_LB(
00557 "%d: done_hier: given_objs.size()=%ld, taken_objs.size()=%ld\n",
00558 rank, given_objs.size(), taken_objs.size()
00559 );
00560
00561 LoadBalance();
00562 theLbdb->nextLoadbalancer(seqno);
00563 }
00564
00565 void
00566 HierarchicalLB::transfer_objects(
00567 int const& to_pe, std::vector<int> const& lst
00568 ) {
00569 auto trans_iter = transfers.find(to_pe);
00570
00571 CkAssert(
00572 trans_iter == transfers.end() &&
00573 "There must not be an entry"
00574 );
00575
00576 transfers[to_pe] = lst;
00577 transfer_count += lst.size();
00578 }
00579
00580 void
00581 HierarchicalLB::finished_transfer_requests() {
00582 LBMigrateMsg* msg = new (transfer_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00583 msg->n_moves = transfer_count;
00584
00585 int i = 0;
00586 for (auto&& t : transfers) {
00587 for (auto&& obj_id : t.second) {
00588 MigrateInfo mig;
00589 mig.obj = my_stats->objData[obj_id].handle;
00590 mig.from_pe = rank;
00591 mig.to_pe = t.first;
00592
00593 msg->moves[i] = std::move(mig);
00594 i++;
00595 }
00596 }
00597
00598 DEBUG_HIER_LB(
00599 "%d: finished_transfer_requests, i=%d\n", rank, i
00600 );
00601
00602 ProcessMigrationDecision(msg);
00603
00604
00605 transfers.clear();
00606 transfer_count = 0;
00607
00608 given_objs.clear();
00609 taken_objs.clear();
00610 obj_sample.clear();
00611 load_over.clear();
00612 my_load = avg_load = total_child_load = thr_avg = 0.0;
00613 agg_node_size = child_msgs = 0;
00614 }
00615
00616 void HierarchicalLB::LoadBalance() {
00617
00618
00619
00620
00621
00622 std::map<int, std::vector<int>> transfer_list;
00623
00624 for (auto&& bin : taken_objs) {
00625 for (auto&& obj_id : bin.second) {
00626 auto pe = hier_objid_get_pe(obj_id);
00627 auto id = hier_objid_get_id(obj_id);
00628
00629 if (pe != rank) {
00630 migrates_expected++;
00631
00632 DEBUG_HIER_LB(
00633 "%d: LoadBalance, obj_id=%ld, pe=%d, id=%d\n",
00634 rank, obj_id, pe, id
00635 );
00636
00637 transfer_list[pe].push_back(id);
00638 }
00639 }
00640 }
00641
00642 DEBUG_HIER_LB(
00643 "%d: LoadBalance, transfer_list=%ld\n",
00644 rank, transfer_list.size()
00645 );
00646
00647 for (auto&& trans : transfer_list) {
00648 thisProxy[trans.first].transfer_objects(rank, trans.second);
00649 }
00650
00651
00652 if (CkMyPe() == 0) {
00653 CkCallback cb(
00654 CkIndex_HierarchicalLB::finished_transfer_requests(), thisProxy
00655 );
00656 CkStartQD(cb);
00657 }
00658 }
00659
00660 #include "HierarchicalLB.def.h"