00001 #include <stdlib.h>
00002
00003 #include "converse.h"
00004 #include "cldb.neighbor.h"
00005 #include "queueing.h"
00006 #include "cldb.h"
00007 #include "topology.h"
00008
00009 #define USE_MULTICAST 0
00010 #define IDLE_IMMEDIATE 1
00011 #define TRACE_USEREVENTS 1
00012
00013 #define PERIOD 20
00014 #define MAXOVERLOAD 1
00015
00016 static int LBPeriod = PERIOD;
00017 static int overload_threshold = MAXOVERLOAD;
00018
00019 typedef struct CldProcInfo_s {
00020 double lastCheck;
00021 int sent;
00022 int balanceEvt;
00023 int updateLoadEvt;
00024 int idleEvt;
00025 int idleprocEvt;
00026 } *CldProcInfo;
00027
00028 extern char *_lbtopo;
00029 int _lbsteal = 0;
00030
00031 extern "C" void gengraph(int, int, int, int *, int *);
00032
00033 CpvStaticDeclare(CldProcInfo, CldData);
00034 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
00035 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
00036 CpvStaticDeclare(int, MinLoad);
00037 CpvStaticDeclare(int, MinProc);
00038 CpvStaticDeclare(int, Mindex);
00039 CpvStaticDeclare(int, start);
00040
00041 #if ! USE_MULTICAST
00042 CpvStaticDeclare(loadmsg *, msgpool);
00043
00044 static
00045 #if CMK_C_INLINE
00046 inline
00047 #endif
00048 loadmsg *getPool(void){
00049 loadmsg *msg;
00050 if (CpvAccess(msgpool)!=NULL) {
00051 msg = CpvAccess(msgpool);
00052 CpvAccess(msgpool) = msg->next;
00053 }
00054 else {
00055 msg = (loadmsg *)CmiAlloc(sizeof(loadmsg));
00056 CmiSetHandler(msg, CpvAccess(CldLoadResponseHandlerIndex));
00057 }
00058 return msg;
00059 }
00060
00061 static
00062 #if CMK_C_INLINE
00063 inline
00064 #endif
00065 void putPool(loadmsg *msg)
00066 {
00067 msg->next = CpvAccess(msgpool);
00068 CpvAccess(msgpool) = msg;
00069 }
00070
00071 #endif
00072
00073 void LoadNotifyFn(int l)
00074 {
00075 CldProcInfo cldData = CpvAccess(CldData);
00076 cldData->sent = 0;
00077 }
00078
00079 const char *CldGetStrategy(void)
00080 {
00081 return "neighbor";
00082 }
00083
00084
00085 static void CldBeginIdle(void *dummy)
00086 {
00087 CpvAccess(CldData)->lastCheck = CmiWallTimer();
00088 }
00089
00090 static void CldEndIdle(void *dummy)
00091 {
00092 CpvAccess(CldData)->lastCheck = -1;
00093 }
00094
00095 static void CldStillIdle(void *dummy, double curT)
00096 {
00097 int i;
00098 double startT;
00099 requestmsg msg;
00100 int myload;
00101 CldProcInfo cldData = CpvAccess(CldData);
00102
00103 double now = curT;
00104 double lt = cldData->lastCheck;
00105
00106 if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
00107 cldData->lastCheck = now;
00108
00109 myload = CldCountTokens();
00110 if (myload > 0) return;
00111
00112 msg.from_pe = CmiMyPe();
00113 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
00114 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
00115
00116 CmiBecomeImmediate(&msg);
00117 for (i=0; i<CpvAccess(numNeighbors); i++) {
00118 msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
00119 CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
00120 }
00121 #else
00122 msg.to_rank = -1;
00123 CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
00124 #endif
00125 cldData->sent = 1;
00126
00127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00128 traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
00129 #endif
00130 }
00131
00132
00133
00134 static void CldAskLoadHandler(requestmsg *msg)
00135 {
00136 int receiver, rank, recvIdx, i;
00137 int myload = CldCountTokens();
00138 double now = CmiWallTimer();
00139
00140
00141 if (myload>0) {
00142 int sendLoad;
00143 receiver = msg->from_pe;
00144 rank = CmiMyRank();
00145 if (msg->to_rank != -1) rank = msg->to_rank;
00146 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
00147
00148 if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
00149 CmiDelayImmediate();
00150 return;
00151 }
00152 CmiUnlock(CpvAccessOther(cldLock, rank));
00153 #endif
00154 sendLoad = myload / CpvAccess(numNeighbors) / 2;
00155 if (sendLoad < 1) sendLoad = 1;
00156 sendLoad = 1;
00157 for (i=0; i<CpvAccess(numNeighbors); i++)
00158 if (CpvAccess(neighbors)[i].pe == receiver) break;
00159
00160 if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;}
00161 CpvAccess(neighbors)[i].load += sendLoad;
00162 CldMultipleSend(receiver, sendLoad, rank, 0);
00163 #if 0
00164 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00165
00166 {
00167 CldProcInfo cldData = CpvAccessOther(CldData, rank);
00168 traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
00169 }
00170 #endif
00171 #endif
00172 }
00173 CmiFree(msg);
00174 }
00175
00176
00177
00178 void CldSendLoad(void)
00179 {
00180 #if CMK_MULTICORE
00181
00182 double myload = CldCountTokens();
00183 int nNeighbors = CpvAccess(numNeighbors);
00184 int i;
00185 for (i=0; i<nNeighbors; i++) {
00186 int neighbor_pe = CpvAccess(neighbors)[i].pe;
00187 int j, found=0;
00188 for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
00189 if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
00190 {
00191 CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
00192 found = 1;
00193 break;
00194 }
00195 }
00196 #else
00197 #if USE_MULTICAST
00198 loadmsg msg;
00199
00200 msg.pe = CmiMyPe();
00201 msg.load = CldCountTokens();
00202 CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
00203 CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
00204 CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
00205 #else
00206 int i;
00207 int mype = CmiMyPe();
00208 int myload = CldCountTokens();
00209 for(i=0; i<CpvAccess(numNeighbors); i++) {
00210 loadmsg *msg = getPool();
00211 msg->fromindex = i;
00212 msg->toindex = CpvAccess(neighbors)[i].index;
00213 msg->pe = mype;
00214 msg->load = myload;
00215 CmiSyncSendAndFree(CpvAccess(neighbors)[i].pe, sizeof(loadmsg), msg);
00216 }
00217 #endif
00218 #endif
00219 }
00220
00221 int CldMinAvg(void)
00222 {
00223 int sum=0, i;
00224 int myload;
00225
00226 int nNeighbors = CpvAccess(numNeighbors);
00227 if (CpvAccess(start) == -1)
00228 CpvAccess(start) = CmiMyPe() % nNeighbors;
00229
00230 #if 0
00231
00232 for (i=0; i<nNeighbors; i++) {
00233 CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
00234 }
00235 #endif
00236 CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
00237 CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
00238 sum = CpvAccess(neighbors)[CpvAccess(start)].load;
00239 CpvAccess(Mindex) = CpvAccess(start);
00240 for (i=1; i<nNeighbors; i++) {
00241 CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
00242 sum += CpvAccess(neighbors)[CpvAccess(start)].load;
00243 if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
00244 CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
00245 CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
00246 CpvAccess(Mindex) = CpvAccess(start);
00247 }
00248 }
00249 CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
00250 myload = CldCountTokens();
00251 sum += myload;
00252 if (myload < CpvAccess(MinLoad)) {
00253 CpvAccess(MinLoad) = myload;
00254 CpvAccess(MinProc) = CmiMyPe();
00255 }
00256 i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
00257 return i;
00258 }
00259
00260 void CldBalance(void *dummy, double curT)
00261 {
00262 int i, j, overload, numToMove=0, avgLoad;
00263 int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
00264
00265 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00266 double startT = curT;
00267 #endif
00268
00269
00270 avgLoad = CldMinAvg();
00271
00272
00273
00274
00275
00276 overload = CldCountTokens() - avgLoad;
00277
00278 if (overload > overload_threshold) {
00279 int nNeighbors = CpvAccess(numNeighbors);
00280 for (i=0; i<nNeighbors; i++)
00281 if (CpvAccess(neighbors)[i].load < avgLoad) {
00282 totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
00283 if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
00284 maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
00285 numUnderAvg++;
00286 }
00287 if (numUnderAvg > 0) {
00288 int myrank = CmiMyRank();
00289 for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
00290 j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
00291 if (CpvAccess(neighbors)[j].load < avgLoad) {
00292 numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
00293 if (numToMove > overload)
00294 numToMove = overload;
00295 overload -= numToMove;
00296 CpvAccess(neighbors)[j].load += numToMove;
00297 #if CMK_MULTICORE || CMK_USE_IBVERBS
00298 CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove, myrank);
00299 #else
00300 CldMultipleSend(CpvAccess(neighbors)[j].pe,
00301 numToMove, myrank,
00302 #if CMK_SMP
00303 0
00304 #else
00305 1
00306 #endif
00307 );
00308 #endif
00309 }
00310 }
00311 }
00312 }
00313 CldSendLoad();
00314 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00315 traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
00316 #endif
00317 }
00318
00319 void CldBalancePeriod(void *dummy, double curT)
00320 {
00321 CldBalance(NULL, curT);
00322 CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, LBPeriod, CmiMyPe());
00323 }
00324
00325
00326 void CldLoadResponseHandler(loadmsg *msg)
00327 {
00328 int i;
00329
00330 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00331 double startT = CmiWallTimer();
00332 #endif
00333 #if USE_MULTICAST
00334 for(i=0; i<CpvAccess(numNeighbors); i++)
00335 if (CpvAccess(neighbors)[i].pe == msg->pe) {
00336 CpvAccess(neighbors)[i].load = msg->load;
00337 break;
00338 }
00339 CmiFree(msg);
00340 #else
00341 int index = msg->toindex;
00342 if (index == -1) {
00343 for(i=0; i<CpvAccess(numNeighbors); i++)
00344 if (CpvAccess(neighbors)[i].pe == msg->pe) {
00345 index = i;
00346 break;
00347 }
00348 }
00349 if (index != -1) {
00350 CpvAccess(neighbors)[index].load = msg->load;
00351 if (CpvAccess(neighbors)[index].index == -1) CpvAccess(neighbors)[index].index = msg->fromindex;
00352 }
00353 putPool(msg);
00354 #endif
00355 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00356 traceUserBracketEvent(CpvAccess(CldData)->updateLoadEvt, startT, CmiWallTimer());
00357 #endif
00358 }
00359
00360 void CldBalanceHandler(void *msg)
00361 {
00362 CldRestoreHandler((char *)msg);
00363 CldPutToken((char *)msg);
00364 }
00365
00366 void CldHandler(void *msg)
00367 {
00368 CldInfoFn ifn; CldPackFn pfn;
00369 int len, queueing, priobits; unsigned int *prioptr;
00370
00371 CldRestoreHandler((char *)msg);
00372 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00373 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00374 CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
00375
00376 }
00377
00378 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00379 {
00380 int len, queueing, priobits,i; unsigned int *prioptr;
00381 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00382 CldPackFn pfn;
00383 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00384 if (pfn) {
00385 pfn(&msg);
00386 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00387 }
00388 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00389 CmiSetInfo(msg,infofn);
00390
00391 CmiSyncMulticastAndFree(grp, len, msg);
00392 }
00393
00394 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
00395 {
00396 int len, queueing, priobits,i; unsigned int *prioptr;
00397 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00398 CldPackFn pfn;
00399 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00400 if (pfn) {
00401 pfn(&msg);
00402 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00403 }
00404 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00405 CmiSetInfo(msg,infofn);
00406
00407
00408
00409
00410
00411
00412 CmiSyncListSendAndFree(npes, pes, len, msg);
00413 }
00414
00415 void CldEnqueue(int pe, void *msg, int infofn)
00416 {
00417 int len, queueing, priobits, avg; unsigned int *prioptr;
00418 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00419 CldPackFn pfn;
00420
00421 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00422 avg = CldMinAvg();
00423 if (CldCountTokens() < avg)
00424 pe = CmiMyPe();
00425 else
00426 pe = CpvAccess(MinProc);
00427 #if CMK_NODE_QUEUE_AVAILABLE
00428 if (CmiNodeOf(pe) == CmiMyNode()) {
00429 CldNodeEnqueue(CmiMyNode(), msg, infofn);
00430 return;
00431 }
00432 #endif
00433
00434
00435 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00436 if (pfn && CmiNumNodes()>1) {
00437 pfn(&msg);
00438 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00439 }
00440 if (pe != CmiMyPe()) {
00441 CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
00442 CpvAccess(CldRelocatedMessages)++;
00443 CmiSetInfo(msg,infofn);
00444 CldSwitchHandler((char *)msg, CpvAccess(CldBalanceHandlerIndex));
00445 CmiSyncSendAndFree(pe, len, msg);
00446 }
00447 else {
00448 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00449 CmiSetInfo(msg,infofn);
00450 CldPutToken((char *)msg);
00451 }
00452 }
00453 else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
00454 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00455
00456 CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
00457
00458 }
00459 else {
00460 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00461 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00462 pfn(&msg);
00463 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00464 }
00465 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00466 CmiSetInfo(msg,infofn);
00467 if (pe==CLD_BROADCAST)
00468 CmiSyncBroadcastAndFree(len, msg);
00469 else if (pe==CLD_BROADCAST_ALL)
00470 CmiSyncBroadcastAllAndFree(len, msg);
00471 else CmiSyncSendAndFree(pe, len, msg);
00472 }
00473 }
00474
00475 void CldNodeEnqueue(int node, void *msg, int infofn)
00476 {
00477 int len, queueing, priobits, pe, avg; unsigned int *prioptr;
00478 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00479 CldPackFn pfn;
00480 if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00481 avg = CldMinAvg();
00482 if (CldCountTokens() < avg)
00483 pe = CmiMyPe();
00484 else
00485 pe = CpvAccess(MinProc);
00486 node = CmiNodeOf(pe);
00487 if (node != CmiMyNode()){
00488 CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
00489 CpvAccess(CldRelocatedMessages)++;
00490 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00491 if (pfn) {
00492 pfn(&msg);
00493 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00494 }
00495 CmiSetInfo(msg,infofn);
00496 CldSwitchHandler((char *)msg, CpvAccess(CldBalanceHandlerIndex));
00497 CmiSyncNodeSendAndFree(node, len, msg);
00498 }
00499 else {
00500 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00501
00502
00503 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00504 }
00505 }
00506 else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
00507 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00508
00509 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00510 }
00511 else {
00512 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00513 if (pfn) {
00514 pfn(&msg);
00515 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00516 }
00517 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00518 CmiSetInfo(msg,infofn);
00519 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
00520 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
00521 else CmiSyncNodeSendAndFree(node, len, msg);
00522 }
00523 }
00524
00525 void CldReadNeighborData(void)
00526 {
00527 FILE *fp;
00528 char filename[25];
00529 int i, *pes;
00530
00531 if (CmiNumPes() <= 1)
00532 return;
00533 sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
00534 if ((fp = fopen(filename, "r")) == 0)
00535 {
00536 CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
00537 return;
00538 }
00539 if (fscanf(fp, "%d", &CpvAccess(numNeighbors)) != 1) {
00540 CmiAbort("CLD> reading neighbor data failed!");
00541 }
00542 CpvAccess(neighbors) =
00543 (struct CldNeighborData_s *)calloc(CpvAccess(numNeighbors),
00544 sizeof(struct CldNeighborData_s));
00545 pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
00546 for (i=0; i<CpvAccess(numNeighbors); i++) {
00547 if (fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe)) != 1) {
00548 CmiAbort("CLD> reading neighbor data failed!");
00549 }
00550 pes[i] = CpvAccess(neighbors)[i].pe;
00551 CpvAccess(neighbors)[i].load = 0;
00552 }
00553 fclose(fp);
00554 CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
00555 }
00556
00557 static void CldComputeNeighborData(void)
00558 {
00559 int i, npes;
00560 int *pes;
00561 LBtopoFn topofn;
00562 void *topo;
00563
00564 topofn = LBTopoLookup(_lbtopo);
00565 if (topofn == NULL) {
00566 char str[1024];
00567 CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
00568 printoutTopo();
00569 sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
00570 CmiAbort(str);
00571 }
00572 topo = topofn(CmiNumPes());
00573 npes = getTopoMaxNeighbors(topo);
00574 pes = (int *)malloc(npes*sizeof(int));
00575 getTopoNeighbors(topo, CmiMyPe(), pes, &npes);
00576 #if 0
00577 {
00578 char buf[512], *ptr;
00579 sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npes);
00580 ptr = buf + strlen(buf);
00581 for (i=0; i<npes; i++) {
00582 CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
00583 sprintf(ptr, " %d ", pes[i]);
00584 ptr += strlen(ptr);
00585 }
00586 strcat(ptr, "\n");
00587 CmiPrintf(buf);
00588 }
00589 #endif
00590
00591 CpvAccess(numNeighbors) = npes;
00592 CpvAccess(neighbors) =
00593 (struct CldNeighborData_s *)calloc(npes, sizeof(struct CldNeighborData_s));
00594 for (i=0; i<npes; i++) {
00595 CpvAccess(neighbors)[i].pe = pes[i];
00596 CpvAccess(neighbors)[i].load = 0;
00597 #if ! USE_MULTICAST
00598 CpvAccess(neighbors)[i].index = -1;
00599 #endif
00600 }
00601 CpvAccess(neighborGroup) = CmiEstablishGroup(npes, pes);
00602 free(pes);
00603 }
00604
00605 static void topo_callback(void)
00606 {
00607 CldComputeNeighborData();
00608 #if CMK_MULTICORE
00609 CmiNodeBarrier();
00610 #endif
00611 CldBalancePeriod(NULL, CmiWallTimer());
00612 }
00613
00614 void CldGraphModuleInit(char **argv)
00615 {
00616 CpvInitialize(CldProcInfo, CldData);
00617 CpvInitialize(int, numNeighbors);
00618 CpvInitialize(int, MinLoad);
00619 CpvInitialize(int, Mindex);
00620 CpvInitialize(int, MinProc);
00621 CpvInitialize(int, start);
00622 CpvInitialize(CmiGroup, neighborGroup);
00623 CpvInitialize(CldNeighborData, neighbors);
00624 CpvInitialize(int, CldBalanceHandlerIndex);
00625 CpvInitialize(int, CldLoadResponseHandlerIndex);
00626 CpvInitialize(int, CldAskLoadHandlerIndex);
00627
00628 CpvAccess(start) = -1;
00629 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00630 CpvAccess(CldData)->lastCheck = -1;
00631 CpvAccess(CldData)->sent = 0;
00632 #if CMK_TRACE_ENABLED
00633 CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
00634 CpvAccess(CldData)->updateLoadEvt = traceRegisterUserEvent("UpdateLoad", -1);
00635 CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
00636 CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
00637 #endif
00638
00639 CpvAccess(MinLoad) = 0;
00640 CpvAccess(Mindex) = 0;
00641 CpvAccess(MinProc) = CmiMyPe();
00642 CpvAccess(CldBalanceHandlerIndex) =
00643 CmiRegisterHandler(CldBalanceHandler);
00644 CpvAccess(CldLoadResponseHandlerIndex) =
00645 CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
00646 CpvAccess(CldAskLoadHandlerIndex) =
00647 CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
00648
00649
00650 if (CmiMyRank() == CmiMyNodeSize()) return;
00651
00652 CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
00653 if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
00654
00655 if (CmiNumPes() > 1) {
00656 #if 0
00657 FILE *fp;
00658 char filename[20];
00659
00660 sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
00661 if ((fp = fopen(filename, "r")) == 0)
00662 {
00663 if (CmiMyPe() == 0) {
00664 CmiPrintf("No proper graph%d directory exists in current directory.\n Generating... ", CmiNumPes());
00665 gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
00666 CmiPrintf("done.\n");
00667 }
00668 else {
00669 while (!(fp = fopen(filename, "r"))) ;
00670 fclose(fp);
00671 }
00672 }
00673 else fclose(fp);
00674 CldReadNeighborData();
00675 #endif
00676
00677
00678
00679
00680
00681
00682
00683 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)topo_callback, NULL);
00684
00685 }
00686
00687 if (CmiGetArgIntDesc(argv, "+cldb_neighbor_period", &LBPeriod, "time interval to do neighbor seed lb")) {
00688 CmiAssert(LBPeriod>0);
00689 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod);
00690 }
00691 if (CmiGetArgIntDesc(argv, "+cldb_neighbor_overload", &overload_threshold, "neighbor seed lb's overload threshold")) {
00692 CmiAssert(overload_threshold>0);
00693 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold);
00694 }
00695
00696 #if 1
00697 _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
00698 if (_lbsteal) {
00699
00700 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
00701 (CcdVoidFn) CldBeginIdle, NULL);
00702 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
00703 (CcdVoidFn) CldStillIdle, NULL);
00704 if (CmiMyPe() == 0)
00705 CmiPrintf("Charm++> Work stealing is enabled. \n");
00706 }
00707 #endif
00708 }
00709
00710
00711 void CldModuleInit(char **argv)
00712 {
00713 CpvInitialize(int, CldHandlerIndex);
00714 CpvInitialize(int, CldRelocatedMessages);
00715 CpvInitialize(int, CldLoadBalanceMessages);
00716 CpvInitialize(int, CldMessageChunks);
00717 CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
00718 CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) =
00719 CpvAccess(CldMessageChunks) = 0;
00720
00721 CpvInitialize(loadmsg *, msgpool);
00722 CpvAccess(msgpool) = NULL;
00723
00724 CldModuleGeneralInit(argv);
00725 CldGraphModuleInit(argv);
00726
00727 CpvAccess(CldLoadNotify) = 1;
00728 }