00001
00005
00010 #include "elements.h"
00011 #include "ckheap.h"
00012 #include "RefinerComm.h"
00013
00014
00015 void RefinerComm::create(int count, BaseLB::LDStats* _stats, int* procs)
00016 {
00017 int i;
00018 stats = _stats;
00019 Refiner::create(count, _stats, procs);
00020
00021 for (i=0; i<stats->n_comm; i++)
00022 {
00023 LDCommData &comm = stats->commData[i];
00024 if (!comm.from_proc()) {
00025
00026 int computeIdx = stats->getSendHash(comm);
00027 CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
00028 computes[computeIdx].sendmessages.push_back(i);
00029 }
00030
00031
00032
00033 if (comm.receiver.get_type() == LD_OBJ_MSG) {
00034 int computeIdx = stats->getRecvHash(comm);
00035 CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
00036 computes[computeIdx].recvmessages.push_back(i);
00037 }
00038 }
00039 }
00040
00041 void RefinerComm::computeAverage()
00042 {
00043 int i;
00044 double total = 0.;
00045 for (i=0; i<numComputes; i++) total += computes[i].load;
00046
00047 for (i=0; i<P; i++) {
00048 if (processors[i].available == true) {
00049 total += processors[i].backgroundLoad;
00050 total += commTable->overheadOnPe(i);
00051 }
00052 }
00053
00054 averageLoad = total/numAvail;
00055 }
00056
00057
00058 void RefinerComm::processorCommCost()
00059 {
00060 int i;
00061
00062 for (int cidx=0; cidx < stats->n_comm; cidx++) {
00063 LDCommData& cdata = stats->commData[cidx];
00064 int senderPE = -1, receiverPE = -1;
00065 if (cdata.from_proc())
00066 senderPE = cdata.src_proc;
00067 else {
00068 int idx = stats->getSendHash(cdata);
00069 CmiAssert(idx != -1);
00070 senderPE = computes[idx].oldProcessor;
00071 }
00072 CmiAssert(senderPE != -1);
00073 int ctype = cdata.receiver.get_type();
00074 if (ctype==LD_PROC_MSG || ctype==LD_OBJ_MSG) {
00075 if (ctype==LD_PROC_MSG)
00076 receiverPE = cdata.receiver.proc();
00077 else {
00078 int idx = stats->getRecvHash(cdata);
00079 CmiAssert(idx != -1);
00080 receiverPE = computes[idx].oldProcessor;
00081 }
00082 CmiAssert(receiverPE != -1);
00083 if(senderPE != receiverPE)
00084 {
00085 commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
00086 commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
00087 }
00088 }
00089 else if (ctype == LD_OBJLIST_MSG) {
00090 int nobjs;
00091 const LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
00092 for (i=0; i<nobjs; i++) {
00093 int idx = stats->getHash(objs[i]);
00094 if(idx == -1)
00095 {
00096 if (_lb_args.migObjOnly()) continue;
00097 else CkAbort("Error in search\n");
00098 }
00099 receiverPE = computes[idx].oldProcessor;
00100 CmiAssert(receiverPE != -1);
00101 if(senderPE != receiverPE)
00102 {
00103 commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
00104 commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
00105 }
00106 }
00107 }
00108 }
00109
00110 for (i=0; i<P; i++)
00111 {
00112 processorInfo *p = &processors[i];
00113 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(i);
00114 }
00115 }
00116
00117 void RefinerComm::assign(computeInfo *c, int processor)
00118 {
00119 assign(c, &(processors[processor]));
00120 }
00121
00122 void RefinerComm::assign(computeInfo *c, processorInfo *p)
00123 {
00124 c->processor = p->Id;
00125 p->computeSet->insert((InfoRecord *) c);
00126 p->computeLoad += c->load;
00127
00128
00129 Messages m;
00130 objCommCost(c->Id, p->Id, m);
00131 commTable->increase(true, p->Id, m.msgSent, m.byteSent);
00132 commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
00133
00134
00135
00136 commAffinity(c->Id, p->Id, m);
00137 commTable->increase(false, p->Id, -m.msgSent, -m.byteSent);
00138 commTable->increase(true, p->Id, -m.msgRecv, -m.byteRecv);
00139
00140
00141
00142 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
00143 }
00144
00145 void RefinerComm::deAssign(computeInfo *c, processorInfo *p)
00146 {
00147
00148 p->computeSet->remove(c);
00149 p->computeLoad -= c->load;
00150
00151 Messages m;
00152 objCommCost(c->Id, p->Id, m);
00153 commTable->increase(true, p->Id, -m.msgSent, -m.byteSent);
00154 commTable->increase(false, p->Id, -m.msgRecv, -m.byteRecv);
00155
00156 commAffinity(c->Id, p->Id, m);
00157 commTable->increase(true, p->Id, m.msgSent, m.byteSent);
00158 commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
00159
00160 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
00161 }
00162
00163
00164
00165
00166 void RefinerComm::commAffinity(int c, int pe, Messages &m)
00167 {
00168 int i;
00169 m.clear();
00170 computeInfo &obj = computes[c];
00171
00172 int nSendMsgs = obj.sendmessages.length();
00173 for (i=0; i<nSendMsgs; i++) {
00174 LDCommData &cdata = stats->commData[obj.sendmessages[i]];
00175 bool sendtope = false;
00176 if (cdata.receiver.get_type() == LD_OBJ_MSG) {
00177 int recvCompute = stats->getRecvHash(cdata);
00178 int recvProc = computes[recvCompute].processor;
00179 if (recvProc != -1 && recvProc == pe) sendtope = true;
00180 }
00181 else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
00182 int nobjs;
00183 const LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
00184 for (int j=0; j<nobjs; j++) {
00185 int recvCompute = stats->getHash(recvs[j]);
00186 int recvProc = computes[recvCompute].processor;
00187 if (recvProc != -1 && recvProc == pe) { sendtope = true; continue; }
00188 }
00189 }
00190 if (sendtope) {
00191 m.byteSent += cdata.bytes;
00192 m.msgSent += cdata.messages;
00193 }
00194 }
00195
00196 int nRecvMsgs = obj.recvmessages.length();
00197 for (i=0; i<nRecvMsgs; i++) {
00198 LDCommData &cdata = stats->commData[obj.recvmessages[i]];
00199 int sendProc;
00200 if (cdata.from_proc()) {
00201 sendProc = cdata.src_proc;
00202 }
00203 else {
00204 int sendCompute = stats->getSendHash(cdata);
00205 sendProc = computes[sendCompute].processor;
00206 }
00207 if (sendProc != -1 && sendProc == pe) {
00208 m.byteRecv += cdata.bytes;
00209 m.msgRecv += cdata.messages;
00210 }
00211 }
00212 }
00213
00214
00215 void RefinerComm::objCommCost(int c, int pe, Messages &m)
00216 {
00217 int i;
00218 m.clear();
00219 computeInfo &obj = computes[c];
00220
00221
00222
00223 int nSendMsgs = obj.sendmessages.length();
00224 for (i=0; i<nSendMsgs; i++) {
00225 LDCommData &cdata = stats->commData[obj.sendmessages[i]];
00226 bool diffPe = false;
00227 if (cdata.receiver.get_type() == LD_PROC_MSG) {
00228 CmiAssert(0);
00229 }
00230 if (cdata.receiver.get_type() == LD_OBJ_MSG) {
00231 int recvCompute = stats->getRecvHash(cdata);
00232 int recvProc = computes[recvCompute].processor;
00233 if (recvProc!= -1 && recvProc != pe) diffPe = true;
00234 }
00235 else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
00236 int nobjs;
00237 const LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
00238 for (int j=0; j<nobjs; j++) {
00239 int recvCompute = stats->getHash(recvs[j]);
00240 int recvProc = computes[recvCompute].processor;
00241 if (recvProc!= -1 && recvProc != pe) { diffPe = true; }
00242 }
00243 }
00244 if (diffPe) {
00245 m.byteSent += cdata.bytes;
00246 m.msgSent += cdata.messages;
00247 }
00248 }
00249
00250
00251
00252 int nRecvMsgs = obj.recvmessages.length();
00253 for (i=0; i<nRecvMsgs; i++) {
00254 LDCommData &cdata = stats->commData[obj.recvmessages[i]];
00255 bool diffPe = false;
00256 if (cdata.from_proc()) {
00257 if (cdata.src_proc != pe) diffPe = true;
00258 }
00259 else {
00260 int sendCompute = stats->getSendHash(cdata);
00261 int sendProc = computes[sendCompute].processor;
00262 if (sendProc != -1 && sendProc != pe) diffPe = true;
00263 }
00264 if (diffPe) {
00265 m.byteRecv += cdata.bytes;
00266 m.msgRecv += cdata.messages;
00267 }
00268 }
00269 }
00270
00271 int RefinerComm::refine()
00272 {
00273 int i;
00274 int finish = 1;
00275
00276 maxHeap *heavyProcessors = new maxHeap(P);
00277 Set *lightProcessors = new Set();
00278 for (i=0; i<P; i++) {
00279 if (isHeavy(&processors[i])) {
00280
00281
00282 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00283 } else if (isLight(&processors[i])) {
00284
00285
00286 lightProcessors->insert((InfoRecord *) &(processors[i]));
00287 }
00288 }
00289 int done = 0;
00290
00291 while (!done) {
00292 double bestSize;
00293 computeInfo *bestCompute;
00294 processorInfo *bestP;
00295
00296 processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00297 if (!donor) break;
00298
00299
00300 Iterator nextProcessor;
00301 processorInfo *p = (processorInfo *)
00302 lightProcessors->iterator((Iterator *) &nextProcessor);
00303 bestSize = 0;
00304 bestP = NULL;
00305 bestCompute = NULL;
00306
00307 while (p) {
00308 Iterator nextCompute;
00309 nextCompute.id = 0;
00310 computeInfo *c = (computeInfo *)
00311 donor->computeSet->iterator((Iterator *)&nextCompute);
00312
00313 while (c) {
00314 if (!c->migratable) {
00315 nextCompute.id++;
00316 c = (computeInfo *)
00317 donor->computeSet->next((Iterator *)&nextCompute);
00318 continue;
00319 }
00320
00321
00322 Messages m;
00323 objCommCost(c->Id, donor->Id, m);
00324 double commcost = m.cost();
00325 commAffinity(c->Id, p->Id, m);
00326 double commgain = m.cost();;
00327
00328
00329 if ( c->load + p->load + commcost - commgain < overLoad*averageLoad) {
00330
00331 if(c->load + commcost - commgain > bestSize) {
00332 bestSize = c->load + commcost - commgain;
00333 bestCompute = c;
00334 bestP = p;
00335 }
00336 }
00337 nextCompute.id++;
00338 c = (computeInfo *)
00339 donor->computeSet->next((Iterator *)&nextCompute);
00340 }
00341 p = (processorInfo *)
00342 lightProcessors->next((Iterator *) &nextProcessor);
00343 }
00344
00345 if (bestCompute) {
00346 if (_lb_args.debug())
00347 CkPrintf("Assign: [%d] with load: %f from %d to %d \n",
00348 bestCompute->Id, bestCompute->load,
00349 donor->Id, bestP->Id);
00350 deAssign(bestCompute, donor);
00351 assign(bestCompute, bestP);
00352
00353
00354 if (_lb_args.debug()) printLoad();
00355
00356
00357 computeAverage();
00358 delete heavyProcessors;
00359 delete lightProcessors;
00360 heavyProcessors = new maxHeap(P);
00361 lightProcessors = new Set();
00362 for (i=0; i<P; i++) {
00363 if (isHeavy(&processors[i])) {
00364
00365
00366 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00367 } else if (isLight(&processors[i])) {
00368 lightProcessors->insert((InfoRecord *) &(processors[i]));
00369 }
00370 }
00371 if (_lb_args.debug()) CmiPrintf("averageLoad after assignment: %f\n", averageLoad);
00372 } else {
00373 finish = 0;
00374 break;
00375 }
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 }
00388
00389 delete heavyProcessors;
00390 delete lightProcessors;
00391
00392 return finish;
00393 }
00394
00395 void RefinerComm::Refine(int count, BaseLB::LDStats* stats,
00396 int* cur_p, int* new_p)
00397 {
00398
00399
00400 P = count;
00401 numComputes = stats->n_objs;
00402 computes = new computeInfo[numComputes];
00403 processors = new processorInfo[count];
00404 commTable = new CommTable(P);
00405
00406
00407 stats->makeCommHash();
00408
00409 create(count, stats, cur_p);
00410
00411 int i;
00412 for (i=0; i<numComputes; i++)
00413 assign((computeInfo *) &(computes[i]),
00414 (processorInfo *) &(processors[computes[i].oldProcessor]));
00415
00416 commTable->clear();
00417
00418
00419 processorCommCost();
00420
00421 removeComputes();
00422 if (_lb_args.debug()) printLoad();
00423
00424 computeAverage();
00425 if (_lb_args.debug()) CmiPrintf("averageLoad: %f\n", averageLoad);
00426
00427 multirefine();
00428
00429 for (int pe=0; pe < P; pe++) {
00430 Iterator nextCompute;
00431 nextCompute.id = 0;
00432 computeInfo *c = (computeInfo *)
00433 processors[pe].computeSet->iterator((Iterator *)&nextCompute);
00434 while(c) {
00435 new_p[c->Id] = c->processor;
00436
00437
00438 nextCompute.id++;
00439 c = (computeInfo *) processors[pe].computeSet->
00440 next((Iterator *)&nextCompute);
00441 }
00442 }
00443
00444 delete [] computes;
00445 delete [] processors;
00446 delete commTable;
00447 }
00448
00449 RefinerComm::CommTable::CommTable(int P)
00450 {
00451 count = P;
00452 msgSentCount = new int[P];
00453 msgRecvCount = new int[P];
00454 byteSentCount = new int[P];
00455 byteRecvCount = new int[P];
00456 clear();
00457 }
00458
00459 RefinerComm::CommTable::~CommTable()
00460 {
00461 delete [] msgSentCount;
00462 delete [] msgRecvCount;
00463 delete [] byteSentCount;
00464 delete [] byteRecvCount;
00465 }
00466
00467 void RefinerComm::CommTable::clear()
00468 {
00469 for(int i = 0; i < count; i++)
00470 msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
00471 }
00472
00473 void RefinerComm::CommTable::increase(bool issend, int pe, int msgs, int bytes)
00474 {
00475 if (issend) {
00476 msgSentCount[pe] += msgs;
00477 byteSentCount[pe] += bytes;
00478 }
00479 else {
00480 msgRecvCount[pe] += msgs;
00481 byteRecvCount[pe] += bytes;
00482 }
00483 }
00484
00485 double RefinerComm::CommTable::overheadOnPe(int pe)
00486 {
00487 return msgRecvCount[pe] * PER_MESSAGE_RECV_OVERHEAD +
00488 msgSentCount[pe] * _lb_args.alpha() +
00489 byteRecvCount[pe] * PER_BYTE_RECV_OVERHEAD +
00490 byteSentCount[pe] * _lb_args.beta();
00491 }
00492