00001
00005
00006 #ifndef WIN32
00007 #include <unistd.h>
00008 #endif
00009
00010 #include "elements.h"
00011 #include "ckheap.h"
00012 #include "WSLB.h"
00013 #include "LBDBManager.h"
00014
00015
00016
00017
00018 #define VACATE_PROC -1
00019
00020 #define VACATE_AFTER 30
00021 #define UNVACATE_AFTER 15
00022
00023 CreateLBFunc_Def(WSLB, "Workstation load balancer")
00024
00025 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00026 {
00027 WSLB *me = (WSLB*)(data);
00028
00029 me->Migrated(h, waitBarrier);
00030 }
00031
00032 void WSLB::staticAtSync(void* data)
00033 {
00034 WSLB *me = (WSLB*)(data);
00035
00036 me->AtSync();
00037 }
00038
00039 WSLB::WSLB(const CkLBOptions &opt) : BaseLB(opt)
00040 {
00041 #if CMK_LBDB_ON
00042 thisProxy = CProxy_WSLB(thisgroup);
00043 lbname = "WSLB";
00044 if (CkMyPe() == 0)
00045 CkPrintf("[%d] WSLB created\n",CkMyPe());
00046
00047 mystep = 0;
00048 theLbdb->
00049 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
00050 notifier = theLbdb->getLBDB()->
00051 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
00052
00053
00054 LBtopoFn topofn = LBTopoLookup(_lbtopo);
00055 if (topofn == NULL) {
00056 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
00057 CmiAbort("");
00058 }
00059 topo = topofn(CkNumPes());
00060
00061
00062
00063
00064 neighbor_pes = NULL;
00065 stats_msg_count = 0;
00066 statsMsgsList = NULL;
00067 statsDataList = NULL;
00068 migrates_completed = 0;
00069 migrates_expected = -1;
00070 mig_msgs_received = 0;
00071 mig_msgs = NULL;
00072
00073 myStats.proc_speed = theLbdb->ProcessorSpeed();
00074
00075
00076
00077 myStats.obj_data_sz = 0;
00078 myStats.comm_data_sz = 0;
00079 receive_stats_ready = 0;
00080
00081 vacate = CmiFalse;
00082 usage = 1.0;
00083 usage_int_err = 0.;
00084
00085 theLbdb->CollectStatsOn();
00086 #endif
00087 }
00088
00089 WSLB::~WSLB()
00090 {
00091 #if CMK_LBDB_ON
00092 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00093 if (theLbdb) {
00094 theLbdb->getLBDB()->
00095 RemoveNotifyMigrated(notifier);
00096
00097
00098 }
00099 if (statsMsgsList) delete [] statsMsgsList;
00100 if (statsDataList) delete [] statsDataList;
00101 if (neighbor_pes) delete [] neighbor_pes;
00102 if (mig_msgs) delete [] mig_msgs;
00103 #endif
00104 }
00105
00106 void WSLB::FindNeighbors()
00107 {
00108 if (neighbor_pes == 0) {
00109
00110
00111 int maxneighbors = topo->max_neighbors();
00112 statsMsgsList = new WSLBStatsMsg*[maxneighbors];
00113 for(int i=0; i < maxneighbors; i++)
00114 statsMsgsList[i] = 0;
00115 statsDataList = new LDStats[maxneighbors];
00116
00117 neighbor_pes = new int[maxneighbors];
00118 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
00119 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
00120 }
00121
00122 }
00123
00124 void WSLB::AtSync()
00125 {
00126 #if CMK_LBDB_ON
00127
00128
00129 if (CkMyPe() == 0) {
00130 start_lb_time = CkWallTimer();
00131 CkPrintf("Load balancing step %d starting at %f\n",
00132 step(),start_lb_time);
00133 }
00134
00135 if (neighbor_pes == 0) FindNeighbors();
00136
00137 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
00138 MigrationDone();
00139 return;
00140 }
00141
00142 WSLBStatsMsg* msg = AssembleStats();
00143
00144 thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
00145
00146
00147 ReceiveStats((WSLBStatsMsg*)0);
00148 #endif
00149 }
00150
00151 WSLBStatsMsg* WSLB::AssembleStats()
00152 {
00153 #if CMK_LBDB_ON
00154
00155 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
00156 theLbdb->IdleTime(&myStats.idletime);
00157 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
00158 myStats.obj_data_sz = theLbdb->GetObjDataSz();
00159 myStats.objData = new LDObjData[myStats.obj_data_sz];
00160 theLbdb->GetObjData(myStats.objData);
00161
00162 myStats.comm_data_sz = theLbdb->GetCommDataSz();
00163 myStats.commData = new LDCommData[myStats.comm_data_sz];
00164 theLbdb->GetCommData(myStats.commData);
00165
00166 myStats.obj_walltime = myStats.obj_cputime = 0;
00167 for(int i=0; i < myStats.obj_data_sz; i++) {
00168 myStats.obj_walltime += myStats.objData[i].wallTime;
00169 myStats.obj_cputime += myStats.objData[i].cpuTime;
00170 }
00171
00172 WSLBStatsMsg* msg = new WSLBStatsMsg;
00173
00174
00175 double myload = myStats.total_walltime - myStats.idletime;
00176 double myusage;
00177
00178
00179
00180
00181
00182
00183
00184
00185 if (myload > 0)
00186 myusage = myStats.total_cputime / myload;
00187 else myusage = 1.0;
00188
00189 const double usage_err = myusage - usage;
00190 usage_int_err += usage_err;
00191 usage += usage_err * 0.1 + usage_int_err * 0.01;
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203 msg->from_pe = CkMyPe();
00204
00205 msg->serial = CrnRand();
00206 msg->proc_speed = myStats.proc_speed;
00207 msg->total_walltime = myStats.total_walltime;
00208 msg->total_cputime = myStats.total_cputime;
00209 msg->idletime = myStats.idletime;
00210 msg->bg_walltime = myStats.bg_walltime;
00211 msg->bg_cputime = myStats.bg_cputime;
00212 msg->obj_walltime = myStats.obj_walltime;
00213 msg->obj_cputime = myStats.obj_cputime;
00214 msg->vacate_me = vacate;
00215 msg->usage = usage;
00216
00217 if (_lb_args.debug()) {
00218 CkPrintf(
00219 "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
00220 CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
00221 msg->idletime,msg->bg_walltime,msg->bg_cputime,
00222 msg->obj_walltime,msg->obj_cputime);
00223 }
00224
00225
00226
00227 return msg;
00228 #else
00229 return NULL;
00230 #endif
00231 }
00232
00233 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
00234 {
00235 #if CMK_LBDB_ON
00236 migrates_completed++;
00237
00238
00239 if (migrates_completed == migrates_expected) {
00240 MigrationDone();
00241 }
00242 #endif
00243 }
00244
00245 void WSLB::ReceiveStats(WSLBStatsMsg *m)
00246 {
00247 #if CMK_LBDB_ON
00248 if (neighbor_pes == 0) FindNeighbors();
00249
00250 if (m == 0) {
00251 receive_stats_ready = 1;
00252 } else {
00253 const int pe = m->from_pe;
00254
00255
00256 int peslot = -1;
00257 for(int i=0; i < mig_msgs_expected; i++) {
00258 if (pe == neighbor_pes[i]) {
00259 peslot = i;
00260 break;
00261 }
00262 }
00263 if (peslot == -1 || statsMsgsList[peslot] != 0) {
00264 CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
00265 pe);
00266 } else {
00267 statsMsgsList[peslot] = m;
00268 statsDataList[peslot].from_pe = m->from_pe;
00269 statsDataList[peslot].total_walltime = m->total_walltime;
00270 statsDataList[peslot].total_cputime = m->total_cputime;
00271 statsDataList[peslot].idletime = m->idletime;
00272 statsDataList[peslot].bg_walltime = m->bg_walltime;
00273 statsDataList[peslot].bg_cputime = m->bg_cputime;
00274 statsDataList[peslot].proc_speed = m->proc_speed;
00275 statsDataList[peslot].obj_walltime = m->obj_walltime;
00276 statsDataList[peslot].obj_cputime = m->obj_cputime;
00277 statsDataList[peslot].vacate_me = m->vacate_me;
00278 statsDataList[peslot].usage = m->usage;
00279 stats_msg_count++;
00280 }
00281 }
00282
00283 const int clients = mig_msgs_expected;
00284 if (stats_msg_count == clients && receive_stats_ready) {
00285 double strat_start_time = CkWallTimer();
00286 receive_stats_ready = 0;
00287 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
00288
00289 int i;
00290
00291
00292 for(i=0; i < migrateMsg->n_moves; i++) {
00293 MigrateInfo& move = migrateMsg->moves[i];
00294 const int me = CkMyPe();
00295 if (move.from_pe == me && move.to_pe != me) {
00296 theLbdb->Migrate(move.obj,move.to_pe);
00297 } else if (move.from_pe != me) {
00298 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
00299 me,move.from_pe,move.to_pe);
00300 }
00301 }
00302
00303
00304 thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
00305
00306
00307 for(i=0; i < clients; i++) {
00308 delete statsMsgsList[i];
00309 statsMsgsList[i]=0;
00310 }
00311 stats_msg_count=0;
00312
00313 theLbdb->ClearLoads();
00314 if (CkMyPe() == 0) {
00315 double strat_end_time = CkWallTimer();
00316 CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
00317 }
00318 }
00319 #endif
00320 }
00321
00322 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
00323 {
00324 #if CMK_LBDB_ON
00325 if (neighbor_pes == 0) FindNeighbors();
00326
00327 if (mig_msgs_received == 0) migrates_expected = 0;
00328
00329 mig_msgs[mig_msgs_received] = msg;
00330 mig_msgs_received++;
00331
00332
00333
00334 if (mig_msgs_received > mig_msgs_expected) {
00335 CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
00336 CkMyPe());
00337 }
00338
00339 if (mig_msgs_received != mig_msgs_expected) {
00340 return;
00341 }
00342
00343
00344 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
00345 LBMigrateMsg* m = mig_msgs[neigh];
00346 for(int i=0; i < m->n_moves; i++) {
00347 MigrateInfo& move = m->moves[i];
00348 const int me = CkMyPe();
00349 if (move.from_pe != me && move.to_pe == me) {
00350 migrates_expected++;
00351 }
00352 }
00353 delete m;
00354 mig_msgs[neigh]=0;
00355 }
00356
00357
00358 mig_msgs_received = 0;
00359 if (migrates_expected == 0 || migrates_expected == migrates_completed)
00360 MigrationDone();
00361 #endif
00362 }
00363
00364
00365 void WSLB::MigrationDone()
00366 {
00367 #if CMK_LBDB_ON
00368 if (CkMyPe() == 0) {
00369 double end_lb_time = CkWallTimer();
00370 CkPrintf("Load balancing step %d finished at %f duration %f\n",
00371 step(),end_lb_time,end_lb_time - start_lb_time);
00372 }
00373 migrates_completed = 0;
00374 migrates_expected = -1;
00375
00376 mystep++;
00377 thisProxy [CkMyPe()].ResumeClients();
00378 #endif
00379 }
00380
00381 void WSLB::ResumeClients()
00382 {
00383 #if CMK_LBDB_ON
00384 theLbdb->ResumeClients();
00385 #endif
00386 }
00387
00388 CmiBool WSLB::QueryBalanceNow(int step)
00389 {
00390 #if CMK_LBDB_ON
00391 double now = CkWallTimer();
00392
00393 if (step==0)
00394 first_step_time = now;
00395 else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
00396 && now < (VACATE_AFTER+UNVACATE_AFTER)) {
00397 if (vacate == CmiFalse)
00398 CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
00399 vacate = CmiTrue;
00400 } else {
00401 if (vacate == CmiTrue)
00402 CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
00403 vacate = CmiFalse;
00404 }
00405 #endif
00406 return CmiTrue;
00407 }
00408
00409 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
00410 {
00411 #if CMK_LBDB_ON
00412
00413
00414
00415 const double load_factor = 1.05;
00416 double objload;
00417
00418 double myload = myStats.total_walltime - myStats.idletime;
00419 double avgload = myload;
00420 int unvacated_neighbors = 0;
00421 int i;
00422 for(i=0; i < count; i++) {
00423
00424 if (stats[i].vacate_me)
00425 continue;
00426
00427
00428 double hisload = stats[i].total_walltime - stats[i].idletime;
00429 const double hisusage = stats[i].usage;
00430
00431 const double scale = (myStats.proc_speed * usage)
00432 / (stats[i].proc_speed * hisusage);
00433
00434 hisload *= scale;
00435 stats[i].total_walltime *= scale;
00436 stats[i].idletime *= scale;
00437
00438
00439
00440 avgload += hisload;
00441 unvacated_neighbors++;
00442 }
00443 if (vacate && unvacated_neighbors == 0)
00444 CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
00445
00446 avgload /= (unvacated_neighbors+1);
00447
00448 CkVec<MigrateInfo*> migrateInfo;
00449
00450
00451
00452
00453 if (vacate || myload > avgload) {
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465 minHeap procs(count);
00466 for(i=0; i < count; i++) {
00467
00468
00469 if (!stats[i].vacate_me) {
00470 InfoRecord* item = new InfoRecord;
00471 item->load = stats[i].total_walltime - stats[i].idletime;
00472 item->Id = stats[i].from_pe;
00473 procs.insert(item);
00474 }
00475 }
00476
00477 maxHeap objs(myStats.obj_data_sz);
00478 for(i=0; i < myStats.obj_data_sz; i++) {
00479 InfoRecord* item = new InfoRecord;
00480 item->load = myStats.objData[i].wallTime;
00481 item->Id = i;
00482 objs.insert(item);
00483 }
00484
00485 int objs_here = myStats.obj_data_sz;
00486 do {
00487
00488
00489 InfoRecord* p;
00490 InfoRecord* obj;
00491
00492
00493 p = procs.deleteMin();
00494 if (p == 0) {
00495
00496 break;
00497 }
00498
00499
00500 CmiBool objfound = CmiFalse;
00501 do {
00502 obj = objs.deleteMax();
00503 if (obj == 0) break;
00504
00505 objload = load_factor * obj->load;
00506
00507 double new_p_load = p->load + objload;
00508 double my_new_load = myload - objload;
00509
00510
00511
00512 if (vacate || new_p_load < my_new_load) {
00513 objfound = CmiTrue;
00514 } else {
00515
00516
00517
00518 delete obj;
00519 }
00520 } while (!objfound);
00521
00522 if (!objfound) {
00523
00524 break;
00525 }
00526
00527 const int me = CkMyPe();
00528
00529 if (_lb_args.debug())
00530 CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
00531 CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
00532
00533 MigrateInfo* migrateMe = new MigrateInfo;
00534 migrateMe->obj = myStats.objData[obj->Id].handle;
00535 migrateMe->from_pe = me;
00536 migrateMe->to_pe = p->Id;
00537 migrateInfo.insertAtEnd(migrateMe);
00538
00539 objs_here--;
00540
00541
00542
00543 p->load += objload;
00544 myload -= objload;
00545 procs.insert(p);
00546
00547
00548 delete obj;
00549
00550 } while(vacate || myload > avgload);
00551
00552
00553 InfoRecord* p;
00554 while (NULL!=(p=procs.deleteMin()))
00555 delete p;
00556 InfoRecord* obj;
00557 while (NULL!=(obj=objs.deleteMax()))
00558 delete obj;
00559 }
00560
00561
00562 int migrate_count=migrateInfo.length();
00563
00564
00565
00566
00567 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00568 msg->n_moves = migrate_count;
00569 for(i=0; i < migrate_count; i++) {
00570 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
00571 msg->moves[i] = *item;
00572 delete item;
00573 migrateInfo[i] = 0;
00574 }
00575
00576 return msg;
00577 #else
00578 return NULL;
00579 #endif
00580 }
00581
00582 #include "WSLB.def.h"
00583
00584