00001
00005
00006 #include "BaseLB.h"
00007 #include "NborBaseLB.h"
00008 #include "LBDBManager.h"
00009 #include "NborBaseLB.def.h"
00010
00011 #define DEBUGF(x) // CmiPrintf x;
00012
00013
00014
00015 void NborBaseLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
00016 {
00017 NborBaseLB *me = (NborBaseLB*)(data);
00018
00019 me->Migrated(h, waitBarrier);
00020 }
00021
00022 void NborBaseLB::staticAtSync(void* data)
00023 {
00024 NborBaseLB *me = (NborBaseLB*)(data);
00025
00026 me->AtSync();
00027 }
00028
00029 NborBaseLB::NborBaseLB(const CkLBOptions &opt): BaseLB(opt)
00030 {
00031 #if CMK_LBDB_ON
00032 lbname = (char *)"NborBaseLB";
00033 thisProxy = CProxy_NborBaseLB(thisgroup);
00034 receiver = theLbdb->
00035 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
00036 (void*)(this));
00037 notifier = theLbdb->getLBDB()->
00038 NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
00039
00040
00041
00042
00043
00044 LBtopoFn topofn = LBTopoLookup(_lbtopo);
00045 if (topofn == NULL) {
00046 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
00047 CmiAbort("");
00048 }
00049 topo = topofn(CkNumPes());
00050
00051 mig_msgs_expected = 0;
00052 neighbor_pes = NULL;
00053 stats_msg_count = 0;
00054 statsMsgsList = NULL;
00055 statsDataList = NULL;
00056 migrates_completed = 0;
00057 migrates_expected = -1;
00058 mig_msgs_received = 0;
00059 mig_msgs = NULL;
00060
00061 myStats.pe_speed = theLbdb->ProcessorSpeed();
00062
00063
00064
00065 myStats.from_pe = CkMyPe();
00066 myStats.n_objs = 0;
00067 myStats.objData = NULL;
00068 myStats.n_comm = 0;
00069 myStats.commData = NULL;
00070
00071 receive_stats_ready = 0;
00072
00073 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
00074 #endif
00075 }
00076
00077 NborBaseLB::~NborBaseLB()
00078 {
00079 #if CMK_LBDB_ON
00080 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00081 if (theLbdb) {
00082 theLbdb->getLBDB()->
00083 RemoveNotifyMigrated(notifier);
00084
00085
00086 }
00087 if (statsMsgsList) delete [] statsMsgsList;
00088 if (statsDataList) delete [] statsDataList;
00089 if (neighbor_pes) delete [] neighbor_pes;
00090 if (mig_msgs) delete [] mig_msgs;
00091 #endif
00092 }
00093
00094 void NborBaseLB::FindNeighbors()
00095 {
00096 if (neighbor_pes == 0) {
00097
00098
00099 int maxneighbors = topo->max_neighbors();
00100 statsMsgsList = new NLBStatsMsg*[maxneighbors];
00101 for(int i=0; i < maxneighbors; i++)
00102 statsMsgsList[i] = 0;
00103 statsDataList = new LDStats[maxneighbors];
00104
00105 neighbor_pes = new int[maxneighbors];
00106 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
00107 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
00108 }
00109
00110 }
00111
00112 void NborBaseLB::AtSync()
00113 {
00114 #if CMK_LBDB_ON
00115
00116
00117 if (neighbor_pes == 0) FindNeighbors();
00118 start_lb_time = 0;
00119
00120 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
00121 MigrationDone(0);
00122 return;
00123 }
00124
00125 if (CkMyPe() == 0) {
00126 start_lb_time = CkWallTimer();
00127 if (_lb_args.debug())
00128 CkPrintf("[%s] Load balancing step %d starting at %f\n",
00129 lbName(), step(),start_lb_time);
00130 }
00131
00132 NLBStatsMsg* msg = AssembleStats();
00133
00134 if (mig_msgs_expected > 0) {
00135 CkMarshalledNLBStatsMessage marshmsg(msg);
00136 thisProxy.ReceiveStats(marshmsg, mig_msgs_expected, neighbor_pes);
00137 }
00138
00139
00140 CkMarshalledNLBStatsMessage mmsg(NULL);
00141 thisProxy[CkMyPe()].ReceiveStats(mmsg);
00142 #endif
00143 }
00144
00145 NLBStatsMsg* NborBaseLB::AssembleStats()
00146 {
00147 #if CMK_LBDB_ON
00148
00149 #if CMK_LB_CPUTIMER
00150 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
00151 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
00152 #else
00153 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_walltime);
00154 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_walltime);
00155 #endif
00156 theLbdb->IdleTime(&myStats.idletime);
00157
00158 myStats.move = QueryMigrateStep(step());
00159
00160 myStats.n_objs = theLbdb->GetObjDataSz();
00161 if (myStats.objData) delete [] myStats.objData;
00162 myStats.objData = new LDObjData[myStats.n_objs];
00163 theLbdb->GetObjData(myStats.objData);
00164
00165 myStats.n_comm = theLbdb->GetCommDataSz();
00166 if (myStats.commData) delete [] myStats.commData;
00167 myStats.commData = new LDCommData[myStats.n_comm];
00168 theLbdb->GetCommData(myStats.commData);
00169
00170 myStats.obj_walltime = 0;
00171 #if CMK_LB_CPUTIMER
00172 myStats.obj_cputime = 0;
00173 #endif
00174 for(int i=0; i < myStats.n_objs; i++) {
00175 myStats.obj_walltime += myStats.objData[i].wallTime;
00176 #if CMK_LB_CPUTIMER
00177 myStats.obj_cputime += myStats.objData[i].cpuTime;
00178 #endif
00179 }
00180
00181 const int osz = theLbdb->GetObjDataSz();
00182 const int csz = theLbdb->GetCommDataSz();
00183 NLBStatsMsg* msg = new NLBStatsMsg(osz, csz);
00184
00185 msg->from_pe = CkMyPe();
00186
00187 msg->serial = CrnRand();
00188 msg->pe_speed = myStats.pe_speed;
00189 msg->total_walltime = myStats.total_walltime;
00190 msg->idletime = myStats.idletime;
00191 msg->bg_walltime = myStats.bg_walltime;
00192 msg->obj_walltime = myStats.obj_walltime;
00193 #if CMK_LB_CPUTIMER
00194 msg->total_cputime = myStats.total_cputime;
00195 msg->bg_cputime = myStats.bg_cputime;
00196 msg->obj_cputime = myStats.obj_cputime;
00197 #endif
00198 msg->n_objs = osz;
00199 theLbdb->GetObjData(msg->objData);
00200 msg->n_comm = csz;
00201 theLbdb->GetCommData(msg->commData);
00202
00203
00204 delete [] myStats.objData;
00205 myStats.objData = NULL;
00206 myStats.n_objs = 0;
00207 delete [] myStats.commData;
00208 myStats.commData = NULL;
00209 myStats.n_comm = 0;
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219 return msg;
00220 #else
00221 return NULL;
00222 #endif
00223 }
00224
00225 void NborBaseLB::Migrated(LDObjHandle h, int waitBarrier)
00226 {
00227 migrates_completed++;
00228
00229
00230 if (migrates_completed == migrates_expected) {
00231 MigrationDone(1);
00232 }
00233 }
00234
00235 void NborBaseLB::ReceiveStats(CkMarshalledNLBStatsMessage &data)
00236 {
00237 #if CMK_LBDB_ON
00238 NLBStatsMsg *m = data.getMessage();
00239 if (neighbor_pes == 0) FindNeighbors();
00240
00241 if (m == 0) {
00242 receive_stats_ready = 1;
00243 } else {
00244 const int pe = m->from_pe;
00245
00246
00247 int peslot = NeighborIndex(pe);
00248
00249 if (peslot == -1 || statsMsgsList[peslot] != 0) {
00250 CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
00251 pe);
00252 } else {
00253 statsMsgsList[peslot] = m;
00254 statsDataList[peslot].from_pe = m->from_pe;
00255 statsDataList[peslot].total_walltime = m->total_walltime;
00256 statsDataList[peslot].idletime = m->idletime;
00257 statsDataList[peslot].bg_walltime = m->bg_walltime;
00258 statsDataList[peslot].pe_speed = m->pe_speed;
00259 statsDataList[peslot].obj_walltime = m->obj_walltime;
00260 #if CMK_LB_CPUTIMER
00261 statsDataList[peslot].total_cputime = m->total_cputime;
00262 statsDataList[peslot].bg_cputime = m->bg_cputime;
00263 statsDataList[peslot].obj_cputime = m->obj_cputime;
00264 #endif
00265
00266 statsDataList[peslot].n_objs = m->n_objs;
00267 statsDataList[peslot].objData = m->objData;
00268 statsDataList[peslot].n_comm = m->n_comm;
00269 statsDataList[peslot].commData = m->commData;
00270
00271 if (_lb_args.ignoreBgLoad()) statsDataList[peslot].clearBgLoad();
00272
00273 stats_msg_count++;
00274 }
00275 }
00276
00277 const int clients = mig_msgs_expected;
00278 if (stats_msg_count == clients && receive_stats_ready) {
00279 double strat_start_time = CkWallTimer();
00280 receive_stats_ready = 0;
00281 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
00282
00283 int i;
00284
00285
00286 for(i=0; i < migrateMsg->n_moves; i++) {
00287 MigrateInfo& move = migrateMsg->moves[i];
00288 const int me = CkMyPe();
00289 if (move.from_pe == me && move.to_pe != me) {
00290 theLbdb->Migrate(move.obj,move.to_pe);
00291 } else if (move.from_pe != me) {
00292 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
00293 me,move.from_pe,move.to_pe);
00294 }
00295 }
00296
00297
00298 if (clients > 0)
00299 thisProxy.ReceiveMigration(migrateMsg, clients, neighbor_pes);
00300
00301
00302 for(i=0; i < clients; i++) {
00303 delete statsMsgsList[i];
00304 statsMsgsList[i]=NULL;
00305 }
00306 stats_msg_count=0;
00307
00308
00309 if (CkMyPe() == 0) {
00310 double strat_end_time = CkWallTimer();
00311 if (_lb_args.debug())
00312 CkPrintf("[%d] %s Strat elapsed time %f\n",CkMyPe(),lbName(),strat_end_time-strat_start_time);
00313 }
00314 }
00315 #endif
00316 }
00317
00318 void NborBaseLB::ReceiveMigration(LBMigrateMsg *msg)
00319 {
00320 #if CMK_LBDB_ON
00321 if (neighbor_pes == 0) FindNeighbors();
00322
00323 if (mig_msgs_received == 0) migrates_expected = 0;
00324
00325 mig_msgs[mig_msgs_received] = msg;
00326 mig_msgs_received++;
00327
00328
00329
00330 if (mig_msgs_received > mig_msgs_expected) {
00331 CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
00332 CkMyPe());
00333 }
00334
00335 if (mig_msgs_received != mig_msgs_expected) {
00336 return;
00337 }
00338
00339
00340 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
00341 LBMigrateMsg* m = mig_msgs[neigh];
00342 for(int i=0; i < m->n_moves; i++) {
00343 MigrateInfo& move = m->moves[i];
00344 const int me = CkMyPe();
00345 if (move.from_pe != me && move.to_pe == me) {
00346 migrates_expected++;
00347 }
00348 }
00349 delete m;
00350 mig_msgs[neigh]=0;
00351 }
00352
00353
00354 mig_msgs_received = 0;
00355 if (migrates_expected == 0 || migrates_expected == migrates_completed)
00356 MigrationDone(1);
00357 #endif
00358 }
00359
00360
00361 void NborBaseLB::MigrationDone(int balancing)
00362 {
00363 #if CMK_LBDB_ON
00364 migrates_completed = 0;
00365 migrates_expected = -1;
00366
00367 theLbdb->incStep();
00368
00369 theLbdb->ClearLoads();
00370
00371
00372 if (balancing && _lb_args.syncResume()) {
00373 CkCallback cb(CkIndex_NborBaseLB::ResumeClients((CkReductionMsg*)NULL),
00374 thisProxy);
00375 contribute(0, NULL, CkReduction::sum_int, cb);
00376 }
00377 else
00378 thisProxy [CkMyPe()].ResumeClients(balancing);
00379
00380 #endif
00381 }
00382
00383 void NborBaseLB::ResumeClients(CkReductionMsg *msg)
00384 {
00385 ResumeClients(1);
00386 delete msg;
00387 }
00388
00389 void NborBaseLB::ResumeClients(int balancing)
00390 {
00391 #if CMK_LBDB_ON
00392 DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
00393
00394 if (CkMyPe() == 0 && balancing) {
00395 double end_lb_time = CkWallTimer();
00396 if (_lb_args.debug())
00397 CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
00398 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
00399 }
00400
00401 theLbdb->ResumeClients();
00402 #endif
00403 }
00404
00405 LBMigrateMsg* NborBaseLB::Strategy(LDStats* stats, int n_nbrs)
00406 {
00407 for(int j=0; j < n_nbrs; j++) {
00408 CkPrintf("[%d] Proc %d Speed %d WALL: Total %f Idle %f Bg %f obj %f",
00409 CkMyPe(), stats[j].from_pe, stats[j].pe_speed, stats[j].total_walltime,
00410 stats[j].idletime, stats[j].bg_walltime, stats[j].obj_walltime);
00411 #if CMK_LB_CPUTIMER
00412 CkPrintf(" CPU: Total %f Bg %f obj %f", stats[j].total_cputime,
00413 stats[j].bg_cputime, stats[j].obj_cputime);
00414 #endif
00415 CkPrintf("\n");
00416 }
00417
00418 int sizes=0;
00419 LBMigrateMsg* msg = new(sizes,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00420 msg->n_moves = 0;
00421
00422 return msg;
00423 }
00424
00425 int NborBaseLB::NeighborIndex(int pe)
00426 {
00427 int peslot = -1;
00428 for(int i=0; i < mig_msgs_expected; i++) {
00429 if (pe == neighbor_pes[i]) {
00430 peslot = i;
00431 break;
00432 }
00433 }
00434 return peslot;
00435 }
00436
00437 NLBStatsMsg::NLBStatsMsg(int osz, int csz) {
00438 objData = new LDObjData[osz];
00439 commData = new LDCommData[csz];
00440 }
00441
00442 NLBStatsMsg::NLBStatsMsg(NLBStatsMsg *src)
00443 {
00444 int size;
00445 {
00446 PUP::sizer p;
00447 src->pup(p);
00448 size = p.size();
00449 }
00450 char *buf = new char[size];
00451 {
00452 PUP::toMem p(buf);
00453 src->pup(p);
00454 }
00455 {
00456 PUP::fromMem p(buf);
00457 pup(p);
00458 }
00459 delete [] buf;
00460 }
00461
00462 NLBStatsMsg::~NLBStatsMsg() {
00463 delete [] objData;
00464 delete [] commData;
00465 }
00466
00467 void NLBStatsMsg::pup(PUP::er &p) {
00468 int i;
00469 p|from_pe;
00470 p|serial;
00471 p|pe_speed;
00472 p|total_walltime;
00473 p|idletime;
00474 p|bg_walltime;
00475 #if CMK_LB_CPUTIMER
00476 p|total_cputime;
00477 p|bg_cputime;
00478 #endif
00479 p|n_objs;
00480 if (p.isUnpacking()) objData = new LDObjData[n_objs];
00481 for (i=0; i<n_objs; i++) p|objData[i];
00482 p|n_comm;
00483 if (p.isUnpacking()) commData = new LDCommData[n_comm];
00484 for (i=0; i<n_comm; i++) p|commData[i];
00485 }
00486
00487
00488
00489
00490
00491 CkMarshalledNLBStatsMessage::~CkMarshalledNLBStatsMessage() {
00492 if (msg) delete msg;
00493 }
00494
00495 void CkMarshalledNLBStatsMessage::pup(PUP::er &p)
00496 {
00497 int isnull;
00498 if (p.isPacking()) isnull = (msg==NULL?1:0);
00499 p|isnull;
00500 if (p.isUnpacking()) {
00501 if (!isnull) msg = new NLBStatsMsg;
00502 else msg = NULL;
00503 }
00504 if (msg) msg->pup(p);
00505 #if 0
00506 if (p.isUnpacking()) msg = new NLBStatsMsg;
00507 else CmiAssert(msg);
00508 msg->pup(p);
00509 #endif
00510 }
00511
00512
00513