00001
00005
00006 #include "elements.h"
00007 #include "ckheap.h"
00008 #include "NeighborCommLB.h"
00009 #include "topology.h"
00010
00011 #define PER_MESSAGE_SEND_OVERHEAD 35e-6
00012 #define PER_BYTE_SEND_OVERHEAD 8.5e-9
00013 #define PER_MESSAGE_RECV_OVERHEAD 0.0
00014 #define PER_BYTE_RECV_OVERHEAD 0.0
00015
00016 CreateLBFunc_Def(NeighborCommLB, "The neighborhood load balancer with communication")
00017
00018 NeighborCommLB::NeighborCommLB(const CkLBOptions &opt):CBase_NeighborCommLB(opt)
00019 {
00020 lbname = "NeighborCommLB";
00021 if (CkMyPe() == 0)
00022 CkPrintf("[%d] NeighborCommLB created\n",CkMyPe());
00023 }
00024
00025 LBMigrateMsg* NeighborCommLB::Strategy(NborBaseLB::LDStats* stats, int n_nbrs)
00026 {
00027 bool _lb_debug=0;
00028 bool _lb_debug1=0;
00029 bool _lb_debug2=0;
00030 #if CMK_LBDB_ON
00031
00032
00033
00034 double myload = myStats.total_walltime - myStats.idletime;
00035 double avgload = myload;
00036 int i;
00037 if (_lb_debug)
00038 CkPrintf("[%d] Neighbor Count = %d\n", CkMyPe(), n_nbrs);
00039
00040 for(i=0; i < n_nbrs; i++) {
00041
00042 const double scale = ((double)myStats.pe_speed)
00043 / stats[i].pe_speed;
00044
00045 stats[i].total_walltime *= scale;
00046 stats[i].idletime *= scale;
00047
00048 avgload += (stats[i].total_walltime - stats[i].idletime);
00049 }
00050 avgload /= (n_nbrs + 1);
00051
00052 CkVec<MigrateInfo*> migrateInfo;
00053
00054 if (_lb_debug)
00055 CkPrintf("[%d] My load is %lf\n", CkMyPe(),myload);
00056 if (myload > avgload) {
00057 if (_lb_debug1)
00058 CkPrintf("[%d] OVERLOAD My load is %lf average load is %lf\n", CkMyPe(), myload, avgload);
00059
00060
00061 LBTopology* topo;
00062 {
00063 LBtopoFn topofn;
00064 topofn = LBTopoLookup(_lbtopo);
00065 if (topofn == NULL) {
00066 char str[1024];
00067 CmiPrintf("NeighborCommLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
00068 printoutTopo();
00069 sprintf(str, "NeighborCommLB> Fatal error: Unknown topology: %s", _lbtopo);
00070 CmiAbort(str);
00071 }
00072 topo = topofn(CkNumPes());
00073 }
00074 int dimension = topo->get_dimension();
00075 if (_lb_debug2)
00076 CkPrintf("[%d] Topology dimension = %d\n", CkMyPe(), dimension);
00077 if (dimension == -1) {
00078 char str[1024];
00079 CmiPrintf("NeighborCommLB> Fatal error: Unsupported topology: %s. Only some of the following are supported:\n", _lbtopo);
00080 printoutTopo();
00081 sprintf(str, "NeighborCommLB> Fatal error: Unsupported topology: %s", _lbtopo);
00082 CmiAbort(str);
00083 }
00084
00085
00086 int *myProc = new int[dimension];
00087 topo->get_processor_coordinates(myStats.from_pe, myProc);
00088 if (_lb_debug2) {
00089 char temp[1000];
00090 char* now=temp;
00091 sprintf(now, "[%d] Coordinates = [", CkMyPe());
00092 now += strlen(now);
00093 for(i=0;i<dimension;i++) {
00094 sprintf(now, "%d ", myProc[i]);
00095 now +=strlen(now);
00096 }
00097 sprintf(now, "]\n");
00098 now += strlen(now);
00099 CkPrintf(temp);
00100 }
00101
00102
00103
00104 double **commcenter = new double*[myStats.n_objs];
00105 double *commamount = new double[myStats.n_objs];
00106 if(_lb_debug1) {
00107 CkPrintf("[%d] Number of Objs = %d \n", CkMyPe(), myStats.n_objs);
00108 }
00109 {
00110 memset(commamount, 0, sizeof(double)*myStats.n_objs);
00111 for(i=0; i<myStats.n_objs;i++) {
00112 commcenter[i] = new double[dimension];
00113 memset(commcenter[i], 0, sizeof(double)*dimension);
00114 }
00115
00116
00117 int *destProc = new int[dimension];
00118 int *diff = new int[dimension];
00119
00120
00121 for(i=0; i<myStats.n_comm;i++) {
00122 int j;
00123
00124 for(j=0; j<myStats.n_objs;j++)
00125 if((myStats.objData[j].handle.omhandle.id == myStats.commData[i].sender.omId)
00126 && (myStats.objData[j].handle.id == myStats.commData[i].sender.objId)) {
00127 double comm=
00128 PER_MESSAGE_SEND_OVERHEAD * myStats.commData[i].messages
00129 + PER_BYTE_SEND_OVERHEAD * myStats.commData[i].bytes;
00130 commamount[j] += comm;
00131 int dest_pe = myStats.commData[i].receiver.lastKnown();
00132
00133 if(dest_pe==-1) continue;
00134
00135 topo->get_processor_coordinates(dest_pe, destProc);
00136 topo->coordinate_difference(myProc, destProc, diff);
00137 int k;
00138 for(k=0;k<dimension;k++) {
00139 commcenter[j][k] += diff[k] * comm;
00140 }
00141 }
00142 }
00143 for(i=0; i<myStats.n_objs;i++) if (commamount[i]>0) {
00144 int k;
00145 double ratio = 1.0 /commamount[i];
00146 for(k=0;k<dimension;k++)
00147 commcenter[i][k] *= ratio;
00148 } else {
00149 int k;
00150 for(k=0;k<dimension;k++)
00151 commcenter[i][k] = 0;
00152 }
00153
00154 delete [] destProc;
00155 delete [] diff;
00156 }
00157
00158 if(_lb_debug2) {
00159 for(i=0;i<myStats.n_objs;i++) {
00160 char temp[1000];
00161 char* now=temp;
00162 sprintf(now, "[%d] Objs [%d] Load = %lf Comm Amount = %lf ",
00163 CkMyPe(), i, myStats.objData[i].wallTime, commamount[i] );
00164 now += strlen(now);
00165 sprintf(now, "Comm Center = [");
00166 now += strlen(now);
00167 int j;
00168 for(j=0;j<dimension;j++) {
00169 sprintf(now, "%lf ", commcenter[i][j]);
00170 now += strlen(now);
00171 }
00172 sprintf(now, "]\n");
00173 now += strlen(now);
00174 CkPrintf(temp);
00175 }
00176 }
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187 typedef struct _procInfo{
00188 int id;
00189 double load;
00190 int* difference;
00191 } procInfo;
00192
00193 if(_lb_debug2) {
00194 CkPrintf("[%d] Querying neighborhood topology...\n", CkMyPe() );
00195 }
00196
00197 procInfo* neighbors = new procInfo[n_nbrs];
00198 {
00199 int *destProc = new int[dimension];
00200 for(i=0; i < n_nbrs; i++) {
00201 neighbors[i].id = stats[i].from_pe;
00202 neighbors[i].load = stats[i].total_walltime - stats[i].idletime;
00203 neighbors[i].difference = new int[dimension];
00204 topo->get_processor_coordinates(neighbors[i].id, destProc);
00205 topo->coordinate_difference(myProc, destProc, neighbors[i].difference);
00206 }
00207 delete[] destProc;
00208 }
00209
00210 if(_lb_debug2) {
00211 CkPrintf("[%d] Building obj heap...\n", CkMyPe() );
00212 }
00213
00214 maxHeap objs(myStats.n_objs);
00215 double totalObjLoad=0.0;
00216 for(i=0; i < myStats.n_objs; i++) {
00217 InfoRecord* item = new InfoRecord;
00218 item->load = myStats.objData[i].wallTime;
00219 totalObjLoad += item->load;
00220 item->Id = i;
00221 objs.insert(item);
00222 }
00223
00224 if(_lb_debug2) {
00225 CkPrintf("[%d] Beginning distributing objects...\n", CkMyPe() );
00226 }
00227
00228
00229 while(objs.numElements()>0) {
00230 InfoRecord* obj;
00231 obj = objs.deleteMax();
00232 int bestDest = -1;
00233 for(i = 0; i < n_nbrs; i++)
00234 if(neighbors[i].load +obj->load < myload - obj->load && (bestDest==-1 || neighbors[i].load < neighbors[bestDest].load)) {
00235 double dotsum=0;
00236 int j;
00237 for(j=0; j<dimension; j++) dotsum += (commcenter[obj->Id][j] * neighbors[i].difference[j]);
00238 if(myload - avgload < totalObjLoad || dotsum>0.5 || (dotsum>0 && objs.numElements()==0) || commamount[obj->Id]==0) {
00239 bestDest = i;
00240 }
00241 }
00242
00243 if(bestDest != -1) {
00244 if(_lb_debug1) {
00245 CkPrintf("[%d] Obj[%d] will move to Proc[%d]\n", CkMyPe(), obj->Id, neighbors[bestDest].id);
00246 }
00247
00248 MigrateInfo* migrateMe = new MigrateInfo;
00249 migrateMe->obj = myStats.objData[obj->Id].handle;
00250 migrateMe->from_pe = myStats.from_pe;
00251 migrateMe->to_pe = neighbors[bestDest].id;
00252 migrateInfo.insertAtEnd(migrateMe);
00253
00254 myload -= obj->load;
00255 neighbors[bestDest].load += obj->load;
00256 }
00257 totalObjLoad -= obj->load;
00258 delete obj;
00259 }
00260
00261 if(_lb_debug2) {
00262 CkPrintf("[%d] Clearing Up...\n", CkMyPe());
00263 }
00264
00265 for(i=0; i<n_nbrs; i++) {
00266 delete[] neighbors[i].difference;
00267 }
00268 delete[] neighbors;
00269
00270 delete[] myProc;
00271
00272 for(i=0;i<myStats.n_objs;i++) {
00273 delete[] commcenter[i];
00274 }
00275 delete[] commcenter;
00276 delete[] commamount;
00277 }
00278
00279 if(_lb_debug2) {
00280 CkPrintf("[%d] Generating result...\n", CkMyPe());
00281 }
00282
00283
00284 int migrate_count=migrateInfo.length();
00285
00286
00287
00288 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
00289 msg->n_moves = migrate_count;
00290 for(i=0; i < migrate_count; i++) {
00291 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
00292 msg->moves[i] = *item;
00293 delete item;
00294 migrateInfo[i] = 0;
00295 }
00296
00297 return msg;
00298 #else
00299 return NULL;
00300 #endif
00301 }
00302
00303 #include "NeighborCommLB.def.h"
00304