00001
00017 #include "charm++.h"
00018 #include "ck.h"
00019 #include "ckarray.h"
00020 #include "ckfutures.h"
00021 #include <stdlib.h>
00022
00023 typedef struct Future_s {
00024 int ready;
00025 void *value;
00026 CthThread waiters;
00027 int next;
00028 } Future;
00029
00030 typedef struct {
00031 Future *array;
00032 int max;
00033 int freelist;
00034 }
00035 FutureState;
00036
00037 class CkSema {
00038 private:
00039 CkQ<void*> msgs;
00040 CkQ<CthThread> waiters;
00041 public:
00042 void *wait(void) {
00043 void *retmsg = msgs.deq();
00044 if(retmsg==0) {
00045 waiters.enq(CthSelf());
00046 CthSuspend();
00047 retmsg = msgs.deq();
00048 }
00049 return retmsg;
00050 }
00051 void waitN(int n, void *marray[]) {
00052 while (1) {
00053 if(msgs.length()<n) {
00054 waiters.enq(CthSelf());
00055 CthSuspend();
00056 continue;
00057 }
00058 for(int i=0;i<n;i++)
00059 marray[i] = msgs.deq();
00060 return;
00061 }
00062 }
00063 void signal(void *msg)
00064 {
00065 msgs.enq(msg);
00066 if(!waiters.isEmpty())
00067 CthAwaken(waiters.deq());
00068 return;
00069 }
00070 };
00071
00072 class CkSemaPool {
00073 private:
00074 CkVec<CkSema*> pool;
00075 CkQ<int> freelist;
00076 public:
00077 int getNew(void) {
00078 CkSema *sem = new CkSema();
00079 int idx;
00080 if(freelist.isEmpty()) {
00081 idx = pool.length();
00082 pool.insertAtEnd(sem);
00083 } else {
00084 idx = freelist.deq();
00085 pool[idx] = new CkSema();
00086 }
00087 return idx;
00088 }
00089 void release(int idx) {
00090 CkSema * sem = pool[idx];
00091 delete sem;
00092 freelist.enq(idx);
00093 }
00094 void _check(int idx) {
00095 #ifndef CMK_OPTIMIZE
00096 if(pool[idx]==0) {
00097 CkAbort("ERROR! operation attempted on invalid semaphore\n");
00098 }
00099 #endif
00100 }
00101 void *wait(int idx) {
00102 _check(idx);
00103 return pool[idx]->wait();
00104 }
00105 void waitN(int idx, int n, void *marray[]) {
00106 _check(idx);
00107 pool[idx]->waitN(n, marray);
00108 }
00109 void signal(int idx, void *msg) {
00110 _check(idx);
00111 pool[idx]->signal(msg);
00112 }
00113 };
00114
00115 CpvStaticDeclare(FutureState, futurestate);
00116 CpvStaticDeclare(CkSemaPool*, semapool);
00117
00118 static void addedFutures(int lo, int hi)
00119 {
00120 int i;
00121 FutureState *fs = &(CpvAccess(futurestate));
00122 Future *array = fs->array;
00123
00124 for (i=lo; i<hi; i++)
00125 array[i].next = i+1;
00126 array[hi-1].next = fs->freelist;
00127 fs->freelist = lo;
00128 }
00129
00130 static
00131 inline
00132 int createFuture(void)
00133 {
00134 FutureState *fs = &(CpvAccess(futurestate));
00135 Future *fut; int handle, origsize;
00136
00137
00138 if (fs->freelist == -1) {
00139 origsize = fs->max;
00140 fs->max = fs->max * 2;
00141 fs->array = (Future*)realloc(fs->array, sizeof(Future)*(fs->max));
00142 _MEMCHECK(fs->array);
00143 addedFutures(origsize, fs->max);
00144 }
00145 handle = fs->freelist;
00146 fut = fs->array + handle;
00147 fs->freelist = fut->next;
00148 fut->ready = 0;
00149 fut->value = 0;
00150 fut->waiters = 0;
00151 fut->next = 0;
00152 return handle;
00153 }
00154
00155 extern "C"
00156 CkFuture CkCreateFuture(void)
00157 {
00158 CkFuture fut;
00159 fut.id = createFuture();
00160 fut.pe = CkMyPe();
00161 return fut;
00162 }
00163
00164 extern "C"
00165 void CkReleaseFutureID(CkFutureID handle)
00166 {
00167 FutureState *fs = &(CpvAccess(futurestate));
00168 Future *fut = (fs->array)+handle;
00169 fut->next = fs->freelist;
00170 fs->freelist = handle;
00171 }
00172
00173 extern "C"
00174 int CkProbeFutureID(CkFutureID handle)
00175 {
00176 FutureState *fs = &(CpvAccess(futurestate));
00177 Future *fut = (fs->array)+handle;
00178 return (fut->ready);
00179 }
00180
00181 extern "C"
00182 void *CkWaitFutureID(CkFutureID handle)
00183 {
00184 CthThread self = CthSelf();
00185 FutureState *fs = &(CpvAccess(futurestate));
00186 Future *fut = (fs->array)+handle;
00187 void *value;
00188
00189 if (!(fut->ready)) {
00190 CthSetNext(self, fut->waiters);
00191 fut->waiters = self;
00192 while (!(fut->ready)) { CthSuspend(); fut = (fs->array)+handle; }
00193 }
00194 fut = (fs->array)+handle;
00195 value = fut->value;
00196 #ifndef CMK_OPTIMIZE
00197 if (value==NULL)
00198 CkAbort("ERROR! CkWaitFuture would have to return NULL!\n"
00199 "This can happen when a thread that calls a sync method "
00200 "gets a CthAwaken call *before* the sync method returns.");
00201 #endif
00202 return value;
00203 }
00204
00205 extern "C"
00206 void CkReleaseFuture(CkFuture fut)
00207 {
00208 CkReleaseFutureID(fut.id);
00209 }
00210
00211 extern "C"
00212 int CkProbeFuture(CkFuture fut)
00213 {
00214 return CkProbeFutureID(fut.id);
00215 }
00216
00217 extern "C"
00218 void *CkWaitFuture(CkFuture fut)
00219 {
00220 return CkWaitFutureID(fut.id);
00221 }
00222
00223 extern "C"
00224 void CkWaitVoidFuture(CkFutureID handle)
00225 {
00226 CkFreeMsg(CkWaitFutureID(handle));
00227 }
00228
00229 static void setFuture(CkFutureID handle, void *pointer)
00230 {
00231 CthThread t;
00232 FutureState *fs = &(CpvAccess(futurestate));
00233 Future *fut = (fs->array)+handle;
00234 fut->ready = 1;
00235 #ifndef CMK_OPTIMIZE
00236 if (pointer==NULL) CkAbort("setFuture called with NULL!");
00237 #endif
00238 fut->value = pointer;
00239 for (t=fut->waiters; t; t=CthGetNext(t))
00240 CthAwaken(t);
00241 fut->waiters = 0;
00242 }
00243
00244 void _futuresModuleInit(void)
00245 {
00246 CpvInitialize(FutureState, futurestate);
00247 CpvInitialize(CkSemaPool *, semapool);
00248 CpvAccess(futurestate).array = (Future *)malloc(10*sizeof(Future));
00249 _MEMCHECK(CpvAccess(futurestate).array);
00250 CpvAccess(futurestate).max = 10;
00251 CpvAccess(futurestate).freelist = -1;
00252 addedFutures(0,10);
00253 CpvAccess(semapool) = new CkSemaPool();
00254 }
00255
00256 CkGroupID _fbocID;
00257
00258 class FutureInitMsg : public CMessage_FutureInitMsg {
00259 public: int x ;
00260 };
00261
00262 class FutureMain : public Chare {
00263 public:
00264 FutureMain(CkArgMsg *m) {
00265 _fbocID = CProxy_FutureBOC::ckNew(new FutureInitMsg);
00266 delete m;
00267 }
00268 FutureMain(CkMigrateMessage *m) {}
00269 };
00270
00271 extern "C"
00272 CkFutureID CkRemoteBranchCallAsync(int ep, void *m, CkGroupID group, int PE)
00273 {
00274 CkFutureID ret=CkCreateAttachedFuture(m);
00275 CkSendMsgBranch(ep, m, PE, group);
00276 return ret;
00277 }
00278
00279 extern "C"
00280 void *CkRemoteBranchCall(int ep, void *m, CkGroupID group, int PE)
00281 {
00282 CkFutureID i = CkRemoteBranchCallAsync(ep, m, group, PE);
00283 return CkWaitReleaseFuture(i);
00284 }
00285
00286 extern "C"
00287 CkFutureID CkRemoteNodeBranchCallAsync(int ep, void *m, CkGroupID group, int node)
00288 {
00289 CkFutureID ret=CkCreateAttachedFuture(m);
00290 CkSendMsgNodeBranch(ep, m, node, group);
00291 return ret;
00292 }
00293
00294 extern "C"
00295 void *CkRemoteNodeBranchCall(int ep, void *m, CkGroupID group, int node)
00296 {
00297 CkFutureID i = CkRemoteNodeBranchCallAsync(ep, m, group, node);
00298 return CkWaitReleaseFuture(i);
00299 }
00300
00301 extern "C"
00302 CkFutureID CkRemoteCallAsync(int ep, void *m, const CkChareID *ID)
00303 {
00304 CkFutureID ret=CkCreateAttachedFuture(m);
00305 CkSendMsg(ep, m, ID);
00306 return ret;
00307 }
00308
00309 extern "C"
00310 void *CkRemoteCall(int ep, void *m, const CkChareID *ID)
00311 {
00312 CkFutureID i = CkRemoteCallAsync(ep, m, ID);
00313 return CkWaitReleaseFuture(i);
00314 }
00315
00316
00317 extern "C" CkFutureID CkCreateAttachedFuture(void *msg)
00318 {
00319 CkFutureID ret=createFuture();
00320 UsrToEnv(msg)->setRef(ret);
00321 return ret;
00322 }
00323
00324 extern "C" CkFutureID CkCreateAttachedFutureSend(void *msg, int ep,
00325 CkArrayID id, CkArrayIndex idx,
00326 void(*fptr)(CkArrayID,CkArrayIndex,void*,int,int),int size)
00327 {
00328 CkFutureID ret=createFuture();
00329 UsrToEnv(msg)->setRef(ret);
00330 #if IGET_FLOWCONTROL
00331 if (TheIGetControlClass.iget_request(ret,msg,ep,id,idx,fptr,size))
00332 #endif
00333 (fptr)(id,idx,msg,ep,0);
00334 return ret;
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350 extern "C" void *CkWaitReleaseFuture(CkFutureID futNum)
00351 {
00352 #if IGET_FLOWCONTROL
00353 TheIGetControlClass.iget_resend(futNum);
00354 #endif
00355 void *result=CkWaitFutureID(futNum);
00356 CkReleaseFutureID(futNum);
00357 #if IGET_FLOWCONTROL
00358 TheIGetControlClass.iget_free(1);
00359
00360 #endif
00361 return result;
00362 }
00363
00364 class FutureBOC: public IrrGroup {
00365 public:
00366 FutureBOC(void){ }
00367 FutureBOC(FutureInitMsg *m) { delete m; }
00368 FutureBOC(CkMigrateMessage *m) { }
00369 void SetFuture(FutureInitMsg * m) {
00370 #ifndef CMK_OPTIMIZE
00371 if (m==NULL) CkAbort("FutureBOC::SetFuture called with NULL!");
00372 #endif
00373 int key;
00374 key = UsrToEnv((void *)m)->getRef();
00375 setFuture( key, m);
00376 }
00377 void SetSema(FutureInitMsg *m) {
00378 #ifndef CMK_OPTIMIZE
00379 if (m==NULL) CkAbort("FutureBOC::SetSema called with NULL!");
00380 #endif
00381 int idx;
00382 idx = UsrToEnv((void *)m)->getRef();
00383 CpvAccess(semapool)->signal(idx,(void*)m);
00384 }
00385 };
00386
00387 extern "C"
00388 void CkSendToFutureID(CkFutureID futNum, void *m, int PE)
00389 {
00390 UsrToEnv(m)->setRef(futNum);
00391 CProxy_FutureBOC fBOC(_fbocID);
00392 fBOC[PE].SetFuture((FutureInitMsg *)m);
00393 }
00394
00395 extern "C"
00396 void CkSendToFuture(CkFuture fut, void *msg)
00397 {
00398 CkSendToFutureID(fut.id, msg, fut.pe);
00399 }
00400
00401 extern "C"
00402 CkSemaID CkSemaCreate(void)
00403 {
00404 CkSemaID id;
00405 id.pe = CkMyPe();
00406 id.idx = CpvAccess(semapool)->getNew();
00407 return id;
00408 }
00409
00410 extern "C"
00411 void *CkSemaWait(CkSemaID id)
00412 {
00413 #ifndef CMK_OPTIMIZE
00414 if(id.pe != CkMyPe()) {
00415 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00416 }
00417 #endif
00418 return CpvAccess(semapool)->wait(id.idx);
00419 }
00420
00421 extern "C"
00422 void CkSemaWaitN(CkSemaID id, int n, void *marray[])
00423 {
00424 #ifndef CMK_OPTIMIZE
00425 if(id.pe != CkMyPe()) {
00426 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00427 }
00428 #endif
00429 CpvAccess(semapool)->waitN(id.idx, n, marray);
00430 }
00431
00432 extern "C"
00433 void CkSemaSignal(CkSemaID id, void *m)
00434 {
00435 UsrToEnv(m)->setRef(id.idx);
00436 CProxy_FutureBOC fBOC(_fbocID);
00437 fBOC[id.pe].SetSema((FutureInitMsg *)m);
00438 }
00439
00440 extern "C"
00441 void CkSemaDestroy(CkSemaID id)
00442 {
00443 #ifndef CMK_OPTIMIZE
00444 if(id.pe != CkMyPe()) {
00445 CkAbort("ERROR: destroying a nonlocal semaphore! Aborting..\n");
00446 }
00447 #endif
00448 CpvAccess(semapool)->release(id.idx);
00449 }
00450
00451
00453 #include "CkFutures.def.h"
00454