00001
00005
00006 #include <charm++.h>
00007
00008 #if CMK_LBDB_ON
00009
00010 #include "LBDBManager.h"
00011
00012 struct MigrateCB;
00013
00014
00015
00016
00017
00018
00019
00020
00021 void LBDB::batsyncer::gotoSync(void *bs)
00022 {
00023 LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
00024 s->gotoSyncCalled = true;
00025 s->db->AtLocalBarrier(s->BH);
00026 }
00027
00028 void LBDB::batsyncer::resumeFromSync(void *bs)
00029 {
00030 LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
00031
00032
00033 #if 0
00034 double curT = CmiWallTimer();
00035 if (s->nextT<curT) s->period *= 2;
00036 s->nextT = curT + s->period;
00037 #endif
00038
00039 if (s->gotoSyncCalled) {
00040 CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
00041 s->gotoSyncCalled = false;
00042 }
00043 }
00044
00045
00046 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
00047 {
00048 db=_db;
00049 period=initPeriod;
00050 nextT = CmiWallTimer() + period;
00051 BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
00052 gotoSyncCalled = true;
00053
00054 resumeFromSync((void *)this);
00055 }
00056
00057
00058
00059
00060
00061
00062 LBDB::LBDB(): useBarrier(true)
00063 {
00064 statsAreOn = false;
00065 omCount = oms_registering = 0;
00066 obj_running = false;
00067 commTable = new LBCommTable;
00068 obj_walltime = 0;
00069 #if CMK_LB_CPUTIMER
00070 obj_cputime = 0;
00071 #endif
00072 startLBFn_count = 0;
00073 predictCBFn = NULL;
00074 batsync.init(this, _lb_args.lbperiod());
00075 objsEmptyHead = -1;
00076 }
00077
00078 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData,
00079 LDCallbacks _callbacks)
00080 {
00081 LDOMHandle newhandle;
00082
00083 newhandle.ldb.handle = (void*)(this);
00084
00085 newhandle.id = _userID;
00086
00087 LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
00088 if (om != NULL) {
00089 newhandle.handle = oms.length();
00090 oms.insertAtEnd(om);
00091 } else newhandle.handle = -1;
00092 om->DepositHandle(newhandle);
00093 omCount++;
00094 return newhandle;
00095 }
00096
00097 void LBDB::RemoveOM(LDOMHandle om)
00098 {
00099 delete oms[om.handle];
00100 oms[om.handle] = NULL;
00101 omCount--;
00102 }
00103
00104
00105 #if CMK_BIGSIM_CHARM
00106 #define LBOBJ_OOC_IDX 0x1
00107 #endif
00108
00109 LDObjHandle LBDB::AddObj(LDOMHandle _omh, CmiUInt8 _id,
00110 void *_userData, bool _migratable)
00111 {
00112 LDObjHandle newhandle;
00113
00114 newhandle.omhandle = _omh;
00115
00116 newhandle.id = _id;
00117
00118 #if CMK_BIGSIM_CHARM
00119 if(_BgOutOfCoreFlag==2){
00120
00121 int newpos = -1;
00122 for(int i=0; i<objs.size(); i++){
00123 if(objs[i].obj==(LBObj *)LBOBJ_OOC_IDX){
00124 newpos = i;
00125 break;
00126 }
00127 }
00128 if(newpos==-1) newpos = objs.size();
00129 newhandle.handle = newpos;
00130 LBObj *obj = new LBObj(newhandle, _userData, _migratable);
00131 if (newpos == -1) {
00132 objs.emplace_back(obj);
00133 } else {
00134 objs[newpos].obj = obj;
00135 }
00136
00137
00138
00139 }else
00140 #endif
00141 {
00142
00143
00144
00145
00146
00147
00148
00149 if (objsEmptyHead == -1) {
00150 newhandle.handle = objs.size();
00151 LBObj *obj = new LBObj(newhandle, _userData, _migratable);
00152 objs.emplace_back(obj);
00153 } else {
00154 newhandle.handle = objsEmptyHead;
00155 LBObj *obj = new LBObj(newhandle, _userData, _migratable);
00156 objs[objsEmptyHead].obj = obj;
00157
00158 objsEmptyHead = objs[objsEmptyHead].next;
00159 }
00160 }
00161
00162
00163
00164 return newhandle;
00165 }
00166
00167 void LBDB::UnregisterObj(LDObjHandle _h)
00168 {
00169
00170
00171
00172 delete objs[_h.handle].obj;
00173
00174 #if CMK_BIGSIM_CHARM
00175
00176
00177
00178 if(_BgOutOfCoreFlag==1){
00179 objs[_h.handle].obj = (LBObj *)(LBOBJ_OOC_IDX);
00180 }else
00181 #endif
00182 {
00183 objs[_h.handle].obj = NULL;
00184
00185
00186 objs[_h.handle].next = objsEmptyHead;
00187 objsEmptyHead = _h.handle;
00188 }
00189 }
00190
00191 void LBDB::RegisteringObjects(LDOMHandle _h)
00192 {
00193
00194 if (_h.id.id.idx == 0) {
00195 if (oms_registering == 0)
00196 localBarrier.TurnOff();
00197 oms_registering++;
00198 }
00199 else {
00200 LBOM* om = oms[_h.handle];
00201 if (!om->RegisteringObjs()) {
00202 if (oms_registering == 0)
00203 localBarrier.TurnOff();
00204 oms_registering++;
00205 om->SetRegisteringObjs(true);
00206 }
00207 }
00208 }
00209
00210 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
00211 {
00212
00213 if (_h.id.id.idx == 0) {
00214 oms_registering--;
00215 if (oms_registering == 0 && useBarrier)
00216 localBarrier.TurnOn();
00217 }
00218 else {
00219 LBOM* om = oms[_h.handle];
00220 if (om->RegisteringObjs()) {
00221 oms_registering--;
00222 if (oms_registering == 0 && useBarrier)
00223 localBarrier.TurnOn();
00224 om->SetRegisteringObjs(false);
00225 }
00226 }
00227 }
00228
00229
00230 void LBDB::Send(const LDOMHandle &destOM, const CmiUInt8 &destid, unsigned int bytes, int destObjProc)
00231 {
00232 LBCommData* item_ptr;
00233
00234 if (obj_running) {
00235 const LDObjHandle &runObj = RunningObj();
00236
00237
00238 if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
00239 && runObj.id == destid )
00240 return;
00241
00242
00243
00244
00245 LBCommData item(runObj,destOM.id,destid, destObjProc);
00246 item_ptr = commTable->HashInsertUnique(item);
00247 } else {
00248 LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
00249 item_ptr = commTable->HashInsertUnique(item);
00250 }
00251 item_ptr->addMessage(bytes);
00252 }
00253
00254 void LBDB::MulticastSend(const LDOMHandle &destOM, CmiUInt8 *destids, int ndests, unsigned int bytes, int nMsgs)
00255 {
00256 LBCommData* item_ptr;
00257
00258 if (obj_running) {
00259 const LDObjHandle &runObj = RunningObj();
00260
00261 LBCommData item(runObj,destOM.id,destids, ndests);
00262 item_ptr = commTable->HashInsertUnique(item);
00263 item_ptr->addMessage(bytes, nMsgs);
00264 }
00265 }
00266
00267 void LBDB::ClearLoads(void)
00268 {
00269 int i;
00270 for(i=0; i < objs.size(); i++) {
00271 LBObj *obj = objs[i].obj;
00272 if (obj)
00273 {
00274 if (obj->data.wallTime>.0) {
00275 obj->lastWallTime = obj->data.wallTime;
00276 #if CMK_LB_CPUTIMER
00277 obj->lastCpuTime = obj->data.cpuTime;
00278 #endif
00279 }
00280 obj->data.wallTime = 0.;
00281 #if CMK_LB_CPUTIMER
00282 obj->data.cpuTime = 0.;
00283 #endif
00284 }
00285 }
00286 delete commTable;
00287 commTable = new LBCommTable;
00288 machineUtil.Clear();
00289 obj_walltime = 0;
00290 #if CMK_LB_CPUTIMER
00291 obj_cputime = 0;
00292 #endif
00293 }
00294
00295 int LBDB::ObjDataCount()
00296 {
00297 int nitems=0;
00298 int i;
00299 if (_lb_args.migObjOnly()) {
00300 for(i=0; i < objs.size(); i++)
00301 if (objs[i].obj && (objs[i].obj)->data.migratable)
00302 nitems++;
00303 }
00304 else {
00305 for(i=0; i < objs.size(); i++)
00306 if (objs[i].obj)
00307 nitems++;
00308 }
00309 return nitems;
00310 }
00311
00312 void LBDB::GetObjData(LDObjData *dp)
00313 {
00314 if (_lb_args.migObjOnly()) {
00315 for(int i = 0; i < objs.size(); i++) {
00316 LBObj* obj = objs[i].obj;
00317 if (obj && obj->data.migratable)
00318 *dp++ = obj->ObjData();
00319 }
00320 }
00321 else {
00322 for(int i = 0; i < objs.size(); i++) {
00323 LBObj* obj = objs[i].obj;
00324 if (obj)
00325 *dp++ = obj->ObjData();
00326 }
00327 }
00328 }
00329
00330 int LBDB::Migrate(LDObjHandle h, int dest)
00331 {
00332
00333
00334
00335 if (h.handle >= objs.size()) {
00336 CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objs.size());
00337 CmiAbort("LB handle out of range!");
00338 }
00339 else if (!(objs[h.handle].obj)) {
00340 CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objs.size());
00341 CmiAbort("LB handle no longer registered!");
00342 }
00343
00344 LBOM *const om = oms[(objs[h.handle].obj)->parentOM().handle];
00345 om->Migrate(h, dest);
00346 return 1;
00347 }
00348
00349 void LBDB::MetaLBResumeWaitingChares(int lb_ideal_period) {
00350 for (int i = 0; i < objs.size(); i++) {
00351 LBObj* obj = objs[i].obj;
00352 if (obj) {
00353 LBOM *om = oms[obj->parentOM().handle];
00354 LDObjHandle h = obj->GetLDObjHandle();
00355 om->MetaLBResumeWaitingChares(h, lb_ideal_period);
00356 }
00357 }
00358 }
00359
00360 void LBDB::MetaLBCallLBOnChares() {
00361 for (int i = 0; i < objs.size(); i++) {
00362 LBObj* obj = objs[i].obj;
00363 if (obj) {
00364 LBOM *om = oms[obj->parentOM().handle];
00365 LDObjHandle h = obj->GetLDObjHandle();
00366 om->MetaLBCallLBOnChares(h);
00367 }
00368 }
00369 }
00370
00371 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
00372 {
00373
00374
00375
00376
00377
00378 for(int i=migrateCBList.length()-1; i>=0; i--) {
00379 MigrateCB* cb = (MigrateCB*)migrateCBList[i];
00380 if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
00381 }
00382
00383 }
00384
00385
00386 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
00387 {
00388
00389 MigrateCB* callbk = new MigrateCB;
00390
00391 callbk->fn = fn;
00392 callbk->data = data;
00393 callbk->on = 1;
00394 migrateCBList.insertAtEnd(callbk);
00395 return migrateCBList.size()-1;
00396 }
00397
00398 void LBDB::RemoveNotifyMigrated(int handle)
00399 {
00400 MigrateCB* callbk = migrateCBList[handle];
00401 migrateCBList[handle] = NULL;
00402 delete callbk;
00403 }
00404
00405 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
00406 {
00407
00408 StartLBCB* callbk = new StartLBCB;
00409
00410 callbk->fn = fn;
00411 callbk->data = data;
00412 callbk->on = 1;
00413 startLBFnList.push_back(callbk);
00414 startLBFn_count++;
00415 return startLBFnList.size()-1;
00416 }
00417
00418 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
00419 {
00420 for (int i=0; i<startLBFnList.length(); i++) {
00421 StartLBCB* callbk = startLBFnList[i];
00422 if (callbk && callbk->fn == fn) {
00423 delete callbk;
00424 startLBFnList[i] = 0;
00425 startLBFn_count --;
00426 break;
00427 }
00428 }
00429 }
00430
00431 void LBDB::StartLB()
00432 {
00433 if (startLBFn_count == 0) {
00434 CmiAbort("StartLB is not supported in this LB");
00435 }
00436 for (int i=0; i<startLBFnList.length(); i++) {
00437 StartLBCB *startLBFn = startLBFnList[i];
00438 if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
00439 }
00440 }
00441
00442 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
00443
00444 MigrationDoneCB* callbk = new MigrationDoneCB;
00445
00446 callbk->fn = fn;
00447 callbk->data = data;
00448 migrationDoneCBList.push_back(callbk);
00449 return migrationDoneCBList.size()-1;
00450 }
00451
00452 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
00453 for (int i=0; i<migrationDoneCBList.length(); i++) {
00454 MigrationDoneCB* callbk = migrationDoneCBList[i];
00455 if (callbk && callbk->fn == fn) {
00456 delete callbk;
00457 migrationDoneCBList[i] = 0;
00458 break;
00459 }
00460 }
00461 }
00462
00463 void LBDB::MigrationDone() {
00464 for (int i=0; i<migrationDoneCBList.length(); i++) {
00465 MigrationDoneCB *callbk = migrationDoneCBList[i];
00466 if (callbk) callbk->fn(callbk->data);
00467 }
00468 }
00469
00470 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
00471 {
00472 if (predictCBFn==NULL) predictCBFn = new PredictCB;
00473 predictCBFn->on = on;
00474 predictCBFn->onWin = onWin;
00475 predictCBFn->off = off;
00476 predictCBFn->change = change;
00477 predictCBFn->data = data;
00478 }
00479
00480 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
00481 {
00482 LBRealType total_walltime;
00483 LBRealType total_cputime;
00484 TotalTime(&total_walltime, &total_cputime);
00485
00486 LBRealType idletime;
00487 IdleTime(&idletime);
00488
00489 *bg_walltime = total_walltime - idletime - obj_walltime;
00490 if (*bg_walltime < 0) *bg_walltime = 0.;
00491 #if CMK_LB_CPUTIMER
00492 *bg_cputime = total_cputime - obj_cputime;
00493 #else
00494 *bg_cputime = *bg_walltime;
00495 #endif
00496 }
00497
00498 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
00499 LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
00500 {
00501 TotalTime(total_walltime,total_cputime);
00502
00503 IdleTime(idletime);
00504
00505 *bg_walltime = *total_walltime - *idletime - obj_walltime;
00506 if (*bg_walltime < 0) *bg_walltime = 0.;
00507 #if CMK_LB_CPUTIMER
00508 *bg_cputime = *total_cputime - obj_cputime;
00509 #else
00510 *bg_cputime = *bg_walltime;
00511 #endif
00512
00513 }
00514
00515 void LBDB::DumpDatabase()
00516 {
00517 #ifdef DEBUG
00518 CmiPrintf("Database contains %d object managers\n",omCount);
00519 CmiPrintf("Database contains %d objects\n",objs.size());
00520 #endif
00521 }
00522
00523 int LBDB::useMem() {
00524 int size = sizeof(LBDB);
00525 size += oms.length() * sizeof(LBOM);
00526 size += ObjDataCount() * sizeof(LBObj);
00527 size += migrateCBList.length() * sizeof(MigrateCBList);
00528 size += startLBFnList.length() * sizeof(StartLBCB);
00529 size += commTable->useMem();
00530 return size;
00531 }
00532
00533 class client {
00534 friend class LocalBarrier;
00535 void* data;
00536 LDResumeFn fn;
00537 int refcount;
00538 };
00539 class receiver {
00540 friend class LocalBarrier;
00541 void* data;
00542 LDBarrierFn fn;
00543 int on;
00544 };
00545
00546 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
00547 {
00548 client* new_client = new client;
00549 new_client->fn = fn;
00550 new_client->data = data;
00551 new_client->refcount = cur_refcount;
00552
00553 #if CMK_BIGSIM_CHARM
00554 if(_BgOutOfCoreFlag!=2){
00555
00556
00557 client_count++;
00558 }
00559 #else
00560 client_count++;
00561 #endif
00562
00563 return LDBarrierClient(clients.insert(clients.end(), new_client));
00564 }
00565
00566 void LocalBarrier::RemoveClient(LDBarrierClient c)
00567 {
00568 delete *(c.i);
00569 clients.erase(c.i);
00570
00571 #if CMK_BIGSIM_CHARM
00572
00573
00574 if(_BgOutOfCoreFlag!=1)
00575 {
00576 client_count--;
00577 }
00578 #else
00579 client_count--;
00580 #endif
00581 }
00582
00583 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
00584 {
00585 receiver* new_receiver = new receiver;
00586 new_receiver->fn = fn;
00587 new_receiver->data = data;
00588 new_receiver->on = 1;
00589
00590 return LDBarrierReceiver(receivers.insert(receivers.end(), new_receiver));
00591 }
00592
00593 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
00594 {
00595 delete *(c.i);
00596 receivers.erase(c.i);
00597 }
00598
00599 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
00600 {
00601 (*c.i)->on = 1;
00602 }
00603
00604 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
00605 {
00606 (*c.i)->on = 0;
00607 }
00608
00609 void LocalBarrier::AtBarrier(LDBarrierClient h)
00610 {
00611 (*h.i)->refcount++;
00612 at_count++;
00613 CheckBarrier();
00614 }
00615
00616 void LocalBarrier::DecreaseBarrier(LDBarrierClient h, int c)
00617 {
00618 at_count-=c;
00619 }
00620
00621 void LocalBarrier::CheckBarrier()
00622 {
00623 if (!on) return;
00624
00625
00626
00627 if (client_count == 0) {
00628 cur_refcount++;
00629 CallReceivers();
00630 }
00631 if (at_count >= client_count) {
00632 bool at_barrier = false;
00633
00634 for(std::list<client*>::iterator i = clients.begin(); i != clients.end(); ++i)
00635 if ((*i)->refcount >= cur_refcount)
00636 at_barrier = true;
00637
00638 if (at_barrier) {
00639 at_count -= client_count;
00640 cur_refcount++;
00641 CallReceivers();
00642 }
00643 }
00644 }
00645
00646 void LocalBarrier::CallReceivers(void)
00647 {
00648 bool called_receiver=false;
00649
00650 for (std::list<receiver *>::iterator i = receivers.begin();
00651 i != receivers.end(); ++i) {
00652 receiver *recv = *i;
00653 if (recv->on) {
00654 recv->fn(recv->data);
00655 called_receiver = true;
00656 }
00657 }
00658
00659 if (!called_receiver)
00660 ResumeClients();
00661 }
00662
00663 void LocalBarrier::ResumeClients(void)
00664 {
00665 for (std::list<client *>::iterator i = clients.begin(); i != clients.end(); ++i)
00666 (*i)->fn((*i)->data);
00667 }
00668
00669 #endif
00670