00001
00006 #include "DistributedLB.h"
00007
00008 #include "elements.h"
00009
00010 extern int quietModeRequested;
00011
00012 CreateLBFunc_Def(DistributedLB, "The distributed load balancer")
00013
00014 using std::vector;
00015
00016 DistributedLB::DistributedLB(CkMigrateMessage *m) : CBase_DistributedLB(m) {
00017 }
00018
00019 DistributedLB::DistributedLB(const CkLBOptions &opt) : CBase_DistributedLB(opt) {
00020 lbname = "DistributedLB";
00021 if (CkMyPe() == 0 && !quietModeRequested) {
00022 CkPrintf("CharmLB> DistributedLB created: threshold %lf, max phases %i\n",
00023 kTargetRatio, kMaxPhases);
00024 }
00025 InitLB(opt);
00026 }
00027
00028 void DistributedLB::initnodeFn()
00029 {
00030 _registerCommandLineOpt("+DistLBTargetRatio");
00031 _registerCommandLineOpt("+DistLBMaxPhases");
00032 }
00033
00034 void DistributedLB::turnOn()
00035 {
00036 #if CMK_LBDB_ON
00037 theLbdb->getLBDB()->
00038 TurnOnBarrierReceiver(receiver);
00039 theLbdb->getLBDB()->
00040 TurnOnNotifyMigrated(notifier);
00041 theLbdb->getLBDB()->
00042 TurnOnStartLBFn(startLbFnHdl);
00043 #endif
00044 }
00045
00046 void DistributedLB::turnOff()
00047 {
00048 #if CMK_LBDB_ON
00049 theLbdb->getLBDB()->
00050 TurnOffBarrierReceiver(receiver);
00051 theLbdb->getLBDB()->
00052 TurnOffNotifyMigrated(notifier);
00053 theLbdb->getLBDB()->
00054 TurnOffStartLBFn(startLbFnHdl);
00055 #endif
00056 }
00057
00058 void DistributedLB::InitLB(const CkLBOptions &opt) {
00059 thisProxy = CProxy_DistributedLB(thisgroup);
00060 if (opt.getSeqNo() > 0 || (_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
00061 turnOff();
00062
00063
00064 kUseAck = true;
00065 kPartialInfoCount = -1;
00066 kMaxPhases = _lb_args.maxDistPhases();
00067 kTargetRatio = _lb_args.targetRatio();
00068 }
00069
00070 void DistributedLB::Strategy(const DistBaseLB::LDStats* const stats) {
00071 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00072 start_time = CmiWallTimer();
00073 CkPrintf("In DistributedLB strategy at %lf\n", start_time);
00074 }
00075
00076
00077
00078 kMaxObjPickTrials = stats->n_objs;
00079
00080
00081 kMaxTrials = CkNumPes();
00082
00083 kMaxGossipMsgCount = 2 * CmiLog2(CkNumPes());
00084
00085
00086 phase_number = 0;
00087 my_stats = stats;
00088
00089 my_load = 0.0;
00090 for (int i = 0; i < my_stats->n_objs; i++) {
00091 my_load += my_stats->objData[i].wallTime;
00092 }
00093 init_load = my_load;
00094 b_load = my_stats->total_walltime - (my_stats->idletime + my_load);
00095
00096 pe_no.clear();
00097 loads.clear();
00098 distribution.clear();
00099 lb_started = false;
00100 gossip_msg_count = 0;
00101 negack_count = 0;
00102
00103 total_migrates = 0;
00104 total_migrates_ack = 0;
00105
00106 srand((unsigned)(CmiWallTimer()*1.0e06) + CkMyPe());
00107
00108
00109 CkReduction::tupleElement tupleRedn[] = {
00110 CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::sum_double),
00111 CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double)
00112 };
00113 CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, 2);
00114 CkCallback cb(CkIndex_DistributedLB::LoadReduction(NULL), thisProxy);
00115 msg->setCallback(cb);
00116 contribute(msg);
00117 }
00118
00119
00120
00121
00122
00123
00124 void DistributedLB::LoadReduction(CkReductionMsg* redn_msg) {
00125 int count;
00126 CkReduction::tupleElement* results;
00127 redn_msg->toTuple(&results, &count);
00128 delete redn_msg;
00129
00130
00131 avg_load = *(double*)results[0].data / CkNumPes();
00132 max_load = *(double*)results[1].data;
00133 load_ratio = max_load / avg_load;
00134
00135 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00136 CkPrintf("DistributedLB>>>Before LB: max = %lf, avg = %lf, ratio = %lf\n",
00137 max_load, avg_load, load_ratio);
00138 }
00139
00140
00141 if (load_ratio <= kTargetRatio) {
00142 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00143 CkPrintf("DistributedLB>>>Load ratio already within the target of %lf, ending early.\n",
00144 kTargetRatio);
00145 }
00146 PackAndSendMigrateMsgs();
00147 delete [] results;
00148 return;
00149 }
00150
00151
00152
00153 transfer_threshold = kTargetRatio * avg_load;
00154
00155
00156
00157
00158 if (my_load < transfer_threshold) {
00159 double r_loads[1];
00160 int r_pe_no[1];
00161 r_loads[0] = my_load;
00162 r_pe_no[0] = CkMyPe();
00163 GossipLoadInfo(CkMyPe(), 1, r_pe_no, r_loads);
00164 }
00165
00166
00167 if (CkMyPe() == 0) {
00168 CkCallback cb(CkIndex_DistributedLB::DoneGossip(), thisProxy);
00169 CkStartQD(cb);
00170 }
00171 delete [] results;
00172 }
00173
00174
00175
00176
00177 void DistributedLB::GossipLoadInfo(int from_pe, int n,
00178 int remote_pe_no[], double remote_loads[]) {
00179
00180 vector<int> p_no;
00181 vector<double> l;
00182
00183 int i = 0;
00184 int j = 0;
00185 int m = pe_no.size();
00186
00187
00188
00189
00190 while (i < m && j < n) {
00191 if (pe_no[i] < remote_pe_no[j]) {
00192 p_no.push_back(pe_no[i]);
00193 l.push_back(loads[i]);
00194 i++;
00195 } else {
00196 p_no.push_back(remote_pe_no[j]);
00197 l.push_back(remote_loads[j]);
00198 if (pe_no[i] == remote_pe_no[j]) {
00199 i++;
00200 }
00201 j++;
00202 }
00203 }
00204
00205 if (i == m && j != n) {
00206 while (j < n) {
00207 p_no.push_back(remote_pe_no[j]);
00208 l.push_back(remote_loads[j]);
00209 j++;
00210 }
00211 } else if (j == n && i != m) {
00212 while (i < m) {
00213 p_no.push_back(pe_no[i]);
00214 l.push_back(loads[i]);
00215 i++;
00216 }
00217 }
00218
00219
00220 pe_no.swap(p_no);
00221 loads.swap(l);
00222
00223 SendLoadInfo();
00224 }
00225
00226
00227
00228
00229 void DistributedLB::SendLoadInfo() {
00230
00231
00232
00233 if (gossip_msg_count > kMaxGossipMsgCount) {
00234 return;
00235 }
00236
00237
00238 int rand_nbor1;
00239 int rand_nbor2 = -1;
00240 do {
00241 rand_nbor1 = rand() % CkNumPes();
00242 } while (rand_nbor1 == CkMyPe());
00243
00244 if(CkNumPes() > 2)
00245 do {
00246 rand_nbor2 = rand() % CkNumPes();
00247 } while ((rand_nbor2 == CkMyPe()) || (rand_nbor2 == rand_nbor1));
00248
00249
00250
00251 int info_count = (kPartialInfoCount >= 0) ? kPartialInfoCount : pe_no.size();
00252 int* p = new int[info_count];
00253 double* l = new double[info_count];
00254 for (int i = 0; i < info_count; i++) {
00255 p[i] = pe_no[i];
00256 l[i] = loads[i];
00257 }
00258
00259 thisProxy[rand_nbor1].GossipLoadInfo(CkMyPe(), info_count, p, l);
00260
00261 if(CkNumPes() > 2)
00262 thisProxy[rand_nbor2].GossipLoadInfo(CkMyPe(), info_count, p, l);
00263
00264
00265 gossip_msg_count++;
00266
00267 delete[] p;
00268 delete[] l;
00269 }
00270
00271
00272
00273
00274 void DistributedLB::DoneGossip() {
00275 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00276 double end_time = CmiWallTimer();
00277 CkPrintf("DistributedLB>>>Gossip finished at %lf (%lf elapsed)\n",
00278 end_time, end_time - start_time);
00279 }
00280
00281
00282
00283
00284 transfer_threshold = (max_load + avg_load) / 2;
00285 lb_started = true;
00286 underloaded_pe_count = pe_no.size();
00287 Setup();
00288 StartNextLBPhase();
00289 }
00290
00291 void DistributedLB::StartNextLBPhase() {
00292 if (underloaded_pe_count == 0 || my_load <= transfer_threshold) {
00293
00294
00295 DoneWithLBPhase();
00296 } else {
00297
00298 LoadBalance();
00299 }
00300 }
00301
00302 void DistributedLB::DoneWithLBPhase() {
00303 phase_number++;
00304
00305 int count = 1;
00306 if (_lb_args.debug() >= 1) count = 3;
00307 CkReduction::tupleElement tupleRedn[] = {
00308 CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double),
00309 CkReduction::tupleElement(sizeof(double), &total_migrates, CkReduction::sum_int),
00310 CkReduction::tupleElement(sizeof(double), &negack_count, CkReduction::max_int)
00311 };
00312 CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, count);
00313 CkCallback cb(CkIndex_DistributedLB::AfterLBReduction(NULL), thisProxy);
00314 msg->setCallback(cb);
00315 contribute(msg);
00316 }
00317
00318 void DistributedLB::AfterLBReduction(CkReductionMsg* redn_msg) {
00319 int count, migrations, max_nacks;
00320 CkReduction::tupleElement* results;
00321 redn_msg->toTuple(&results, &count);
00322 delete redn_msg;
00323
00324
00325 max_load = *(double*)results[0].data;
00326 double old_ratio = load_ratio;
00327 load_ratio = max_load / avg_load;
00328 if (count > 1) migrations = *(int*)results[1].data;
00329 if (count > 2) max_nacks = *(int*)results[2].data;
00330
00331 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00332 CkPrintf("DistributedLB>>>After phase %i: max = %lf, avg = %lf, ratio = %lf\n",
00333 phase_number, max_load, avg_load, load_ratio);
00334 }
00335
00336
00337
00338
00339 if (load_ratio > kTargetRatio &&
00340 transfer_threshold > kTargetRatio * avg_load &&
00341 phase_number < kMaxPhases) {
00342
00343
00344 if (std::abs(load_ratio - old_ratio) < 0.01) {
00345
00346
00347 transfer_threshold = (transfer_threshold + avg_load) / 2;
00348 } else {
00349
00350
00351 transfer_threshold = (max_load + avg_load) / 2;
00352 }
00353 StartNextLBPhase();
00354 } else {
00355 if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
00356 double end_time = CmiWallTimer();
00357 CkPrintf("DistributedLB>>>Balancing completed at %lf (%lf elapsed)\n",
00358 end_time, end_time - start_time);
00359 CkPrintf("DistributedLB>>>%i total migrations with %i negative ack max\n",
00360 migrations, max_nacks);
00361 }
00362 Cleanup();
00363 PackAndSendMigrateMsgs();
00364 if (!(_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
00365 theLbdb->nextLoadbalancer(seqno);
00366 }
00367 delete [] results;
00368 }
00369
00370
00371
00372
00373
00374 void DistributedLB::LoadBalance() {
00375 CkVec<int> obj_no;
00376 CkVec<int> obj_pe_no;
00377
00378
00379
00380 MapObjsToPe(objs, obj_no, obj_pe_no);
00381 total_migrates += obj_no.length();
00382 total_migrates_ack = obj_no.length();
00383
00384
00385 if (obj_no.length() == 0) {
00386 DoneWithLBPhase();
00387 }
00388 }
00389
00390 void DistributedLB::Setup() {
00391 objs_count = 0;
00392 double avg_objload = 0.0;
00393 double max_objload = 0.0;
00394
00395 for(int i=0; i < my_stats->n_objs; i++) {
00396 if (my_stats->objData[i].migratable &&
00397 my_stats->objData[i].wallTime > 0.000001) {
00398 objs_count++;
00399 }
00400 }
00401
00402
00403
00404
00405 objs = new minHeap(objs_count);
00406 for(int i=0; i < my_stats->n_objs; i++) {
00407 if (my_stats->objData[i].migratable &&
00408 my_stats->objData[i].wallTime > 0.0001) {
00409 InfoRecord* item = new InfoRecord;
00410 item->load = my_stats->objData[i].wallTime;
00411 item->Id = i;
00412 objs->insert(item);
00413 }
00414 }
00415
00416
00417
00418 CalculateCumulateDistribution();
00419 }
00420
00421 void DistributedLB::Cleanup() {
00422
00423
00424 InfoRecord* obj;
00425 while (NULL!=(obj=objs->deleteMin())) {
00426 delete obj;
00427 }
00428 delete objs;
00429 }
00430
00431
00432
00433
00434
00435
00436 void DistributedLB::MapObjsToPe(minHeap *objs, CkVec<int> &obj_no,
00437 CkVec<int> &obj_pe_no) {
00438 int p_id;
00439 double p_load;
00440 int rand_pe;
00441
00442
00443 while (my_load > transfer_threshold) {
00444
00445 if (objs_count < 2) break;
00446
00447
00448 bool success = false;
00449
00450
00451 InfoRecord* obj = objs->deleteMin();
00452
00453 if (obj == 0) break;
00454
00455
00456
00457 do {
00458 rand_pe = PickRandReceiverPeIdx();
00459 if (rand_pe == -1) break;
00460 p_id = pe_no[rand_pe];
00461 p_load = loads[rand_pe];
00462 if (p_load + obj->load < transfer_threshold) {
00463 success = true;
00464 }
00465 kMaxTrials--;
00466 } while (!success && (kMaxTrials > 0));
00467
00468 kMaxObjPickTrials--;
00469
00470
00471 if (!success) {
00472 objs->insert(obj);
00473 break;
00474 }
00475
00476
00477
00478 obj_no.insertAtEnd(obj->Id);
00479 obj_pe_no.insertAtEnd(p_id);
00480 objs_count--;
00481 loads[rand_pe] += obj->load;
00482 my_load -= obj->load;
00483
00484
00485
00486 thisProxy[p_id].InformMigration(obj->Id, CkMyPe(),
00487 my_stats->objData[obj->Id].wallTime, false);
00488
00489
00490 delete obj;
00491 }
00492 }
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 void DistributedLB::InformMigration(int obj_id, int from_pe, double obj_load,
00505 bool force) {
00506
00507
00508 if (!kUseAck || my_load + obj_load <= transfer_threshold) {
00509 migrates_expected++;
00510
00511 my_load += obj_load;
00512 thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), true);
00513 return;
00514 }
00515
00516
00517
00518
00519 if (force) {
00520 migrates_expected++;
00521
00522 my_load += obj_load;
00523 } else {
00524
00525
00526 thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), false);
00527 }
00528 }
00529
00530
00531
00532
00533
00534
00535 void DistributedLB::RecvAck(int obj_id, int assigned_pe, bool can_accept) {
00536 total_migrates_ack--;
00537
00538
00539 if (can_accept) {
00540 MigrateInfo* migrateMe = new MigrateInfo;
00541 migrateMe->obj = my_stats->objData[obj_id].handle;
00542 migrateMe->from_pe = CkMyPe();
00543 migrateMe->to_pe = assigned_pe;
00544 migrateInfo.push_back(migrateMe);
00545 } else if (negack_count > 0.01*underloaded_pe_count) {
00546
00547 negack_count++;
00548 total_migrates--;
00549 objs_count++;
00550 my_load += my_stats->objData[obj_id].wallTime;
00551 } else {
00552
00553
00554 total_migrates--;
00555 negack_count++;
00556 objs_count++;
00557 my_load += my_stats->objData[obj_id].wallTime;
00558
00559 minHeap* objs = new minHeap(1);
00560 InfoRecord* item = new InfoRecord;
00561 item->load = my_stats->objData[obj_id].wallTime;
00562 item->Id = obj_id;
00563 objs->insert(item);
00564
00565 CkVec<int> obj_no;
00566 CkVec<int> obj_pe_no;
00567 MapObjsToPe(objs, obj_no, obj_pe_no);
00568
00569
00570
00571
00572 if (obj_pe_no.size() > 0) {
00573 total_migrates_ack++;
00574 total_migrates++;
00575 }
00576 InfoRecord* obj;
00577 while (NULL!=(obj=objs->deleteMin())) {
00578 delete obj;
00579 }
00580 }
00581
00582
00583
00584 if (total_migrates_ack == 0) {
00585 DoneWithLBPhase();
00586 }
00587 }
00588
00589 void DistributedLB::PackAndSendMigrateMsgs() {
00590 LBMigrateMsg* msg = new(total_migrates,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00591 msg->n_moves = total_migrates;
00592 for(int i=0; i < total_migrates; i++) {
00593 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
00594 msg->moves[i] = *item;
00595 delete item;
00596 migrateInfo[i] = 0;
00597 }
00598 migrateInfo.clear();
00599 ProcessMigrationDecision(msg);
00600 }
00601
00602
00603
00604
00605 int DistributedLB::PickRandReceiverPeIdx() const {
00606
00607
00608
00609
00610
00611 double no = (double) rand()/(double) RAND_MAX;
00612 for (int i = 0; i < underloaded_pe_count; i++) {
00613 if (distribution[i] >= no) {
00614 return i;
00615 }
00616 }
00617 return -1;
00618 }
00619
00620
00621
00622
00623
00624 void DistributedLB::CalculateCumulateDistribution() {
00625
00626 double cumulative = 0.0;
00627 for (int i = 0; i < underloaded_pe_count; i++) {
00628 cumulative += (transfer_threshold - loads[i])/transfer_threshold;
00629 distribution.push_back(cumulative);
00630 }
00631
00632 for (int i = 0; i < underloaded_pe_count; i++) {
00633 distribution[i] = distribution[i]/cumulative;
00634 }
00635 }
00636
00637 #include "DistributedLB.def.h"