00001
00006 #include "BaseLB.h"
00007 #include "DistBaseLB.h"
00008 #include "LBDBManager.h"
00009 #include "DistBaseLB.def.h"
00010
00011 #define DEBUGF(x) // CmiPrintf x;
00012
00013 void DistBaseLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier) {
00014 DistBaseLB *me = (DistBaseLB*)(data);
00015 me->Migrated(h, waitBarrier);
00016 }
00017
00018 void DistBaseLB::staticAtSync(void* data) {
00019 DistBaseLB *me = (DistBaseLB*)(data);
00020 me->ProcessAtSync();
00021 }
00022
00023 void DistBaseLB::staticStartLB(void* data) {
00024 DistBaseLB *me = (DistBaseLB*)(data);
00025 me->barrierDone();
00026 }
00027
00028 void DistBaseLB::barrierDone() {
00029 thisProxy.AtSync();
00030 }
00031
00032 void DistBaseLB::ProcessAtSync() {
00033
00034 CkCallback cb (CkReductionTarget(DistBaseLB, barrierDone), 0, thisProxy);
00035 contribute(cb);
00036 }
00037
00038 DistBaseLB::DistBaseLB(const CkLBOptions &opt): CBase_DistBaseLB(opt) {
00039 #if CMK_LBDB_ON
00040 lbname = (char *)"DistBaseLB";
00041 thisProxy = CProxy_DistBaseLB(thisgroup);
00042 receiver = theLbdb->AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
00043 (void*)(this));
00044 notifier = theLbdb->getLBDB()->NotifyMigrated((LDMigratedFn)(staticMigrated),
00045 (void*)(this));
00046 startLbFnHdl = theLbdb->getLBDB()->AddStartLBFn((LDStartLBFn)(staticStartLB),
00047 (void*)(this));
00048 theLbdb->AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)this);
00049
00050 migrates_completed = 0;
00051 migrates_expected = 0;
00052 lb_started = false;
00053 mig_msgs = NULL;
00054
00055 myStats.pe_speed = theLbdb->ProcessorSpeed();
00056 myStats.from_pe = CkMyPe();
00057 myStats.n_objs = 0;
00058 myStats.objData = NULL;
00059 myStats.n_comm = 0;
00060 myStats.commData = NULL;
00061
00062 if (_lb_args.statsOn()) {
00063 theLbdb->CollectStatsOn();
00064 }
00065 #endif
00066 }
00067
00068 DistBaseLB::~DistBaseLB() {
00069 #if CMK_LBDB_ON
00070 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
00071 if (theLbdb) {
00072 theLbdb->getLBDB()->RemoveNotifyMigrated(notifier);
00073 theLbdb-> RemoveStartLBFn((LDStartLBFn)(staticStartLB));
00074 }
00075 if (mig_msgs) {
00076 delete [] mig_msgs;
00077 }
00078 #endif
00079 }
00080
00081
00082 void DistBaseLB::AtSync() {
00083 #if CMK_LBDB_ON
00084 if (lb_started) {
00085 return;
00086 }
00087 lb_started = true;
00088
00089 start_lb_time = 0;
00090
00091 if (CkNumPes() == 1) {
00092 MigrationDone(0);
00093 return;
00094 }
00095
00096 start_lb_time = CkWallTimer();
00097 if (CkMyPe() == 0) {
00098 if (_lb_args.debug()) {
00099 CkPrintf("[%s] Load balancing step %d starting at %f\n",
00100 lbName(), step(),start_lb_time);
00101 }
00102 }
00103
00104 AssembleStats();
00105 thisProxy[CkMyPe()].LoadBalance();
00106 #endif
00107 }
00108
00109
00110
00111 void DistBaseLB::AssembleStats() {
00112 #if CMK_LBDB_ON
00113 #if CMK_LB_CPUTIMER
00114 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
00115 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
00116 #else
00117 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_walltime);
00118 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_walltime);
00119 #endif
00120 theLbdb->IdleTime(&myStats.idletime);
00121
00122 myStats.move = true;
00123
00124 myStats.n_objs = theLbdb->GetObjDataSz();
00125 if (myStats.objData) delete [] myStats.objData;
00126 myStats.objData = new LDObjData[myStats.n_objs];
00127 theLbdb->GetObjData(myStats.objData);
00128
00129 myStats.n_comm = theLbdb->GetCommDataSz();
00130 if (myStats.commData) delete [] myStats.commData;
00131 myStats.commData = new LDCommData[myStats.n_comm];
00132 theLbdb->GetCommData(myStats.commData);
00133
00134 myStats.obj_walltime = 0;
00135 #if CMK_LB_CPUTIMER
00136 myStats.obj_cputime = 0;
00137 #endif
00138 for(int i=0; i < myStats.n_objs; i++) {
00139 myStats.obj_walltime += myStats.objData[i].wallTime;
00140 #if CMK_LB_CPUTIMER
00141 myStats.obj_cputime += myStats.objData[i].cpuTime;
00142 #endif
00143 }
00144 #endif
00145 }
00146
00147 void DistBaseLB::LoadBalance() {
00148 #if CMK_LBDB_ON
00149 strat_start_time = CkWallTimer();
00150
00151 if (CkMyPe() == 0 && _lb_args.debug()) {
00152 CkPrintf("DistLB> %s: step %d starting at %f Memory: %f MB\n",
00153 lbname, step(), strat_start_time, CmiMemoryUsage()/(1024.0*1024.0));
00154 }
00155
00156 migrates_expected = 0;
00157 migrates_completed = 0;
00158 Strategy(&myStats);
00159 #endif
00160 }
00161
00162 void DistBaseLB::Migrated(LDObjHandle h, int waitBarrier) {
00163 migrates_completed++;
00164 if (migrates_completed == migrates_expected && lb_started) {
00165 MigrationDone(1);
00166 }
00167 }
00168
00169
00170
00171
00172
00173 void DistBaseLB::ProcessMigrationDecision(LBMigrateMsg *migrateMsg) {
00174 #if CMK_LBDB_ON
00175 strat_end_time = CkWallTimer() - strat_start_time;
00176 const int me = CkMyPe();
00177
00178
00179 for(int i=0; i < migrateMsg->n_moves; i++) {
00180 MigrateInfo& move = migrateMsg->moves[i];
00181 if (move.from_pe == me && move.to_pe != me) {
00182 theLbdb->Migrate(move.obj,move.to_pe);
00183 } else if (move.from_pe != me) {
00184 CkPrintf("[%d] Error, strategy wants to move from %d to %d\n",
00185 me,move.from_pe,move.to_pe);
00186 CkAbort("Trying to move objs not on my PE\n");
00187 }
00188 }
00189
00190 if (CkMyPe() == 0) {
00191 double strat_end_time = CkWallTimer();
00192 if (_lb_args.debug())
00193 CkPrintf("%s> Strategy took %fs memory usage: %f MB.\n", lbName(),
00194 strat_end_time - strat_start_time, CmiMemoryUsage()/(1024.0*1024.0));
00195 }
00196
00197
00198 if (migrates_expected == migrates_completed && lb_started) {
00199 MigrationDone(1);
00200 }
00201 #endif
00202 }
00203
00204 void DistBaseLB::MigrationDone(int balancing) {
00205 #if CMK_LBDB_ON
00206
00207 lb_started = false;
00208
00209 theLbdb->incStep();
00210 theLbdb->ClearLoads();
00211
00212
00213 if (balancing && _lb_args.syncResume()) {
00214 contribute(CkCallback(CkReductionTarget(DistBaseLB, ResumeClients),
00215 thisProxy));
00216 }
00217 else
00218 thisProxy [CkMyPe()].ResumeClients(balancing);
00219 #endif
00220 }
00221
00222 void DistBaseLB::ResumeClients() {
00223 ResumeClients(1);
00224 }
00225
00226 void DistBaseLB::ResumeClients(int balancing) {
00227 #if CMK_LBDB_ON
00228 DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
00229
00230 if (CkMyPe() == 0 && balancing) {
00231 double end_lb_time = CkWallTimer();
00232 if (_lb_args.debug())
00233 CkPrintf("%s> step %d finished at %f duration %f memory usage: %f\n",
00234 lbName(), step() - 1, end_lb_time, end_lb_time - strat_start_time,
00235 CmiMemoryUsage() / (1024.0 * 1024.0));
00236 }
00237
00238 theLbdb->ResumeClients();
00239 #endif
00240 }
00241
00242 void DistBaseLB::Strategy(const LDStats* const stats) {
00243 int sizes=0;
00244 LBMigrateMsg* msg = new(sizes, CkNumPes(), CkNumPes(), 0) LBMigrateMsg;
00245 msg->n_moves = 0;
00246 }