00001
00002 #include <stdlib.h>
00003 #include "queueing.h"
00004 #include "cldb.h"
00005 #include <math.h>
00006
00007 typedef char *BitVector;
00008
00009 CpvDeclare(int, CldHandlerIndex);
00010 CpvDeclare(int, CldNodeHandlerIndex);
00011 CpvDeclare(BitVector, CldPEBitVector);
00012 CpvDeclare(int, CldBalanceHandlerIndex);
00013
00014 CpvDeclare(int, CldRelocatedMessages);
00015 CpvDeclare(int, CldLoadBalanceMessages);
00016 CpvDeclare(int, CldMessageChunks);
00017 CpvDeclare(int, CldLoadNotify);
00018
00019 CpvDeclare(CmiNodeLock, cldLock);
00020
00021 extern void LoadNotifyFn(int);
00022
00023 char* _lbtopo = "torus_nd_5";
00024
00025
00026
00027
00028
00029 void CldRegisterEstimator(CldEstimator fn)
00030 {
00031
00032 }
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 CpvDeclare(int, CldLoadOffset);
00051
00052
00053 int CldRegisterInfoFn(CldInfoFn fn)
00054 {
00055 return CmiRegisterHandler((CmiHandler)fn);
00056 }
00057
00058 int CldRegisterPackFn(CldPackFn fn)
00059 {
00060 return CmiRegisterHandler((CmiHandler)fn);
00061 }
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076 void CldSwitchHandler(char *cmsg, int handler)
00077 {
00078 #if CMK_MEM_CHECKPOINT
00079 int old_phase = CmiGetRestartPhase(cmsg);
00080 #endif
00081 CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
00082 CmiSetHandler(cmsg, handler);
00083 #if CMK_MEM_CHECKPOINT
00084 CmiGetRestartPhase(cmsg) = old_phase;
00085 #endif
00086 }
00087
00088 void CldRestoreHandler(char *cmsg)
00089 {
00090 #if CMK_MEM_CHECKPOINT
00091 int old_phase = CmiGetRestartPhase(cmsg);
00092 #endif
00093 CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
00094 #if CMK_MEM_CHECKPOINT
00095 CmiGetRestartPhase(cmsg) = old_phase;
00096 #endif
00097 }
00098
00099 void Cldhandler(char *);
00100
00101 typedef struct CldToken_s {
00102 char msg_header[CmiMsgHeaderSizeBytes];
00103 char *msg;
00104 struct CldToken_s *pred;
00105 struct CldToken_s *succ;
00106 } *CldToken;
00107
00108 typedef struct CldProcInfo_s {
00109 int tokenhandleridx;
00110 int load;
00111 CldToken sentinel;
00112 } *CldProcInfo;
00113
00114 CpvDeclare(CldProcInfo, CldProc);
00115
00116 static void CldTokenHandler(CldToken tok)
00117 {
00118 CldProcInfo proc = CpvAccess(CldProc);
00119 if (tok->msg) {
00120 tok->pred->succ = tok->succ;
00121 tok->succ->pred = tok->pred;
00122 proc->load --;
00123 CmiHandleMessage(tok->msg);
00124 }
00125 else
00126 CpvAccess(CldLoadOffset)--;
00127 if (CpvAccess(CldLoadNotify))
00128 LoadNotifyFn(CpvAccess(CldProc)->load);
00129 CmiFree(tok);
00130 }
00131
00132 int CldCountTokensRank(int rank)
00133 {
00134 return CpvAccessOther(CldProc, rank)->load;
00135 }
00136
00137 int CldCountTokens(void)
00138 {
00139 return (CpvAccess(CldProc)->load);
00140 }
00141
00142 int CldLoad(void)
00143 {
00144 return (CsdLength() - CpvAccess(CldLoadOffset));
00145 }
00146
00147 int CldLoadRank(int rank)
00148 {
00149 int len, offset;
00150
00151 len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
00152
00153 offset = CpvAccessOther(CldLoadOffset, rank);
00154
00155 return len - offset;
00156 }
00157
00158 void CldPutToken(char *msg)
00159 {
00160 CldProcInfo proc = CpvAccess(CldProc);
00161 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00162 CldToken tok;
00163 int len, queueing, priobits; unsigned int *prioptr;
00164 CldPackFn pfn;
00165
00166 CmiLock(CpvAccess(cldLock));
00167 tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00168 tok->msg = msg;
00169
00170
00171 tok->pred = proc->sentinel->pred;
00172 tok->succ = proc->sentinel;
00173 tok->pred->succ = tok;
00174 tok->succ->pred = tok;
00175 proc->load ++;
00176
00177 CmiSetHandler(tok, proc->tokenhandleridx);
00178 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00179
00180 CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
00181 CmiUnlock(CpvAccess(cldLock));
00182 }
00183
00184 void CldPutTokenPrio(char *msg)
00185 {
00186 CldProcInfo proc = CpvAccess(CldProc);
00187 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00188 CldToken tok, ptr;
00189 int len, queueing, priobits; unsigned int *prioptr, ints;
00190 CldPackFn pfn;
00191
00192 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00193 ints = (priobits+CINTBITS-1)/CINTBITS;
00194
00195 CmiLock(CpvAccess(cldLock));
00196 tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00197 tok->msg = msg;
00198
00199
00200 ptr = proc->sentinel->succ;
00201 while (ptr!=proc->sentinel) {
00202 int len1, queueing1, priobits1; unsigned int *prioptr1, ints1;
00203 CldPackFn pfn1;
00204 ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
00205 ints1 = (priobits1+CINTBITS-1)/CINTBITS;
00206
00207 if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
00208 ptr = ptr->succ;
00209 }
00210
00211
00212 tok->succ = ptr;
00213 tok->pred = ptr->pred;
00214 tok->pred->succ = tok;
00215 tok->succ->pred = tok;
00216 proc->load ++;
00217
00218 CmiSetHandler(tok, proc->tokenhandleridx);
00219
00220 CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
00221 CmiUnlock(CpvAccess(cldLock));
00222 }
00223
00224
00225 static
00226 #if CMK_C_INLINE
00227 inline
00228 #endif
00229 void * _CldGetTokenMsg(CldProcInfo proc)
00230 {
00231 CldToken tok;
00232 void *msg;
00233
00234 tok = proc->sentinel->succ;
00235 if (tok == proc->sentinel) {
00236 return NULL;
00237 }
00238 tok->pred->succ = tok->succ;
00239 tok->succ->pred = tok->pred;
00240 proc->load --;
00241 msg = tok->msg;
00242 tok->msg = 0;
00243 return msg;
00244 }
00245
00246 void CldGetToken(char **msg)
00247 {
00248 CldProcInfo proc = CpvAccess(CldProc);
00249 CmiNodeLock cldlock = CpvAccess(cldLock);
00250 CmiLock(cldlock);
00251 *msg = _CldGetTokenMsg(proc);
00252 if (*msg) CpvAccess(CldLoadOffset)++;
00253 CmiUnlock(cldlock);
00254 }
00255
00256
00257
00258 static
00259 #if CMK_C_INLINE
00260 inline
00261 #endif
00262 void CldGetTokenFromRank(char **msg, int rank)
00263 {
00264 CldProcInfo proc = CpvAccessOther(CldProc, rank);
00265 CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
00266 CmiLock(cldlock);
00267 *msg = _CldGetTokenMsg(proc);
00268 if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
00269 CmiUnlock(cldlock);
00270 }
00271
00272 static
00273 #if CMK_C_INLINE
00274 inline
00275 #endif
00276 void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
00277 {
00278 void *msg;
00279
00280 if (tok == proc->sentinel) return NULL;
00281 tok->pred->succ = tok->succ;
00282 tok->succ->pred = tok->pred;
00283 proc->load --;
00284 msg = tok->msg;
00285 tok->msg = 0;
00286 return msg;
00287 }
00288
00289
00290
00291 static
00292 #if CMK_C_INLINE
00293 inline
00294 #endif
00295 void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
00296 {
00297 CldProcInfo proc = CpvAccessOther(CldProc, rank);
00298 CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
00299 CmiLock(cldlock);
00300 *msg = _CldGetTokenMsgAt(proc, tok);
00301 if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
00302 CmiUnlock(cldlock);
00303 }
00304
00305
00306
00307 int CldPresentPE(int pe)
00308 {
00309 return CpvAccess(CldPEBitVector)[pe];
00310 }
00311
00312 void CldMoveAllSeedsAway()
00313 {
00314 char *msg;
00315 int len, queueing, priobits, pe;
00316 unsigned int *prioptr;
00317 CldInfoFn ifn; CldPackFn pfn;
00318
00319 CldGetToken(&msg);
00320 while (msg != 0) {
00321 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00322 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00323 CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
00324 pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
00325 while (!CldPresentPE(pe))
00326 pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
00327 CmiSyncSendAndFree(pe, len, msg);
00328 CldGetToken(&msg);
00329 }
00330 }
00331
00332 void CldSetPEBitVector(const char *newBV)
00333 {
00334 int i;
00335
00336 for (i=0; i<CmiNumPes(); i++)
00337 CpvAccess(CldPEBitVector)[i] = newBV[i];
00338 if (!CldPresentPE(CmiMyPe()))
00339 CldMoveAllSeedsAway();
00340 }
00341
00342
00343
00344 static int _cldb_cs = 0;
00345
00346 void CldModuleGeneralInit(char **argv)
00347 {
00348 CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
00349 CldProcInfo proc;
00350 int i;
00351
00352 CpvInitialize(CldProcInfo, CldProc);
00353 CpvInitialize(int, CldLoadOffset);
00354 CpvAccess(CldLoadOffset) = 0;
00355 CpvInitialize(int, CldLoadNotify);
00356 CpvInitialize(BitVector, CldPEBitVector);
00357 CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
00358 for (i=0; i<CmiNumPes(); i++)
00359 CpvAccess(CldPEBitVector)[i] = 1;
00360 CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00361 proc = CpvAccess(CldProc);
00362 proc->load = 0;
00363 proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
00364 proc->sentinel = sentinel;
00365 sentinel->succ = sentinel;
00366 sentinel->pred = sentinel;
00367
00368
00369 CpvInitialize(CmiNodeLock, cldLock);
00370 CpvAccess(cldLock) = CmiCreateLock();
00371
00372 _cldb_cs = CmiGetArgFlagDesc(argv, "+cldb_cs", "Converse> Print seed load balancing statistics.");
00373
00374 if (CmiMyPe() == 0) {
00375 char *stra = CldGetStrategy();
00376 if (strcmp(stra, "rand") != 0) {
00377 CmiPrintf("Charm++> %s seed load balancer.\n", stra);
00378 }
00379 }
00380 }
00381
00382
00383
00384
00385
00386 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
00387 {
00388 char **msgs;
00389 int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
00390 unsigned int *prioptr;
00391 CldInfoFn ifn;
00392 CldPackFn pfn;
00393
00394 msgs = (char **)calloc(numToSend, sizeof(char *));
00395 msgSizes = (int *)calloc(numToSend, sizeof(int));
00396
00397 while (!done) {
00398 numSent = 0;
00399 parcelSize = 0;
00400 for (i=0; i<numToSend; i++) {
00401 CldGetTokenFromRank(&msgs[i], rank);
00402 if (msgs[i] != 0) {
00403 done = 1;
00404 numSent++;
00405 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
00406 ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
00407 msgSizes[i] = len;
00408 parcelSize += len;
00409 CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
00410 if (immed) CmiBecomeImmediate(msgs[i]);
00411 }
00412 else {
00413 done = 1;
00414 break;
00415 }
00416 if (parcelSize > MAXMSGBFRSIZE) {
00417 if(i<numToSend-1)
00418 done = 0;
00419 numToSend -= numSent;
00420 break;
00421 }
00422 }
00423 if (numSent > 1) {
00424 if (immed)
00425 CmiMultipleIsend(pe, numSent, msgSizes, msgs);
00426 else
00427 CmiMultipleSend(pe, numSent, msgSizes, msgs);
00428 for (i=0; i<numSent; i++)
00429 CmiFree(msgs[i]);
00430 CpvAccessOther(CldRelocatedMessages, rank) += numSent;
00431 CpvAccessOther(CldMessageChunks, rank)++;
00432 }
00433 else if (numSent == 1) {
00434 if (immed) CmiBecomeImmediate(msgs[0]);
00435 CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
00436 CpvAccessOther(CldRelocatedMessages, rank)++;
00437 CpvAccessOther(CldMessageChunks, rank)++;
00438 }
00439 }
00440 free(msgs);
00441 free(msgSizes);
00442 }
00443
00444
00445
00446
00447
00448 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
00449 {
00450 char **msgs;
00451 int len, queueing, priobits, *msgSizes, i;
00452 unsigned int *prioptr;
00453 CldInfoFn ifn;
00454 CldPackFn pfn;
00455 CldToken tok;
00456 CldProcInfo proc = CpvAccess(CldProc);
00457 int count = 0;
00458
00459 if (numToSend ==0) return;
00460 msgs = (char **)calloc(numToSend, sizeof(char *));
00461 msgSizes = (int *)calloc(numToSend, sizeof(int));
00462
00463 tok = proc->sentinel->succ;
00464 if (tok == proc->sentinel) return;
00465 tok = tok->succ;
00466 while (tok!=proc->sentinel) {
00467 tok = tok->succ;
00468 if (tok == proc->sentinel) break;
00469 CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
00470 if (msgs[i] != 0) {
00471 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
00472 ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
00473 msgSizes[count] = len;
00474 CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
00475 if (immed) CmiBecomeImmediate(msgs[count]);
00476 count ++;
00477 }
00478 tok = tok->succ;
00479 }
00480 if (count > 1) {
00481 if (immed)
00482 CmiMultipleIsend(pe, count, msgSizes, msgs);
00483 else
00484 CmiMultipleSend(pe, count, msgSizes, msgs);
00485 for (i=0; i<count; i++)
00486 CmiFree(msgs[i]);
00487 CpvAccessOther(CldRelocatedMessages, rank) += count;
00488 CpvAccessOther(CldMessageChunks, rank)++;
00489 }
00490 else if (count == 1) {
00491 if (immed) CmiBecomeImmediate(msgs[0]);
00492 CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
00493 CpvAccessOther(CldRelocatedMessages, rank)++;
00494 CpvAccessOther(CldMessageChunks, rank)++;
00495 }
00496 free(msgs);
00497 free(msgSizes);
00498 }
00499
00500
00501 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
00502 {
00503 char *msg;
00504 int len, queueing, priobits, *msgSizes, i, numSent, done=0;
00505 unsigned int *prioptr;
00506 CldInfoFn ifn;
00507 CldPackFn pfn;
00508
00509 if (numToSend == 0)
00510 return;
00511
00512 numSent = 0;
00513 while (!done) {
00514 for (i=0; i<numToSend; i++) {
00515 CldGetTokenFromRank(&msg, rank);
00516 if (msg != 0) {
00517 done = 1;
00518 numToSend--;
00519 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00520 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00521 CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, rank));
00522 CmiSyncSendAndFree(pe, len, msg);
00523 if (numToSend == 0) done = 1;
00524 }
00525 else {
00526 done = 1;
00527 break;
00528 }
00529 }
00530 }
00531 }
00532
00533 void seedBalancerExit()
00534 {
00535 if (_cldb_cs)
00536 CmiPrintf("[%d] Relocate message number is %d\n", CmiMyPe(), CpvAccess(CldRelocatedMessages));
00537 }