00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include "GridCommLB.decl.h"
00042
00043 #include "GridCommLB.h"
00044 #include "manager.h"
00045
00046 CreateLBFunc_Def (GridCommLB, "Grid communication load balancer (evenly distribute objects across each cluster)")
00047
00048
00049
00050
00051
00052
00053 GridCommLB::GridCommLB (const CkLBOptions &opt) : CentralLB (opt)
00054 {
00055 char *value;
00056
00057
00058 lbname = (char *) "GridCommLB";
00059
00060 if (CkMyPe() == 0) {
00061 CkPrintf ("[%d] GridCommLB created.\n", CkMyPe());
00062 }
00063
00064 if (value = getenv ("CK_LDB_GRIDCOMMLB_MODE")) {
00065 CK_LDB_GridCommLB_Mode = atoi (value);
00066 } else {
00067 CK_LDB_GridCommLB_Mode = CK_LDB_GRIDCOMMLB_MODE;
00068 }
00069
00070 if (value = getenv ("CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD")) {
00071 CK_LDB_GridCommLB_Background_Load = atoi (value);
00072 } else {
00073 CK_LDB_GridCommLB_Background_Load = CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD;
00074 }
00075
00076 if (value = getenv ("CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE")) {
00077 CK_LDB_GridCommLB_Load_Tolerance = atof (value);
00078 } else {
00079 CK_LDB_GridCommLB_Load_Tolerance = CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE;
00080 }
00081
00082 manager_init ();
00083 }
00084
00085
00086
00087
00088
00089
00090 GridCommLB::GridCommLB (CkMigrateMessage *msg) : CentralLB (msg)
00091 {
00092 char *value;
00093
00094
00095 lbname = (char *) "GridCommLB";
00096
00097 if (value = getenv ("CK_LDB_GRIDCOMMLB_MODE")) {
00098 CK_LDB_GridCommLB_Mode = atoi (value);
00099 } else {
00100 CK_LDB_GridCommLB_Mode = CK_LDB_GRIDCOMMLB_MODE;
00101 }
00102
00103 if (value = getenv ("CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD")) {
00104 CK_LDB_GridCommLB_Background_Load = atoi (value);
00105 } else {
00106 CK_LDB_GridCommLB_Background_Load = CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD;
00107 }
00108
00109 if (value = getenv ("CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE")) {
00110 CK_LDB_GridCommLB_Load_Tolerance = atof (value);
00111 } else {
00112 CK_LDB_GridCommLB_Load_Tolerance = CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE;
00113 }
00114
00115 manager_init ();
00116 }
00117
00118
00119
00120
00121
00122
00123
00124 CmiBool GridCommLB::QueryBalanceNow (int step)
00125 {
00126 if (_lb_args.debug() > 2) {
00127 CkPrintf ("[%d] GridCommLB is balancing on step %d.\n", CkMyPe(), step);
00128 }
00129
00130 return (CmiTrue);
00131 }
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146 int GridCommLB::Get_Cluster (int pe)
00147 {
00148 #if CONVERSE_VERSION_VMI
00149 return (CmiGetCluster (pe));
00150 #else
00151 return (0);
00152 #endif
00153 }
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166 void GridCommLB::Initialize_PE_Data (CentralLB::LDStats *stats)
00167 {
00168 int min_speed;
00169 int i;
00170
00171
00172 PE_Data = new PE_Data_T[Num_PEs];
00173
00174 min_speed = MAXINT;
00175 for (i = 0; i < Num_PEs; i++) {
00176 (&PE_Data[i])->available = stats->procs[i].available;
00177 (&PE_Data[i])->cluster = Get_Cluster (i);
00178 (&PE_Data[i])->num_objs = 0;
00179 (&PE_Data[i])->num_lan_objs = 0;
00180 (&PE_Data[i])->num_lan_msgs = 0;
00181 (&PE_Data[i])->num_wan_objs = 0;
00182 (&PE_Data[i])->num_wan_msgs = 0;
00183 (&PE_Data[i])->relative_speed = 0.0;
00184 (&PE_Data[i])->scaled_load = 0.0;
00185
00186 if (stats->procs[i].pe_speed < min_speed) {
00187 min_speed = stats->procs[i].pe_speed;
00188 }
00189 }
00190
00191
00192
00193 for (i = 0; i < Num_PEs; i++) {
00194 (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
00195 if (CK_LDB_GridCommLB_Background_Load) {
00196 (&PE_Data[i])->scaled_load += stats->procs[i].bg_walltime;
00197 }
00198 }
00199 }
00200
00201
00202
00203
00204
00205
00206 int GridCommLB::Available_PE_Count ()
00207 {
00208 int available_pe_count;
00209 int i;
00210
00211
00212 available_pe_count = 0;
00213 for (i = 0; i < Num_PEs; i++) {
00214 if ((&PE_Data[i])->available) {
00215 available_pe_count += 1;
00216 }
00217 }
00218 return (available_pe_count);
00219 }
00220
00221
00222
00223
00224
00225
00226 int GridCommLB::Compute_Number_Of_Clusters ()
00227 {
00228 int max_cluster;
00229 int i;
00230
00231
00232 max_cluster = 0;
00233 for (i = 0; i < Num_PEs; i++) {
00234 if ((&PE_Data[i])->cluster < 0) {
00235 return (-1);
00236 }
00237
00238 if ((&PE_Data[i])->cluster > max_cluster) {
00239 max_cluster = (&PE_Data[i])->cluster;
00240 }
00241 }
00242 return (max_cluster + 1);
00243 }
00244
00245
00246
00247
00248
00249
00250 void GridCommLB::Initialize_Object_Data (CentralLB::LDStats *stats)
00251 {
00252 int i;
00253
00254
00255 Object_Data = new Object_Data_T[Num_Objects];
00256
00257 for (i = 0; i < Num_Objects; i++) {
00258 (&Object_Data[i])->migratable = (&stats->objData[i])->migratable;
00259 (&Object_Data[i])->cluster = Get_Cluster (stats->from_proc[i]);
00260 (&Object_Data[i])->from_pe = stats->from_proc[i];
00261 (&Object_Data[i])->to_pe = -1;
00262 (&Object_Data[i])->num_lan_msgs = 0;
00263 (&Object_Data[i])->num_wan_msgs = 0;
00264 (&Object_Data[i])->load = (&stats->objData[i])->wallTime;
00265 }
00266 }
00267
00268
00269
00270
00271
00272
00273 void GridCommLB::Examine_InterObject_Messages (CentralLB::LDStats *stats)
00274 {
00275 int i;
00276 int j;
00277 LDCommData *com_data;
00278 int send_object;
00279 int send_pe;
00280 int send_cluster;
00281 int recv_object;
00282 int recv_pe;
00283 int recv_cluster;
00284 LDObjKey *recv_objects;
00285 int num_objects;
00286
00287
00288 for (i = 0; i < stats->n_comm; i++) {
00289 com_data = &(stats->commData[i]);
00290 if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
00291 send_object = stats->getHash (com_data->sender);
00292 recv_object = stats->getHash (com_data->receiver.get_destObj());
00293
00294 if ((send_object < 0) || (send_object > Num_Objects) || (recv_object < 0) || (recv_object > Num_Objects)) {
00295 continue;
00296 }
00297
00298 send_pe = (&Object_Data[send_object])->from_pe;
00299 recv_pe = (&Object_Data[recv_object])->from_pe;
00300
00301 send_cluster = Get_Cluster (send_pe);
00302 recv_cluster = Get_Cluster (recv_pe);
00303
00304 if (send_cluster == recv_cluster) {
00305 (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
00306 } else {
00307 (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
00308 }
00309 } else if (com_data->receiver.get_type() == LD_OBJLIST_MSG) {
00310 send_object = stats->getHash (com_data->sender);
00311
00312 if ((send_object < 0) || (send_object > Num_Objects)) {
00313 continue;
00314 }
00315
00316 send_pe = (&Object_Data[send_object])->from_pe;
00317 send_cluster = Get_Cluster (send_pe);
00318
00319 recv_objects = com_data->receiver.get_destObjs (num_objects);
00320
00321 for (j = 0; j < num_objects; j++) {
00322 recv_object = stats->getHash (recv_objects[j]);
00323
00324 if ((recv_object < 0) || (recv_object > Num_Objects)) {
00325 continue;
00326 }
00327
00328 recv_pe = (&Object_Data[recv_object])->from_pe;
00329 recv_cluster = Get_Cluster (recv_pe);
00330
00331 if (send_cluster == recv_cluster) {
00332 (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
00333 } else {
00334 (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
00335 }
00336 }
00337 }
00338 }
00339 }
00340
00341
00342
00343
00344
00345
00346 void GridCommLB::Map_NonMigratable_Objects_To_PEs ()
00347 {
00348 int i;
00349
00350
00351 for (i = 0; i < Num_Objects; i++) {
00352 if (!((&Object_Data[i])->migratable)) {
00353 if (_lb_args.debug() > 1) {
00354 CkPrintf ("[%d] GridCommLB identifies object %d as non-migratable.\n", CkMyPe(), i);
00355 }
00356
00357 Assign_Object_To_PE (i, (&Object_Data[i])->from_pe);
00358 }
00359 }
00360 }
00361
00362
00363
00364
00365
00366
00367 void GridCommLB::Map_Migratable_Objects_To_PEs (int cluster)
00368 {
00369 int target_object;
00370 int target_pe;
00371
00372
00373 while (1) {
00374 target_object = Find_Maximum_Object (cluster);
00375 target_pe = Find_Minimum_PE (cluster);
00376
00377 if ((target_object == -1) || (target_pe == -1)) {
00378 break;
00379 }
00380
00381 Assign_Object_To_PE (target_object, target_pe);
00382 }
00383 }
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395 int GridCommLB::Find_Maximum_Object (int cluster)
00396 {
00397 int max_index;
00398 int max_load_index;
00399 double max_load;
00400 int max_wan_msgs_index;
00401 int max_wan_msgs;
00402 double load_tolerance;
00403 int i;
00404
00405
00406 max_index = -1;
00407
00408 max_load_index = -1;
00409 max_load = -1.0;
00410
00411 max_wan_msgs_index = -1;
00412 max_wan_msgs = -1;
00413
00414 for (i = 0; i < Num_Objects; i++) {
00415 if (((&Object_Data[i])->cluster == cluster) && ((&Object_Data[i])->to_pe == -1)) {
00416 if ((&Object_Data[i])->load > max_load) {
00417 max_load_index = i;
00418 max_load = (&Object_Data[i])->load;
00419 }
00420 if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
00421 max_wan_msgs_index = i;
00422 max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
00423 }
00424 }
00425 }
00426
00427 if (max_load_index < 0) {
00428 return (max_load_index);
00429 }
00430
00431 if ((&Object_Data[max_load_index])->num_wan_msgs >= (&Object_Data[max_wan_msgs_index])->num_wan_msgs) {
00432 return (max_load_index);
00433 }
00434
00435 load_tolerance = (&Object_Data[max_load_index])->load * CK_LDB_GridCommLB_Load_Tolerance;
00436
00437 max_index = max_load_index;
00438
00439 for (i = 0; i < Num_Objects; i++) {
00440 if (((&Object_Data[i])->cluster == cluster) && ((&Object_Data[i])->to_pe == -1)) {
00441 if (i != max_load_index) {
00442 if (fabs ((&Object_Data[max_load_index])->load - (&Object_Data[i])->load) <= load_tolerance) {
00443 if ((&Object_Data[i])->num_wan_msgs > (&Object_Data[max_index])->num_wan_msgs) {
00444 max_index = i;
00445 }
00446 }
00447 }
00448 }
00449 }
00450
00451 return (max_index);
00452 }
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470 int GridCommLB::Find_Minimum_PE (int cluster)
00471 {
00472 if (CK_LDB_GridCommLB_Mode == 0) {
00473 int min_index;
00474 int min_objs;
00475 int i;
00476
00477
00478 min_index = -1;
00479 min_objs = MAXINT;
00480
00481 for (i = 0; i < Num_PEs; i++) {
00482 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
00483 if ((&PE_Data[i])->num_objs < min_objs) {
00484 min_index = i;
00485 min_objs = (&PE_Data[i])->num_objs;
00486 } else if (((&PE_Data[i])->num_objs == min_objs) &&
00487 ((&PE_Data[i])->num_wan_objs < (&PE_Data[min_index])->num_wan_objs)) {
00488 min_index = i;
00489 } else if (((&PE_Data[i])->num_objs == min_objs) &&
00490 ((&PE_Data[i])->num_wan_objs == (&PE_Data[min_index])->num_wan_objs) &&
00491 ((&PE_Data[i])->num_wan_msgs < (&PE_Data[min_index])->num_wan_msgs)) {
00492 min_index = i;
00493 } else if (((&PE_Data[i])->num_objs == min_objs) &&
00494 ((&PE_Data[i])->num_wan_objs == (&PE_Data[min_index])->num_wan_objs) &&
00495 ((&PE_Data[i])->num_wan_msgs == (&PE_Data[min_index])->num_wan_msgs) &&
00496 ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
00497 min_index = i;
00498 }
00499 }
00500 }
00501
00502 return (min_index);
00503 } else if (CK_LDB_GridCommLB_Mode == 1) {
00504 int min_index;
00505 int min_load_index;
00506 double min_scaled_load;
00507 int min_wan_msgs_index;
00508 int min_wan_msgs;
00509 double load_tolerance;
00510 int i;
00511
00512
00513 min_index = -1;
00514
00515 min_load_index = -1;
00516 min_scaled_load = MAXDOUBLE;
00517
00518 min_wan_msgs_index = -1;
00519 min_wan_msgs = MAXINT;
00520
00521 for (i = 0; i < Num_PEs; i++) {
00522 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
00523 if ((&PE_Data[i])->scaled_load < min_scaled_load) {
00524 min_load_index = i;
00525 min_scaled_load = (&PE_Data[i])->scaled_load;
00526 }
00527 if ((&PE_Data[i])->num_wan_msgs < min_wan_msgs) {
00528 min_wan_msgs_index = i;
00529 min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
00530 }
00531 }
00532 }
00533
00534
00535 if (min_load_index < 0) {
00536 return (min_load_index);
00537 }
00538
00539
00540
00541
00542 if ((&PE_Data[min_load_index])->num_wan_msgs <= (&PE_Data[min_wan_msgs_index])->num_wan_msgs) {
00543 return (min_load_index);
00544 }
00545
00546
00547
00548
00549
00550 load_tolerance = (&PE_Data[min_load_index])->scaled_load * CK_LDB_GridCommLB_Load_Tolerance;
00551
00552 min_index = min_load_index;
00553
00554 for (i = 0; i < Num_PEs; i++) {
00555 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
00556 if (i != min_load_index) {
00557 if (fabs ((&PE_Data[i])->scaled_load - (&PE_Data[min_load_index])->scaled_load) <= load_tolerance) {
00558 if ((&PE_Data[i])->num_wan_msgs < (&PE_Data[min_index])->num_wan_msgs) {
00559 min_index = i;
00560 }
00561 }
00562 }
00563 }
00564 }
00565
00566 return (min_index);
00567 } else {
00568 if (_lb_args.debug() > 0) {
00569 CkPrintf ("[%d] GridCommLB was told to use bad mode (%d).\n", CkMyPe(), CK_LDB_GridCommLB_Mode);
00570 }
00571 return (-1);
00572 }
00573 }
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583 void GridCommLB::Assign_Object_To_PE (int target_object, int target_pe)
00584 {
00585 (&Object_Data[target_object])->to_pe = target_pe;
00586
00587 (&PE_Data[target_pe])->num_objs += 1;
00588
00589 if ((&Object_Data[target_object])->num_lan_msgs > 0) {
00590 (&PE_Data[target_pe])->num_lan_objs += 1;
00591 (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
00592 }
00593
00594 if ((&Object_Data[target_object])->num_wan_msgs > 0) {
00595 (&PE_Data[target_pe])->num_wan_objs += 1;
00596 (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
00597 }
00598
00599 (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
00600 }
00601
00602
00603
00604
00605
00606
00607
00608 void GridCommLB::work (LDStats *stats)
00609 {
00610 int i;
00611
00612
00613 if (_lb_args.debug() > 0) {
00614 CkPrintf ("[%d] GridCommLB is working (mode=%d, background load=%d, load tolerance=%f).\n", CkMyPe(), CK_LDB_GridCommLB_Mode, CK_LDB_GridCommLB_Background_Load, CK_LDB_GridCommLB_Load_Tolerance);
00615 }
00616
00617
00618 stats->makeCommHash ();
00619
00620
00621 Num_PEs = stats->nprocs();
00622 Num_Objects = stats->n_objs;
00623
00624 if (_lb_args.debug() > 0) {
00625 CkPrintf ("[%d] GridCommLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
00626 }
00627
00628
00629 Initialize_PE_Data (stats);
00630
00631
00632 if (Available_PE_Count() < 1) {
00633 if (_lb_args.debug() > 0) {
00634 CkPrintf ("[%d] GridCommLB finds no available PEs -- no balancing done.\n", CkMyPe());
00635 }
00636
00637 delete [] PE_Data;
00638
00639 return;
00640 }
00641
00642
00643
00644 Num_Clusters = Compute_Number_Of_Clusters ();
00645 if (Num_Clusters < 1) {
00646 if (_lb_args.debug() > 0) {
00647 CkPrintf ("[%d] GridCommLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
00648 }
00649
00650 delete [] PE_Data;
00651
00652 return;
00653 }
00654
00655 if (_lb_args.debug() > 0) {
00656 CkPrintf ("[%d] GridCommLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
00657 }
00658
00659
00660 Initialize_Object_Data (stats);
00661
00662
00663 Examine_InterObject_Messages (stats);
00664
00665
00666 Map_NonMigratable_Objects_To_PEs ();
00667
00668
00669 for (i = 0; i < Num_Clusters; i++) {
00670 Map_Migratable_Objects_To_PEs (i);
00671 }
00672
00673
00674 for (i = 0; i < Num_Objects; i++) {
00675 stats->to_proc[i] = (&Object_Data[i])->to_pe;
00676
00677 if (_lb_args.debug() > 2) {
00678 CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
00679 } else if (_lb_args.debug() > 1) {
00680 if (stats->to_proc[i] != stats->from_proc[i]) {
00681 CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
00682 }
00683 }
00684 }
00685
00686
00687 delete [] Object_Data;
00688 delete [] PE_Data;
00689 }
00690
00691 #include "GridCommLB.def.h"