00001
00005
00006 #include <charm++.h>
00007
00008 #if CMK_LBDB_ON
00009
00010 #include "LBDBManager.h"
00011
00012 struct MigrateCB;
00013
00014
00015
00016
00017
00018
00019
00020
00021 void LBDB::batsyncer::gotoSync(void *bs)
00022 {
00023 LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
00024 s->db->AtLocalBarrier(s->BH);
00025 }
00026
00027 void LBDB::batsyncer::resumeFromSync(void *bs)
00028 {
00029 LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
00030
00031
00032 #if 0
00033 double curT = CmiWallTimer();
00034 if (s->nextT<curT) s->period *= 2;
00035 s->nextT = curT + s->period;
00036 #endif
00037
00038 CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
00039 }
00040
00041
00042 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
00043 {
00044 db=_db;
00045 period=initPeriod;
00046 nextT = CmiWallTimer() + period;
00047 BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
00048
00049 resumeFromSync((void *)this);
00050 }
00051
00052
00053
00054
00055
00056
00057 LBDB::LBDB(): useBarrier(CmiTrue)
00058 {
00059 statsAreOn = CmiFalse;
00060 omCount = objCount = oms_registering = 0;
00061 obj_running = CmiFalse;
00062 commTable = new LBCommTable;
00063 obj_walltime = 0;
00064 #if CMK_LB_CPUTIMER
00065 obj_cputime = 0;
00066 #endif
00067 startLBFn_count = 0;
00068 predictCBFn = NULL;
00069 batsync.init(this, _lb_args.lbperiod());
00070 }
00071
00072 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData,
00073 LDCallbacks _callbacks)
00074 {
00075 LDOMHandle newhandle;
00076
00077 newhandle.ldb.handle = (void*)(this);
00078
00079 newhandle.id = _userID;
00080
00081 LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
00082 if (om != NULL) {
00083 newhandle.handle = oms.length();
00084 oms.insertAtEnd(om);
00085 } else newhandle.handle = -1;
00086 om->DepositHandle(newhandle);
00087 omCount++;
00088 return newhandle;
00089 }
00090
00091 #if CMK_BIGSIM_CHARM
00092 #define LBOBJ_OOC_IDX 0x1
00093 #endif
00094
00095 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
00096 void *_userData, CmiBool _migratable)
00097 {
00098 LDObjHandle newhandle;
00099
00100 newhandle.omhandle = _omh;
00101
00102 newhandle.id = _id;
00103
00104 #if 1
00105 #if CMK_BIGSIM_CHARM
00106 if(_BgOutOfCoreFlag==2){
00107
00108 int newpos = -1;
00109 for(int i=0; i<objs.length(); i++){
00110 if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
00111 newpos = i;
00112 break;
00113 }
00114 }
00115 if(newpos==-1) newpos = objs.length();
00116 newhandle.handle = newpos;
00117 LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
00118 objs.insert(newpos, obj);
00119
00120
00121
00122 }else
00123 #endif
00124 {
00125
00126
00127 newhandle.handle = objs.length();
00128 LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
00129 objs.insertAtEnd(obj);
00130 objCount++;
00131 }
00132
00133
00134
00135 #else
00136 LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
00137 if (obj != NULL) {
00138 newhandle.handle = objs.length();
00139 objs.insertAtEnd(obj);
00140 } else {
00141 newhandle.handle = -1;
00142 }
00143 obj->DepositHandle(newhandle);
00144 #endif
00145 return newhandle;
00146 }
00147
00148 void LBDB::UnregisterObj(LDObjHandle _h)
00149 {
00150
00151
00152
00153 delete objs[_h.handle];
00154
00155 #if CMK_BIGSIM_CHARM
00156
00157
00158
00159 if(_BgOutOfCoreFlag==1){
00160 objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
00161 }else
00162 #endif
00163 {
00164 objs[_h.handle] = NULL;
00165 }
00166 }
00167
00168 void LBDB::RegisteringObjects(LDOMHandle _h)
00169 {
00170
00171 if (_h.id.id.idx == 0) {
00172 if (oms_registering == 0)
00173 localBarrier.TurnOff();
00174 oms_registering++;
00175 }
00176 else {
00177 LBOM* om = oms[_h.handle];
00178 if (!om->RegisteringObjs()) {
00179 if (oms_registering == 0)
00180 localBarrier.TurnOff();
00181 oms_registering++;
00182 om->SetRegisteringObjs(CmiTrue);
00183 }
00184 }
00185 }
00186
00187 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
00188 {
00189
00190 if (_h.id.id.idx == 0) {
00191 oms_registering--;
00192 if (oms_registering == 0)
00193 localBarrier.TurnOn();
00194 }
00195 else {
00196 LBOM* om = oms[_h.handle];
00197 if (om->RegisteringObjs()) {
00198 oms_registering--;
00199 if (oms_registering == 0)
00200 localBarrier.TurnOn();
00201 om->SetRegisteringObjs(CmiFalse);
00202 }
00203 }
00204 }
00205
00206
00207 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
00208 {
00209 LBCommData* item_ptr;
00210
00211 if (obj_running) {
00212 const LDObjHandle &runObj = RunningObj();
00213
00214
00215 if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
00216 && LDObjIDEqual(runObj.id,destid) )
00217 return;
00218
00219
00220
00221
00222 LBCommData item(runObj,destOM.id,destid, destObjProc);
00223 item_ptr = commTable->HashInsertUnique(item);
00224 } else {
00225 LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
00226 item_ptr = commTable->HashInsertUnique(item);
00227 }
00228 item_ptr->addMessage(bytes);
00229 }
00230
00231 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
00232 {
00233 LBCommData* item_ptr;
00234
00235
00236 if (obj_running) {
00237 const LDObjHandle &runObj = RunningObj();
00238
00239 LBCommData item(runObj,destOM.id,destids, ndests);
00240 item_ptr = commTable->HashInsertUnique(item);
00241 item_ptr->addMessage(bytes, nMsgs);
00242 }
00243 }
00244
00245 void LBDB::ClearLoads(void)
00246 {
00247 int i;
00248 for(i=0; i < objCount; i++) {
00249 LBObj *obj = objs[i];
00250 if (obj)
00251 {
00252 if (obj->data.wallTime>.0) {
00253 obj->lastWallTime = obj->data.wallTime;
00254 #if CMK_LB_CPUTIMER
00255 obj->lastCpuTime = obj->data.cpuTime;
00256 #endif
00257 }
00258 obj->data.wallTime = 0.;
00259 #if CMK_LB_CPUTIMER
00260 obj->data.cpuTime = 0.;
00261 #endif
00262 }
00263 }
00264 delete commTable;
00265 commTable = new LBCommTable;
00266 machineUtil.Clear();
00267 obj_walltime = 0;
00268 #if CMK_LB_CPUTIMER
00269 obj_cputime = 0;
00270 #endif
00271 }
00272
00273 int LBDB::ObjDataCount()
00274 {
00275 int nitems=0;
00276 int i;
00277 if (_lb_args.migObjOnly()) {
00278 for(i=0; i < objCount; i++)
00279 if (objs[i] && (objs[i])->data.migratable)
00280 nitems++;
00281 }
00282 else {
00283 for(i=0; i < objCount; i++)
00284 if (objs[i])
00285 nitems++;
00286 }
00287 return nitems;
00288 }
00289
00290 void LBDB::GetObjData(LDObjData *dp)
00291 {
00292 if (_lb_args.migObjOnly()) {
00293 for(int i = 0; i < objs.length(); i++) {
00294 LBObj* obj = objs[i];
00295 if ( obj && obj->data.migratable)
00296 *dp++ = obj->ObjData();
00297 }
00298 }
00299 else {
00300 for(int i = 0; i < objs.length(); i++) {
00301 LBObj* obj = objs[i];
00302 if (obj)
00303 *dp++ = obj->ObjData();
00304 }
00305 }
00306 }
00307
00308 int LBDB::Migrate(LDObjHandle h, int dest)
00309 {
00310
00311
00312
00313 if (h.handle > objCount)
00314 CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
00315 else if (!objs[h.handle]) {
00316 CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
00317 return 0;
00318 }
00319
00320 if ((h.handle < objCount) && objs[h.handle]) {
00321 LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
00322 om->Migrate(h, dest);
00323 }
00324 return 1;
00325 }
00326
00327 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
00328 {
00329
00330
00331
00332
00333
00334 for(int i=migrateCBList.length()-1; i>=0; i--) {
00335 MigrateCB* cb = (MigrateCB*)migrateCBList[i];
00336 if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
00337 }
00338
00339 }
00340
00341 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
00342 {
00343
00344 MigrateCB* callbk = new MigrateCB;
00345
00346 callbk->fn = fn;
00347 callbk->data = data;
00348 callbk->on = 1;
00349 migrateCBList.insertAtEnd(callbk);
00350 return migrateCBList.size()-1;
00351 }
00352
00353 void LBDB::RemoveNotifyMigrated(int handle)
00354 {
00355 MigrateCB* callbk = migrateCBList[handle];
00356 migrateCBList[handle] = NULL;
00357 delete callbk;
00358 }
00359
00360 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
00361 {
00362
00363 StartLBCB* callbk = new StartLBCB;
00364
00365 callbk->fn = fn;
00366 callbk->data = data;
00367 callbk->on = 1;
00368 startLBFnList.push_back(callbk);
00369 startLBFn_count++;
00370 return startLBFnList.size()-1;
00371 }
00372
00373 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
00374 {
00375 for (int i=0; i<startLBFnList.length(); i++) {
00376 StartLBCB* callbk = startLBFnList[i];
00377 if (callbk && callbk->fn == fn) {
00378 delete callbk;
00379 startLBFnList[i] = 0;
00380 startLBFn_count --;
00381 break;
00382 }
00383 }
00384 }
00385
00386 void LBDB::StartLB()
00387 {
00388 if (startLBFn_count == 0) {
00389 CmiAbort("StartLB is not supported in this LB");
00390 }
00391 for (int i=0; i<startLBFnList.length(); i++) {
00392 StartLBCB *startLBFn = startLBFnList[i];
00393 if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
00394 }
00395 }
00396
00397 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
00398
00399 MigrationDoneCB* callbk = new MigrationDoneCB;
00400
00401 callbk->fn = fn;
00402 callbk->data = data;
00403 migrationDoneCBList.push_back(callbk);
00404 return migrationDoneCBList.size()-1;
00405 }
00406
00407 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
00408 for (int i=0; i<migrationDoneCBList.length(); i++) {
00409 MigrationDoneCB* callbk = migrationDoneCBList[i];
00410 if (callbk && callbk->fn == fn) {
00411 delete callbk;
00412 migrationDoneCBList[i] = 0;
00413 break;
00414 }
00415 }
00416 }
00417
00418 void LBDB::MigrationDone() {
00419 for (int i=0; i<migrationDoneCBList.length(); i++) {
00420 MigrationDoneCB *callbk = migrationDoneCBList[i];
00421 if (callbk) callbk->fn(callbk->data);
00422 }
00423 }
00424
00425 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
00426 {
00427 if (predictCBFn==NULL) predictCBFn = new PredictCB;
00428 predictCBFn->on = on;
00429 predictCBFn->onWin = onWin;
00430 predictCBFn->off = off;
00431 predictCBFn->change = change;
00432 predictCBFn->data = data;
00433 }
00434
00435 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
00436 {
00437 LBRealType total_walltime;
00438 LBRealType total_cputime;
00439 TotalTime(&total_walltime, &total_cputime);
00440
00441 LBRealType idletime;
00442 IdleTime(&idletime);
00443
00444 *bg_walltime = total_walltime - idletime - obj_walltime;
00445 if (*bg_walltime < 0) *bg_walltime = 0.;
00446 #if CMK_LB_CPUTIMER
00447 *bg_cputime = total_cputime - obj_cputime;
00448 #else
00449 *bg_cputime = *bg_walltime;
00450 #endif
00451 }
00452
00453 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
00454 LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
00455 {
00456 TotalTime(total_walltime,total_cputime);
00457
00458 IdleTime(idletime);
00459
00460 *bg_walltime = *total_walltime - *idletime - obj_walltime;
00461 if (*bg_walltime < 0) *bg_walltime = 0.;
00462 #if CMK_LB_CPUTIMER
00463 *bg_cputime = *total_cputime - obj_cputime;
00464 #else
00465 *bg_cputime = *bg_walltime;
00466 #endif
00467
00468 }
00469
00470 void LBDB::DumpDatabase()
00471 {
00472 #ifdef DEBUG
00473 CmiPrintf("Database contains %d object managers\n",omCount);
00474 CmiPrintf("Database contains %d objects\n",objCount);
00475 #endif
00476 }
00477
00478 int LBDB::useMem() {
00479 int size = sizeof(LBDB);
00480 size += oms.length() * sizeof(LBOM);
00481 size += ObjDataCount() * sizeof(LBObj);
00482 size += migrateCBList.length() * sizeof(MigrateCBList);
00483 size += startLBFnList.length() * sizeof(StartLBCB);
00484 size += commTable->useMem();
00485 return size;
00486 }
00487
00488 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
00489 {
00490 client* new_client = new client;
00491 new_client->fn = fn;
00492 new_client->data = data;
00493 new_client->refcount = cur_refcount;
00494
00495 LDBarrierClient ret_val;
00496 #if CMK_BIGSIM_CHARM
00497 ret_val.serial = first_free_client_slot;
00498 clients.insert(ret_val.serial, new_client);
00499
00500
00501 int nextfree=-1;
00502 for(int i=first_free_client_slot+1; i<clients.size(); i++)
00503 if(clients[i]==NULL) { nextfree = i; break; }
00504 if(nextfree==-1) nextfree = clients.size();
00505 first_free_client_slot = nextfree;
00506
00507 if(_BgOutOfCoreFlag!=2){
00508
00509
00510 client_count++;
00511 }
00512
00513 #else
00514
00515 ret_val.serial = clients.size();
00516 clients.insertAtEnd(new_client);
00517
00518 client_count++;
00519 #endif
00520
00521 return ret_val;
00522 }
00523
00524 void LocalBarrier::RemoveClient(LDBarrierClient c)
00525 {
00526 const int cnum = c.serial;
00527 #if CMK_BIGSIM_CHARM
00528 if (cnum < clients.size() && clients[cnum] != 0) {
00529 delete (clients[cnum]);
00530 clients[cnum] = 0;
00531
00532 if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
00533
00534 if(_BgOutOfCoreFlag!=1){
00535
00536
00537 client_count--;
00538 }
00539 }
00540 #else
00541
00542 if (cnum < clients.size() && clients[cnum] != 0) {
00543 delete (clients[cnum]);
00544 clients[cnum] = 0;
00545 client_count--;
00546 }
00547 #endif
00548 }
00549
00550 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
00551 {
00552 receiver* new_receiver = new receiver;
00553 new_receiver->fn = fn;
00554 new_receiver->data = data;
00555 new_receiver->on = 1;
00556
00557 LDBarrierReceiver ret_val;
00558
00559 ret_val.serial = receivers.size();
00560 receivers.insertAtEnd(new_receiver);
00561
00562
00563 return ret_val;
00564 }
00565
00566 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
00567 {
00568 const int cnum = c.serial;
00569
00570 if (cnum < receivers.size() && receivers[cnum] != 0) {
00571 delete (receivers[cnum]);
00572 receivers[cnum] = 0;
00573 }
00574 }
00575
00576 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
00577 {
00578 const int cnum = c.serial;
00579
00580 if (cnum < receivers.size() && receivers[cnum] != 0) {
00581 receivers[cnum]->on = 1;
00582 }
00583 }
00584
00585 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
00586 {
00587 const int cnum = c.serial;
00588
00589 if (cnum < receivers.size() && receivers[cnum] != 0) {
00590 receivers[cnum]->on = 0;
00591 }
00592 }
00593
00594 void LocalBarrier::AtBarrier(LDBarrierClient h)
00595 {
00596 (clients[h.serial])->refcount++;
00597 at_count++;
00598 CheckBarrier();
00599 }
00600
00601 void LocalBarrier::CheckBarrier()
00602 {
00603 if (!on) return;
00604
00605
00606
00607 if (client_count == 0) {
00608 cur_refcount++;
00609 CallReceivers();
00610 }
00611 if (at_count >= client_count) {
00612 CmiBool at_barrier = CmiFalse;
00613
00614
00615 for(int i=0; i < clients.size(); i++)
00616 if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
00617 at_barrier = CmiTrue;
00618
00619 if (at_barrier) {
00620 at_count -= client_count;
00621 cur_refcount++;
00622 CallReceivers();
00623 }
00624 }
00625 }
00626
00627 void LocalBarrier::CallReceivers(void)
00628 {
00629 CmiBool called_receiver=CmiFalse;
00630
00631
00632
00633 for (int i=receivers.size()-1; i>=0; i--) {
00634 receiver *recv = receivers[i];
00635 if (recv != 0 && recv->on) {
00636 recv->fn(recv->data);
00637 called_receiver = CmiTrue;
00638 }
00639 }
00640
00641 if (!called_receiver)
00642 ResumeClients();
00643
00644 }
00645
00646 void LocalBarrier::ResumeClients(void)
00647 {
00648
00649 for(int i=0; i < clients.size(); i++)
00650 if (clients[i] != 0) {
00651 ((client*)clients[i])->fn(((client*)clients[i])->data);
00652 }
00653 }
00654
00655 #endif
00656