00001
00024
00025 #include "CommAwareRefineLB.h"
00026 #include "ckgraph.h"
00027 #include <algorithm>
00028 #include <map>
00029
00030 #include <vector>
00031 using std::vector;
00032
00033 #include <time.h>
00034
00035 #define THRESHOLD 0.02
00036 #define SWAP_MULTIPLIER 5
00037
00038 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid);
00039 inline void printMapping(vector<Vertex> &vertices);
00040 inline void removeFromArray(int pe_id, vector<int> &array);
00041 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr);
00042 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int> *parr_objs, ObjGraph *ogr, ProcArray* parr);
00043 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
00044 vector<int> &parr_above_avg, vector<int> &parr_below_avg,
00045 vector<bool> &proc_load_info, ProcArray *parr);
00046 inline void getPossiblePes(vector<int>& possible_pes, int randomly_obj_id,
00047 ObjGraph *ogr, ProcArray* parr);
00048
00049 double upper_threshold;
00050 double lower_threshold;
00051
00052 extern int quietModeRequested;
00053
00054 CreateLBFunc_Def(CommAwareRefineLB, "always assign the heaviest obj onto lightest loaded processor.")
00055
00056 CommAwareRefineLB::CommAwareRefineLB(const CkLBOptions &opt): CBase_CommAwareRefineLB(opt)
00057 {
00058 lbname = "CommAwareRefineLB";
00059 if (CkMyPe()==0 && !quietModeRequested)
00060 CkPrintf("CharmLB> CommAwareRefineLB created.\n");
00061 }
00062
00063 bool CommAwareRefineLB::QueryBalanceNow(int _step)
00064 {
00065
00066 return true;
00067 }
00068
00069 class CommAwareRefineLB::ProcLoadGreater {
00070 public:
00071 ProcLoadGreater(ProcArray *parr) : parr(parr) {
00072 }
00073 bool operator()(int lhs, int rhs) {
00074 return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
00075 }
00076
00077 private:
00078 ProcArray *parr;
00079 };
00080
00081 class CommAwareRefineLB::ObjLoadGreater {
00082 public:
00083 bool operator()(Vertex v1, Vertex v2) {
00084 return (v1.getVertexLoad() > v2.getVertexLoad());
00085 }
00086 };
00087
00088 class CommAwareRefineLB::PeCommInfo {
00089 public:
00090 PeCommInfo() : num_msg(0), num_bytes(0) {
00091 }
00092
00093 PeCommInfo(int pe_id) : pe_id(pe_id), num_msg(0) , num_bytes(0) {
00094 }
00095 int pe_id;
00096 int num_msg;
00097 int num_bytes;
00098
00099 };
00100
00101
00102
00103 class CommAwareRefineLB::ObjPeCommInfo {
00104 public:
00105 ObjPeCommInfo() {
00106 }
00107
00108 int obj_id;
00109 vector<CommAwareRefineLB::PeCommInfo> pcomm;
00110 };
00111
00112 class CommAwareRefineLB::ProcCommGreater {
00113 public:
00114 bool operator()(CommAwareRefineLB::PeCommInfo p1, CommAwareRefineLB::PeCommInfo p2) {
00115
00116 return (p1.num_bytes > p2.num_bytes);
00117 }
00118 };
00119
00120 void PrintProcLoad(ProcArray *parr) {
00121 int vert;
00122 double pe_load;
00123 for (vert = 0; vert < parr->procs.size(); vert++) {
00124 pe_load = parr->procs[vert].getTotalLoad();
00125 if (pe_load > upper_threshold) {
00126 CkPrintf("Above load : %d load : %E overhead : %E\n",
00127 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
00128 parr->procs[vert].overhead());
00129 } else if (pe_load < lower_threshold) {
00130 CkPrintf("Below load : %d load : %E overhead : %E\n",
00131 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
00132 parr->procs[vert].overhead());
00133 } else {
00134 CkPrintf("Within avg load : %d load : %E overhead : %E\n",
00135 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
00136 parr->procs[vert].overhead());
00137 }
00138 }
00139 }
00140
00141 void PrintProcObj(ProcArray *parr, vector<int>* parr_objs) {
00142 int i, j;
00143 CkPrintf("---------------------\n");
00144 for (i = 0; i < parr->procs.size(); i++) {
00145 CkPrintf("[%d] contains ", i);
00146 for (j = 0; j < parr_objs[i].size(); j++) {
00147 CkPrintf(" %d, ", parr_objs[i][j]);
00148 }
00149 CkPrintf("\n");
00150 }
00151 CkPrintf("---------------------\n");
00152 }
00153
00154
00155 void CommAwareRefineLB::work(LDStats* stats) {
00157 ProcArray *parr = new ProcArray(stats);
00158 ObjGraph *ogr = new ObjGraph(stats);
00159 double avgload = parr->getAverageLoad();
00160
00161
00162 vector<bool> proc_load_info(parr->procs.size(), false);
00163
00164
00165
00166 vector<int>* parr_objs = new vector<int>[parr->procs.size()];
00167
00168 upper_threshold = avgload + (avgload * THRESHOLD);
00169
00170 lower_threshold = avgload;
00171
00172 int less_loaded_counter = 0;
00173
00174 srand(time(NULL));
00177 if (_lb_args.debug()>1)
00178 CkPrintf("[%d] In CommAwareRefineLB strategy\n",CkMyPe());
00179
00180 CkPrintf("-- Average load %E\n", avgload);
00181
00182 int vert, i, j;
00183 int curr_pe;
00184
00185
00186 for(vert = 0; vert < ogr->vertices.size(); vert++) {
00187 curr_pe = ogr->vertices[vert].getCurrentPe();
00188 parr_objs[curr_pe].push_back(vert);
00189 ogr->vertices[vert].setNewPe(curr_pe);
00190 }
00191
00192 vector<int> parr_above_avg;
00193 vector<int> parr_below_avg;
00194
00195 double pe_load;
00196
00197
00198
00199
00200 for (vert = 0; vert < parr->procs.size(); vert++) {
00201 pe_load = parr->procs[vert].getTotalLoad();
00202 if (pe_load > upper_threshold) {
00203
00204 parr_above_avg.push_back(vert);
00205 } else if (pe_load < lower_threshold) {
00206 parr_below_avg.push_back(parr->procs[vert].getProcId());
00207 proc_load_info[parr->procs[vert].getProcId()] = true;
00208 less_loaded_counter++;
00209 }
00210 }
00211
00212 std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
00213 CommAwareRefineLB::ProcLoadGreater(parr));
00214
00215 int random;
00216 int randomly_obj_id;
00217 bool obj_allocated;
00218 int num_tries;
00219
00220 int total_swaps = ogr->vertices.size() * SWAP_MULTIPLIER;
00221 int possible_pe;
00222 double obj_load;
00223
00224
00225 while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
00226
00227
00228 obj_allocated = false;
00229 num_tries = 0;
00230
00231
00232 int p_index = popFromProcHeap(parr_above_avg, parr);
00233 ProcInfo& p = parr->procs[p_index];
00234
00235 while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
00236
00237
00238
00239 if (parr_objs[p.getProcId()].size() == 0) {
00240
00241 obj_allocated = true;
00242 break;
00243 }
00244
00245 int randd = rand();
00246 random = randd % parr_objs[p.getProcId()].size();
00247 randomly_obj_id = parr_objs[p.getProcId()][random];
00248 obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
00249
00250
00251
00252 vector<int> possible_pes;
00253 getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
00254 for (i = 0; i < possible_pes.size(); i++) {
00255
00256
00257
00258 possible_pe = possible_pes[i];
00259
00260 if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
00261
00262
00263
00264
00265
00266 handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
00267 obj_allocated = true;
00268 total_swaps--;
00269 updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
00270 parr_above_avg, parr_below_avg, proc_load_info, parr);
00271
00272 break;
00273 }
00274 }
00275
00276
00277
00278 if (!obj_allocated) {
00279
00280 for (int x = 0; x < parr_below_avg.size(); x++) {
00281 int random_pe = parr_below_avg[x];
00282 if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
00283 obj_allocated = true;
00284 total_swaps--;
00285 handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
00286 updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
00287 parr_above_avg, parr_below_avg, proc_load_info, parr);
00288 break;
00289 }
00290 num_tries++;
00291 }
00292 }
00293 }
00294
00295 if (!obj_allocated) {
00296
00297
00298
00299
00300 }
00301 }
00302
00303
00304
00306 ogr->convertDecisions(stats);
00307 delete parr;
00308 delete ogr;
00309 delete[] parr_objs;
00310 }
00311
00312 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid) {
00313 for (int i = 0; i < parr_objs.size(); i++) {
00314 if (parr_objs[i] == remove_objid) {
00315 parr_objs.erase(parr_objs.begin() + i);
00316 return;
00317 }
00318 }
00319 }
00320
00321 inline void printMapping(vector<Vertex> &vertices) {
00322 for (int i = 0; i < vertices.size(); i++) {
00323 CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
00324 vertices[i].getNewPe());
00325 }
00326 }
00327
00328 inline void removeFromArray(int pe_id, vector<int> &array) {
00329 for (int i = 0; i < array.size(); i++) {
00330 if (array[i] == pe_id) {
00331 array.erase(array.begin() + i);
00332 }
00333 }
00334 }
00335
00336 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr) {
00337 int p_index = parr_above_avg.front();
00338 std::pop_heap(parr_above_avg.begin(), parr_above_avg.end(),
00339 CommAwareRefineLB::ProcLoadGreater(parr));
00340 parr_above_avg.pop_back();
00341 return p_index;
00342 }
00343
00344
00345 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int>* parr_objs, ObjGraph *ogr, ProcArray* parr) {
00346 ogr->vertices[randomly_obj_id].setNewPe(possible_pe);
00347 parr_objs[possible_pe].push_back(randomly_obj_id);
00348 ProcInfo &possible_pe_procinfo = parr->procs[possible_pe];
00349
00350 p.totalLoad() -= ogr->vertices[randomly_obj_id].getVertexLoad();
00351 possible_pe_procinfo.totalLoad() += ogr->vertices[randomly_obj_id].getVertexLoad();
00352 eraseObjFromParrObjs(parr_objs[p.getProcId()], randomly_obj_id);
00353
00354
00355 }
00356
00357 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
00358 vector<int>& parr_above_avg, vector<int>& parr_below_avg,
00359 vector<bool> &proc_load_info, ProcArray *parr) {
00360
00361 ProcInfo& p = parr->procs[p_index];
00362 ProcInfo& possible_pe_procinfo = parr->procs[possible_pe];
00363
00364
00365
00366 if (p.getTotalLoad() > upper_threshold) {
00367 parr_above_avg.push_back(p_index);
00368 std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
00369 CommAwareRefineLB::ProcLoadGreater(parr));
00370
00371 } else if (p.getTotalLoad() < lower_threshold) {
00372 parr_below_avg.push_back(p_index);
00373 proc_load_info[p_index] = true;
00374
00375 }
00376
00377
00378
00379 if (possible_pe_procinfo.getTotalLoad() > upper_threshold) {
00380
00381 parr_above_avg.push_back(possible_pe);
00382 std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
00383 CommAwareRefineLB::ProcLoadGreater(parr));
00384 removeFromArray(possible_pe, parr_below_avg);
00385 proc_load_info[possible_pe] = false;
00386
00387 } else if (possible_pe_procinfo.getTotalLoad() < lower_threshold) {
00388 } else {
00389 removeFromArray(possible_pe, parr_below_avg);
00390 proc_load_info[possible_pe] = false;
00391
00392 }
00393
00394 }
00395
00396 inline void getPossiblePes(vector<int>& possible_pes, int vert,
00397 ObjGraph *ogr, ProcArray* parr) {
00398 std::map<int, int> tmp_map_pid_index;
00399 int counter = 0;
00400 int index;
00401 int i, j, nbrid;
00402 CommAwareRefineLB::ObjPeCommInfo objpcomm;
00403
00404
00405
00406
00407 for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
00408 nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
00409 j = ogr->vertices[nbrid].getNewPe();
00410
00411 if (tmp_map_pid_index.count(j) == 0) {
00412 tmp_map_pid_index[j] = counter;
00413 CommAwareRefineLB::PeCommInfo pecomminf(j);
00414
00415 objpcomm.pcomm.push_back(pecomminf);
00416 counter++;
00417 }
00418 index = tmp_map_pid_index[j];
00419
00420 objpcomm.pcomm[index].num_msg +=
00421 ogr->vertices[vert].sendToList[i].getNumMsgs();
00422 objpcomm.pcomm[index].num_bytes +=
00423 ogr->vertices[vert].sendToList[i].getNumBytes();
00424 }
00425
00426 for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
00427 nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
00428 j = ogr->vertices[nbrid].getNewPe();
00429
00430 if (tmp_map_pid_index.count(j) == 0) {
00431 tmp_map_pid_index[j] = counter;
00432 CommAwareRefineLB::PeCommInfo pecomminf(j);
00433
00434 objpcomm.pcomm.push_back(pecomminf);
00435 counter++;
00436 }
00437 index = tmp_map_pid_index[j];
00438
00439 objpcomm.pcomm[index].num_msg +=
00440 ogr->vertices[vert].sendToList[i].getNumMsgs();
00441 objpcomm.pcomm[index].num_bytes +=
00442 ogr->vertices[vert].sendToList[i].getNumBytes();
00443 }
00444
00445
00446 std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
00447 CommAwareRefineLB::ProcCommGreater());
00448
00449 int pe_id;
00450 int node_id;
00451 int node_size;
00452 int node_first;
00453
00454
00455 for (i = 0; i < objpcomm.pcomm.size(); i++) {
00456 pe_id = objpcomm.pcomm[i].pe_id;
00457 node_id = CkNodeOf(pe_id);
00458 node_size = CkNodeSize(node_id);
00459 node_first = CkNodeFirst(node_id);
00460
00461
00462 for (j = 0; j < node_size; j++) {
00463 possible_pes.push_back(node_first + j);
00464
00465 }
00466 }
00467 }
00468
00469
00470 #include "CommAwareRefineLB.def.h"
00471