00001
00017 #include "charm++.h"
00018 #include "ck.h"
00019 #include "ckarray.h"
00020 #include "ckfutures.h"
00021 #include <stdlib.h>
00022
00023 typedef struct Future_s {
00024 bool ready;
00025 void *value;
00026 CthThread waiters;
00027 int next;
00028 } Future;
00029
00030 typedef struct {
00031 Future *array;
00032 int max;
00033 int freelist;
00034 }
00035 FutureState;
00036
00037 class CkSema {
00038 private:
00039 CkQ<void*> msgs;
00040 CkQ<CthThread> waiters;
00041 public:
00042 void *wait(void) {
00043 void *retmsg = msgs.deq();
00044 if(retmsg==0) {
00045 waiters.enq(CthSelf());
00046 CthSuspend();
00047 retmsg = msgs.deq();
00048 }
00049 return retmsg;
00050 }
00051 void waitN(int n, void *marray[]) {
00052 while (1) {
00053 if(msgs.length()<n) {
00054 waiters.enq(CthSelf());
00055 CthSuspend();
00056 continue;
00057 }
00058 for(int i=0;i<n;i++)
00059 marray[i] = msgs.deq();
00060 return;
00061 }
00062 }
00063 void signal(void *msg)
00064 {
00065 msgs.enq(msg);
00066 if(!waiters.isEmpty())
00067 CthAwaken(waiters.deq());
00068 return;
00069 }
00070 };
00071
00072 class CkSemaPool {
00073 private:
00074 std::vector<CkSema*> pool;
00075 CkQ<int> freelist;
00076 public:
00077 int getNew(void) {
00078 int idx;
00079 if(freelist.isEmpty()) {
00080 idx = pool.size();
00081 pool.push_back(new CkSema());
00082 } else {
00083 idx = freelist.deq();
00084 pool[idx] = new CkSema();
00085 }
00086 return idx;
00087 }
00088 void release(int idx) {
00089 CkSema * sem = pool[idx];
00090 delete sem;
00091 freelist.enq(idx);
00092 }
00093 void _check(int idx) {
00094 #if CMK_ERROR_CHECKING
00095 if(pool[idx]==0) {
00096 CkAbort("ERROR! operation attempted on invalid semaphore\n");
00097 }
00098 #endif
00099 }
00100 void *wait(int idx) {
00101 _check(idx);
00102 return pool[idx]->wait();
00103 }
00104 void waitN(int idx, int n, void *marray[]) {
00105 _check(idx);
00106 pool[idx]->waitN(n, marray);
00107 }
00108 void signal(int idx, void *msg) {
00109 _check(idx);
00110 pool[idx]->signal(msg);
00111 }
00112 };
00113
00114 CpvStaticDeclare(FutureState, futurestate);
00115 CpvStaticDeclare(CkSemaPool*, semapool);
00116
00117 static void addedFutures(int lo, int hi)
00118 {
00119 int i;
00120 FutureState *fs = &(CpvAccess(futurestate));
00121 Future *array = fs->array;
00122
00123 for (i=lo; i<hi; i++)
00124 array[i].next = i+1;
00125 array[hi-1].next = fs->freelist;
00126 fs->freelist = lo;
00127 }
00128
00129 static
00130 inline
00131 int createFuture(void)
00132 {
00133 FutureState *fs = &(CpvAccess(futurestate));
00134 Future *fut; int handle, origsize;
00135
00136
00137 if (fs->freelist == -1) {
00138 origsize = fs->max;
00139 fs->max = fs->max * 2;
00140 fs->array = (Future*)realloc(fs->array, sizeof(Future)*(fs->max));
00141 _MEMCHECK(fs->array);
00142 addedFutures(origsize, fs->max);
00143 }
00144 handle = fs->freelist;
00145 fut = fs->array + handle;
00146 fs->freelist = fut->next;
00147 fut->ready = false;
00148 fut->value = 0;
00149 fut->waiters = 0;
00150 fut->next = 0;
00151 return handle;
00152 }
00153
00154 CkFuture CkCreateFuture(void)
00155 {
00156 CkFuture fut;
00157 fut.id = createFuture();
00158 fut.pe = CkMyPe();
00159 return fut;
00160 }
00161
00162 void CkReleaseFutureID(CkFutureID handle)
00163 {
00164 FutureState *fs = &(CpvAccess(futurestate));
00165 Future *fut = (fs->array)+handle;
00166 fut->next = fs->freelist;
00167 fs->freelist = handle;
00168 }
00169
00170 int CkProbeFutureID(CkFutureID handle)
00171 {
00172 FutureState *fs = &(CpvAccess(futurestate));
00173 Future *fut = (fs->array)+handle;
00174 return (int)(fut->ready);
00175 }
00176
00177 void *CkWaitFutureID(CkFutureID handle)
00178 {
00179 CthThread self = CthSelf();
00180 FutureState *fs = &(CpvAccess(futurestate));
00181 Future *fut = (fs->array)+handle;
00182 void *value;
00183
00184 if (!(fut->ready)) {
00185 CthSetNext(self, fut->waiters);
00186 fut->waiters = self;
00187 while (!(fut->ready)) { CthSuspend(); fut = (fs->array)+handle; }
00188 }
00189 fut = (fs->array)+handle;
00190 value = fut->value;
00191 #if CMK_ERROR_CHECKING
00192 if (value==NULL)
00193 CkAbort("ERROR! CkWaitFuture would have to return NULL!\n"
00194 "This can happen when a thread that calls a sync method "
00195 "gets a CthAwaken call *before* the sync method returns.");
00196 #endif
00197 return value;
00198 }
00199
00200 void CkReleaseFuture(CkFuture fut)
00201 {
00202 CkReleaseFutureID(fut.id);
00203 }
00204
00205 int CkProbeFuture(CkFuture fut)
00206 {
00207 return CkProbeFutureID(fut.id);
00208 }
00209
00210 void *CkWaitFuture(CkFuture fut)
00211 {
00212 return CkWaitFutureID(fut.id);
00213 }
00214
00215 void CkWaitVoidFuture(CkFutureID handle)
00216 {
00217 CkFreeMsg(CkWaitFutureID(handle));
00218 }
00219
00220 static void setFuture(CkFutureID handle, void *pointer)
00221 {
00222 CthThread t;
00223 FutureState *fs = &(CpvAccess(futurestate));
00224 Future *fut = (fs->array)+handle;
00225 fut->ready = true;
00226 #if CMK_ERROR_CHECKING
00227 if (pointer==NULL) CkAbort("setFuture called with NULL!");
00228 #endif
00229 fut->value = pointer;
00230 for (t=fut->waiters; t; t=CthGetNext(t))
00231 CthAwaken(t);
00232 fut->waiters = 0;
00233 }
00234
00235 void _futuresModuleInit(void)
00236 {
00237 CpvInitialize(FutureState, futurestate);
00238 CpvInitialize(CkSemaPool *, semapool);
00239 CpvAccess(futurestate).array = (Future *)malloc(10*sizeof(Future));
00240 _MEMCHECK(CpvAccess(futurestate).array);
00241 CpvAccess(futurestate).max = 10;
00242 CpvAccess(futurestate).freelist = -1;
00243 addedFutures(0,10);
00244 CpvAccess(semapool) = new CkSemaPool();
00245 }
00246
00247 CkGroupID _fbocID;
00248
00249 class FutureInitMsg : public CMessage_FutureInitMsg {
00250 public: int x ;
00251 };
00252
00253 class FutureMain : public Chare {
00254 public:
00255 FutureMain(CkArgMsg *m) {
00256 _fbocID = CProxy_FutureBOC::ckNew(new FutureInitMsg);
00257 delete m;
00258 }
00259 FutureMain(CkMigrateMessage *m) {}
00260 };
00261
00262 extern "C"
00263 CkFutureID CkRemoteBranchCallAsync(int ep, void *m, CkGroupID group, int PE)
00264 {
00265 CkFutureID ret=CkCreateAttachedFuture(m);
00266 CkSendMsgBranch(ep, m, PE, group);
00267 return ret;
00268 }
00269
00270 extern "C"
00271 void *CkRemoteBranchCall(int ep, void *m, CkGroupID group, int PE)
00272 {
00273 CkFutureID i = CkRemoteBranchCallAsync(ep, m, group, PE);
00274 return CkWaitReleaseFuture(i);
00275 }
00276
00277 extern "C"
00278 CkFutureID CkRemoteNodeBranchCallAsync(int ep, void *m, CkGroupID group, int node)
00279 {
00280 CkFutureID ret=CkCreateAttachedFuture(m);
00281 CkSendMsgNodeBranch(ep, m, node, group);
00282 return ret;
00283 }
00284
00285 extern "C"
00286 void *CkRemoteNodeBranchCall(int ep, void *m, CkGroupID group, int node)
00287 {
00288 CkFutureID i = CkRemoteNodeBranchCallAsync(ep, m, group, node);
00289 return CkWaitReleaseFuture(i);
00290 }
00291
00292 extern "C"
00293 CkFutureID CkRemoteCallAsync(int ep, void *m, const CkChareID *ID)
00294 {
00295 CkFutureID ret=CkCreateAttachedFuture(m);
00296 CkSendMsg(ep, m, ID);
00297 return ret;
00298 }
00299
00300 extern "C"
00301 void *CkRemoteCall(int ep, void *m, const CkChareID *ID)
00302 {
00303 CkFutureID i = CkRemoteCallAsync(ep, m, ID);
00304 return CkWaitReleaseFuture(i);
00305 }
00306
00307
00308 CkFutureID CkCreateAttachedFuture(void *msg)
00309 {
00310 CkFutureID ret=createFuture();
00311 UsrToEnv(msg)->setRef(ret);
00312 return ret;
00313 }
00314
00315 CkFutureID CkCreateAttachedFutureSend(void *msg, int ep,
00316 CkArrayID id, CkArrayIndex idx,
00317 void(*fptr)(CkArrayID,CkArrayIndex,void*,int,int),int size)
00318 {
00319 CkFutureID ret=createFuture();
00320 UsrToEnv(msg)->setRef(ret);
00321 #if IGET_FLOWCONTROL
00322 if (TheIGetControlClass.iget_request(ret,msg,ep,id,idx,fptr,size))
00323 #endif
00324 (fptr)(id,idx,msg,ep,0);
00325 return ret;
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341 void *CkWaitReleaseFuture(CkFutureID futNum)
00342 {
00343 #if IGET_FLOWCONTROL
00344 TheIGetControlClass.iget_resend(futNum);
00345 #endif
00346 void *result=CkWaitFutureID(futNum);
00347 CkReleaseFutureID(futNum);
00348 #if IGET_FLOWCONTROL
00349 TheIGetControlClass.iget_free(1);
00350
00351 #endif
00352 return result;
00353 }
00354
00355 class FutureBOC: public IrrGroup {
00356 public:
00357 FutureBOC(void){ }
00358 FutureBOC(FutureInitMsg *m) { delete m; }
00359 FutureBOC(CkMigrateMessage *m) { }
00360 void SetFuture(FutureInitMsg * m) {
00361 #if CMK_ERROR_CHECKING
00362 if (m==NULL) CkAbort("FutureBOC::SetFuture called with NULL!");
00363 #endif
00364 int key;
00365 key = UsrToEnv((void *)m)->getRef();
00366 setFuture( key, m);
00367 }
00368 void SetSema(FutureInitMsg *m) {
00369 #if CMK_ERROR_CHECKING
00370 if (m==NULL) CkAbort("FutureBOC::SetSema called with NULL!");
00371 #endif
00372 int idx;
00373 idx = UsrToEnv((void *)m)->getRef();
00374 CpvAccess(semapool)->signal(idx,(void*)m);
00375 }
00376 };
00377
00378 extern "C"
00379 void CkSendToFutureID(CkFutureID futNum, void *m, int PE)
00380 {
00381 UsrToEnv(m)->setRef(futNum);
00382 CProxy_FutureBOC fBOC(_fbocID);
00383 fBOC[PE].SetFuture((FutureInitMsg *)m);
00384 }
00385
00386 void CkSendToFuture(CkFuture fut, void *msg)
00387 {
00388 CkSendToFutureID(fut.id, msg, fut.pe);
00389 }
00390
00391 CkSemaID CkSemaCreate(void)
00392 {
00393 CkSemaID id;
00394 id.pe = CkMyPe();
00395 id.idx = CpvAccess(semapool)->getNew();
00396 return id;
00397 }
00398
00399 void *CkSemaWait(CkSemaID id)
00400 {
00401 #if CMK_ERROR_CHECKING
00402 if(id.pe != CkMyPe()) {
00403 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00404 }
00405 #endif
00406 return CpvAccess(semapool)->wait(id.idx);
00407 }
00408
00409 void CkSemaWaitN(CkSemaID id, int n, void *marray[])
00410 {
00411 #if CMK_ERROR_CHECKING
00412 if(id.pe != CkMyPe()) {
00413 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00414 }
00415 #endif
00416 CpvAccess(semapool)->waitN(id.idx, n, marray);
00417 }
00418
00419 void CkSemaSignal(CkSemaID id, void *m)
00420 {
00421 UsrToEnv(m)->setRef(id.idx);
00422 CProxy_FutureBOC fBOC(_fbocID);
00423 fBOC[id.pe].SetSema((FutureInitMsg *)m);
00424 }
00425
00426 void CkSemaDestroy(CkSemaID id)
00427 {
00428 #if CMK_ERROR_CHECKING
00429 if(id.pe != CkMyPe()) {
00430 CkAbort("ERROR: destroying a nonlocal semaphore! Aborting..\n");
00431 }
00432 #endif
00433 CpvAccess(semapool)->release(id.idx);
00434 }
00435
00436
00438 #include "CkFutures.def.h"
00439