00001
00005
00006 #ifndef CENTRALLB_H
00007 #define CENTRALLB_H
00008
00009 #include "BaseLB.h"
00010 #include "CentralLB.decl.h"
00011
00012 extern CkGroupID loadbalancer;
00013
00014 void CreateCentralLB();
00015
00016 class CLBStatsMsg;
00017 class LBSimulation;
00018
00020 typedef LBMigrateMsg CLBMigrateMsg;
00021
00022 class LBInfo
00023 {
00024 public:
00025 LBRealType *peLoads;
00026 LBRealType *objLoads;
00027 LBRealType *comLoads;
00028 LBRealType *bgLoads;
00029 int numPes;
00030 int msgCount;
00031 CmiUInt8 msgBytes;
00032 LBRealType minObjLoad, maxObjLoad;
00033 LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL),
00034 bgLoads(NULL), numPes(0), msgCount(0),
00035 msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00036 LBInfo(LBRealType *pl, int count): peLoads(pl), objLoads(NULL),
00037 comLoads(NULL), bgLoads(NULL), numPes(count), msgCount(0),
00038 msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00039 LBInfo(int count);
00040 ~LBInfo();
00041 void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
00042 void clear();
00043 void print();
00044 void getSummary(LBRealType &maxLoad, LBRealType &maxCpuLoad, LBRealType &totalLoad);
00045 };
00046
00050 class SpanningTree
00051 {
00052 public:
00053 int arity;
00054 int parent;
00055 int numChildren;
00056 SpanningTree();
00057 void calcParent(int n);
00058 void calcNumChildren(int n);
00059 };
00060
00061 class CentralLB : public BaseLB
00062 {
00063 private:
00064 CLBStatsMsg *statsMsg;
00065 int count_msgs;
00066 void initLB(const CkLBOptions &);
00067 public:
00068 CkMarshalledCLBStatsMessage bufMsg;
00069 SpanningTree st;
00070 CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt);
00071 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00072 lbDecisionCount= resumeCount=0;
00073 #endif
00074 }
00075 CentralLB(CkMigrateMessage *m):BaseLB(m) {}
00076 virtual ~CentralLB();
00077
00078 void pup(PUP::er &p);
00079
00080 void turnOn();
00081 void turnOff();
00082
00083 static void staticAtSync(void*);
00084 void AtSync(void);
00085 void ProcessAtSync(void);
00086
00087
00088 void SendStats();
00089 void ReceiveCounts(CkReductionMsg *);
00090 void ReceiveStats(CkMarshalledCLBStatsMessage &msg);
00091 void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg);
00092
00093 void depositData(CLBStatsMsg *m);
00094 void LoadBalance(void);
00095 void ResumeClients(int);
00096
00097 void ResumeClients(CkReductionMsg *);
00098 void ReceiveMigration(LBMigrateMsg *);
00099 void ProcessReceiveMigration(CkReductionMsg *);
00100 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00101 void ReceiveDummyMigration(int _step);
00102 #endif
00103 void MissMigrate(int waitForBarrier);
00104
00105
00106 static void staticPredictorOn(void* data, void* model);
00107 static void staticPredictorOnWin(void* data, void* model, int wind);
00108 static void staticPredictorOff(void* data);
00109 static void staticChangePredictor(void* data, void* model);
00110
00111
00112 inline void StartLB() { thisProxy.ProcessAtSync(); }
00113 static void staticStartLB(void* data);
00114
00115
00116 static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
00117 void Migrated(LDObjHandle h, int waitBarrier=1);
00118
00119 void MigrationDone(int balancing);
00120 void CheckMigrationComplete();
00121
00122
00123 void FuturePredictor(LDStats* stats);
00124
00125 struct FutureModel {
00126 int n_stats;
00127 int cur_stats;
00128 int start_stats;
00129 LDStats *collection;
00130 int n_objs;
00131 LBPredictorFunction *predictor;
00132 double **parameters;
00133 bool *model_valid;
00134
00135 FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
00136 n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
00137
00138 FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
00139 parameters(NULL) {
00140 collection = new LDStats[n];
00141
00142 predictor = new DefaultFunction();
00143 }
00144
00145 FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
00146 collection = new LDStats[n];
00147
00148 predictor = myfunc;
00149 }
00150
00151 ~FutureModel() {
00152 delete[] collection;
00153 for (int i=0;i<n_objs;++i) delete[] parameters[i];
00154 delete[] parameters;
00155 delete predictor;
00156 }
00157
00158 void changePredictor(LBPredictorFunction *new_predictor) {
00159 delete predictor;
00160 int i;
00161
00162 predictor = new_predictor;
00163 for (i=0;i<n_objs;++i) delete[] parameters[i];
00164 for (i=0;i<n_objs;++i) {
00165 parameters[i] = new double[new_predictor->num_params];
00166 model_valid[i] = false;
00167 }
00168 }
00169 };
00170
00171
00172
00173 void predictorOn(LBPredictorFunction *pred) {
00174 predictorOn(pred, _lb_predict_window);
00175 }
00176 void predictorOn(LBPredictorFunction *pred, int window_size) {
00177 if (predicted_model) PredictorPrintf("Predictor already allocated");
00178 else {
00179 _lb_predict_window = window_size;
00180 if (pred) predicted_model = new FutureModel(window_size, pred);
00181 else predicted_model = new FutureModel(window_size);
00182 _lb_predict = CmiTrue;
00183 }
00184 PredictorPrintf("Predictor turned on, window size %d\n",window_size);
00185 }
00186
00187
00188 void predictorOff() {
00189 if (predicted_model) delete predicted_model;
00190 predicted_model = 0;
00191 _lb_predict = CmiFalse;
00192 PredictorPrintf("Predictor turned off\n");
00193 }
00194
00195
00196
00197 void changePredictor(LBPredictorFunction *new_predictor) {
00198 if (predicted_model) {
00199 predicted_model->changePredictor(new_predictor);
00200 PredictorPrintf("Predictor model changed\n");
00201 }
00202 }
00203
00204
00205 LBMigrateMsg* callStrategy(LDStats* stats,int count){
00206 return Strategy(stats);
00207 };
00208
00209 int cur_ld_balancer;
00210
00211 void readStatsMsgs(const char* filename);
00212 void writeStatsMsgs(const char* filename);
00213
00214 void preprocess(LDStats* stats);
00215 virtual LBMigrateMsg* Strategy(LDStats* stats);
00216 virtual void work(LDStats* stats);
00217 virtual LBMigrateMsg * createMigrateMsg(LDStats* stats);
00218 virtual LBMigrateMsg * extractMigrateMsg(LBMigrateMsg *m, int p);
00219
00220
00221 virtual LBMigrateMsg* Strategy(LDStats* stats, int nprocs) {
00222 return Strategy(stats);
00223 }
00224
00225 protected:
00226 virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };
00227 virtual CmiBool QueryDumpData() { return CmiFalse; };
00228 virtual void LoadbalanceDone(int balancing) {}
00229
00230 void simulationRead();
00231 void simulationWrite();
00232 void findSimResults(LDStats* stats, int count,
00233 LBMigrateMsg* msg, LBSimulation* simResults);
00234 void removeNonMigratable(LDStats* statsDataList, int count);
00235
00236 private:
00237 CProxy_CentralLB thisProxy;
00238 int myspeed;
00239 int stats_msg_count;
00240 CLBStatsMsg **statsMsgsList;
00241 LDStats *statsData;
00242 int migrates_completed;
00243 int migrates_expected;
00244 int future_migrates_completed;
00245 int future_migrates_expected;
00246 int lbdone;
00247 double start_lb_time;
00248 LBMigrateMsg *storedMigrateMsg;
00249 int reduction_started;
00250
00251 FutureModel *predicted_model;
00252
00253 void BuildStatsMsg();
00254 void buildStats();
00255
00256 public:
00257 int useMem();
00258 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00259 int savedBalancing;
00260 void endMigrationDone(int balancing);
00261 int lbDecisionCount ,resumeCount;
00262 #endif
00263 };
00264
00265 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00266 void resumeCentralLbAfterChkpt(void *lb);
00267 #endif
00268
00269
00270
00271
00272 class CLBStatsMsg {
00273 public:
00274 int from_pe;
00275 int pe_speed;
00276 LBRealType total_walltime;
00277 LBRealType idletime;
00278 LBRealType bg_walltime;
00279 #if CMK_LB_CPUTIMER
00280 LBRealType total_cputime;
00281 LBRealType bg_cputime;
00282 #endif
00283 int n_objs;
00284 LDObjData *objData;
00285 int n_comm;
00286 LDCommData *commData;
00287
00288 char * avail_vector;
00289 int next_lb;
00290 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00291 int step;
00292 #endif
00293
00294 public:
00295 CLBStatsMsg(int osz, int csz);
00296 CLBStatsMsg(): from_pe(0), pe_speed(0), total_walltime(0.0), idletime(0.0),
00297 bg_walltime(0.0), n_objs(0), objData(NULL), n_comm(0),
00298 #if CMK_LB_CPUTIMER
00299 total_cputime(0.0), bg_cputime(0.0),
00300 #endif
00301 commData(NULL), avail_vector(NULL), next_lb(0) {}
00302 ~CLBStatsMsg();
00303 void pup(PUP::er &p);
00304 };
00305
00306
00307
00308 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
00309
00310 #endif
00311