00001
00002
00003
00004
00005
00006
00007
00012
00013 #ifndef CENTRALLB_H
00014 #define CENTRALLB_H
00015
00016 #include <math.h>
00017 #include "BaseLB.h"
00018 #include "CentralLB.decl.h"
00019
00020 extern CkGroupID loadbalancer;
00021
00022 void CreateCentralLB();
00023
00024 class CLBStatsMsg;
00025 class LBSimulation;
00026
00028 typedef LBMigrateMsg CLBMigrateMsg;
00029
00030 class LBInfo
00031 {
00032 public:
00033 double *peLoads;
00034 double *objLoads;
00035 double *comLoads;
00036 double *bgLoads;
00037 int numPes;
00038 int msgCount;
00039 CmiUInt8 msgBytes;
00040 double minObjLoad, maxObjLoad;
00041 LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL),
00042 bgLoads(NULL), msgCount(0), msgBytes(0),
00043 numPes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00044 LBInfo(double *pl, int count): peLoads(pl), objLoads(NULL),
00045 comLoads(NULL), bgLoads(NULL), msgCount(0), msgBytes(0),
00046 numPes(count), minObjLoad(0.0), maxObjLoad(0.0) {}
00047 LBInfo(int count);
00048 ~LBInfo();
00049 void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
00050 void clear();
00051 void print();
00052 void getSummary(double &maxLoad, double &maxCpuLoad, double &totalLoad);
00053 };
00054
00058 class SpanningTree
00059 {
00060 public:
00061 int arity;
00062 int parent;
00063 int numChildren;
00064 SpanningTree();
00065 void calcParent(int n);
00066 void calcNumChildren(int n);
00067 };
00068
00069 class CentralLB : public BaseLB
00070 {
00071 private:
00072 CLBStatsMsg *statsMsg;
00073 int count_msgs;
00074 void initLB(const CkLBOptions &);
00075 public:
00076 CkMarshalledCLBStatsMessage bufMsg;
00077 SpanningTree st;
00078 CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); }
00079 CentralLB(CkMigrateMessage *m):BaseLB(m) {}
00080 virtual ~CentralLB();
00081
00082 void pup(PUP::er &p);
00083
00084 void turnOn();
00085 void turnOff();
00086
00087 static void staticAtSync(void*);
00088 void AtSync(void);
00089 void ProcessAtSync(void);
00090
00091
00092 void SendStats();
00093 void ReceiveCounts(CkReductionMsg *);
00094 void ReceiveStats(CkMarshalledCLBStatsMessage &msg);
00095 void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg);
00096
00097 void depositData(CLBStatsMsg *m);
00098 void LoadBalance(void);
00099 void ResumeClients(int);
00100
00101 void ResumeClients(CkReductionMsg *);
00102 void ReceiveMigration(LBMigrateMsg *);
00103 void MissMigrate(int waitForBarrier);
00104
00105
00106 static void staticPredictorOn(void* data, void* model);
00107 static void staticPredictorOnWin(void* data, void* model, int wind);
00108 static void staticPredictorOff(void* data);
00109 static void staticChangePredictor(void* data, void* model);
00110
00111
00112 inline void StartLB() { thisProxy.ProcessAtSync(); }
00113 static void staticStartLB(void* data);
00114
00115
00116 static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
00117 void Migrated(LDObjHandle h, int waitBarrier=1);
00118
00119 void MigrationDone(int balancing);
00120 void CheckMigrationComplete();
00121
00122
00123 void FuturePredictor(LDStats* stats);
00124
00125 struct FutureModel {
00126 int n_stats;
00127 int cur_stats;
00128 int start_stats;
00129 LDStats *collection;
00130 int n_objs;
00131 LBPredictorFunction *predictor;
00132 double **parameters;
00133 bool *model_valid;
00134
00135 FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
00136 n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
00137
00138 FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
00139 parameters(NULL) {
00140 collection = new LDStats[n];
00141
00142 predictor = new DefaultFunction();
00143 }
00144
00145 FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
00146 collection = new LDStats[n];
00147
00148 predictor = myfunc;
00149 }
00150
00151 ~FutureModel() {
00152 delete[] collection;
00153 for (int i=0;i<n_objs;++i) delete[] parameters[i];
00154 delete[] parameters;
00155 delete predictor;
00156 }
00157
00158 void changePredictor(LBPredictorFunction *new_predictor) {
00159 delete predictor;
00160 int i;
00161
00162 predictor = new_predictor;
00163 for (i=0;i<n_objs;++i) delete[] parameters[i];
00164 for (i=0;i<n_objs;++i) {
00165 parameters[i] = new double[new_predictor->num_params];
00166 model_valid = false;
00167 }
00168 }
00169 };
00170
00171
00172
00173 void predictorOn(LBPredictorFunction *pred) {
00174 predictorOn(pred, _lb_predict_window);
00175 }
00176 void predictorOn(LBPredictorFunction *pred, int window_size) {
00177 if (predicted_model) PredictorPrintf("Predictor already allocated");
00178 else {
00179 _lb_predict_window = window_size;
00180 if (pred) predicted_model = new FutureModel(window_size, pred);
00181 else predicted_model = new FutureModel(window_size);
00182 _lb_predict = CmiTrue;
00183 }
00184 PredictorPrintf("Predictor turned on, window size %d\n",window_size);
00185 }
00186
00187
00188 void predictorOff() {
00189 if (predicted_model) delete predicted_model;
00190 predicted_model = 0;
00191 _lb_predict = CmiFalse;
00192 PredictorPrintf("Predictor turned off\n");
00193 }
00194
00195
00196
00197 void changePredictor(LBPredictorFunction *new_predictor) {
00198 if (predicted_model) {
00199 predicted_model->changePredictor(new_predictor);
00200 PredictorPrintf("Predictor model changed\n");
00201 }
00202 }
00203
00204
00205 LBMigrateMsg* callStrategy(LDStats* stats,int count){
00206 return Strategy(stats,count);
00207 };
00208
00209 int cur_ld_balancer;
00210
00211 void readStatsMsgs(const char* filename);
00212 void writeStatsMsgs(const char* filename);
00213
00214 void preprocess(LDStats* stats,int count);
00215 virtual LBMigrateMsg* Strategy(LDStats* stats,int count);
00216 virtual void work(LDStats* stats,int count);
00217 virtual LBMigrateMsg * createMigrateMsg(LDStats* stats,int count);
00218 protected:
00219 virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };
00220 virtual CmiBool QueryDumpData() { return CmiFalse; };
00221 virtual void LoadbalanceDone(int balancing) {}
00222
00223 void simulationRead();
00224 void simulationWrite();
00225 void findSimResults(LDStats* stats, int count,
00226 LBMigrateMsg* msg, LBSimulation* simResults);
00227 void removeNonMigratable(LDStats* statsDataList, int count);
00228
00229 private:
00230 CProxy_CentralLB thisProxy;
00231 int myspeed;
00232 int stats_msg_count;
00233 CLBStatsMsg **statsMsgsList;
00234 LDStats *statsData;
00235 int migrates_completed;
00236 int migrates_expected;
00237 int future_migrates_completed;
00238 int future_migrates_expected;
00239 int lbdone;
00240 double start_lb_time;
00241
00242 FutureModel *predicted_model;
00243
00244 void BuildStatsMsg();
00245 void buildStats();
00246
00247 public:
00248 int useMem();
00249 };
00250
00251
00252
00253
00254 class CLBStatsMsg {
00255 public:
00256 int from_pe;
00257 int pe_speed;
00258 double total_walltime;
00259 double total_cputime;
00260 double idletime;
00261 double bg_walltime;
00262 double bg_cputime;
00263 int n_objs;
00264 LDObjData *objData;
00265 int n_comm;
00266 LDCommData *commData;
00267
00268 char * avail_vector;
00269 int next_lb;
00270 public:
00271 CLBStatsMsg(int osz, int csz);
00272 CLBStatsMsg(): from_pe(0), pe_speed(0),
00273 total_walltime(0.0), total_cputime(0.0),
00274 idletime(0.0), bg_walltime(0.0), bg_cputime(0.0),
00275 n_objs(0), objData(NULL), n_comm(0), commData(NULL),
00276 avail_vector(NULL), next_lb(0) {}
00277 ~CLBStatsMsg();
00278 void pup(PUP::er &p);
00279 };
00280
00281
00282
00283 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
00284
00285 #endif
00286