00001
00005
00010 #include "Refiner.h"
00011
00012 int* Refiner::AllocProcs(int count, BaseLB::LDStats* stats)
00013 {
00014 return new int[stats->n_objs];
00015 }
00016
00017 void Refiner::FreeProcs(int* bufs)
00018 {
00019 delete [] bufs;
00020 }
00021
00022 void Refiner::create(int count, BaseLB::LDStats* stats, int* procs)
00023 {
00024 int i;
00025
00026
00027
00028
00029 numAvail = 0;
00030 for(i=0; i < P; i++) {
00031 processors[i].Id = i;
00032 processors[i].backgroundLoad = stats->procs[i].bg_walltime;
00033 processors[i].load = processors[i].backgroundLoad;
00034 processors[i].computeLoad = 0;
00035 processors[i].computeSet = new Set();
00036 processors[i].pe_speed = stats->procs[i].pe_speed;
00037
00038 processors[i].available = stats->procs[i].available;
00039 if (processors[i].available == true) numAvail++;
00040 }
00041
00042 for (i=0; i<stats->n_objs; i++)
00043 {
00044 LDObjData &odata = stats->objData[i];
00045 computes[i].Id = i;
00046 computes[i].id = odata.objID();
00047
00048 computes[i].load = odata.wallTime;
00049 computes[i].processor = -1;
00050 computes[i].oldProcessor = procs[i];
00051 computes[i].migratable = odata.migratable;
00052 if (computes[i].oldProcessor >= P) {
00053 if (stats->complete_flag)
00054 CmiAbort("LB Panic: the old processor in RefineLB cannot be found, is this in a simulation mode?");
00055 else {
00056
00057 computes[i].oldProcessor = CrnRand()%P;
00058 }
00059 }
00060 }
00061
00062
00063 }
00064
00065 void Refiner::assign(computeInfo *c, int processor)
00066 {
00067 assign(c, &(processors[processor]));
00068 }
00069
00070 void Refiner::assign(computeInfo *c, processorInfo *p)
00071 {
00072 double speed_ratio = processors[c->oldProcessor].pe_speed / p->pe_speed;
00073 c->processor = p->Id;
00074 p->computeSet->insert((InfoRecord *) c);
00075 p->computeLoad += c->load * speed_ratio;
00076 p->load = p->computeLoad + p->backgroundLoad;
00077 }
00078
00079 void Refiner::deAssign(computeInfo *c, processorInfo *p)
00080 {
00081 double speed_ratio = processors[c->oldProcessor].pe_speed / p->pe_speed;
00082 c->processor = -1;
00083 p->computeSet->remove(c);
00084 p->computeLoad -= c->load * speed_ratio;
00085 p->load = p->computeLoad + p->backgroundLoad;
00086 }
00087
00088 double Refiner::computeAverageLoad() {
00089 computeAverage();
00090 return averageLoad;
00091 }
00092
00093 void Refiner::computeAverage()
00094 {
00095 int i;
00096 double total = 0.;
00097 for (i=0; i<numComputes; i++) total += computes[i].load;
00098
00099 for (i=0; i<P; i++)
00100 if (processors[i].available == true)
00101 total += processors[i].backgroundLoad;
00102
00103 averageLoad = total/numAvail;
00104 }
00105
00106 double Refiner::computeMax()
00107 {
00108 int i;
00109 double max = -1.0;
00110 for (i=0; i<P; i++) {
00111 if (processors[i].available == true && processors[i].load > max)
00112 max = processors[i].load;
00113 }
00114 return max;
00115 }
00116
00117 bool Refiner::isHeavy(processorInfo *p)
00118 {
00119 if (p->available == true)
00120 return p->load > overLoad*averageLoad;
00121 else {
00122 return p->computeSet->numElements() != 0;
00123 }
00124 }
00125
00126 bool Refiner::isLight(processorInfo *p)
00127 {
00128 if (p->available == true)
00129 return p->load < averageLoad;
00130 else
00131 return false;
00132 }
00133
00134
00135 void Refiner::removeComputes()
00136 {
00137 int first;
00138 Iterator nextCompute;
00139
00140 if (numAvail < P) {
00141 if (numAvail == 0) CmiAbort("No processor available!");
00142 for (first=0; first<P; first++)
00143 if (processors[first].available == true) break;
00144 for (int i=0; i<P; i++) {
00145 if (processors[i].available == false) {
00146 computeInfo *c = (computeInfo *)
00147 processors[i].computeSet->iterator((Iterator *)&nextCompute);
00148 while (c) {
00149 deAssign(c, &processors[i]);
00150 assign(c, &processors[first]);
00151 nextCompute.id++;
00152 c = (computeInfo *)
00153 processors[i].computeSet->next((Iterator *)&nextCompute);
00154 }
00155 }
00156 }
00157 }
00158 }
00159
00160 int Refiner::refine()
00161 {
00162 int i;
00163 int finish = 1;
00164 maxHeap *heavyProcessors = new maxHeap(P);
00165
00166 Set *lightProcessors = new Set();
00167 for (i=0; i<P; i++) {
00168 if (isHeavy(&processors[i])) {
00169
00170
00171 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00172 } else if (isLight(&processors[i])) {
00173
00174
00175 lightProcessors->insert((InfoRecord *) &(processors[i]));
00176 }
00177 }
00178 int done = 0;
00179
00180 while (!done) {
00181 double bestSize;
00182 computeInfo *bestCompute;
00183 processorInfo *bestP;
00184
00185 processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00186 if (!donor) break;
00187
00188
00189 Iterator nextProcessor;
00190 processorInfo *p = (processorInfo *)
00191 lightProcessors->iterator((Iterator *) &nextProcessor);
00192 bestSize = 0;
00193 bestP = 0;
00194 bestCompute = 0;
00195
00196 while (p) {
00197 Iterator nextCompute;
00198 nextCompute.id = 0;
00199 computeInfo *c = (computeInfo *)
00200 donor->computeSet->iterator((Iterator *)&nextCompute);
00201
00202
00203 while (c) {
00204 if (!c->migratable) {
00205 nextCompute.id++;
00206 c = (computeInfo *)
00207 donor->computeSet->next((Iterator *)&nextCompute);
00208 continue;
00209 }
00210 double speed_ratio = processors[c->oldProcessor].pe_speed / p->pe_speed;
00211
00212
00213 if ( c->load * speed_ratio + p->load < overLoad*averageLoad) {
00214
00215
00216
00217 if(c->load > bestSize) {
00218 bestSize = c->load;
00219 bestCompute = c;
00220 bestP = p;
00221 }
00222 }
00223 nextCompute.id++;
00224 c = (computeInfo *)
00225 donor->computeSet->next((Iterator *)&nextCompute);
00226 }
00227 p = (processorInfo *)
00228 lightProcessors->next((Iterator *) &nextProcessor);
00229 }
00230
00231 if (bestCompute) {
00232
00233
00234
00235 deAssign(bestCompute, donor);
00236 assign(bestCompute, bestP);
00237 } else {
00238 finish = 0;
00239 break;
00240 }
00241
00242 if (bestP->load > averageLoad)
00243 lightProcessors->remove(bestP);
00244
00245 if (isHeavy(donor))
00246 heavyProcessors->insert((InfoRecord *) donor);
00247 else if (isLight(donor))
00248 lightProcessors->insert((InfoRecord *) donor);
00249 }
00250
00251 delete heavyProcessors;
00252 delete lightProcessors;
00253
00254 return finish;
00255 }
00256
00257 int Refiner::multirefine(bool reset)
00258 {
00259 computeAverage();
00260 double avg = averageLoad;
00261 double max = computeMax();
00262
00263 const double overloadStep = 0.01;
00264 const double overloadStart = overLoad;
00265 double dCurOverload = max / avg;
00266
00267 int minOverload = 0;
00268 int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
00269 double dMinOverload = minOverload * overloadStep + overloadStart;
00270 double dMaxOverload = maxOverload * overloadStep + overloadStart;
00271 int curOverload;
00272 int refineDone = 0;
00273 if (_lb_args.debug()>=1)
00274 CmiPrintf("dMinOverload: %f dMaxOverload: %f\n", dMinOverload, dMaxOverload);
00275
00276 overLoad = dMinOverload;
00277 if (refine())
00278 refineDone = 1;
00279 else {
00280 overLoad = dMaxOverload;
00281 if (!refine()) {
00282 CmiPrintf("ERROR: Could not refine at max overload\n");
00283 refineDone = 1;
00284 }
00285 }
00286
00287
00288 while (!refineDone) {
00289 if (maxOverload - minOverload <= 1)
00290 refineDone = 1;
00291 else {
00292 curOverload = (maxOverload + minOverload ) / 2;
00293
00294 overLoad = curOverload * overloadStep + overloadStart;
00295 if (_lb_args.debug()>=1)
00296 CmiPrintf("Testing curOverload %d = %f [min,max]= %d, %d\n", curOverload, overLoad, minOverload, maxOverload);
00297
00298
00299 if (reset) {
00300 int i;
00301 for (i = 0; i < P; i++) {
00302 processors[i].computeLoad = 0;
00303 delete processors[i].computeSet;
00304 processors[i].computeSet = new Set();
00305 }
00306 for (i = 0; i < numComputes; i++)
00307 assign((computeInfo *) &(computes[i]),
00308 (processorInfo *) &(processors[computes[i].oldProcessor]));
00309 }
00310
00311 if (refine())
00312 maxOverload = curOverload;
00313 else
00314 minOverload = curOverload;
00315 }
00316 }
00317 return 1;
00318 }
00319
00320 void Refiner::Refine(int count, BaseLB::LDStats* stats,
00321 int* cur_p, int* new_p)
00322 {
00323
00324
00325 P = count;
00326 numComputes = stats->n_objs;
00327 computes = new computeInfo[numComputes];
00328 processors = new processorInfo[count];
00329
00330 create(count, stats, cur_p);
00331
00332 int i;
00333 for (i=0; i<numComputes; i++)
00334 assign((computeInfo *) &(computes[i]),
00335 (processorInfo *) &(processors[computes[i].oldProcessor]));
00336
00337 removeComputes();
00338
00339 computeAverage();
00340
00341 if (_lb_args.debug()>2) {
00342 CkPrintf("Old PE load (bg load): ");
00343 for (i=0; i<count; i++) CkPrintf("%d:%f(%f) ", i, processors[i].load, processors[i].backgroundLoad);
00344 CkPrintf("\n");
00345 }
00346
00347
00348
00349 multirefine(true);
00350
00351 int nmoves = 0;
00352 for (int pe=0; pe < P; pe++) {
00353 Iterator nextCompute;
00354 nextCompute.id = 0;
00355 computeInfo *c = (computeInfo *)
00356 processors[pe].computeSet->iterator((Iterator *)&nextCompute);
00357 while(c) {
00358 new_p[c->Id] = c->processor;
00359 if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
00360
00361
00362 nextCompute.id++;
00363 c = (computeInfo *) processors[pe].computeSet->
00364 next((Iterator *)&nextCompute);
00365 }
00366 }
00367 if (_lb_args.debug()>2) {
00368 CkPrintf("New PE load: ");
00369 for (i=0; i<count; i++) CkPrintf("%f ", processors[i].load);
00370 CkPrintf("\n");
00371 }
00372 if (_lb_args.debug()>1)
00373 CkPrintf("Refiner: moving %d obejcts. \n", nmoves);
00374 delete [] computes;
00375 delete [] processors;
00376 }
00377
00378