00001
00002 #include "elements.h"
00003 #include "ckheap.h"
00004
00005 #include "BaseLB.h"
00006
00007 LBVectorMigrateMsg * VectorStrategy(BaseLB::LDStats *stats)
00008 {
00009 int i;
00010 int n_pes = stats->nprocs();
00011
00012 processorInfo *processors = new processorInfo[n_pes];
00013
00014 for(i=0; i < n_pes; i++) {
00015 processors[i].Id = i;
00016 processors[i].backgroundLoad = stats->procs[i].bg_walltime;
00017 processors[i].computeLoad = stats->procs[i].total_walltime;
00018 processors[i].load = processors[i].computeLoad + processors[i].backgroundLoad;
00019 processors[i].pe_speed = stats->procs[i].pe_speed;
00020 processors[i].available = stats->procs[i].available;
00021 }
00022
00023
00024 double total = 0.0;
00025 for (i=0; i<n_pes; i++)
00026 total += processors[i].load;
00027
00028 double averageLoad = total/n_pes;
00029
00030 if (_lb_args.debug()>1) CkPrintf("Average load: %f (total: %f, n_pes: %d)\n", averageLoad, total, n_pes);
00031
00032 maxHeap *heavyProcessors = new maxHeap(n_pes);
00033 Set *lightProcessors = new Set();
00034
00035 double overload_factor = 1.01;
00036 for (i=0; i<n_pes; i++) {
00037 if (processors[i].load > averageLoad*overload_factor) {
00038
00039
00040 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00041 } else if (processors[i].load < averageLoad) {
00042
00043
00044 lightProcessors->insert((InfoRecord *) &(processors[i]));
00045 }
00046 }
00047
00048 if (_lb_args.debug()>1) {
00049 CkPrintf("Before migration: (%d) ", n_pes);
00050 for (i=0; i<n_pes; i++) CkPrintf("%f (%f %f) ", processors[i].load, processors[i].computeLoad, processors[i].backgroundLoad);
00051 CkPrintf("\n");
00052 }
00053
00054 int done = 0;
00055 CkVec<VectorMigrateInfo *> miginfo;
00056 while (!done) {
00057 processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00058 if (!donor) break;
00059 if (donor->computeLoad == 0.0) continue;
00060 Iterator nextProcessor;
00061 processorInfo *p = (processorInfo *)
00062 lightProcessors->iterator((Iterator *) &nextProcessor);
00063 double load = donor->load - averageLoad;
00064 while (load > 0.0 && p) {
00065 double needed = averageLoad - p->load;
00066 double give;
00067 if (load > needed) give = needed;
00068 else give = load;
00069 if (give > donor->computeLoad) give = donor->computeLoad;
00070 donor->load -= give;
00071 donor->computeLoad -= give;
00072 p->load += give;
00073 p->computeLoad += give;
00074 VectorMigrateInfo *move = new VectorMigrateInfo;
00075 move->from_pe = donor->Id;
00076 move->to_pe = p->Id;
00077 move->load = give;
00078 miginfo.push_back(move);
00079 if (give < needed)
00080 break;
00081 else
00082 lightProcessors->remove(p);
00083 p = (processorInfo *)lightProcessors->next((Iterator *) &nextProcessor);
00084 load -= give;
00085 }
00086 }
00087
00088 int migrate_count = miginfo.length();
00089 LBVectorMigrateMsg* msg = new(migrate_count,0) LBVectorMigrateMsg;
00090 msg->n_moves = migrate_count;
00091 for(i=0; i < migrate_count; i++) {
00092 VectorMigrateInfo* item = (VectorMigrateInfo*) miginfo[i];
00093 msg->moves[i] = *item;
00094 if (_lb_args.debug()>1)
00095 CkPrintf("Processor %d => %d load: %f.\n", item->from_pe, item->to_pe, item->load);
00096 delete item;
00097 miginfo[i] = 0;
00098 }
00099
00100 if (_lb_args.debug()>1) {
00101 CkPrintf("After migration: (%d) ", n_pes);
00102 for (i=0; i<n_pes; i++) CkPrintf("%f (%f %f) ", processors[i].load, processors[i].computeLoad, processors[i].backgroundLoad);
00103 CkPrintf("\n");
00104 }
00105
00106 if (_lb_args.debug())
00107 CkPrintf("VectorStrategy: %d processor vector migrating.\n", migrate_count);
00108
00109 delete heavyProcessors;
00110 delete lightProcessors;
00111
00112 delete [] processors;
00113
00114 return msg;
00115 }
00116
00117