00001 #ifndef _CKLOOP_H
00002 #define _CKLOOP_H
00003 #include <assert.h>
00004
00005 #include "converse.h"
00006 #include "taskqueue.h"
00007 #include "charm++.h"
00008 #include "CkLoopAPI.h"
00009 #include <atomic>
00010 #define USE_TREE_BROADCAST_THRESHOLD 8
00011 #define TREE_BCAST_BRANCH (4)
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #define USE_CONVERSE_NOTIFICATION 1
00023
00024 CmiNodeLock loop_info_inited_lock;
00025
00026 #if CMK_TRACE_ENABLED
00027 CpvDeclare(envelope*, dummyEnv);
00028 #endif
00029 CpvCExtern(int, isHelperOn);
00030 class FuncSingleHelper;
00031
00032 class CurLoopInfo {
00033 friend class FuncSingleHelper;
00034
00035 private:
00036 float staticFraction;
00037 std::atomic<int> curChunkIdx;
00038 int numChunks;
00039 int chunkSize;
00040 REDUCTION_TYPE type;
00041 HelperFn fnPtr;
00042 int lowerIndex;
00043 int upperIndex;
00044 int paramNum;
00045 void *param;
00046
00047 void **redBufs;
00048 char *bufSpace;
00049
00050 std::atomic<int> finishFlag;
00051
00052
00053
00054 std::atomic<int> inited;
00055
00056
00057 std::atomic<int> numStaticRegionsCompleted{0};
00058 std::atomic<int> numDynamicChunksCompleted{0};
00059 std::atomic<int> numDynamicChunksFired{0};
00060
00061 public:
00062 CurLoopInfo(int maxChunks):curChunkIdx(-1),numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0),
00063 paramNum(0), param(NULL), redBufs(NULL), bufSpace(NULL), finishFlag(0), inited(0) {
00064 redBufs = new void *[maxChunks];
00065 bufSpace = new char[maxChunks * CMI_CACHE_LINE_SIZE];
00066 for (int i=0; i<maxChunks; i++) redBufs[i] = (void *)(bufSpace+i*CMI_CACHE_LINE_SIZE);
00067 }
00068
00069 ~CurLoopInfo() {
00070 delete [] redBufs;
00071 delete [] bufSpace;
00072 }
00073
00074 void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p) {
00075
00076
00077
00078
00079
00080
00081
00082
00083 CmiLock(loop_info_inited_lock);
00084 numChunks = nc;
00085 fnPtr = f;
00086 lowerIndex = lIdx;
00087 upperIndex = uIdx;
00088 paramNum = numParams;
00089 param = p;
00090 curChunkIdx = -1;
00091 finishFlag = 0;
00092
00093 inited = 1;
00094 CmiUnlock(loop_info_inited_lock);
00095 }
00096
00097 void setReductionType(REDUCTION_TYPE p) {
00098 type = p;
00099 }
00100
00101 void setStaticFraction(float _staticFraction) {
00102 staticFraction = _staticFraction;
00103 }
00104
00105 #define LOCALSUM(T) *((T*) redBufs[CmiMyRank()]) += (T) x;
00106
00107 void localReduce(double x, REDUCTION_TYPE type) {
00108
00109 switch(type)
00110 {
00111 case CKLOOP_INT_SUM: {
00112 LOCALSUM(int)
00113 break;
00114 }
00115 case CKLOOP_FLOAT_SUM: {
00116 LOCALSUM(float)
00117 break;
00118 }
00119 case CKLOOP_DOUBLE_SUM: {
00120 LOCALSUM(double)
00121 break;
00122 }
00123 case CKLOOP_DOUBLE_MAX: {
00124 if( *((double *)(redBufs[CmiMyRank()])) < x ) *((double *)(redBufs[CmiMyRank()])) = x;
00125 break;
00126 }
00127 default:
00128 break;
00129 }
00130 }
00131
00132
00133 void runChunk(int sInd, int eInd) {
00134 int myRank = CmiMyRank();
00135 int numHelpers = CmiMyNodeSize();
00136 int nextPesStaticBegin = lowerIndex + (myRank+1)*(upperIndex - lowerIndex)/numHelpers;
00137 double x;
00138 fnPtr(sInd, eInd, (void*) &x, paramNum, param);
00139
00140
00141 localReduce(x, type);
00142 numDynamicChunksCompleted++;
00143 }
00144
00145 void waitLoopDone(int sync) {
00146
00147 if (sync) while (finishFlag.load(std::memory_order_relaxed)!=numChunks);
00148 std::atomic_thread_fence(std::memory_order_acquire);
00149
00150 CmiLock(loop_info_inited_lock);
00151 inited = 0;
00152 CmiUnlock(loop_info_inited_lock);
00153 }
00154
00155 void waitLoopDoneHybrid(int sync) {
00156 int count = 0;
00157 int numHelpers = CmiMyNodeSize();
00158 if (sync)
00159 while ((numStaticRegionsCompleted != numHelpers) || (numDynamicChunksCompleted != numDynamicChunksFired))
00160 {
00161
00162
00163
00164 };
00165 CmiLock(loop_info_inited_lock);
00166 inited = 0;
00167 CmiUnlock(loop_info_inited_lock);
00168 }
00169
00170 int getNextChunkIdx() {
00171 return curChunkIdx.fetch_add(1, std::memory_order_relaxed) + 1;
00172 }
00173
00174 void reportFinished(int counter) {
00175 if (counter==0) return;
00176 finishFlag.fetch_add(counter, std::memory_order_release);
00177 }
00178
00179 int isFree() {
00180 int fin = finishFlag.load(std::memory_order_acquire) == numChunks;
00181 return fin;
00182 }
00183
00184 void **getRedBufs() {
00185 return redBufs;
00186 }
00187
00188 void stealWork();
00189 void doWorkForMyPe();
00190 };
00191
00192
00193 typedef struct loopChunkMsg
00194 {
00195 char hdr[CmiMsgHeaderSizeBytes];
00196 CurLoopInfo* loopRec;
00197 int startIndex;
00198 int endIndex;
00199 } LoopChunkMsg;
00200
00201
00202
00203 typedef enum CkLoop_queueID { NODE_Q=0, PE_Q} CkLoop_queueID;
00204
00205 typedef struct converseNotifyMsg {
00206 char core[CmiMsgHeaderSizeBytes];
00207 int srcRank;
00208 unsigned int eventID;
00209 CkLoop_queueID queueID;
00210
00211 void *ptr;
00212 } ConverseNotifyMsg;
00213
00214 class CharmNotifyMsg: public CMessage_CharmNotifyMsg {
00215 public:
00216 int srcRank;
00217 void *ptr;
00218 };
00219
00220 class HelperNotifyMsg: public CMessage_HelperNotifyMsg {
00221 public:
00222 int srcRank;
00223 FuncSingleHelper *localHelper;
00224 };
00225
00226 class DestroyNotifyMsg: public CMessage_DestroyNotifyMsg {};
00227
00228 class FuncCkLoop : public CBase_FuncCkLoop {
00229 friend class FuncSingleHelper;
00230
00231 public:
00232 static int MAX_CHUNKS;
00233 private:
00234 int mode;
00235
00236 int numHelpers;
00237 FuncSingleHelper **helperPtr;
00238 CkLoop_sched schedPolicy;
00239
00240 public:
00241 FuncCkLoop(int mode_, int numThreads_);
00242
00243 FuncCkLoop(CkMigrateMessage *m);
00244
00245 ~FuncCkLoop() {
00246 #if CMK_TRACE_ENABLED
00247 int i;
00248 for (i = 0; i < CkMyNodeSize(); i++)
00249 CmiFree(CpvAccessOther(dummyEnv,i));
00250 #endif
00251 CmiDestroyLock(loop_info_inited_lock);
00252 delete [] helperPtr;
00253 }
00254
00255
00256
00257
00258
00259 void registerHelper(HelperNotifyMsg* msg);
00260
00261 void createPThreads();
00262 void exit();
00263 void init(int mode_, int numThreads_);
00264
00265 int getNumHelpers() {
00266 return numHelpers;
00267 }
00268 CkLoop_sched getSchedPolicy() {
00269 return schedPolicy;
00270 }
00271 void setSchedPolicy(CkLoop_sched schedPolicy) {
00272 #if !CMK_NODE_QUEUE_AVAILABLE && CMK_ERROR_CHECKING
00273 if (schedPolicy == CKLOOP_NODE_QUEUE)
00274 CkAbort("SchedPolicy, CKLOOP_NODE_QUEUE is not available on this environment\n");
00275 #endif
00276 this->schedPolicy = schedPolicy;
00277 }
00278 void parallelizeFunc(HelperFn func,
00279 int paramNum, void * param,
00280 int numChunks,
00281 int lowerRange, int upperRange,
00282 int sync=1,
00283 void *redResult=NULL, REDUCTION_TYPE type=CKLOOP_NONE,
00284 CallerFn cfunc=NULL,
00285 int cparamNum=0, void* cparam=NULL
00286 );
00287 void parallelizeFuncHybrid(float sf,
00288 HelperFn func,
00289 int paramNum, void * param,
00290 int numChunks,
00291 int lowerRange, int upperRange,
00292 int sync=1,
00293 void *redResult=NULL, REDUCTION_TYPE type=CKLOOP_NONE,
00294 CallerFn cfunc=NULL,
00295 int cparamNum=0, void* cparam=NULL
00296 );
00297 void destroyHelpers();
00298 void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
00299 void pup(PUP::er &p);
00300 };
00301
00302 void executeChunk(LoopChunkMsg* msg);
00303 void SingleHelperStealWork(ConverseNotifyMsg *msg);
00304 void hybridHandlerFunc(LoopChunkMsg *msg);
00305
00306
00307
00308 #define TASK_BUFFER_SIZE (3)
00309 class FuncSingleHelper: public CBase_FuncSingleHelper {
00310 friend class FuncCkLoop;
00311 private:
00312 int totalHelpers;
00313 int notifyMsgBufSize;
00314
00315 FuncCkLoop *thisCkLoop;
00316 CProxy_FuncCkLoop funcckproxy;
00317 CkLoop_sched schedPolicy;
00318
00319 #if USE_CONVERSE_NOTIFICATION
00320
00321 ConverseNotifyMsg *notifyMsg;
00322 #else
00323
00324
00325
00326
00327 CharmNotifyMsg **notifyMsg;
00328 CurLoopInfo **taskBuffer;
00329 int nextFreeTaskBuffer;
00330 #endif
00331 int nextFreeNotifyMsg;
00332
00333 public:
00334 FuncSingleHelper();
00335
00336 ~FuncSingleHelper() {
00337 #if USE_CONVERSE_NOTIFICATION
00338 for (int i=0; i<notifyMsgBufSize; i++) {
00339 ConverseNotifyMsg *tmp = notifyMsg+i;
00340 CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
00341 delete loop;
00342 }
00343 free(notifyMsg);
00344 #else
00345 for (int i=0; i<notifyMsgBufSize; i++) delete notifyMsg[i];
00346 free(notifyMsg);
00347 for (int i=0; i<TASK_BUFFER_SIZE; i++) delete taskBuffer[i];
00348 free(taskBuffer);
00349 #endif
00350 }
00351 #if USE_CONVERSE_NOTIFICATION
00352 ConverseNotifyMsg *getNotifyMsg() {
00353 while (1) {
00354 ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
00355 CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
00356 nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
00357 if (loop->isFree()) return cur;
00358 }
00359 return NULL;
00360 }
00361 #else
00362 CharmNotifyMsg *getNotifyMsg() {
00363 while (1) {
00364 CharmNotifyMsg *cur = notifyMsg[nextFreeNotifyMsg];
00365 CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
00366 nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
00367 if (loop==NULL || loop->isFree()) return cur;
00368 }
00369 return NULL;
00370 }
00371 CurLoopInfo *getNewTask() {
00372 while (1) {
00373 CurLoopInfo *cur = taskBuffer[nextFreeTaskBuffer];
00374 nextFreeTaskBuffer = (nextFreeTaskBuffer+1)%TASK_BUFFER_SIZE;
00375 if (cur->isFree()) return cur;
00376 }
00377 return NULL;
00378 }
00379 #endif
00380
00381 void stealWork(CharmNotifyMsg *msg);
00382 void destroyMyself() {
00383 delete this;
00384 }
00385
00386 FuncSingleHelper(CkMigrateMessage *m) : CBase_FuncSingleHelper(m) {}
00387
00388 private:
00389 void createNotifyMsg();
00390
00391 };
00392
00393 #endif