00001
00005
00006 #ifndef CENTRALLB_H
00007 #define CENTRALLB_H
00008
00009 #include "BaseLB.h"
00010 #include "CentralLB.decl.h"
00011
00012 #include <vector>
00013 #include "pup_stl.h"
00014 #include "manager.h"
00015 extern CkGroupID loadbalancer;
00016
00017 void CreateCentralLB();
00018
00019 class CLBStatsMsg;
00020 class LBSimulation;
00021
00023 typedef LBMigrateMsg CLBMigrateMsg;
00024
00025 class LBInfo
00026 {
00027 public:
00028 LBRealType *peLoads;
00029 LBRealType *objLoads;
00030 LBRealType *comLoads;
00031 LBRealType *bgLoads;
00032 int numPes;
00033 int msgCount;
00034 CmiUInt8 msgBytes;
00035 LBRealType minObjLoad, maxObjLoad;
00036 LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL),
00037 bgLoads(NULL), numPes(0), msgCount(0),
00038 msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00039 LBInfo(LBRealType *pl, int count): peLoads(pl), objLoads(NULL),
00040 comLoads(NULL), bgLoads(NULL), numPes(count), msgCount(0),
00041 msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00042 LBInfo(int count);
00043 ~LBInfo();
00044 void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
00045 void clear();
00046 void print();
00047 void getSummary(LBRealType &maxLoad, LBRealType &maxCpuLoad, LBRealType &totalLoad);
00048 };
00049
00053 class SpanningTree
00054 {
00055 public:
00056 int arity;
00057 int parent;
00058 int numChildren;
00059 SpanningTree();
00060 void calcParent(int n);
00061 void calcNumChildren(int n);
00062 };
00063
00064 class CentralLB : public CBase_CentralLB
00065 {
00066 private:
00067 CLBStatsMsg *statsMsg;
00068 int count_msgs;
00069 void initLB(const CkLBOptions &);
00070 public:
00071 CkMarshalledCLBStatsMessage bufMsg;
00072 SpanningTree st;
00073 CentralLB(const CkLBOptions & opt) : CBase_CentralLB(opt), concurrent(false) { initLB(opt);
00074 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00075 lbDecisionCount= resumeCount=0;
00076 #endif
00077 #if CMK_SHRINK_EXPAND
00078 manager_init();
00079 #endif
00080 }
00081 CentralLB(CkMigrateMessage *m) : CBase_CentralLB(m), concurrent(false) {
00082 #if CMK_SHRINK_EXPAND
00083 manager_init();
00084 #endif
00085 }
00086 #if defined(TEMP_LDB)
00087 float getTemp(int);
00088 FILE* logFD;
00089 int physicalCoresPerNode;
00090 int logicalCoresPerNode,numSockets;
00091 int logicalCoresPerChip;
00092 #endif
00093
00094 virtual ~CentralLB();
00095
00096 void pup(PUP::er &p);
00097
00098 void turnOn();
00099 void turnOff();
00100
00101 void SetPESpeed(int);
00102 int GetPESpeed();
00103 inline void setConcurrent(bool c) { concurrent = c; }
00104
00105 static void staticAtSync(void*);
00106 void AtSync(void);
00107 void ProcessAtSync(void);
00108
00109 void SendStats();
00110 void ReceiveCounts(int *counts, int n);
00111 void ReceiveStats(CkMarshalledCLBStatsMessage &&msg);
00112 void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &&msg);
00113 void ReceiveStatsFromRoot(CkMarshalledCLBStatsMessage &&msg);
00114
00115 void depositData(CLBStatsMsg *m);
00116 void LoadBalance(void);
00117 void t_LoadBalance(void);
00118 void ApplyDecision(void);
00119 void ResumeClients(int);
00120
00121 void ResumeClients();
00122 void InitiateScatter(LBMigrateMsg *msg);
00123 void ScatterMigrationResults(LBScatterMsg *);
00124 void ReceiveMigration(LBScatterMsg *);
00125 void ReceiveMigration(LBMigrateMsg *);
00126 void ProcessMigrationDecision();
00127 void ProcessReceiveMigration();
00128 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00129 void ReceiveDummyMigration(int _step);
00130 #endif
00131 void MissMigrate(int waitForBarrier);
00132
00133
00134 void CheckForRealloc ();
00135 void ResumeFromReallocCheckpoint();
00136 void MigrationDoneImpl (int );
00137 void WillIbekilled(std::vector<char> avail, int);
00138 void StartCleanup();
00139
00140
00141 static void staticPredictorOn(void* data, void* model);
00142 static void staticPredictorOnWin(void* data, void* model, int wind);
00143 static void staticPredictorOff(void* data);
00144 static void staticChangePredictor(void* data, void* model);
00145
00146
00147 inline void StartLB() { thisProxy.ProcessAtSync(); }
00148 static void staticStartLB(void* data);
00149
00150
00151 static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
00152 void Migrated(int waitBarrier=1);
00153
00154 void MigrationDone(int balancing);
00155 void CheckMigrationComplete();
00156
00157
00158 void FuturePredictor(LDStats* stats);
00159
00160 struct FutureModel {
00161 int n_stats;
00162 int cur_stats;
00163 int start_stats;
00164 LDStats *collection;
00165 int n_objs;
00166 LBPredictorFunction *predictor;
00167 double **parameters;
00168 bool *model_valid;
00169
00170 FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
00171 n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
00172
00173 FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
00174 parameters(NULL) {
00175 collection = new LDStats[n];
00176
00177 predictor = new DefaultFunction();
00178 }
00179
00180 FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
00181 collection = new LDStats[n];
00182
00183 predictor = myfunc;
00184 }
00185
00186 ~FutureModel() {
00187 delete[] collection;
00188 for (int i=0;i<n_objs;++i) delete[] parameters[i];
00189 delete[] parameters;
00190 delete predictor;
00191 }
00192
00193 void changePredictor(LBPredictorFunction *new_predictor) {
00194 delete predictor;
00195 int i;
00196
00197 predictor = new_predictor;
00198 for (i=0;i<n_objs;++i) delete[] parameters[i];
00199 for (i=0;i<n_objs;++i) {
00200 parameters[i] = new double[new_predictor->num_params];
00201 model_valid[i] = false;
00202 }
00203 }
00204 };
00205
00206
00207
00208 void predictorOn(LBPredictorFunction *pred) {
00209 predictorOn(pred, _lb_predict_window);
00210 }
00211 void predictorOn(LBPredictorFunction *pred, int window_size) {
00212 if (predicted_model) {
00213 PredictorPrintf("Predictor already allocated");
00214 } else {
00215 _lb_predict_window = window_size;
00216 if (pred) predicted_model = new FutureModel(window_size, pred);
00217 else predicted_model = new FutureModel(window_size);
00218 _lb_predict = true;
00219 }
00220 PredictorPrintf("Predictor turned on, window size %d\n",window_size);
00221 }
00222
00223
00224 void predictorOff() {
00225 if (predicted_model) delete predicted_model;
00226 predicted_model = 0;
00227 _lb_predict = false;
00228 PredictorPrintf("Predictor turned off\n");
00229 }
00230
00231
00232
00233 void changePredictor(LBPredictorFunction *new_predictor) {
00234 if (predicted_model) {
00235 predicted_model->changePredictor(new_predictor);
00236 PredictorPrintf("Predictor model changed\n");
00237 }
00238 }
00239
00240
00241 LBMigrateMsg* callStrategy(LDStats* stats,int count){
00242 return Strategy(stats);
00243 };
00244
00245 int cur_ld_balancer;
00246
00247 void readStatsMsgs(const char* filename);
00248 void writeStatsMsgs(const char* filename);
00249
00250 void removeCommDataOfDeletedObjs(LDStats* stats);
00251 void preprocess(LDStats* stats);
00252 virtual LBMigrateMsg* Strategy(LDStats* stats);
00253 virtual void work(LDStats* stats);
00254 virtual void changeFreq(int n);
00255 virtual LBMigrateMsg * createMigrateMsg(LDStats* stats);
00256 virtual LBMigrateMsg * extractMigrateMsg(LBMigrateMsg *m, int p);
00257
00258
00259 virtual LBMigrateMsg* Strategy(LDStats* stats, int nprocs) {
00260 return Strategy(stats);
00261 }
00262
00263 protected:
00264 virtual bool QueryBalanceNow(int) { return true; };
00265 virtual bool QueryDumpData() { return false; };
00266 virtual void LoadbalanceDone(int balancing) {}
00267
00268 void simulationRead();
00269 void simulationWrite();
00270 void findSimResults(LDStats* stats, int count,
00271 LBMigrateMsg* msg, LBSimulation* simResults);
00272 void removeNonMigratable(LDStats* statsDataList, int count);
00273 void loadbalance_with_thread() { use_thread = true; }
00274
00275 bool concurrent;
00276 private:
00277 int myspeed;
00278 int stats_msg_count;
00279 CLBStatsMsg **statsMsgsList;
00280 LDStats *statsData;
00281 int migrates_completed;
00282 int migrates_expected;
00283 int future_migrates_completed;
00284 int future_migrates_expected;
00285 int lbdone;
00286 double start_lb_time;
00287 double strat_start_time;
00288 LBMigrateMsg *storedMigrateMsg;
00289 LBScatterMsg *storedScatterMsg;
00290 bool reduction_started;
00291 bool use_thread;
00292
00293 FutureModel *predicted_model;
00294
00295 void BuildStatsMsg();
00296 void buildStats();
00297 void printStrategyStats(LBMigrateMsg *msg);
00298
00299 public:
00300 int useMem();
00301 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00302 int savedBalancing;
00303 void endMigrationDone(int balancing);
00304 int lbDecisionCount ,resumeCount;
00305 #endif
00306 };
00307
00308 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00309 void resumeCentralLbAfterChkpt(void *lb);
00310 void resumeAfterRestoreParallelRecovery(void *_lb);
00311 #endif
00312
00313
00314
00315
00316 class CLBStatsMsg {
00317 public:
00318 #if defined(TEMP_LDB)
00319 float pe_temp;
00320 #endif
00321
00322 int from_pe;
00323 int pe_speed;
00324 LBRealType total_walltime;
00325 LBRealType idletime;
00326 LBRealType bg_walltime;
00327 #if CMK_LB_CPUTIMER
00328 LBRealType total_cputime;
00329 LBRealType bg_cputime;
00330 #endif
00331 int n_objs;
00332 LDObjData *objData;
00333 int n_comm;
00334 LDCommData *commData;
00335
00336 char * avail_vector;
00337 int next_lb;
00338 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00339 int step;
00340 #endif
00341
00342 public:
00343 CLBStatsMsg(int osz, int csz);
00344 CLBStatsMsg(): from_pe(0), pe_speed(0), total_walltime(0.0), idletime(0.0),
00345 bg_walltime(0.0), n_objs(0), objData(NULL), n_comm(0),
00346 #if defined(TEMP_LDB)
00347 pe_temp(1.0),
00348 #endif
00349
00350 #if CMK_LB_CPUTIMER
00351 total_cputime(0.0), bg_cputime(0.0),
00352 #endif
00353 commData(NULL), avail_vector(NULL), next_lb(0) {}
00354 ~CLBStatsMsg();
00355 void pup(PUP::er &p);
00356 };
00357
00358
00359
00360 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
00361
00362 #endif
00363