00001
00002 #include <stdlib.h>
00003 #include "queueing.h"
00004 #include "cldb.h"
00005 #include <math.h>
00006
00007 typedef char *BitVector;
00008
00009 CpvDeclare(int, CldHandlerIndex);
00010 CpvDeclare(int, CldNodeHandlerIndex);
00011 CpvDeclare(BitVector, CldPEBitVector);
00012 CpvDeclare(int, CldBalanceHandlerIndex);
00013
00014 CpvDeclare(int, CldRelocatedMessages);
00015 CpvDeclare(int, CldLoadBalanceMessages);
00016 CpvDeclare(int, CldMessageChunks);
00017 CpvDeclare(int, CldLoadNotify);
00018
00019 CpvDeclare(CmiNodeLock, cldLock);
00020
00021 extern void LoadNotifyFn(int);
00022
00023 static char s_lbtopo_default[] = "torus_nd_5";
00024 extern char *_lbtopo;
00025 char *_lbtopo = s_lbtopo_default;
00026
00027
00028
00029
00030
00031 void CldRegisterEstimator(CldEstimator fn)
00032 {
00033
00034 }
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 CpvDeclare(int, CldLoadOffset);
00053
00054
00055 int CldRegisterInfoFn(CldInfoFn fn)
00056 {
00057 return CmiRegisterHandler((CmiHandler)fn);
00058 }
00059
00060 int CldRegisterPackFn(CldPackFn fn)
00061 {
00062 return CmiRegisterHandler((CmiHandler)fn);
00063 }
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 void CldSwitchHandler(char *cmsg, int handler)
00079 {
00080 #if CMK_MEM_CHECKPOINT
00081 int old_phase = CmiGetRestartPhase(cmsg);
00082 #endif
00083 CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
00084 CmiSetHandler(cmsg, handler);
00085 #if CMK_MEM_CHECKPOINT
00086 CmiGetRestartPhase(cmsg) = old_phase;
00087 #endif
00088 }
00089
00090 void CldRestoreHandler(char *cmsg)
00091 {
00092 #if CMK_MEM_CHECKPOINT
00093 int old_phase = CmiGetRestartPhase(cmsg);
00094 #endif
00095 CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
00096 #if CMK_MEM_CHECKPOINT
00097 CmiGetRestartPhase(cmsg) = old_phase;
00098 #endif
00099 }
00100
00101 void Cldhandler(char *);
00102
00103 typedef struct CldToken_s {
00104 char msg_header[CmiMsgHeaderSizeBytes];
00105 char *msg;
00106 struct CldToken_s *pred;
00107 struct CldToken_s *succ;
00108 } *CldToken;
00109
00110 typedef struct CldProcInfo_s {
00111 int tokenhandleridx;
00112 int load;
00113 CldToken sentinel;
00114 } *CldProcInfo;
00115
00116 CpvDeclare(CldProcInfo, CldProc);
00117
00118 static void CldTokenHandler(CldToken tok)
00119 {
00120 CldProcInfo proc = CpvAccess(CldProc);
00121 if (tok->msg) {
00122 tok->pred->succ = tok->succ;
00123 tok->succ->pred = tok->pred;
00124 proc->load --;
00125 CmiHandleMessage(tok->msg);
00126 }
00127 else
00128 CpvAccess(CldLoadOffset)--;
00129 if (CpvAccess(CldLoadNotify))
00130 LoadNotifyFn(CpvAccess(CldProc)->load);
00131 CmiFree(tok);
00132 }
00133
00134 int CldCountTokensRank(int rank)
00135 {
00136 return CpvAccessOther(CldProc, rank)->load;
00137 }
00138
00139 int CldCountTokens(void)
00140 {
00141 return (CpvAccess(CldProc)->load);
00142 }
00143
00144 int CldLoad(void)
00145 {
00146 return (CsdLength() - CpvAccess(CldLoadOffset));
00147 }
00148
00149 int CldLoadRank(int rank)
00150 {
00151 int len, offset;
00152
00153 len = CqsLength((Queue)CpvAccessOther(CsdSchedQueue, rank));
00154
00155 offset = CpvAccessOther(CldLoadOffset, rank);
00156
00157 return len - offset;
00158 }
00159
00160 void CldPutToken(char *msg)
00161 {
00162 CldProcInfo proc = CpvAccess(CldProc);
00163 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00164 CldToken tok;
00165 int len, queueing, priobits; unsigned int *prioptr;
00166 CldPackFn pfn;
00167
00168 CmiLock(CpvAccess(cldLock));
00169 tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00170 tok->msg = msg;
00171
00172
00173 tok->pred = proc->sentinel->pred;
00174 tok->succ = proc->sentinel;
00175 tok->pred->succ = tok;
00176 tok->succ->pred = tok;
00177 proc->load ++;
00178
00179 CmiSetHandler(tok, proc->tokenhandleridx);
00180 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00181
00182 CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
00183 CmiUnlock(CpvAccess(cldLock));
00184 }
00185
00186 void CldPutTokenPrio(char *msg)
00187 {
00188 CldProcInfo proc = CpvAccess(CldProc);
00189 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00190 CldToken tok, ptr;
00191 int len, queueing, priobits; unsigned int *prioptr, ints;
00192 CldPackFn pfn;
00193
00194 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00195 ints = (priobits+CINTBITS-1)/CINTBITS;
00196
00197 CmiLock(CpvAccess(cldLock));
00198 tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00199 tok->msg = msg;
00200
00201
00202 ptr = proc->sentinel->succ;
00203 while (ptr!=proc->sentinel) {
00204 int len1, queueing1, priobits1; unsigned int *prioptr1, ints1;
00205 CldPackFn pfn1;
00206 ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
00207 ints1 = (priobits1+CINTBITS-1)/CINTBITS;
00208
00209 if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
00210 ptr = ptr->succ;
00211 }
00212
00213
00214 tok->succ = ptr;
00215 tok->pred = ptr->pred;
00216 tok->pred->succ = tok;
00217 tok->succ->pred = tok;
00218 proc->load ++;
00219
00220 CmiSetHandler(tok, proc->tokenhandleridx);
00221
00222 CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
00223 CmiUnlock(CpvAccess(cldLock));
00224 }
00225
00226
00227 static
00228 #if CMK_C_INLINE
00229 inline
00230 #endif
00231 void * _CldGetTokenMsg(CldProcInfo proc)
00232 {
00233 CldToken tok;
00234 void *msg;
00235
00236 tok = proc->sentinel->succ;
00237 if (tok == proc->sentinel) {
00238 return NULL;
00239 }
00240 tok->pred->succ = tok->succ;
00241 tok->succ->pred = tok->pred;
00242 proc->load --;
00243 msg = tok->msg;
00244 tok->msg = 0;
00245 return msg;
00246 }
00247
00248 void CldGetToken(char **msg)
00249 {
00250 CldProcInfo proc = CpvAccess(CldProc);
00251 CmiNodeLock cldlock = CpvAccess(cldLock);
00252 CmiLock(cldlock);
00253 *msg = (char *)_CldGetTokenMsg(proc);
00254 if (*msg) CpvAccess(CldLoadOffset)++;
00255 CmiUnlock(cldlock);
00256 }
00257
00258
00259
00260 static
00261 #if CMK_C_INLINE
00262 inline
00263 #endif
00264 void CldGetTokenFromRank(char **msg, int rank)
00265 {
00266 CldProcInfo proc = CpvAccessOther(CldProc, rank);
00267 CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
00268 CmiLock(cldlock);
00269 *msg = (char *)_CldGetTokenMsg(proc);
00270 if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
00271 CmiUnlock(cldlock);
00272 }
00273
00274 static
00275 #if CMK_C_INLINE
00276 inline
00277 #endif
00278 void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
00279 {
00280 void *msg;
00281
00282 if (tok == proc->sentinel) return NULL;
00283 tok->pred->succ = tok->succ;
00284 tok->succ->pred = tok->pred;
00285 proc->load --;
00286 msg = tok->msg;
00287 tok->msg = 0;
00288 return msg;
00289 }
00290
00291
00292
00293 static
00294 #if CMK_C_INLINE
00295 inline
00296 #endif
00297 void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
00298 {
00299 CldProcInfo proc = CpvAccessOther(CldProc, rank);
00300 CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
00301 CmiLock(cldlock);
00302 *msg = (char *)_CldGetTokenMsgAt(proc, tok);
00303 if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
00304 CmiUnlock(cldlock);
00305 }
00306
00307
00308
00309 int CldPresentPE(int pe)
00310 {
00311 return CpvAccess(CldPEBitVector)[pe];
00312 }
00313
00314 void CldMoveAllSeedsAway(void)
00315 {
00316 char *msg;
00317 int len, queueing, priobits, pe;
00318 unsigned int *prioptr;
00319 CldInfoFn ifn; CldPackFn pfn;
00320
00321 CldGetToken(&msg);
00322 while (msg != 0) {
00323 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00324 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00325 CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
00326 pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
00327 while (!CldPresentPE(pe))
00328 pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
00329 CmiSyncSendAndFree(pe, len, msg);
00330 CldGetToken(&msg);
00331 }
00332 }
00333
00334 void CldSetPEBitVector(const char *newBV)
00335 {
00336 int i;
00337
00338 for (i=0; i<CmiNumPes(); i++)
00339 CpvAccess(CldPEBitVector)[i] = newBV[i];
00340 if (!CldPresentPE(CmiMyPe()))
00341 CldMoveAllSeedsAway();
00342 }
00343
00344
00345
00346 static int _cldb_cs = 0;
00347
00348 void CldModuleGeneralInit(char **argv)
00349 {
00350 CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00351 CldProcInfo proc;
00352 int i;
00353
00354 CpvInitialize(CldProcInfo, CldProc);
00355 CpvInitialize(int, CldLoadOffset);
00356 CpvAccess(CldLoadOffset) = 0;
00357 CpvInitialize(int, CldLoadNotify);
00358 CpvInitialize(BitVector, CldPEBitVector);
00359 CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
00360 for (i=0; i<CmiNumPes(); i++)
00361 CpvAccess(CldPEBitVector)[i] = 1;
00362 CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00363 proc = CpvAccess(CldProc);
00364 proc->load = 0;
00365 proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
00366 proc->sentinel = sentinel;
00367 sentinel->succ = sentinel;
00368 sentinel->pred = sentinel;
00369
00370
00371 CpvInitialize(CmiNodeLock, cldLock);
00372 CpvAccess(cldLock) = CmiCreateLock();
00373
00374 _cldb_cs = CmiGetArgFlagDesc(argv, "+cldb_cs", "Converse> Print seed load balancing statistics.");
00375
00376 if (CmiMyPe() == 0) {
00377 const char *stra = CldGetStrategy();
00378 if (strcmp(stra, "rand") != 0) {
00379 CmiPrintf("Charm++> %s seed load balancer.\n", stra);
00380 }
00381 }
00382 }
00383
00384
00385
00386
00387
00388 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
00389 {
00390 char **msgs;
00391 int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
00392 unsigned int *prioptr;
00393 CldInfoFn ifn;
00394 CldPackFn pfn;
00395
00396 msgs = (char **)calloc(numToSend, sizeof(char *));
00397 msgSizes = (int *)calloc(numToSend, sizeof(int));
00398
00399 while (!done) {
00400 numSent = 0;
00401 parcelSize = 0;
00402 for (i=0; i<numToSend; i++) {
00403 CldGetTokenFromRank(&msgs[i], rank);
00404 if (msgs[i] != 0) {
00405 done = 1;
00406 numSent++;
00407 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
00408 ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
00409 msgSizes[i] = len;
00410 parcelSize += len;
00411 CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
00412 if (immed) CmiBecomeImmediate(msgs[i]);
00413 }
00414 else {
00415 done = 1;
00416 break;
00417 }
00418 if (parcelSize > MAXMSGBFRSIZE) {
00419 if(i<numToSend-1)
00420 done = 0;
00421 numToSend -= numSent;
00422 break;
00423 }
00424 }
00425 if (numSent > 1) {
00426 if (immed)
00427 CmiMultipleIsend(pe, numSent, msgSizes, msgs);
00428 else
00429 CmiMultipleSend(pe, numSent, msgSizes, msgs);
00430 for (i=0; i<numSent; i++)
00431 CmiFree(msgs[i]);
00432 CpvAccessOther(CldRelocatedMessages, rank) += numSent;
00433 CpvAccessOther(CldMessageChunks, rank)++;
00434 }
00435 else if (numSent == 1) {
00436 if (immed) CmiBecomeImmediate(msgs[0]);
00437 CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
00438 CpvAccessOther(CldRelocatedMessages, rank)++;
00439 CpvAccessOther(CldMessageChunks, rank)++;
00440 }
00441 }
00442 free(msgs);
00443 free(msgSizes);
00444 }
00445
00446
00447
00448
00449
00450 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
00451 {
00452 char **msgs;
00453 int len, queueing, priobits, *msgSizes, i;
00454 unsigned int *prioptr;
00455 CldInfoFn ifn;
00456 CldPackFn pfn;
00457 CldToken tok;
00458 CldProcInfo proc = CpvAccess(CldProc);
00459 int count = 0;
00460
00461 if (numToSend ==0) return;
00462 msgs = (char **)calloc(numToSend, sizeof(char *));
00463 msgSizes = (int *)calloc(numToSend, sizeof(int));
00464
00465 tok = proc->sentinel->succ;
00466 if (tok == proc->sentinel) {
00467 free(msgs);
00468 free(msgSizes);
00469 return;
00470 }
00471 tok = tok->succ;
00472 while (tok!=proc->sentinel) {
00473 tok = tok->succ;
00474 if (tok == proc->sentinel) break;
00475 CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
00476 if (msgs[count] != 0) {
00477 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[count]));
00478 ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
00479 msgSizes[count] = len;
00480 CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
00481 if (immed) CmiBecomeImmediate(msgs[count]);
00482 count ++;
00483 }
00484 tok = tok->succ;
00485 }
00486 if (count > 1) {
00487 if (immed)
00488 CmiMultipleIsend(pe, count, msgSizes, msgs);
00489 else
00490 CmiMultipleSend(pe, count, msgSizes, msgs);
00491 for (i=0; i<count; i++)
00492 CmiFree(msgs[i]);
00493 CpvAccessOther(CldRelocatedMessages, rank) += count;
00494 CpvAccessOther(CldMessageChunks, rank)++;
00495 }
00496 else if (count == 1) {
00497 if (immed) CmiBecomeImmediate(msgs[0]);
00498 CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
00499 CpvAccessOther(CldRelocatedMessages, rank)++;
00500 CpvAccessOther(CldMessageChunks, rank)++;
00501 }
00502 free(msgs);
00503 free(msgSizes);
00504 }
00505
00506
00507 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
00508 {
00509 char *msg;
00510 int len, queueing, priobits, *msgSizes, i, numSent, done=0;
00511 unsigned int *prioptr;
00512 CldInfoFn ifn;
00513 CldPackFn pfn;
00514
00515 if (numToSend == 0)
00516 return;
00517
00518 numSent = 0;
00519 while (!done) {
00520 for (i=0; i<numToSend; i++) {
00521 CldGetTokenFromRank(&msg, rank);
00522 if (msg != 0) {
00523 done = 1;
00524 numToSend--;
00525 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00526 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00527 CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, rank));
00528 CmiSyncSendAndFree(pe, len, msg);
00529 if (numToSend == 0) done = 1;
00530 }
00531 else {
00532 done = 1;
00533 break;
00534 }
00535 }
00536 }
00537 }
00538
00539 void seedBalancerExit(void)
00540 {
00541 if (_cldb_cs)
00542 CmiPrintf("[%d] Relocate message number is %d\n", CmiMyPe(), CpvAccess(CldRelocatedMessages));
00543 }