00001 #include "CkLoop.h"
00002 #if !defined(_WIN32)
00003 #include <unistd.h>
00004 #include <pthread.h>
00005 #endif
00006
00007 #if !USE_CONVERSE_NOTIFICATION
00008 #include "qd.h"
00009 #endif
00010
00011 #if CMK_NODE_QUEUE_AVAILABLE
00012 void CmiPushNode(void *msg);
00013 #endif
00014
00015 #define CKLOOP_USECHARM 1
00016 #define CKLOOP_PTHREAD 2
00017 #define CKLOOP_NOOP 3
00018
00019
00020 #if !CMK_SMP && !defined(_WIN32)
00021 static CMK_THREADLOCAL pthread_cond_t thdCondition;
00022 static CMK_THREADLOCAL pthread_mutex_t thdLock;
00023 #endif
00024
00025 static FuncCkLoop *mainHelper = NULL;
00026 static int mainHelperPhyRank = 0;
00027 static int numPhysicalPEs = 0;
00028 static CurLoopInfo *pthdLoop = NULL;
00029 #if !defined(_WIN32)
00030 static pthread_mutex_t **allLocks = NULL;
00031 static pthread_cond_t **allConds = NULL;
00032 static pthread_t *ndhThreads = NULL;
00033 #endif
00034 static volatile int gCrtCnt = 0;
00035 static volatile int exitFlag = 0;
00036
00037 #if CMK_OS_IS_LINUX || CMK_USING_BGCLANG
00038 #include <sys/syscall.h>
00039 #endif
00040
00041 static int HelperOnCore() {
00042 #if CMK_OS_IS_LINUX
00043 char fname[64];
00044 sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
00045 FILE *ifp = fopen(fname, "r");
00046 if (ifp == NULL) return -1;
00047 fseek(ifp, 0, SEEK_SET);
00048 char str[128];
00049 for (int i=0; i<39; i++) fscanf(ifp, "%127s", str);
00050 fclose(ifp);
00051 return atoi(str);
00052 #else
00053 return -1;
00054 #endif
00055 }
00056
00057 static void *ndhThreadWork(void *id) {
00058 #if !CMK_SMP && !defined(_WIN32)
00059 size_t myId = (size_t) id;
00060
00061
00062 int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
00063
00064 myPhyRank = myId;
00065 CmiSetCPUAffinity(myPhyRank);
00066
00067 pthread_mutex_init(&thdLock, NULL);
00068 pthread_cond_init(&thdCondition, NULL);
00069
00070 allLocks[myId-1] = &thdLock;
00071 allConds[myId-1] = &thdCondition;
00072
00073 __sync_add_and_fetch(&gCrtCnt, 1);
00074
00075 while (1) {
00076
00077 if (exitFlag) break;
00078 pthread_mutex_lock(&thdLock);
00079 pthread_cond_wait(&thdCondition, &thdLock);
00080 pthread_mutex_unlock(&thdLock);
00081
00082 if (mainHelper->getSchedPolicy() == CKLOOP_TREE) {
00083
00084 int myKid = myId*TREE_BCAST_BRANCH+1;
00085 for (int i=0; i<TREE_BCAST_BRANCH; i++, myKid++) {
00086 if (myKid >= mainHelper->getNumHelpers()) break;
00087
00088 pthread_mutex_lock(allLocks[myKid-1]);
00089 pthread_cond_signal(allConds[myKid-1]);
00090 pthread_mutex_unlock(allLocks[myKid-1]);
00091 }
00092 }
00093 pthdLoop->stealWork();
00094 }
00095 return NULL;
00096 #else
00097 return NULL;
00098 #endif
00099 }
00100
00101 void FuncCkLoop::createPThreads() {
00102 #if !defined(_WIN32)
00103 int numThreads = numHelpers - 1;
00104 allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
00105 allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
00106 memset(allLocks, 0, sizeof(void *)*numThreads);
00107 memset(allConds, 0, sizeof(void *)*numThreads);
00108
00109 pthread_attr_t attr;
00110 pthread_attr_init(&attr);
00111 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00112 ndhThreads = new pthread_t[numThreads];
00113 mainHelperPhyRank = CmiOnCore();
00114 numPhysicalPEs = CmiNumCores();
00115 if (mainHelperPhyRank == -1) mainHelperPhyRank = 0;
00116 for (int i=1; i<=numThreads; i++) {
00117 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)(intptr_t)i);
00118 }
00119 while (gCrtCnt != numThreads);
00120 #endif
00121 }
00122
00123 void FuncCkLoop::exit() {
00124 #if !defined(_WIN32)
00125 if (mode == CKLOOP_PTHREAD) {
00126 exitFlag = 1;
00127 for (int i=0; i<numHelpers-1; i++)
00128 pthread_join(ndhThreads[i], NULL);
00129 delete [] ndhThreads;
00130 free(allLocks);
00131 free(allConds);
00132 delete pthdLoop;
00133 }
00134 #endif
00135 }
00136
00137
00138
00139
00140
00141 #define CKLOOP_TOTAL_WORK_EVENTID 139
00142 #define CKLOOP_FINISH_SIGNAL_EVENTID 143
00143 #define CKLOOP_STATIC_CHUNK_WORK 998
00144 #define CKLOOP_DYNAMIC_CHUNK_WORK 999
00145
00146 static FuncCkLoop *globalCkLoop = NULL;
00147
00148 FuncCkLoop::FuncCkLoop(int mode_, int numThreads_) {
00149 init(mode_, numThreads_);
00150 }
00151
00152 void FuncCkLoop::init(int mode_, int numThreads_) {
00153 traceRegisterUserEvent("ckloop total work",CKLOOP_TOTAL_WORK_EVENTID);
00154 traceRegisterUserEvent("ckloop finish signal",CKLOOP_FINISH_SIGNAL_EVENTID);
00155
00156 mode = mode_;
00157 loop_info_inited_lock = CmiCreateLock();
00158
00159 CmiAssert(globalCkLoop==NULL);
00160 globalCkLoop = this;
00161
00162 if (mode == CKLOOP_USECHARM) {
00163
00164 numHelpers = CkMyNodeSize();
00165 helperPtr = new FuncSingleHelper *[numHelpers];
00166 #if CMK_NODE_QUEUE_AVAILABLE
00167 schedPolicy = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD ? CKLOOP_NODE_QUEUE : CKLOOP_LIST);
00168 #else
00169 schedPolicy = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD ? CKLOOP_TREE : CKLOOP_LIST);
00170 #endif
00171 int pestart = CkNodeFirst(CkMyNode());
00172
00173 for (int i=0; i<numHelpers; i++) {
00174 CkChareID helper;
00175 CProxy_FuncSingleHelper::ckNew(&helper, pestart+i);
00176 }
00177 } else if (mode == CKLOOP_PTHREAD) {
00178 helperPtr = NULL;
00179
00180 numHelpers = numThreads_;
00181 schedPolicy = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD ? CKLOOP_TREE : CKLOOP_LIST);
00182 pthdLoop = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
00183 mainHelper = this;
00184 createPThreads();
00185 }
00186 }
00187
00188 FuncCkLoop::FuncCkLoop(CkMigrateMessage *m) : CBase_FuncCkLoop(m) {
00189 }
00190
00191 int FuncCkLoop::MAX_CHUNKS = 64;
00192
00193 #if CMK_TRACE_ENABLED
00194 #define TRACE_START(id) _start = CmiWallTimer()
00195 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
00196 #else
00197 #define TRACE_START(id)
00198 #define TRACE_BRACKET(id)
00199 #endif
00200
00201 #define ALLOW_MULTIPLE_UNSYNC 1
00202 void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
00203 int numChunks, int lowerRange,
00204 int upperRange, int sync,
00205 void *redResult, REDUCTION_TYPE type,
00206 CallerFn cfunc,
00207 int cparamNum, void * cparam) {
00208
00209 double _start;
00210
00211 if (numChunks > MAX_CHUNKS) {
00212 numChunks = MAX_CHUNKS;
00213 }
00214
00215
00216
00217
00218
00219
00220 CurLoopInfo *curLoop = NULL;
00221
00222
00223 TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 if (mode == CKLOOP_NOOP || numChunks + !!cfunc < 2) {
00234 func(lowerRange, upperRange, redResult, paramNum, param);
00235 if (cfunc != NULL) {
00236 cfunc(cparamNum, cparam);
00237 }
00238 return;
00239 } else if (mode == CKLOOP_USECHARM) {
00240 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
00241 #if USE_CONVERSE_NOTIFICATION
00242 #if ALLOW_MULTIPLE_UNSYNC
00243 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
00244 #else
00245 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
00246 #endif
00247 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
00248 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
00249 #if CMK_TRACE_ENABLED
00250 envelope *env = CpvAccess(dummyEnv);
00251 #endif
00252 CmiMemoryReadFence();
00253 if (schedPolicy == CKLOOP_NODE_QUEUE) {
00254 #if CMK_NODE_QUEUE_AVAILABLE
00255 notifyMsg->queueID = NODE_Q;
00256 #if CMK_TRACE_ENABLED
00257 int loopTimes = CkMyNodeSize();
00258 _TRACE_CREATION_N(env, loopTimes);
00259 notifyMsg->eventID = env->getEvent();
00260 #endif
00261 notifyMsg->srcRank = CmiMyRank();
00262 CmiPushNode((void *)(notifyMsg));
00263 #else
00264 CkAbort("SchedPolicy, CKLOOP_NODE_QUEUE is not available on this environment\n");
00265 #endif
00266 }
00267 else if (schedPolicy == CKLOOP_TREE) {
00268 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
00269
00270 int pe = CmiMyRank()+1;
00271 notifyMsg->queueID = NODE_Q;
00272 #if CMK_TRACE_ENABLED
00273 _TRACE_CREATION_N(env, loopTimes);
00274 notifyMsg->eventID =env->getEvent();
00275 #endif
00276 for (int i=0; i<loopTimes; i++, pe++) {
00277 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
00278 CmiPushPE(pe, (void *)(notifyMsg));
00279 }
00280 } else {
00281 notifyMsg->queueID = PE_Q;
00282 #if CMK_TRACE_ENABLED
00283 _TRACE_CREATION_N(env, numHelpers-1);
00284 notifyMsg->eventID = env->getEvent();
00285 #endif
00286 for (int i=CmiMyRank()+1; i<numHelpers; i++) {
00287 if (CpvAccessOther(isHelperOn, i))
00288 CmiPushPE(i, (void *)(notifyMsg));
00289 }
00290 for (int i=0; i<CmiMyRank(); i++) {
00291 if (CpvAccessOther(isHelperOn, i))
00292 CmiPushPE(i, (void *)(notifyMsg));
00293 }
00294 }
00295 #else
00296 #if ALLOW_MULTIPLE_UNSYNC
00297 curLoop = thisHelper->getNewTask();
00298 #else
00299 curLoop = thisHelper->taskBuffer[0];
00300 #endif
00301 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
00302 CpvAccess(_qd)->create(numHelpers-1);
00303 CmiMemoryReadFence();
00304 if (schedPolicy == CKLOOP_TREE) {
00305 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
00306
00307 int pe = CmiMyRank()+1;
00308 for (int i=0; i<loopTimes; i++, pe++) {
00309 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
00310 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
00311 one->ptr = (void *)curLoop;
00312 envelope *env = UsrToEnv(one);
00313 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
00314 CmiPushPE(pe, (void *)(env));
00315 }
00316 } else {
00317 for (int i=CmiMyRank()+1; i<numHelpers; i++) {
00318 if (!CpvAccessOther(isHelperOn, i)) continue;
00319 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
00320 one->ptr = (void *)curLoop;
00321 envelope *env = UsrToEnv(one);
00322 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
00323
00324 CmiPushPE(i, (void *)(env));
00325 }
00326 for (int i=0; i<CmiMyRank(); i++) {
00327 if (!CpvAccessOther(isHelperOn, i)) continue;
00328 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
00329 one->ptr = (void *)curLoop;
00330 envelope *env = UsrToEnv(one);
00331 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
00332
00333 CmiPushPE(i, (void *)(env));
00334 }
00335 }
00336 #endif
00337 } else if (mode == CKLOOP_PTHREAD) {
00338
00339 #if !defined(_WIN32)
00340 int numThreads = numHelpers-1;
00341 curLoop = pthdLoop;
00342 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
00343 int numNotices = numThreads;
00344 if (schedPolicy == CKLOOP_TREE) {
00345 numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
00346 }
00347 for (int i=0; i<numNotices; i++) {
00348 pthread_mutex_lock(allLocks[i]);
00349 pthread_cond_signal(allConds[i]);
00350 pthread_mutex_unlock(allLocks[i]);
00351 }
00352
00353 sync = 1;
00354 #endif
00355 }
00356
00357
00358 if (cfunc != NULL) {
00359 cfunc(cparamNum, cparam);
00360 }
00361
00362 if(curLoop) curLoop->stealWork();
00363 TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
00364
00365
00366
00367 TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
00368 curLoop->waitLoopDone(sync);
00369 TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
00370
00371
00372
00373 if (type!=CKLOOP_NONE)
00374 reduce(curLoop->getRedBufs(), redResult, type, numChunks);
00375 return;
00376 }
00377
00378 CpvStaticDeclare(int, chunkHandler);
00379 CpvStaticDeclare(int, hybridHandler);
00380
00381 void FuncCkLoop::parallelizeFuncHybrid(float staticFraction, HelperFn func, int paramNum, void * param,
00382 int numChunks, int lowerRange,
00383 int upperRange, int sync,
00384 void *redResult, REDUCTION_TYPE type,
00385 CallerFn cfunc,
00386 int cparamNum, void * cparam) {
00387 double _start;
00388 if (numChunks > MAX_CHUNKS) {
00389 numChunks = MAX_CHUNKS;
00390 }
00391
00392 #ifdef CMK_PAPI_PROFILING
00393 int num_hwcntrs = 2;
00394 int Events[2] = {PAPI_L2_DCM, PAPI_L3_DCM};
00395 long_long values[2];
00396 #endif
00397
00398 #ifdef CMK_PAPI_PROFILING
00399 if (PAPI_start_counters(Events, num_hwcntrs) != PAPI_OK) CkPrintf("Error with creating event set \n");
00400 #endif
00401
00402
00403 TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 CurLoopInfo* curLoop = new CurLoopInfo(numHelpers);
00422 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
00423 curLoop->setStaticFraction(staticFraction);
00424 curLoop->setReductionType(type);
00425 void** redBufs = curLoop->getRedBufs();
00426 if(type == CKLOOP_INT_SUM)
00427 for(int i=0; i<numHelpers; i++)
00428 *((int*)redBufs[i]) = 0;
00429 else if((type == CKLOOP_DOUBLE_SUM) || (type == CKLOOP_DOUBLE_MAX))
00430 for(int i=0; i<numHelpers; i++)
00431 *((double*)redBufs[i]) = 0.0;
00432 else if(type == CKLOOP_FLOAT_SUM)
00433 for(int i=0; i<numHelpers; i++)
00434 *((float*)redBufs[i]) = 0.0;
00435 LoopChunkMsg* msg = new LoopChunkMsg;
00436 msg->loopRec = curLoop;
00437 CmiSetHandler(msg, CpvAccess(hybridHandler));
00438 for (int i=1; i<numHelpers; i++) {
00439 CmiPushPE(i, (void*)msg);
00440 }
00441
00442 if (cfunc != NULL) {
00443 cfunc(cparamNum, cparam);
00444 }
00445 curLoop->doWorkForMyPe();
00446
00447 #if CMK_SMP && CMK_TASKQUEUE
00448 while(1) {
00449 void* msg = TaskQueuePop((TaskQueue)CpvAccess(CsdTaskQueue));
00450 if (msg == NULL) break;
00451 CmiHandleMessage(msg);
00452 }
00453 #endif
00454
00455
00456 curLoop->waitLoopDoneHybrid(1);
00457
00458
00459
00460
00461 TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
00462
00463 #ifdef CMK_PAPI_PROFILING
00464 if (PAPI_stop_counters(values, num_hwcntrs) != PAPI_OK) CkPrintf("Error with stopping counters!\n");
00465 #endif
00466
00467 #ifdef CMK_PAPI_PROFILING
00468 if (PAPI_read_counters(values, num_hwcntrs) != PAPI_OK) CkPrintf("Error with reading counters!\n");
00469 #endif
00470
00471
00472 TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
00473 TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
00474
00475 if (type!=CKLOOP_NONE)
00476 reduce(curLoop->getRedBufs(), redResult, type, numHelpers);
00477
00478 delete curLoop;
00479 delete msg;
00480 }
00481
00482 #define COMPUTE_REDUCTION(T) {\
00483 for(int i=0; i<numChunks; i++) {\
00484 result += *((T *)(redBufs[i])); \
00485 \
00486 }\
00487 }
00488 #define COMPUTE_REDUCTION_MAX(T) {\
00489 for(int i=0; i<numChunks; i++) {\
00490 if( *((T *)(redBufs[i])) > result ) result = *((T *)(redBufs[i])); \
00491 \
00492 }\
00493 }
00494
00495 void FuncCkLoop::destroyHelpers() {
00496 int pe = CmiMyRank()+1;
00497 for (int i = 0; i < numHelpers; i++) {
00498 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
00499 DestroyNotifyMsg *tmp = new DestroyNotifyMsg;
00500 envelope *env = UsrToEnv(tmp);
00501 env->setMsgtype(ForChareMsg);
00502 env->setEpIdx(CkIndex_FuncSingleHelper::destroyMyself());
00503 env->setSrcPe(CkMyPe());
00504 CmiSetHandler(env, _charmHandlerIdx);
00505 CmiPushPE(pe, (void *)(env));
00506 }
00507 }
00508
00509 void FuncCkLoop::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
00510 switch (type) {
00511 case CKLOOP_INT_SUM: {
00512 int result=0;
00513 COMPUTE_REDUCTION(int)
00514 *((int *)redBuf) = result;
00515 break;
00516 }
00517 case CKLOOP_FLOAT_SUM: {
00518 float result=0;
00519 COMPUTE_REDUCTION(float)
00520 *((float *)redBuf) = result;
00521 break;
00522 }
00523 case CKLOOP_DOUBLE_SUM: {
00524 double result=0;
00525 COMPUTE_REDUCTION(double)
00526 *((double *)redBuf) = result;
00527 break;
00528 }
00529 case CKLOOP_DOUBLE_MAX: {
00530 double result=0;
00531 COMPUTE_REDUCTION_MAX(double)
00532 *((double *)redBuf) = result;
00533 break;
00534 }
00535 default:
00536 break;
00537 }
00538 }
00539
00540 void FuncCkLoop::registerHelper(HelperNotifyMsg* msg) {
00541 helperPtr[msg->srcRank] = msg->localHelper;
00542 msg->localHelper->thisCkLoop = this;
00543 delete msg;
00544 }
00545
00546 void FuncCkLoop::pup(PUP::er &p) {
00547 p|mode;
00548 p|numHelpers;
00549 if (p.isUnpacking()) {
00550 init(mode, numHelpers);
00551 }
00552 }
00553
00554 static int _ckloopEP;
00555 CpvStaticDeclare(int, NdhStealWorkHandler);
00556 static void RegisterCkLoopHdlrs() {
00557 CpvInitialize(int, NdhStealWorkHandler);
00558
00559
00560 CpvInitialize(int, hybridHandler);
00561 CpvAccess(hybridHandler) = CmiRegisterHandler((CmiHandler)hybridHandlerFunc);
00562 CpvInitialize(int, chunkHandler);
00563 CpvAccess(chunkHandler) = CmiRegisterHandler((CmiHandler)executeChunk);
00564
00565 #if CMK_TRACE_ENABLED
00566 CpvInitialize(envelope*, dummyEnv);
00567 CpvAccess(dummyEnv) = envelope::alloc(ForChareMsg,0,0);
00568 #endif
00569 CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
00570 #ifdef __BIGSIM__
00571 if(BgNodeRank()==0) {
00572 #else
00573 if(CkMyRank()==0) {
00574 #endif
00575 int _ckloopMsg = CkRegisterMsg("ckloop_converse_msg", 0, 0, 0, 0);
00576 int _ckloopChare = CkRegisterChare("ckloop_converse_chare", 0, TypeInvalid);
00577 CkRegisterChareInCharm(_ckloopChare);
00578 _ckloopEP = CkRegisterEp("CkLoop", (CkCallFnPtr)SingleHelperStealWork, _ckloopMsg, _ckloopChare, 0+CK_EP_INTRINSIC);
00579 }
00580 }
00581
00582 extern int _charmHandlerIdx;
00583
00584 FuncSingleHelper::FuncSingleHelper() {
00585 CmiAssert(globalCkLoop!=NULL);
00586 thisCkLoop = globalCkLoop;
00587 totalHelpers = globalCkLoop->numHelpers;
00588 funcckproxy = globalCkLoop->thisProxy;
00589 schedPolicy = globalCkLoop->schedPolicy;
00590
00591 createNotifyMsg();
00592
00593 globalCkLoop->helperPtr[CkMyRank()] = this;
00594 }
00595
00596 void FuncSingleHelper::createNotifyMsg() {
00597 #if USE_CONVERSE_NOTIFICATION
00598 notifyMsgBufSize = TASK_BUFFER_SIZE;
00599 #else
00600 notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
00601 #endif
00602
00603 nextFreeNotifyMsg = 0;
00604 #if USE_CONVERSE_NOTIFICATION
00605 notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
00606 for (int i=0; i<notifyMsgBufSize; i++) {
00607 ConverseNotifyMsg *tmp = notifyMsg+i;
00608 if (schedPolicy == CKLOOP_NODE_QUEUE || schedPolicy == CKLOOP_TREE) {
00609 tmp->srcRank = CmiMyRank();
00610 } else {
00611 tmp->srcRank = -1;
00612 }
00613 tmp->ptr = (void *)(new CurLoopInfo(FuncCkLoop::MAX_CHUNKS));
00614 CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
00615 }
00616 #else
00617 nextFreeTaskBuffer = 0;
00618 notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
00619 for (int i=0; i<notifyMsgBufSize; i++) {
00620 CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg;
00621 notifyMsg[i] = tmp;
00622 if (schedPolicy == CKLOOP_NODE_QUEUE || schedPolicy == CKLOOP_TREE) {
00623 tmp->srcRank = CmiMyRank();
00624 } else {
00625 tmp->srcRank = -1;
00626 }
00627 tmp->ptr = NULL;
00628 envelope *env = UsrToEnv(tmp);
00629 env->setMsgtype(ForChareMsg);
00630 env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
00631 env->setSrcPe(CkMyPe());
00632 CmiSetHandler(env, _charmHandlerIdx);
00633
00634 }
00635 taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
00636 for (int i=0; i<TASK_BUFFER_SIZE; i++) {
00637 taskBuffer[i] = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
00638 }
00639 #endif
00640 }
00641
00642 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg) {
00643 #if !USE_CONVERSE_NOTIFICATION
00644 int srcRank = msg->srcRank;
00645 CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
00646 if (srcRank >= 0) {
00647
00648 int relPE = CmiMyRank()-msg->srcRank;
00649 if (relPE<0) relPE += CmiMyNodeSize();
00650
00651
00652 relPE=relPE*TREE_BCAST_BRANCH+1;
00653 for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
00654 if (relPE >= CmiMyNodeSize()) break;
00655 int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
00656 if (!CpvAccessOther(isHelperOn, pe)) continue;
00657
00658 CharmNotifyMsg *newone = getNotifyMsg();
00659 newone->ptr = (void *)loop;
00660 envelope *env = UsrToEnv(newone);
00661 env->setObjPtr(thisCkLoop->helperPtr[pe]->ckGetChareID().objPtr);
00662 CmiPushPE(pe, (void *)env);
00663 }
00664 }
00665 loop->stealWork();
00666 #endif
00667 }
00668
00669 void SingleHelperStealWork(ConverseNotifyMsg *msg) {
00670 int srcRank = msg->srcRank;
00671 CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
00672
00673 if (srcRank >= 0 && !loop->isFree()) {
00674 if (msg->queueID == NODE_Q && globalCkLoop->getSchedPolicy() == CKLOOP_NODE_QUEUE ) {
00675 msg->queueID = PE_Q;
00676 int myRank = CmiMyRank();
00677 if ( srcRank == myRank ) return;
00678
00679 for (int i=srcRank+1; i<CmiMyNodeSize(); i++) {
00680 if ( i == myRank ) continue;
00681 if (!CpvAccessOther(isHelperOn, i)) continue;
00682 CmiPushPE(i, (void *)(msg));
00683 }
00684 for (int i=0; i<srcRank; i++) {
00685 if ( i == myRank ) continue;
00686 if (!CpvAccessOther(isHelperOn, i)) continue;
00687 CmiPushPE(i, (void *)(msg));
00688 }
00689 }
00690 else if (globalCkLoop->getSchedPolicy() == CKLOOP_TREE) {
00691 int relPE = CmiMyRank()-msg->srcRank;
00692 if (relPE<0) relPE += CmiMyNodeSize();
00693
00694
00695
00696 relPE=relPE*TREE_BCAST_BRANCH+1;
00697 for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
00698 if (relPE >= CmiMyNodeSize()) break;
00699 int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
00700
00701
00702 CmiPushPE(pe, (void *)msg);
00703 }
00704 }
00705 }
00706
00707
00708
00709
00710 if (!CpvAccess(isHelperOn)) return;
00711 #if CMK_TRACE_ENABLED
00712 unsigned int event = msg->eventID;
00713 _TRACE_BEGIN_EXECUTE_DETAILED(event, ForChareMsg, _ckloopEP,
00714 CkNodeFirst(CkMyNode())+srcRank, sizeof(ConverseNotifyMsg), NULL, NULL);
00715 #endif
00716 loop->stealWork();
00717 #if CMK_TRACE_ENABLED
00718 _TRACE_END_EXECUTE();
00719 #endif
00720 }
00721
00722 void CurLoopInfo::stealWork() {
00723
00724
00725 CmiLock(loop_info_inited_lock);
00726 if (inited == 0) {
00727 CmiUnlock(loop_info_inited_lock);
00728 return;
00729 }
00730
00731 int nextChunkId = getNextChunkIdx();
00732 if (nextChunkId >= numChunks) {
00733 CmiUnlock(loop_info_inited_lock);
00734 return;
00735 }
00736
00737 CmiUnlock(loop_info_inited_lock);
00738 int execTimes = 0;
00739
00740 int first, last;
00741 int unit = (upperIndex-lowerIndex+1)/numChunks;
00742 int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
00743 int markIdx = remainder*(unit+1);
00744
00745 while (nextChunkId < numChunks) {
00746 if (nextChunkId < remainder) {
00747 first = lowerIndex+(unit+1)*nextChunkId;
00748 last = first+unit;
00749 } else {
00750 first = lowerIndex+(nextChunkId - remainder)*unit + markIdx;
00751 last = first+unit-1;
00752 }
00753
00754 if (first < lowerIndex || first > upperIndex || last < lowerIndex || last > upperIndex) {
00755 CkPrintf("Error in CurLoopInfo::stealWork() node %d pe %d lowerIndex %d upperIndex %d numChunks %d first %d last %d\n",
00756 CkMyNode(), CkMyPe(), lowerIndex, upperIndex, numChunks, first, last);
00757 CkAbort("Indices of CkLoop incorrect. There maybe a race condition!\n");
00758 }
00759
00760 fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
00761 execTimes++;
00762 nextChunkId = getNextChunkIdx();
00763 }
00764 reportFinished(execTimes);
00765 }
00766
00767
00768
00769
00770
00771 CProxy_FuncCkLoop CkLoop_Init(int numThreads) {
00772 int mode;
00773 #if CMK_SMP
00774 mode = CKLOOP_USECHARM;
00775 #if USE_CONVERSE_NOTIFICATION
00776 CkPrintf("CkLoopLib is used in SMP with simple dynamic scheduling (converse-level notification)\n");
00777 #else
00778 CkPrintf("CkLoopLib is used in SMP with simple dynamic scheduling (charm-level notification)\n");
00779 #endif
00780 #elif defined(_WIN32)
00781 mode = CKLOOP_NOOP;
00782 #else
00783 mode = CKLOOP_PTHREAD;
00784 CkPrintf("CkLoopLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
00785 CmiAssert(numThreads>0);
00786 #endif
00787 return CProxy_FuncCkLoop::ckNew(mode, numThreads);
00788 }
00789
00790 void CkLoop_Exit(CProxy_FuncCkLoop ckLoop) {
00791 ckLoop.exit();
00792 }
00793
00794 void hybridHandlerFunc(LoopChunkMsg *msg)
00795 {
00796 CurLoopInfo* loop = msg->loopRec;
00797 loop->doWorkForMyPe();
00798 }
00799
00800 void CurLoopInfo::doWorkForMyPe() {
00801 int numHelpers = CmiMyNodeSize();
00802 int myRank = CmiMyRank();
00803 if (upperIndex-lowerIndex < numHelpers)
00804 numHelpers = upperIndex-lowerIndex;
00805 int myStaticBegin = lowerIndex + myRank*(upperIndex - lowerIndex)/numHelpers;
00806
00807 int myDynamicBegin = myStaticBegin + ((upperIndex - lowerIndex)/numHelpers)*staticFraction;
00808 int lastDynamic = lowerIndex + (myRank+1)*(upperIndex - lowerIndex)/numHelpers;
00809 if(lastDynamic > upperIndex) lastDynamic = upperIndex;
00810
00811 int i, j = 0;
00812
00813
00814
00815 chunkSize = (upperIndex - lowerIndex)/numChunks;
00816 if(chunkSize == 0) chunkSize = 1;
00817 LoopChunkMsg* msgBlock = new LoopChunkMsg[1 + (lastDynamic - myDynamicBegin)/chunkSize];
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835
00836 #if CMK_SMP && CMK_TASKQUEUE
00837 for (i=lastDynamic, j=0; i>myDynamicBegin; i-=chunkSize, j++)
00838 {
00839 LoopChunkMsg* msg = (LoopChunkMsg*)(&(msgBlock[j]));
00840
00841
00842 msg->endIndex = i;
00843 msg->startIndex = i - chunkSize < myDynamicBegin ? myDynamicBegin : i - chunkSize;
00844 msg->loopRec = this;
00845 CmiSetHandler(msg, CpvAccess(chunkHandler));
00846 CsdTaskEnqueue(msg);
00847 }
00848 #endif
00849
00850 double _start;
00851 TRACE_START(CKLOOP_STATIC_CHUNK_WORK);
00852
00853 double x = 0.0;
00854 fnPtr(myStaticBegin, myDynamicBegin, (void*) &x, paramNum, param);
00855 TRACE_BRACKET(CKLOOP_STATIC_CHUNK_WORK);
00856
00857
00858
00859
00860
00861
00862 localReduce(x, type);
00863 numDynamicChunksFired += j;
00864 numStaticRegionsCompleted++;
00865 }
00866
00867 void executeChunk(LoopChunkMsg *msg) {
00868 double _start;
00869 TRACE_START(CKLOOP_DYNAMIC_CHUNK_WORK);
00870
00871 CurLoopInfo* linfo = msg->loopRec;
00872 linfo->runChunk(msg->startIndex, msg->endIndex);
00873 TRACE_BRACKET(CKLOOP_DYNAMIC_CHUNK_WORK);
00874
00875 }
00876
00877 void CkLoop_Parallelize(HelperFn func,
00878 int paramNum, void * param,
00879 int numChunks, int lowerRange, int upperRange,
00880 int sync,
00881 void *redResult, REDUCTION_TYPE type,
00882 CallerFn cfunc,
00883 int cparamNum, void* cparam) {
00884 if ( numChunks > upperRange - lowerRange + 1 ) numChunks = upperRange - lowerRange + 1;
00885 globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange,
00886 upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
00887 }
00888
00914 void CkLoop_ParallelizeHybrid(float staticFraction,
00915 HelperFn func,
00916 int paramNum, void * param,
00917 int numChunks, int lowerRange, int upperRange,
00918 int sync,
00919 void *redResult, REDUCTION_TYPE type,
00920 CallerFn cfunc,
00921 int cparamNum, void* cparam) {
00922 #if CMK_SMP && CMK_TASKQUEUE
00923 if (0 != CkMyRank()) CkAbort("CkLoop_ParallelizeHybrid() must be called from rank 0 PE on a node.\n");
00924 if (numChunks > upperRange - lowerRange + 1) numChunks = upperRange - lowerRange + 1;
00925
00926 globalCkLoop->parallelizeFuncHybrid(staticFraction, func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
00927 #else
00928 globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange,
00929 upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
00930 #endif
00931 }
00932
00933 void CkLoop_SetSchedPolicy(CkLoop_sched schedPolicy) {
00934 globalCkLoop->setSchedPolicy(schedPolicy);
00935 std::atomic_thread_fence(std::memory_order_release);
00936 }
00937
00938 void CkLoop_DestroyHelpers() {
00939 globalCkLoop->destroyHelpers();
00940 }
00941 #include "CkLoop.def.h"