00001
00005
00006 #ifndef _WIN32
00007 #include <unistd.h>
00008 #endif
00009
00010 #include "elements.h"
00011 #include "ckheap.h"
00012 #include "WSLB.h"
00013 #include "LBDBManager.h"
00014
00015
00016
00017
00018 #define VACATE_PROC -1
00019
00020 #define VACATE_AFTER 30
00021 #define UNVACATE_AFTER 15
00022
00023 extern int quietModeRequested;
00024
00025 CreateLBFunc_Def(WSLB, "Workstation load balancer")
00026
00027 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00028 {
00029 WSLB *me = (WSLB*)(data);
00030
00031 me->Migrated(h, waitBarrier);
00032 }
00033
00034 void WSLB::staticAtSync(void* data)
00035 {
00036 WSLB *me = (WSLB*)(data);
00037
00038 me->AtSync();
00039 }
00040
00041 WSLB::WSLB(const CkLBOptions &opt) : CBase_WSLB(opt)
00042 {
00043 #if CMK_LBDB_ON
00044 thisProxy = CProxy_WSLB(thisgroup);
00045 lbname = "WSLB";
00046 if (CkMyPe() == 0 && !quietModeRequested)
00047 CkPrintf("CharmLB> WSLB created.\n");
00048
00049 mystep = 0;
00050 theLbdb->
00051 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
00052 notifier = theLbdb->getLBDB()->
00053 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
00054
00055
00056 LBtopoFn topofn = LBTopoLookup(_lbtopo);
00057 if (topofn == NULL) {
00058 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
00059 CmiAbort("");
00060 }
00061 topo = topofn(CkNumPes());
00062
00063
00064
00065
00066 neighbor_pes = NULL;
00067 stats_msg_count = 0;
00068 statsMsgsList = NULL;
00069 statsDataList = NULL;
00070 migrates_completed = 0;
00071 migrates_expected = -1;
00072 mig_msgs_received = 0;
00073 mig_msgs = NULL;
00074
00075 myStats.proc_speed = theLbdb->ProcessorSpeed();
00076
00077
00078
00079 myStats.obj_data_sz = 0;
00080 myStats.comm_data_sz = 0;
00081 receive_stats_ready = 0;
00082
00083 vacate = false;
00084 usage = 1.0;
00085 usage_int_err = 0.;
00086
00087 theLbdb->CollectStatsOn();
00088 #endif
00089 }
00090
00091 WSLB::~WSLB()
00092 {
00093 #if CMK_LBDB_ON
00094 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00095 if (theLbdb) {
00096 theLbdb->getLBDB()->
00097 RemoveNotifyMigrated(notifier);
00098
00099
00100 }
00101 if (statsMsgsList) delete [] statsMsgsList;
00102 if (statsDataList) delete [] statsDataList;
00103 if (neighbor_pes) delete [] neighbor_pes;
00104 if (mig_msgs) delete [] mig_msgs;
00105 #endif
00106 }
00107
00108 void WSLB::FindNeighbors()
00109 {
00110 if (neighbor_pes == 0) {
00111
00112
00113 int maxneighbors = topo->max_neighbors();
00114 statsMsgsList = new WSLBStatsMsg*[maxneighbors];
00115 for(int i=0; i < maxneighbors; i++)
00116 statsMsgsList[i] = 0;
00117 statsDataList = new LDStats[maxneighbors];
00118
00119 neighbor_pes = new int[maxneighbors];
00120 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
00121 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
00122 }
00123
00124 }
00125
00126 void WSLB::AtSync()
00127 {
00128 #if CMK_LBDB_ON
00129
00130
00131 if (CkMyPe() == 0) {
00132 start_lb_time = CkWallTimer();
00133 CkPrintf("Load balancing step %d starting at %f\n",
00134 step(),start_lb_time);
00135 }
00136
00137 if (neighbor_pes == 0) FindNeighbors();
00138
00139 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
00140 MigrationDone();
00141 return;
00142 }
00143
00144 WSLBStatsMsg* msg = AssembleStats();
00145
00146 thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
00147
00148
00149 ReceiveStats((WSLBStatsMsg*)0);
00150 #endif
00151 }
00152
00153 WSLBStatsMsg* WSLB::AssembleStats()
00154 {
00155 #if CMK_LBDB_ON
00156
00157 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
00158 theLbdb->IdleTime(&myStats.idletime);
00159 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
00160 myStats.obj_data_sz = theLbdb->GetObjDataSz();
00161 myStats.objData = new LDObjData[myStats.obj_data_sz];
00162 theLbdb->GetObjData(myStats.objData);
00163
00164 myStats.comm_data_sz = theLbdb->GetCommDataSz();
00165 myStats.commData = new LDCommData[myStats.comm_data_sz];
00166 theLbdb->GetCommData(myStats.commData);
00167
00168 myStats.obj_walltime = myStats.obj_cputime = 0;
00169 for(int i=0; i < myStats.obj_data_sz; i++) {
00170 myStats.obj_walltime += myStats.objData[i].wallTime;
00171 myStats.obj_cputime += myStats.objData[i].cpuTime;
00172 }
00173
00174 WSLBStatsMsg* msg = new WSLBStatsMsg;
00175
00176
00177 double myload = myStats.total_walltime - myStats.idletime;
00178 double myusage;
00179
00180
00181
00182
00183
00184
00185
00186
00187 if (myload > 0)
00188 myusage = myStats.total_cputime / myload;
00189 else myusage = 1.0;
00190
00191 const double usage_err = myusage - usage;
00192 usage_int_err += usage_err;
00193 usage += usage_err * 0.1 + usage_int_err * 0.01;
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205 msg->from_pe = CkMyPe();
00206
00207 msg->serial = CrnRand();
00208 msg->proc_speed = myStats.proc_speed;
00209 msg->total_walltime = myStats.total_walltime;
00210 msg->total_cputime = myStats.total_cputime;
00211 msg->idletime = myStats.idletime;
00212 msg->bg_walltime = myStats.bg_walltime;
00213 msg->bg_cputime = myStats.bg_cputime;
00214 msg->obj_walltime = myStats.obj_walltime;
00215 msg->obj_cputime = myStats.obj_cputime;
00216 msg->vacate_me = vacate;
00217 msg->usage = usage;
00218
00219 if (_lb_args.debug()) {
00220 CkPrintf(
00221 "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
00222 CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
00223 msg->idletime,msg->bg_walltime,msg->bg_cputime,
00224 msg->obj_walltime,msg->obj_cputime);
00225 }
00226
00227
00228
00229 return msg;
00230 #else
00231 return NULL;
00232 #endif
00233 }
00234
00235 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
00236 {
00237 #if CMK_LBDB_ON
00238 migrates_completed++;
00239
00240
00241 if (migrates_completed == migrates_expected) {
00242 MigrationDone();
00243 }
00244 #endif
00245 }
00246
00247 void WSLB::ReceiveStats(WSLBStatsMsg *m)
00248 {
00249 #if CMK_LBDB_ON
00250 if (neighbor_pes == 0) FindNeighbors();
00251
00252 if (m == 0) {
00253 receive_stats_ready = 1;
00254 } else {
00255 const int pe = m->from_pe;
00256
00257
00258 int peslot = -1;
00259 for(int i=0; i < mig_msgs_expected; i++) {
00260 if (pe == neighbor_pes[i]) {
00261 peslot = i;
00262 break;
00263 }
00264 }
00265 if (peslot == -1 || statsMsgsList[peslot] != 0) {
00266 CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
00267 pe);
00268 } else {
00269 statsMsgsList[peslot] = m;
00270 statsDataList[peslot].from_pe = m->from_pe;
00271 statsDataList[peslot].total_walltime = m->total_walltime;
00272 statsDataList[peslot].total_cputime = m->total_cputime;
00273 statsDataList[peslot].idletime = m->idletime;
00274 statsDataList[peslot].bg_walltime = m->bg_walltime;
00275 statsDataList[peslot].bg_cputime = m->bg_cputime;
00276 statsDataList[peslot].proc_speed = m->proc_speed;
00277 statsDataList[peslot].obj_walltime = m->obj_walltime;
00278 statsDataList[peslot].obj_cputime = m->obj_cputime;
00279 statsDataList[peslot].vacate_me = m->vacate_me;
00280 statsDataList[peslot].usage = m->usage;
00281 stats_msg_count++;
00282 }
00283 }
00284
00285 const int clients = mig_msgs_expected;
00286 if (stats_msg_count == clients && receive_stats_ready) {
00287 double strat_start_time = CkWallTimer();
00288 receive_stats_ready = 0;
00289 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
00290
00291 int i;
00292
00293
00294 for(i=0; i < migrateMsg->n_moves; i++) {
00295 MigrateInfo& move = migrateMsg->moves[i];
00296 const int me = CkMyPe();
00297 if (move.from_pe == me && move.to_pe != me) {
00298 theLbdb->Migrate(move.obj,move.to_pe);
00299 } else if (move.from_pe != me) {
00300 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
00301 me,move.from_pe,move.to_pe);
00302 }
00303 }
00304
00305
00306 thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
00307
00308
00309 for(i=0; i < clients; i++) {
00310 delete statsMsgsList[i];
00311 statsMsgsList[i]=0;
00312 }
00313 stats_msg_count=0;
00314
00315 theLbdb->ClearLoads();
00316 if (CkMyPe() == 0) {
00317 double strat_end_time = CkWallTimer();
00318 CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
00319 }
00320 }
00321 #endif
00322 }
00323
00324 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
00325 {
00326 #if CMK_LBDB_ON
00327 if (neighbor_pes == 0) FindNeighbors();
00328
00329 if (mig_msgs_received == 0) migrates_expected = 0;
00330
00331 mig_msgs[mig_msgs_received] = msg;
00332 mig_msgs_received++;
00333
00334
00335
00336 if (mig_msgs_received > mig_msgs_expected) {
00337 CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
00338 CkMyPe());
00339 }
00340
00341 if (mig_msgs_received != mig_msgs_expected) {
00342 return;
00343 }
00344
00345
00346 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
00347 LBMigrateMsg* m = mig_msgs[neigh];
00348 for(int i=0; i < m->n_moves; i++) {
00349 MigrateInfo& move = m->moves[i];
00350 const int me = CkMyPe();
00351 if (move.from_pe != me && move.to_pe == me) {
00352 migrates_expected++;
00353 }
00354 }
00355 delete m;
00356 mig_msgs[neigh]=0;
00357 }
00358
00359
00360 mig_msgs_received = 0;
00361 if (migrates_expected == 0 || migrates_expected == migrates_completed)
00362 MigrationDone();
00363 #endif
00364 }
00365
00366
00367 void WSLB::MigrationDone()
00368 {
00369 #if CMK_LBDB_ON
00370 if (CkMyPe() == 0) {
00371 double end_lb_time = CkWallTimer();
00372 CkPrintf("Load balancing step %d finished at %f duration %f\n",
00373 step(),end_lb_time,end_lb_time - start_lb_time);
00374 }
00375 migrates_completed = 0;
00376 migrates_expected = -1;
00377
00378 mystep++;
00379 thisProxy [CkMyPe()].ResumeClients();
00380 #endif
00381 }
00382
00383 void WSLB::ResumeClients()
00384 {
00385 #if CMK_LBDB_ON
00386 theLbdb->ResumeClients();
00387 #endif
00388 }
00389
00390 bool WSLB::QueryBalanceNow(int step)
00391 {
00392 #if CMK_LBDB_ON
00393 double now = CkWallTimer();
00394
00395 if (step==0)
00396 first_step_time = now;
00397 else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
00398 && now < (VACATE_AFTER+UNVACATE_AFTER)) {
00399 if (vacate == false)
00400 CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
00401 vacate = true;
00402 } else {
00403 if (vacate == true)
00404 CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
00405 vacate = false;
00406 }
00407 #endif
00408 return true;
00409 }
00410
00411 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
00412 {
00413 #if CMK_LBDB_ON
00414
00415
00416
00417 const double load_factor = 1.05;
00418 double objload;
00419
00420 double myload = myStats.total_walltime - myStats.idletime;
00421 double avgload = myload;
00422 int unvacated_neighbors = 0;
00423 int i;
00424 for(i=0; i < count; i++) {
00425
00426 if (stats[i].vacate_me)
00427 continue;
00428
00429
00430 double hisload = stats[i].total_walltime - stats[i].idletime;
00431 const double hisusage = stats[i].usage;
00432
00433 const double scale = (myStats.proc_speed * usage)
00434 / (stats[i].proc_speed * hisusage);
00435
00436 hisload *= scale;
00437 stats[i].total_walltime *= scale;
00438 stats[i].idletime *= scale;
00439
00440
00441
00442 avgload += hisload;
00443 unvacated_neighbors++;
00444 }
00445 if (vacate && unvacated_neighbors == 0)
00446 CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
00447
00448 avgload /= (unvacated_neighbors+1);
00449
00450 CkVec<MigrateInfo*> migrateInfo;
00451
00452
00453
00454
00455 if (vacate || myload > avgload) {
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467 minHeap procs(count);
00468 for(i=0; i < count; i++) {
00469
00470
00471 if (!stats[i].vacate_me) {
00472 InfoRecord* item = new InfoRecord;
00473 item->load = stats[i].total_walltime - stats[i].idletime;
00474 item->Id = stats[i].from_pe;
00475 procs.insert(item);
00476 }
00477 }
00478
00479 maxHeap objs(myStats.obj_data_sz);
00480 for(i=0; i < myStats.obj_data_sz; i++) {
00481 InfoRecord* item = new InfoRecord;
00482 item->load = myStats.objData[i].wallTime;
00483 item->Id = i;
00484 objs.insert(item);
00485 }
00486
00487 int objs_here = myStats.obj_data_sz;
00488 do {
00489
00490
00491 InfoRecord* p;
00492 InfoRecord* obj;
00493
00494
00495 p = procs.deleteMin();
00496 if (p == 0) {
00497
00498 break;
00499 }
00500
00501
00502 bool objfound = false;
00503 do {
00504 obj = objs.deleteMax();
00505 if (obj == 0) break;
00506
00507 objload = load_factor * obj->load;
00508
00509 double new_p_load = p->load + objload;
00510 double my_new_load = myload - objload;
00511
00512
00513
00514 if (vacate || new_p_load < my_new_load) {
00515 objfound = true;
00516 } else {
00517
00518
00519
00520 delete obj;
00521 }
00522 } while (!objfound);
00523
00524 if (!objfound) {
00525
00526 break;
00527 }
00528
00529 const int me = CkMyPe();
00530
00531 if (_lb_args.debug())
00532 CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
00533 CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
00534
00535 MigrateInfo* migrateMe = new MigrateInfo;
00536 migrateMe->obj = myStats.objData[obj->Id].handle;
00537 migrateMe->from_pe = me;
00538 migrateMe->to_pe = p->Id;
00539 migrateInfo.insertAtEnd(migrateMe);
00540
00541 objs_here--;
00542
00543
00544
00545 p->load += objload;
00546 myload -= objload;
00547 procs.insert(p);
00548
00549
00550 delete obj;
00551
00552 } while(vacate || myload > avgload);
00553
00554
00555 InfoRecord* p;
00556 while (NULL!=(p=procs.deleteMin()))
00557 delete p;
00558 InfoRecord* obj;
00559 while (NULL!=(obj=objs.deleteMax()))
00560 delete obj;
00561 }
00562
00563
00564 int migrate_count=migrateInfo.length();
00565
00566
00567
00568
00569 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00570 msg->n_moves = migrate_count;
00571 for(i=0; i < migrate_count; i++) {
00572 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
00573 msg->moves[i] = *item;
00574 delete item;
00575 migrateInfo[i] = 0;
00576 }
00577
00578 return msg;
00579 #else
00580 return NULL;
00581 #endif
00582 }
00583
00584 #include "WSLB.def.h"
00585
00586