00001
00005
00010 #include "elements.h"
00011 #include "ckheap.h"
00012 #include "RefinerComm.h"
00013
00014
00015 void RefinerComm::create(int count, BaseLB::LDStats* _stats, int* procs)
00016 {
00017 int i;
00018 stats = _stats;
00019 Refiner::create(count, _stats, procs);
00020
00021 for (i=0; i<stats->n_comm; i++)
00022 {
00023 LDCommData &comm = stats->commData[i];
00024 if (!comm.from_proc()) {
00025
00026 int computeIdx = stats->getSendHash(comm);
00027 CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
00028 computes[computeIdx].sendmessages.push_back(i);
00029 }
00030
00031
00032
00033 if (comm.receiver.get_type() == LD_OBJ_MSG) {
00034 int computeIdx = stats->getRecvHash(comm);
00035 CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
00036 computes[computeIdx].recvmessages.push_back(i);
00037 }
00038 }
00039 }
00040
00041 void RefinerComm::computeAverage()
00042 {
00043 int i;
00044 double total = 0.;
00045 for (i=0; i<numComputes; i++) total += computes[i].load;
00046
00047 for (i=0; i<P; i++) {
00048 if (processors[i].available == CmiTrue) {
00049 total += processors[i].backgroundLoad;
00050 total += commTable->overheadOnPe(i);
00051 }
00052 }
00053
00054 averageLoad = total/numAvail;
00055 }
00056
00057
00058 void RefinerComm::processorCommCost()
00059 {
00060 int i;
00061
00062 for (int cidx=0; cidx < stats->n_comm; cidx++) {
00063 LDCommData& cdata = stats->commData[cidx];
00064 int senderPE = -1, receiverPE = -1;
00065 if (cdata.from_proc())
00066 senderPE = cdata.src_proc;
00067 else {
00068 int idx = stats->getSendHash(cdata);
00069 CmiAssert(idx != -1);
00070 senderPE = computes[idx].oldProcessor;
00071 }
00072 CmiAssert(senderPE != -1);
00073 int ctype = cdata.receiver.get_type();
00074 if (ctype==LD_PROC_MSG || ctype==LD_OBJ_MSG) {
00075 if (ctype==LD_PROC_MSG)
00076 receiverPE = cdata.receiver.proc();
00077 else {
00078 int idx = stats->getRecvHash(cdata);
00079 CmiAssert(idx != -1);
00080 receiverPE = computes[idx].oldProcessor;
00081 }
00082 CmiAssert(receiverPE != -1);
00083 if(senderPE != receiverPE)
00084 {
00085 commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
00086 commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
00087 }
00088 }
00089 else if (ctype == LD_OBJLIST_MSG) {
00090 int nobjs;
00091 LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
00092 for (i=0; i<nobjs; i++) {
00093 int idx = stats->getHash(objs[i]);
00094 if(idx == -1)
00095 if (_lb_args.migObjOnly()) continue;
00096 else CkAbort("Error in search\n");
00097 receiverPE = computes[idx].oldProcessor;
00098 CmiAssert(receiverPE != -1);
00099 if(senderPE != receiverPE)
00100 {
00101 commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
00102 commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
00103 }
00104 }
00105 }
00106 }
00107
00108 for (i=0; i<P; i++)
00109 {
00110 processorInfo *p = &processors[i];
00111 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(i);
00112 }
00113 }
00114
00115 void RefinerComm::assign(computeInfo *c, int processor)
00116 {
00117 assign(c, &(processors[processor]));
00118 }
00119
00120 void RefinerComm::assign(computeInfo *c, processorInfo *p)
00121 {
00122 c->processor = p->Id;
00123 p->computeSet->insert((InfoRecord *) c);
00124 p->computeLoad += c->load;
00125
00126
00127 Messages m;
00128 objCommCost(c->Id, p->Id, m);
00129 commTable->increase(true, p->Id, m.msgSent, m.byteSent);
00130 commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
00131
00132
00133
00134 commAffinity(c->Id, p->Id, m);
00135 commTable->increase(false, p->Id, -m.msgSent, -m.byteSent);
00136 commTable->increase(true, p->Id, -m.msgRecv, -m.byteRecv);
00137
00138
00139
00140 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
00141 }
00142
00143 void RefinerComm::deAssign(computeInfo *c, processorInfo *p)
00144 {
00145
00146 p->computeSet->remove(c);
00147 p->computeLoad -= c->load;
00148
00149 Messages m;
00150 objCommCost(c->Id, p->Id, m);
00151 commTable->increase(true, p->Id, -m.msgSent, -m.byteSent);
00152 commTable->increase(false, p->Id, -m.msgRecv, -m.byteRecv);
00153
00154 commAffinity(c->Id, p->Id, m);
00155 commTable->increase(true, p->Id, m.msgSent, m.byteSent);
00156 commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
00157
00158 p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
00159 }
00160
00161
00162
00163
00164 void RefinerComm::commAffinity(int c, int pe, Messages &m)
00165 {
00166 int i;
00167 m.clear();
00168 computeInfo &obj = computes[c];
00169
00170 int nSendMsgs = obj.sendmessages.length();
00171 for (i=0; i<nSendMsgs; i++) {
00172 LDCommData &cdata = stats->commData[obj.sendmessages[i]];
00173 bool sendtope = false;
00174 if (cdata.receiver.get_type() == LD_OBJ_MSG) {
00175 int recvCompute = stats->getRecvHash(cdata);
00176 int recvProc = computes[recvCompute].processor;
00177 if (recvProc != -1 && recvProc == pe) sendtope = true;
00178 }
00179 else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
00180 int nobjs;
00181 LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
00182 for (int j=0; j<nobjs; j++) {
00183 int recvCompute = stats->getHash(recvs[j]);
00184 int recvProc = computes[recvCompute].processor;
00185 if (recvProc != -1 && recvProc == pe) { sendtope = true; continue; }
00186 }
00187 }
00188 if (sendtope) {
00189 m.byteSent += cdata.bytes;
00190 m.msgSent += cdata.messages;
00191 }
00192 }
00193
00194 int nRecvMsgs = obj.recvmessages.length();
00195 for (i=0; i<nRecvMsgs; i++) {
00196 LDCommData &cdata = stats->commData[obj.recvmessages[i]];
00197 int sendProc;
00198 if (cdata.from_proc()) {
00199 sendProc = cdata.src_proc;
00200 }
00201 else {
00202 int sendCompute = stats->getSendHash(cdata);
00203 sendProc = computes[sendCompute].processor;
00204 }
00205 if (sendProc != -1 && sendProc == pe) {
00206 m.byteRecv += cdata.bytes;
00207 m.msgRecv += cdata.messages;
00208 }
00209 }
00210 }
00211
00212
00213 void RefinerComm::objCommCost(int c, int pe, Messages &m)
00214 {
00215 int i;
00216 m.clear();
00217 computeInfo &obj = computes[c];
00218
00219
00220
00221 int nSendMsgs = obj.sendmessages.length();
00222 for (i=0; i<nSendMsgs; i++) {
00223 LDCommData &cdata = stats->commData[obj.sendmessages[i]];
00224 bool diffPe = false;
00225 if (cdata.receiver.get_type() == LD_PROC_MSG) {
00226 CmiAssert(0);
00227 }
00228 if (cdata.receiver.get_type() == LD_OBJ_MSG) {
00229 int recvCompute = stats->getRecvHash(cdata);
00230 int recvProc = computes[recvCompute].processor;
00231 if (recvProc!= -1 && recvProc != pe) diffPe = true;
00232 }
00233 else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
00234 int nobjs;
00235 LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
00236 for (int j=0; j<nobjs; j++) {
00237 int recvCompute = stats->getHash(recvs[j]);
00238 int recvProc = computes[recvCompute].processor;
00239 if (recvProc!= -1 && recvProc != pe) { diffPe = true; }
00240 }
00241 }
00242 if (diffPe) {
00243 m.byteSent += cdata.bytes;
00244 m.msgSent += cdata.messages;
00245 }
00246 }
00247
00248
00249
00250 int nRecvMsgs = obj.recvmessages.length();
00251 for (i=0; i<nRecvMsgs; i++) {
00252 LDCommData &cdata = stats->commData[obj.recvmessages[i]];
00253 bool diffPe = false;
00254 if (cdata.from_proc()) {
00255 if (cdata.src_proc != pe) diffPe = true;
00256 }
00257 else {
00258 int sendCompute = stats->getSendHash(cdata);
00259 int sendProc = computes[sendCompute].processor;
00260 if (sendProc != -1 && sendProc != pe) diffPe = true;
00261 }
00262 if (diffPe) {
00263 m.byteRecv += cdata.bytes;
00264 m.msgRecv += cdata.messages;
00265 }
00266 }
00267 }
00268
00269 int RefinerComm::refine()
00270 {
00271 int i;
00272 int finish = 1;
00273
00274 maxHeap *heavyProcessors = new maxHeap(P);
00275 Set *lightProcessors = new Set();
00276 for (i=0; i<P; i++) {
00277 if (isHeavy(&processors[i])) {
00278
00279
00280 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00281 } else if (isLight(&processors[i])) {
00282
00283
00284 lightProcessors->insert((InfoRecord *) &(processors[i]));
00285 }
00286 }
00287 int done = 0;
00288
00289 while (!done) {
00290 double bestSize;
00291 computeInfo *bestCompute;
00292 processorInfo *bestP;
00293
00294 processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00295 if (!donor) break;
00296
00297
00298 Iterator nextProcessor;
00299 processorInfo *p = (processorInfo *)
00300 lightProcessors->iterator((Iterator *) &nextProcessor);
00301 bestSize = 0;
00302 bestP = NULL;
00303 bestCompute = NULL;
00304
00305 while (p) {
00306 Iterator nextCompute;
00307 nextCompute.id = 0;
00308 computeInfo *c = (computeInfo *)
00309 donor->computeSet->iterator((Iterator *)&nextCompute);
00310
00311 while (c) {
00312 if (!c->migratable) {
00313 nextCompute.id++;
00314 c = (computeInfo *)
00315 donor->computeSet->next((Iterator *)&nextCompute);
00316 continue;
00317 }
00318
00319
00320 Messages m;
00321 objCommCost(c->Id, donor->Id, m);
00322 double commcost = m.cost();
00323 commAffinity(c->Id, p->Id, m);
00324 double commgain = m.cost();;
00325
00326
00327 if ( c->load + p->load + commcost - commgain < overLoad*averageLoad) {
00328
00329 if(c->load + commcost - commgain > bestSize) {
00330 bestSize = c->load + commcost - commgain;
00331 bestCompute = c;
00332 bestP = p;
00333 }
00334 }
00335 nextCompute.id++;
00336 c = (computeInfo *)
00337 donor->computeSet->next((Iterator *)&nextCompute);
00338 }
00339 p = (processorInfo *)
00340 lightProcessors->next((Iterator *) &nextProcessor);
00341 }
00342
00343 if (bestCompute) {
00344 if (_lb_args.debug())
00345 CkPrintf("Assign: [%d] with load: %f from %d to %d \n",
00346 bestCompute->Id, bestCompute->load,
00347 donor->Id, bestP->Id);
00348 deAssign(bestCompute, donor);
00349 assign(bestCompute, bestP);
00350
00351
00352 if (_lb_args.debug()) printLoad();
00353
00354
00355 computeAverage();
00356 delete heavyProcessors;
00357 delete lightProcessors;
00358 heavyProcessors = new maxHeap(P);
00359 lightProcessors = new Set();
00360 for (i=0; i<P; i++) {
00361 if (isHeavy(&processors[i])) {
00362
00363
00364 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00365 } else if (isLight(&processors[i])) {
00366 lightProcessors->insert((InfoRecord *) &(processors[i]));
00367 }
00368 }
00369 if (_lb_args.debug()) CmiPrintf("averageLoad after assignment: %f\n", averageLoad);
00370 } else {
00371 finish = 0;
00372 break;
00373 }
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385 }
00386
00387 delete heavyProcessors;
00388 delete lightProcessors;
00389
00390 return finish;
00391 }
00392
00393 void RefinerComm::Refine(int count, BaseLB::LDStats* stats,
00394 int* cur_p, int* new_p)
00395 {
00396
00397
00398 P = count;
00399 numComputes = stats->n_objs;
00400 computes = new computeInfo[numComputes];
00401 processors = new processorInfo[count];
00402 commTable = new CommTable(P);
00403
00404
00405 stats->makeCommHash();
00406
00407 create(count, stats, cur_p);
00408
00409 int i;
00410 for (i=0; i<numComputes; i++)
00411 assign((computeInfo *) &(computes[i]),
00412 (processorInfo *) &(processors[computes[i].oldProcessor]));
00413
00414 commTable->clear();
00415
00416
00417 processorCommCost();
00418
00419 removeComputes();
00420 if (_lb_args.debug()) printLoad();
00421
00422 computeAverage();
00423 if (_lb_args.debug()) CmiPrintf("averageLoad: %f\n", averageLoad);
00424
00425 multirefine();
00426
00427 for (int pe=0; pe < P; pe++) {
00428 Iterator nextCompute;
00429 nextCompute.id = 0;
00430 computeInfo *c = (computeInfo *)
00431 processors[pe].computeSet->iterator((Iterator *)&nextCompute);
00432 while(c) {
00433 new_p[c->Id] = c->processor;
00434
00435
00436 nextCompute.id++;
00437 c = (computeInfo *) processors[pe].computeSet->
00438 next((Iterator *)&nextCompute);
00439 }
00440 }
00441
00442 delete [] computes;
00443 delete [] processors;
00444 delete commTable;
00445 }
00446
00447 RefinerComm::CommTable::CommTable(int P)
00448 {
00449 count = P;
00450 msgSentCount = new int[P];
00451 msgRecvCount = new int[P];
00452 byteSentCount = new int[P];
00453 byteRecvCount = new int[P];
00454 clear();
00455 }
00456
00457 RefinerComm::CommTable::~CommTable()
00458 {
00459 delete [] msgSentCount;
00460 delete [] msgRecvCount;
00461 delete [] byteSentCount;
00462 delete [] byteRecvCount;
00463 }
00464
00465 void RefinerComm::CommTable::clear()
00466 {
00467 for(int i = 0; i < count; i++)
00468 msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
00469 }
00470
00471 void RefinerComm::CommTable::increase(bool issend, int pe, int msgs, int bytes)
00472 {
00473 if (issend) {
00474 msgSentCount[pe] += msgs;
00475 byteSentCount[pe] += bytes;
00476 }
00477 else {
00478 msgRecvCount[pe] += msgs;
00479 byteRecvCount[pe] += bytes;
00480 }
00481 }
00482
00483 double RefinerComm::CommTable::overheadOnPe(int pe)
00484 {
00485 return msgRecvCount[pe] * PER_MESSAGE_RECV_OVERHEAD +
00486 msgSentCount[pe] * _lb_args.alpha() +
00487 byteRecvCount[pe] * PER_BYTE_RECV_OVERHEAD +
00488 byteSentCount[pe] * _lb_args.beeta();
00489 }
00490