00001
00005
00006 #include "BaseLB.h"
00007 #include "NborBaseLB.h"
00008 #include "LBDBManager.h"
00009 #include "NborBaseLB.def.h"
00010
00011 #define DEBUGF(x) // CmiPrintf x;
00012
00013
00014
00015 void NborBaseLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00016 {
00017 NborBaseLB *me = (NborBaseLB*)(data);
00018
00019 me->Migrated(h, waitBarrier);
00020 }
00021
00022 void NborBaseLB::staticAtSync(void* data)
00023 {
00024 NborBaseLB *me = (NborBaseLB*)(data);
00025
00026 me->AtSync();
00027 }
00028
00029 NborBaseLB::NborBaseLB(const CkLBOptions &opt): CBase_NborBaseLB(opt)
00030 {
00031 #if CMK_LBDB_ON
00032 lbname = (char *)"NborBaseLB";
00033 thisProxy = CProxy_NborBaseLB(thisgroup);
00034 receiver = theLbdb->
00035 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
00036 (void*)(this));
00037 notifier = theLbdb->getLBDB()->
00038 NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
00039
00040
00041
00042
00043
00044 LBtopoFn topofn = LBTopoLookup(_lbtopo);
00045 if (topofn == NULL) {
00046 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
00047 CmiAbort("");
00048 }
00049 topo = topofn(CkNumPes());
00050
00051 mig_msgs_expected = 0;
00052 neighbor_pes = NULL;
00053 stats_msg_count = 0;
00054 statsMsgsList = NULL;
00055 statsDataList = NULL;
00056 migrates_completed = 0;
00057 migrates_expected = -1;
00058 mig_msgs_received = 0;
00059 mig_msgs = NULL;
00060
00061 myStats.pe_speed = theLbdb->ProcessorSpeed();
00062
00063
00064
00065 myStats.from_pe = CkMyPe();
00066 myStats.n_objs = 0;
00067 myStats.objData = NULL;
00068 myStats.n_comm = 0;
00069 myStats.commData = NULL;
00070
00071 receive_stats_ready = false;
00072
00073 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
00074 #endif
00075 }
00076
00077 NborBaseLB::~NborBaseLB()
00078 {
00079 #if CMK_LBDB_ON
00080 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00081 if (theLbdb) {
00082 theLbdb->getLBDB()->
00083 RemoveNotifyMigrated(notifier);
00084
00085
00086 }
00087 if (statsMsgsList) delete [] statsMsgsList;
00088 if (statsDataList) delete [] statsDataList;
00089 if (neighbor_pes) delete [] neighbor_pes;
00090 if (mig_msgs) delete [] mig_msgs;
00091 #endif
00092 }
00093
00094 void NborBaseLB::FindNeighbors()
00095 {
00096 if (neighbor_pes == 0) {
00097
00098
00099 int maxneighbors = topo->max_neighbors();
00100 statsMsgsList = new NLBStatsMsg*[maxneighbors];
00101 for(int i=0; i < maxneighbors; i++)
00102 statsMsgsList[i] = 0;
00103 statsDataList = new LDStats[maxneighbors];
00104
00105 neighbor_pes = new int[maxneighbors];
00106 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
00107 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
00108 }
00109
00110 }
00111
00112 void NborBaseLB::AtSync()
00113 {
00114 #if CMK_LBDB_ON
00115
00116
00117 if (neighbor_pes == 0) FindNeighbors();
00118 start_lb_time = 0;
00119
00120 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
00121 MigrationDone(0);
00122 return;
00123 }
00124
00125 if (CkMyPe() == 0) {
00126 start_lb_time = CkWallTimer();
00127 if (_lb_args.debug())
00128 CkPrintf("[%s] Load balancing step %d starting at %f\n",
00129 lbName(), step(),start_lb_time);
00130 }
00131
00132 NLBStatsMsg* msg = AssembleStats();
00133
00134 if (mig_msgs_expected > 0) {
00135 CkMarshalledNLBStatsMessage marshmsg(msg);
00136 thisProxy.ReceiveStats(marshmsg, mig_msgs_expected, neighbor_pes);
00137 }
00138
00139
00140 CkMarshalledNLBStatsMessage mmsg(NULL);
00141 thisProxy[CkMyPe()].ReceiveStats(mmsg);
00142 #endif
00143 }
00144
00145 NLBStatsMsg* NborBaseLB::AssembleStats()
00146 {
00147 #if CMK_LBDB_ON
00148
00149 #if CMK_LB_CPUTIMER
00150 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
00151 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
00152 #else
00153 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_walltime);
00154 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_walltime);
00155 #endif
00156 theLbdb->IdleTime(&myStats.idletime);
00157
00158 myStats.move = QueryMigrateStep(step());
00159
00160 myStats.n_objs = theLbdb->GetObjDataSz();
00161 if (myStats.objData) delete [] myStats.objData;
00162 myStats.objData = new LDObjData[myStats.n_objs];
00163 theLbdb->GetObjData(myStats.objData);
00164
00165 myStats.n_comm = theLbdb->GetCommDataSz();
00166 if (myStats.commData) delete [] myStats.commData;
00167 myStats.commData = new LDCommData[myStats.n_comm];
00168 theLbdb->GetCommData(myStats.commData);
00169
00170 myStats.obj_walltime = 0;
00171 #if CMK_LB_CPUTIMER
00172 myStats.obj_cputime = 0;
00173 #endif
00174 for(int i=0; i < myStats.n_objs; i++) {
00175 myStats.obj_walltime += myStats.objData[i].wallTime;
00176 #if CMK_LB_CPUTIMER
00177 myStats.obj_cputime += myStats.objData[i].cpuTime;
00178 #endif
00179 }
00180
00181 const int osz = theLbdb->GetObjDataSz();
00182 const int csz = theLbdb->GetCommDataSz();
00183 NLBStatsMsg* msg = new NLBStatsMsg(osz, csz);
00184
00185 msg->from_pe = CkMyPe();
00186
00187 msg->serial = CrnRand();
00188 msg->pe_speed = myStats.pe_speed;
00189 msg->total_walltime = myStats.total_walltime;
00190 msg->idletime = myStats.idletime;
00191 msg->bg_walltime = myStats.bg_walltime;
00192 msg->obj_walltime = myStats.obj_walltime;
00193 #if CMK_LB_CPUTIMER
00194 msg->total_cputime = myStats.total_cputime;
00195 msg->bg_cputime = myStats.bg_cputime;
00196 msg->obj_cputime = myStats.obj_cputime;
00197 #endif
00198 msg->n_objs = osz;
00199 theLbdb->GetObjData(msg->objData);
00200 msg->n_comm = csz;
00201 theLbdb->GetCommData(msg->commData);
00202
00203
00204 delete [] myStats.objData;
00205 myStats.objData = NULL;
00206 myStats.n_objs = 0;
00207 delete [] myStats.commData;
00208 myStats.commData = NULL;
00209 myStats.n_comm = 0;
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219 return msg;
00220 #else
00221 return NULL;
00222 #endif
00223 }
00224
00225 void NborBaseLB::Migrated(LDObjHandle h, int waitBarrier)
00226 {
00227 migrates_completed++;
00228
00229
00230 if (migrates_completed == migrates_expected) {
00231 MigrationDone(1);
00232 }
00233 }
00234
00235 void NborBaseLB::ReceiveStats(CkMarshalledNLBStatsMessage &&data)
00236 {
00237 #if CMK_LBDB_ON
00238 NLBStatsMsg *m = data.getMessage();
00239 if (neighbor_pes == 0) FindNeighbors();
00240
00241 if (m == 0) {
00242 receive_stats_ready = true;
00243 } else {
00244 const int pe = m->from_pe;
00245
00246
00247 int peslot = NeighborIndex(pe);
00248
00249 if (peslot == -1 || statsMsgsList[peslot] != 0) {
00250 CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
00251 pe);
00252 } else {
00253 statsMsgsList[peslot] = m;
00254 statsDataList[peslot].from_pe = m->from_pe;
00255 statsDataList[peslot].total_walltime = m->total_walltime;
00256 statsDataList[peslot].idletime = m->idletime;
00257 statsDataList[peslot].bg_walltime = m->bg_walltime;
00258 statsDataList[peslot].pe_speed = m->pe_speed;
00259 statsDataList[peslot].obj_walltime = m->obj_walltime;
00260 #if CMK_LB_CPUTIMER
00261 statsDataList[peslot].total_cputime = m->total_cputime;
00262 statsDataList[peslot].bg_cputime = m->bg_cputime;
00263 statsDataList[peslot].obj_cputime = m->obj_cputime;
00264 #endif
00265
00266 statsDataList[peslot].n_objs = m->n_objs;
00267 statsDataList[peslot].objData = m->objData;
00268 statsDataList[peslot].n_comm = m->n_comm;
00269 statsDataList[peslot].commData = m->commData;
00270
00271 if (_lb_args.ignoreBgLoad()) statsDataList[peslot].clearBgLoad();
00272
00273 stats_msg_count++;
00274 }
00275 }
00276
00277 const int clients = mig_msgs_expected;
00278 if (stats_msg_count == clients && receive_stats_ready) {
00279 double strat_start_time = CkWallTimer();
00280 receive_stats_ready = false;
00281 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
00282
00283 int i;
00284
00285
00286 for(i=0; i < migrateMsg->n_moves; i++) {
00287 MigrateInfo& move = migrateMsg->moves[i];
00288 const int me = CkMyPe();
00289 if (move.from_pe == me && move.to_pe != me) {
00290 theLbdb->Migrate(move.obj,move.to_pe);
00291 } else if (move.from_pe != me) {
00292 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
00293 me,move.from_pe,move.to_pe);
00294 }
00295 }
00296
00297
00298 if (clients > 0)
00299 thisProxy.ReceiveMigration(migrateMsg, clients, neighbor_pes);
00300
00301
00302 for(i=0; i < clients; i++) {
00303 delete statsMsgsList[i];
00304 statsMsgsList[i]=NULL;
00305 }
00306 stats_msg_count=0;
00307
00308
00309 if (CkMyPe() == 0) {
00310 double strat_end_time = CkWallTimer();
00311 if (_lb_args.debug())
00312 CkPrintf("[%d] %s Strat elapsed time %f\n",CkMyPe(),lbName(),strat_end_time-strat_start_time);
00313 }
00314 }
00315 #endif
00316 }
00317
00318 void NborBaseLB::ReceiveMigration(LBMigrateMsg *msg)
00319 {
00320 #if CMK_LBDB_ON
00321 if (neighbor_pes == 0) FindNeighbors();
00322
00323 if (mig_msgs_received == 0) migrates_expected = 0;
00324
00325 mig_msgs[mig_msgs_received] = msg;
00326 mig_msgs_received++;
00327
00328
00329
00330 if (mig_msgs_received > mig_msgs_expected) {
00331 CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
00332 CkMyPe());
00333 }
00334
00335 if (mig_msgs_received != mig_msgs_expected) {
00336 return;
00337 }
00338
00339
00340 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
00341 LBMigrateMsg* m = mig_msgs[neigh];
00342 for(int i=0; i < m->n_moves; i++) {
00343 MigrateInfo& move = m->moves[i];
00344 const int me = CkMyPe();
00345 if (move.from_pe != me && move.to_pe == me) {
00346 migrates_expected++;
00347 }
00348 }
00349 delete m;
00350 mig_msgs[neigh]=0;
00351 }
00352
00353
00354 mig_msgs_received = 0;
00355 if (migrates_expected == 0 || migrates_expected == migrates_completed)
00356 MigrationDone(1);
00357 #endif
00358 }
00359
00360
00361 void NborBaseLB::MigrationDone(int balancing)
00362 {
00363 #if CMK_LBDB_ON
00364 migrates_completed = 0;
00365 migrates_expected = -1;
00366
00367 theLbdb->incStep();
00368
00369 theLbdb->ClearLoads();
00370
00371
00372 if (balancing && _lb_args.syncResume()) {
00373 contribute(CkCallback(CkReductionTarget(NborBaseLB, ResumeClients),
00374 thisProxy));
00375 }
00376 else
00377 thisProxy [CkMyPe()].ResumeClients(balancing);
00378
00379 #endif
00380 }
00381
00382 void NborBaseLB::ResumeClients()
00383 {
00384 ResumeClients(1);
00385 }
00386
00387 void NborBaseLB::ResumeClients(int balancing)
00388 {
00389 #if CMK_LBDB_ON
00390 DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
00391
00392 if (CkMyPe() == 0 && balancing) {
00393 double end_lb_time = CkWallTimer();
00394 if (_lb_args.debug())
00395 CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
00396 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
00397 }
00398
00399 theLbdb->ResumeClients();
00400 #endif
00401 }
00402
00403 LBMigrateMsg* NborBaseLB::Strategy(LDStats* stats, int n_nbrs)
00404 {
00405 for(int j=0; j < n_nbrs; j++) {
00406 CkPrintf("[%d] Proc %d Speed %d WALL: Total %f Idle %f Bg %f obj %f",
00407 CkMyPe(), stats[j].from_pe, stats[j].pe_speed, stats[j].total_walltime,
00408 stats[j].idletime, stats[j].bg_walltime, stats[j].obj_walltime);
00409 #if CMK_LB_CPUTIMER
00410 CkPrintf(" CPU: Total %f Bg %f obj %f", stats[j].total_cputime,
00411 stats[j].bg_cputime, stats[j].obj_cputime);
00412 #endif
00413 CkPrintf("\n");
00414 }
00415
00416 int sizes=0;
00417 LBMigrateMsg* msg = new(sizes,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00418 msg->n_moves = 0;
00419
00420 return msg;
00421 }
00422
00423 int NborBaseLB::NeighborIndex(int pe)
00424 {
00425 int peslot = -1;
00426 for(int i=0; i < mig_msgs_expected; i++) {
00427 if (pe == neighbor_pes[i]) {
00428 peslot = i;
00429 break;
00430 }
00431 }
00432 return peslot;
00433 }
00434
00435 NLBStatsMsg::NLBStatsMsg(int osz, int csz) {
00436 objData = new LDObjData[osz];
00437 commData = new LDCommData[csz];
00438 }
00439
00440 NLBStatsMsg::NLBStatsMsg(NLBStatsMsg *src)
00441 {
00442 int size;
00443 {
00444 PUP::sizer p;
00445 src->pup(p);
00446 size = p.size();
00447 }
00448 char *buf = new char[size];
00449 {
00450 PUP::toMem p(buf);
00451 src->pup(p);
00452 }
00453 {
00454 PUP::fromMem p(buf);
00455 pup(p);
00456 }
00457 delete [] buf;
00458 }
00459
00460 NLBStatsMsg::~NLBStatsMsg() {
00461 delete [] objData;
00462 delete [] commData;
00463 }
00464
00465 void NLBStatsMsg::pup(PUP::er &p) {
00466 int i;
00467 p|from_pe;
00468 p|serial;
00469 p|pe_speed;
00470 p|total_walltime;
00471 p|idletime;
00472 p|bg_walltime;
00473 #if CMK_LB_CPUTIMER
00474 p|total_cputime;
00475 p|bg_cputime;
00476 #endif
00477 p|n_objs;
00478 if (p.isUnpacking()) objData = new LDObjData[n_objs];
00479 for (i=0; i<n_objs; i++) p|objData[i];
00480 p|n_comm;
00481 if (p.isUnpacking()) commData = new LDCommData[n_comm];
00482 for (i=0; i<n_comm; i++) p|commData[i];
00483 }
00484
00485
00486
00487
00488
00489 CkMarshalledNLBStatsMessage::~CkMarshalledNLBStatsMessage() {
00490 if (msg) delete msg;
00491 }
00492
00493 void CkMarshalledNLBStatsMessage::pup(PUP::er &p)
00494 {
00495 bool isnull;
00496 if (p.isPacking()) isnull = (msg==NULL);
00497 p|isnull;
00498 if (p.isUnpacking()) {
00499 if (!isnull) msg = new NLBStatsMsg;
00500 else msg = NULL;
00501 }
00502 if (msg) msg->pup(p);
00503 #if 0
00504 if (p.isUnpacking()) msg = new NLBStatsMsg;
00505 else CmiAssert(msg);
00506 msg->pup(p);
00507 #endif
00508 }
00509
00510
00511