00001 #include "elements.h"
00002 #include "ckheap.h"
00003 #include "RefinerApprox.h"
00004
00005 int _lb_debug=0;
00006
00007 void RefinerApprox::create(int count, CentralLB::LDStats* stats, int* procs)
00008 {
00009 int i;
00010
00011
00012
00013 numAvail = 0;
00014 for(i=0; i < P; i++)
00015 {
00016 processors[i].Id = i;
00017 processors[i].backgroundLoad = stats->procs[i].bg_walltime;
00018
00019 processors[i].computeLoad = 0;
00020 processors[i].load = processors[i].backgroundLoad;
00021 processors[i].computeSet = new Set();
00022 processors[i].pe_speed = stats->procs[i].pe_speed;
00023
00024 processors[i].available = stats->procs[i].available;
00025 if (processors[i].available == true) numAvail++;
00026 }
00027
00028 int index=0;
00029 for (i=0; i<stats->n_objs; i++)
00030 {
00031
00032 LDObjData &odata = stats->objData[i];
00033 if (odata.migratable == true)
00034 {
00035 computes[index].id = odata.objID();
00036 computes[index].Id = i;
00037
00038 computes[index].load = odata.wallTime;
00039 computes[index].processor = -1;
00040 computes[index].oldProcessor = procs[i];
00041 computes[index].migratable = odata.migratable;
00042 if (computes[index].oldProcessor >= P) {
00043 if (stats->complete_flag) {
00044 CmiPrintf("LB Panic: the old processor %d of obj %d in RefineKLB cannot be found, is this in a simulation mode?\n", computes[index].oldProcessor, i);
00045 CmiAbort("Abort!");
00046 }
00047 else {
00048
00049 computes[i].oldProcessor = CrnRand()%P;
00050 }
00051 }
00052 index ++;
00053 }
00054 else
00055 {
00056
00057 processors[procs[i]].backgroundLoad += odata.wallTime;
00058 processors[procs[i]].load+= odata.wallTime;
00059 numComputes--;
00060 }
00061 }
00062
00063
00064 }
00065
00066 void RefinerApprox::reinitAssignment()
00067 {
00068 int i;
00069
00070
00071
00072 for(i=0;i<P;i++)
00073 {
00074 Iterator nextCompute;
00075 nextCompute.id=0;
00076 computeInfo *c = (computeInfo *)
00077 processors[i].computeSet->iterator((Iterator *)&nextCompute);
00078 while(c)
00079 {
00080 if(c->oldProcessor!=i)
00081 {
00082 deAssign(c,&processors[i]);
00083 if(c->oldProcessor!=-1)
00084 {
00085 assign(c,c->oldProcessor);
00086 }
00087 else
00088 {
00089 assign(c,0);
00090 }
00091 }
00092 nextCompute.id++;
00093 c = (computeInfo *)
00094 processors[i].computeSet->next((Iterator *)&nextCompute);
00095 }
00096 }
00097 }
00098
00099 void RefinerApprox::multirefine(int num_moves)
00100 {
00101 computeAverage();
00102 double avg = averageLoad;
00103 double max = computeMax();
00104
00105 int numMoves=0;
00106 double lbound=avg;
00107 double ubound=max;
00108
00109 double EPSILON=0.01;
00110
00111 numMoves=refine(avg);
00112
00113 if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
00114 if(numMoves<=num_moves)
00115 return ;
00116
00117 if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
00118 while(ubound-lbound> EPSILON*avg)
00119 {
00120 reinitAssignment();
00121 double testVal=(ubound+lbound)/2;
00122 numMoves=refine(testVal);
00123 if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
00124 if(numMoves>num_moves)
00125 {
00126 lbound=testVal;
00127 }
00128 else
00129 {
00130 ubound=testVal;
00131 }
00132 if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
00133 }
00134 if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
00135 return;
00136 }
00137
00138 Set * RefinerApprox::removeBiggestSmallComputes(int num,processorInfo * p,double opt)
00139 {
00140 int numPComputes=p->computeSet->numElements();
00141 double totalSmallLoad=0;
00142 maxHeap *h=new maxHeap(numPComputes);
00143 Set * removedComputes=new Set();
00144 int numSmallPComputes=0;
00145
00146 Iterator nextCompute;
00147 nextCompute.id=0;
00148 computeInfo *c = (computeInfo *)
00149 p->computeSet->iterator((Iterator *)&nextCompute);
00150
00151 for(int i=0;i<numPComputes;i++)
00152 {
00153 if(c->load < opt/2)
00154 {
00155 h->insert((InfoRecord *)c);
00156 numSmallPComputes++;
00157 }
00158 nextCompute.id++;
00159 c = (computeInfo *)
00160 p->computeSet->next((Iterator *)&nextCompute);
00161 }
00162
00163 if(p->backgroundLoad <opt/2)
00164 {
00165 totalSmallLoad+=p->backgroundLoad;
00166 }
00167 if(numSmallPComputes<num)
00168 {
00169 if(_lb_debug)CkPrintf("Error[%d]: Cant remove %d small computes from a total of %d small computes\n",p->Id,num,numSmallPComputes);
00170 }
00171
00172
00173 int j;
00174 for(j=0;j<num;j++)
00175 {
00176 computeInfo *rec=(computeInfo *)(h->deleteMax());
00177 removedComputes->insert((InfoRecord *)rec);
00178 totalSmallLoad-=rec->load;
00179 }
00180
00181 delete h;
00182 return removedComputes;
00183 }
00184
00185 Set * RefinerApprox::removeBigComputes(int num,processorInfo * p,double opt)
00186 {
00187 int numPComputes=p->computeSet->numElements();
00188 if(num>numPComputes)
00189 {
00190 if(_lb_debug)CkPrintf("Error [%d]: Cant remove %d computes out of a total of %d\n",p->Id,num,numPComputes);
00191 return new Set();
00192 }
00193 double totalLoad=p->load;
00194 maxHeap *h=new maxHeap(numPComputes);
00195 Set * removedComputes=new Set();
00196
00197 Iterator nextCompute;
00198 nextCompute.id=0;
00199 computeInfo *c = (computeInfo *)
00200 p->computeSet->iterator((Iterator *)&nextCompute);
00201
00202 for(int i=0;i<numPComputes;i++)
00203 {
00204 h->insert((InfoRecord *)c);
00205 nextCompute.id++;
00206 c = (computeInfo *)
00207 p->computeSet->next((Iterator *)&nextCompute);
00208 }
00209
00210 int j;
00211 for(j=0;j<num;j++)
00212 {
00213 computeInfo *rec=(computeInfo *)(h->deleteMax());
00214 removedComputes->insert((InfoRecord *)rec);
00215 totalLoad-=rec->load;
00216 }
00217
00218 delete h;
00219 return removedComputes;
00220 }
00221
00222 double RefinerApprox::getLargestCompute()
00223 {
00224 double largestC=0;
00225 for(int i=0;i<P;i++)
00226 {
00227 if(processors[i].backgroundLoad > largestC)
00228 largestC=processors[i].backgroundLoad;
00229
00230 Iterator nextCompute;
00231 nextCompute.id=0;
00232 computeInfo *c = (computeInfo *)
00233 processors[i].computeSet->iterator((Iterator *)&nextCompute);
00234 while(c)
00235 {
00236 if(c->load > largestC)
00237 {
00238 largestC=c->load;
00239 }
00240 nextCompute.id++;
00241 c = (computeInfo *)
00242 processors[i].computeSet->next((Iterator *)&nextCompute);
00243 }
00244 }
00245 return largestC;
00246 }
00247
00248 int RefinerApprox::getNumLargeComputes(double opt)
00249 {
00250 int numLarge=0;
00251 for(int i=0;i<P;i++)
00252 {
00253 if(processors[i].backgroundLoad>=(opt/2))
00254 numLarge++;
00255 Iterator nextCompute;
00256 nextCompute.id=0;
00257 computeInfo *c = (computeInfo *)
00258 processors[i].computeSet->iterator((Iterator *)&nextCompute);
00259 int numC=0;
00260
00261 while(c)
00262 {
00263 numC++;
00264
00265 if(c->load>(opt/2))
00266 numLarge++;
00267
00268 nextCompute.id++;
00269 c = (computeInfo *)
00270 processors[i].computeSet->next((Iterator *)&nextCompute);
00271 }
00272 }
00273 return numLarge;
00274
00275 }
00276
00277 int RefinerApprox::computeA(processorInfo *p,double opt)
00278 {
00279 int numPComputes=p->computeSet->numElements();
00280 double totalSmallLoad=0;
00281 maxHeap *h=new maxHeap(numPComputes);
00282
00283 Iterator nextCompute;
00284 nextCompute.id=0;
00285 computeInfo *c = (computeInfo *)
00286 p->computeSet->iterator((Iterator *)&nextCompute);
00287
00288 for(int i=0;i<numPComputes;i++)
00289 {
00290 if(c->load < opt/2)
00291 {
00292 totalSmallLoad+=c->load;
00293 h->insert((InfoRecord *)c);
00294 }
00295 nextCompute.id++;
00296 c = (computeInfo *)
00297 p->computeSet->next((Iterator *)&nextCompute);
00298 }
00299
00300 if(p->backgroundLoad <opt/2)
00301 {
00302 totalSmallLoad+=p->backgroundLoad;
00303 }
00304
00305 int avalue=0;
00306 while(totalSmallLoad > opt/2)
00307 {
00308 avalue++;
00309 InfoRecord *rec=h->deleteMax();
00310 totalSmallLoad-=rec->load;
00311 }
00312 delete h;
00313 return avalue;
00314 }
00315
00316 int RefinerApprox::computeB(processorInfo *p,double opt)
00317 {
00318 int numPComputes=p->computeSet->numElements();
00319 double totalLoad=p->load;
00320 if(p->backgroundLoad > opt)
00321 {
00322 if(_lb_debug)
00323 CkPrintf("Error in computeB: Background load greater than OPT!\n");
00324 return 0;
00325 }
00326 maxHeap *h=new maxHeap(numPComputes);
00327
00328 Iterator nextCompute;
00329 nextCompute.id=0;
00330 computeInfo *c = (computeInfo *)
00331 p->computeSet->iterator((Iterator *)&nextCompute);
00332
00333 for(int i=0;i<numPComputes;i++)
00334 {
00335 h->insert((InfoRecord*)c);
00336 nextCompute.id++;
00337 c = (computeInfo *)
00338 p->computeSet->next((Iterator *)&nextCompute);
00339 }
00340
00341 int bvalue=0;
00342 while(totalLoad > opt)
00343 {
00344 bvalue++;
00345 InfoRecord *rec=h->deleteMax();
00346 totalLoad-=rec->load;
00347 }
00348
00349 delete h;
00350 return bvalue;
00351 }
00352
00353 int RefinerApprox::refine(double opt)
00354 {
00355 int i;
00356 if(_lb_debug)CkPrintf("RefinerApprox::refine called with %lf\n",opt);
00357 if(opt<averageLoad)
00358 return INFTY;
00359
00360 int numLargeComputes=getNumLargeComputes(opt);
00361 if(_lb_debug) CkPrintf("Num of large Computes %d for opt = %10f\n",numLargeComputes,opt);
00362 if(numLargeComputes>P)
00363 return INFTY;
00364 if(getLargestCompute()>opt)
00365 return INFTY;
00366
00367
00368
00369
00370 int *a=new int[P];
00371
00372
00373 int *b=new int[P];
00374 bool *largeFree=new bool[P];
00375 Set *largeComputes=new Set();
00376 Set *smallComputes=new Set();
00377
00378
00379 for(i=0;i<P;i++)
00380 {
00381 computeInfo *smallestLargeCompute=NULL;
00382 largeFree[i]=true;
00383 Iterator nextCompute;
00384 nextCompute.id=0;
00385 computeInfo *c = (computeInfo *)
00386 processors[i].computeSet->iterator((Iterator *)&nextCompute);
00387 while(c)
00388 {
00389 if(c->load>opt/2)
00390 {
00391 largeFree[i]=false;
00392 largeComputes->insert((InfoRecord*)c);
00393 deAssign(c,&(processors[i]));
00394 if (smallestLargeCompute==NULL)
00395 {
00396 smallestLargeCompute=c;
00397 }
00398 else if(smallestLargeCompute->load > c->load)
00399 {
00400 smallestLargeCompute=c;
00401 }
00402 }
00403 nextCompute.id++;
00404 c = (computeInfo *)
00405 processors[i].computeSet->next((Iterator *)&nextCompute);
00406 }
00407
00408 if(processors[i].backgroundLoad>opt/2)
00409 {
00410 largeFree[i]=false;
00411 }
00412 else
00413 {
00414 if(smallestLargeCompute)
00415 {
00416 assign(smallestLargeCompute,i);
00417 largeComputes->remove((InfoRecord*)smallestLargeCompute);
00418 }
00419 }
00420 if(!largeFree[i])
00421 {
00422 if(_lb_debug)
00423 CkPrintf("Processor %d not LargeFree !\n",i);
00424 }
00425
00426
00427 a[i]=computeA(&processors[i],opt);
00428 b[i]=computeB(&processors[i],opt);
00429 }
00430
00431
00432
00433 minHeap *cHeap=new minHeap(P);
00434 for(i=0;i<P;i++)
00435 {
00436 InfoRecord *ci=new InfoRecord();
00437 ci->load=a[i]-b[i];
00438 ci->Id=i;
00439 cHeap->insert(ci);
00440 }
00441
00442
00443 minHeap *largeFreeLightProcs=new minHeap(P);
00444
00445 for(i=0;i<numLargeComputes;i++)
00446 {
00447 if(_lb_debug) CkPrintf("Removing a large compute %d\n",i);
00448
00449 InfoRecord *cdata= cHeap->deleteMin();
00450 Set *smallComputesRemoved= removeBiggestSmallComputes(a[cdata->Id],&(processors[cdata->Id]),opt);
00451 if(largeFree[cdata->Id])
00452 largeFreeLightProcs->insert((InfoRecord *)&(processors[cdata->Id]));
00453
00454
00455 Iterator nextCompute;
00456 nextCompute.id=0;
00457 computeInfo *c=(computeInfo *)
00458 smallComputesRemoved->iterator((Iterator*)&nextCompute);
00459 while(c)
00460 {
00461 deAssign(c,&(processors[cdata->Id]));
00462 if(c->load > opt/2)
00463 {
00464 if (_lb_debug) CkPrintf(" Error : Large compute not expected here\n");
00465 }
00466 else
00467 {
00468 smallComputes->insert((InfoRecord *)c);
00469 }
00470 nextCompute.id++;
00471 c = (computeInfo *)
00472 smallComputesRemoved->next((Iterator *)&nextCompute);
00473 }
00474 delete smallComputesRemoved;
00475 delete cdata;
00476 }
00477
00478
00479
00480
00481
00482 for(i=numLargeComputes;i<P;i++)
00483 {
00484
00485 InfoRecord *cdata= cHeap->deleteMin();
00486 Set *computesRemoved=removeBigComputes(b[cdata->Id],&(processors[cdata->Id]),opt);
00487
00488
00489
00490 Iterator nextCompute;
00491 nextCompute.id=0;
00492 computeInfo *c=(computeInfo *)
00493 computesRemoved->iterator((Iterator*)&nextCompute);
00494 while(c)
00495 {
00496 deAssign(c,&(processors[cdata->Id]));
00497 if(c->load > opt/2)
00498 {
00499 processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
00500 assign(c,targetproc);
00501 largeFree[cdata->Id]=true;
00502 largeFree[targetproc->Id]=false;
00503 }
00504 else
00505 {
00506 smallComputes->insert((InfoRecord *)c);
00507 }
00508 nextCompute.id++;
00509 c = (computeInfo *)
00510 computesRemoved->next((Iterator *)&nextCompute);
00511 }
00512 delete computesRemoved;
00513 delete cdata;
00514 }
00515 delete cHeap;
00516
00517
00518
00519 Iterator nextCompute;
00520 nextCompute.id=0;
00521 computeInfo *c=(computeInfo *)
00522 largeComputes->iterator((Iterator*)&nextCompute);
00523 if(_lb_debug)
00524 {
00525 if(c)
00526 {
00527 CkPrintf("Reassigning Large computes removes in Step 1\n");
00528 }
00529 else
00530 {
00531 CkPrintf("No Large computes removed in Step 1\n");
00532 }
00533 }
00534 while(c)
00535 {
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550 processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
00551 if(_lb_debug)
00552 {
00553 if(!targetproc)
00554 CkPrintf("Error finding a large free light proc\n");
00555 }
00556 assign(c,targetproc);
00557 largeFree[targetproc->Id]=false;
00558 nextCompute.id++;
00559 c = (computeInfo *)
00560 largeComputes->next((Iterator *)&nextCompute);
00561 }
00562
00563
00564 minHeap *procsLoad=new minHeap(P);
00565 for(i=0;i<P;i++)
00566 {
00567 procsLoad->insert((InfoRecord *) &(processors[i]) );
00568 }
00569 nextCompute.id=0;
00570 c=(computeInfo *)
00571 smallComputes->iterator((Iterator*)&nextCompute);
00572 while(c)
00573 {
00574 processorInfo *leastLoadedP=(processorInfo *)procsLoad->deleteMin();
00575 assign(c,leastLoadedP);
00576 procsLoad->insert((InfoRecord *) leastLoadedP);
00577 nextCompute.id++;
00578 c = (computeInfo *)
00579 smallComputes->next((Iterator *)&nextCompute);
00580 }
00581
00582 delete largeFreeLightProcs;
00583 delete procsLoad;
00584 delete [] a;
00585 delete [] b;
00586 delete [] largeFree;
00587 delete largeComputes;
00588 delete smallComputes;
00589 return numMoves();
00590 }
00591
00592 int RefinerApprox::numMoves()
00593 {
00594 int nmoves=0;
00595 for(int i=0;i<numComputes;i++)
00596 {
00597 if(computes[i].processor!=computes[i].oldProcessor)
00598 nmoves++;
00599 }
00600 return nmoves;
00601 }
00602
00603 void RefinerApprox::Refine(int count, CentralLB::LDStats* stats,
00604 int* cur_p, int* new_p, int percentMoves)
00605 {
00606
00607 if(_lb_debug) CkPrintf("\n\n");
00608 if(_lb_debug) CkPrintf("[%d] RefinerApprox strategy\n",CkMyPe());
00609 P = count;
00610 numComputes = stats->n_objs;
00611 computes = new computeInfo[numComputes];
00612 processors = new processorInfo[count];
00613
00614 if(_lb_debug) CkPrintf("Total Number of computes : %d\n",numComputes);
00615
00616 create(count, stats, cur_p);
00617 if(_lb_debug) printStats(0);
00618
00619 int i;
00620 for (i=0; i<numComputes; i++)
00621 {
00622 if(computes[i].oldProcessor!=-1)
00623
00624 {
00625 assign((computeInfo *) &(computes[i]),
00626 (processorInfo *) &(processors[computes[i].oldProcessor]));
00627 }
00628 else
00629 {
00630 assign((computeInfo *) &(computes[i]),
00631 (processorInfo *) &(processors[0]));
00632 }
00633 }
00634
00635 if(_lb_debug) CkPrintf("Total Migratable computes : %d\n\n",numComputes);
00636 if(_lb_debug) CkPrintf("Total processors : %d\n",P);
00637 if(_lb_debug) CkPrintf("Total available processors : %d\n",numAvail);
00638
00639 removeComputes();
00640
00641 computeAverage();
00642
00643 if(_lb_debug) CkPrintf("Avearge load : %lf\n",averageLoad);
00644 if(_lb_debug) printStats(0);
00645
00646
00647 int numAllowedMoves=(int)(percentMoves*numComputes/100.0);
00648 if(numAllowedMoves<0)
00649 numAllowedMoves=0;
00650 if(numAllowedMoves>numComputes)
00651 numAllowedMoves=numComputes;
00652
00653 if(_lb_args.debug())
00654 {
00655 CkPrintf("Percent of allowed moves = %d\n",percentMoves);
00656 CkPrintf("Number of allowed moves = %d\n",numAllowedMoves);
00657 }
00658
00659 multirefine(numAllowedMoves);
00660
00661 int nmoves = 0;
00662
00663
00664
00665
00666
00667 for(i=0;i<stats->n_objs;i++)
00668 {
00669 new_p[i]=cur_p[i];
00670 }
00671
00672
00673 for (int pe=0; pe < P; pe++)
00674 {
00675 Iterator nextCompute;
00676 nextCompute.id = 0;
00677 computeInfo *c = (computeInfo *)
00678 processors[pe].computeSet->iterator((Iterator *)&nextCompute);
00679
00680 while(c)
00681 {
00682 new_p[c->Id] = c->processor;
00683 if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
00684
00685 nextCompute.id++;
00686 c = (computeInfo *) processors[pe].computeSet->
00687 next((Iterator *)&nextCompute);
00688 }
00689 }
00690 if (_lb_debug) CkPrintf("RefinerApprox: moving %d objects. \n", nmoves);
00691 delete [] computes;
00692 delete [] processors;
00693 }
00694
00695
00696 void RefinerApprox::printStats(int newStats)
00697 {
00698
00699 CkPrintf("%Proc#\tLoad\tObjLoad\tBgdLoad\n");
00700 for(int i=0;i<P;i++)
00701 {
00702 CkPrintf("%d\t\t%lf\t%lf\t%lf\n",i,processors[i].load,processors[i].computeLoad,processors[i].backgroundLoad);
00703 }
00704
00705 }
00706