00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #include "charm++.h"
00013 #include "ckcallback-ccs.h"
00014 #include "CkCallback.decl.h"
00015 #include "envelope.h"
00016
00017 extern "C" void LibCkExit();
00018
00019 CProxy_ckcallback_group _ckcallbackgroup;
00020
00021 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
00022 CpvStaticDeclare(threadCB_t*, threadCBs);
00023 CpvStaticDeclare(unsigned int, nextThreadCB);
00024
00025
00026 class ckcallback_main : public CBase_ckcallback_main {
00027 public:
00028 ckcallback_main(CkArgMsg *m) {
00029 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
00030 delete m;
00031 }
00032 };
00033
00034
00035
00036 class ckcallback_group : public CBase_ckcallback_group {
00037 public:
00038 ckcallback_group() { }
00039 ckcallback_group(CkMigrateMessage *m) { }
00040 void registerCcsCallback(const char *name,const CkCallback &cb);
00041 void call(CkCallback &&c, CkMarshalledMessage &&msg) {
00042 c.send(msg.getMessage());
00043 }
00044 void call(CkCallback &&c, int length, const char *data) {
00045 if(c.requiresMsgConstruction())
00046 c.send(CkDataMsg::buildNew(length,data));
00047 else
00048 c.send(NULL);
00049 }
00050 };
00051
00052
00053
00054 void CkCallback::impl_thread_init(void)
00055 {
00056 int exist;
00057 CkCallback **cb;
00058 d.thread.onPE=CkMyPe();
00059 do {
00060 if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
00061 d.thread.cb=CpvAccess(nextThreadCB)++;
00062 cb = &CpvAccess(threadCBs)->put(d.thread.cb, &exist);
00063 } while (exist==1);
00064 *cb = this;
00065 d.thread.th=NULL;
00066 d.thread.ret=(void*)-1;
00067 }
00068
00069
00070 void *CkCallback::impl_thread_delay(void) const
00071 {
00072 if (type!=resumeThread)
00073 CkAbort("Called impl_thread_delay on non-threaded callback");
00074 if (CkMyPe()!=d.thread.onPE)
00075 CkAbort("Called thread_delay on different processor than where callback was created");
00076
00077
00078 CkCallback *dest=(CkCallback *)this;
00079 if (d.thread.cb!=0) dest=CpvAccess(threadCBs)->get(d.thread.cb);
00080 if (dest==0)
00081 CkAbort("Called thread_delay on an already deleted callback");
00082 if (dest->d.thread.ret==(void*)-1)
00083 {
00084 dest->d.thread.th=CthSelf();
00085 CthSuspend();
00086 if (dest->d.thread.ret==(void*)-1)
00087 CkAbort("thread resumed, but callback data is still empty");
00088 }
00089 return dest->d.thread.ret;
00090 }
00091
00092
00093
00094
00095
00096 CkCallback::CkCallback(Chare *p, int ep, bool forceInline) {
00097 #if CMK_ERROR_CHECKING
00098 memset(this, 0, sizeof(CkCallback));
00099 #endif
00100 type = (forceInline || _entryTable[ep]->isInline) ? isendChare : sendChare;
00101 d.chare.ep=ep;
00102 d.chare.id=p->ckGetChareID();
00103 d.chare.hasRefnum= false;
00104 d.chare.refnum = 0;
00105 }
00106 CkCallback::CkCallback(Group *p, int ep, bool forceInline) {
00107 #if CMK_ERROR_CHECKING
00108 memset(this, 0, sizeof(CkCallback));
00109 #endif
00110 type = (forceInline || _entryTable[ep]->isInline) ? isendGroup : sendGroup;
00111 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
00112 d.group.hasRefnum= false;
00113 d.group.refnum = 0;
00114 }
00115 CkCallback::CkCallback(NodeGroup *p, int ep, bool forceInline) {
00116 #if CMK_ERROR_CHECKING
00117 memset(this, 0, sizeof(CkCallback));
00118 #endif
00119 type = (forceInline || _entryTable[ep]->isInline) ? isendNodeGroup : sendNodeGroup;
00120 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
00121 d.group.hasRefnum= false;
00122 d.group.refnum = 0;
00123 }
00124
00125 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
00126 #if CMK_ERROR_CHECKING
00127 memset(this, 0, sizeof(CkCallback));
00128 #endif
00129 type=bcastNodeGroup;
00130 d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
00131 d.group.hasRefnum= false;
00132 d.group.refnum = 0;
00133 }
00134
00135 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,bool forceInline) {
00136 #if CMK_ERROR_CHECKING
00137 memset(this, 0, sizeof(CkCallback));
00138 #endif
00139 type = (forceInline || _entryTable[ep]->isInline) ? isendNodeGroup : sendNodeGroup;
00140 d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
00141 d.group.hasRefnum= false;
00142 d.group.refnum = 0;
00143 }
00144
00145 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,bool forceInline) {
00146 #if CMK_ERROR_CHECKING
00147 memset(this, 0, sizeof(CkCallback));
00148 #endif
00149 type = (forceInline || _entryTable[ep]->isInline) ? isendGroup : sendGroup;
00150 d.group.ep=ep;
00151 d.group.id=grpElt.ckGetGroupID();
00152 d.group.onPE=grpElt.ckGetGroupPe();
00153 d.group.hasRefnum= false;
00154 d.group.refnum = 0;
00155 }
00156
00157 CkCallback::CkCallback(int ep, const CProxyElement_NodeGroup &grpElt, bool forceInline) {
00158 #if CMK_ERROR_CHECKING
00159 memset(this, 0, sizeof(CkCallback));
00160 #endif
00161 type = (forceInline || _entryTable[ep]->isInline) ? isendNodeGroup : sendNodeGroup;
00162 d.group.ep = ep;
00163 d.group.id = grpElt.ckGetGroupID();
00164 d.group.onPE = grpElt.ckGetGroupPe();
00165 d.group.hasRefnum = false;
00166 d.group.refnum = 0;
00167 }
00168
00169 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,bool forceInline) {
00170 #if CMK_ERROR_CHECKING
00171 memset(this, 0, sizeof(CkCallback));
00172 #endif
00173 type = (forceInline || _entryTable[ep]->isInline) ? isendArray : sendArray;
00174 d.array.ep=ep;
00175 d.array.id=arrElt.ckGetArrayID();
00176 d.array.idx = arrElt.ckGetIndex();
00177 d.array.hasRefnum= false;
00178 d.array.refnum = 0;
00179 }
00180
00181 CkCallback::CkCallback(int ep,CProxySection_ArrayBase §Elt,bool forceInline) {
00182 #if CMK_ERROR_CHECKING
00183 memset(this, 0, sizeof(CkCallback));
00184 #endif
00185 type=bcastSection;
00186 d.section.ep=ep;
00187 CkSectionID secID=sectElt.ckGetSectionID(0);
00188 d.section.sinfo = secID._cookie.info;
00189 d.section._elems = secID._elems.data();
00190 d.section._nElems = secID._elems.size();
00191 d.section.pelist = secID.pelist.data();
00192 d.section.npes = secID.pelist.size();
00193 d.section.hasRefnum = false;
00194 d.section.refnum = 0;
00195 }
00196
00197 CkCallback::CkCallback(int ep, CkSectionID &id) {
00198 #if CMK_ERROR_CHECKING
00199 memset(this, 0, sizeof(CkCallback));
00200 #endif
00201 type=bcastSection;
00202 d.section.ep=ep;
00203 d.section.sinfo = id._cookie.info;
00204 d.section._elems = id._elems.data();
00205 d.section._nElems = id._elems.size();
00206 d.section.pelist = id.pelist.data();
00207 d.section.npes = id.pelist.size();
00208 d.section.hasRefnum = false;
00209 d.section.refnum = 0;
00210 }
00211
00212 CkCallback::CkCallback(ArrayElement *p, int ep,bool forceInline) {
00213 #if CMK_ERROR_CHECKING
00214 memset(this, 0, sizeof(CkCallback));
00215 #endif
00216 type = (forceInline || _entryTable[ep]->isInline) ? isendArray : sendArray;
00217 d.array.ep=ep;
00218 d.array.id=p->ckGetArrayID();
00219 d.array.idx = p->ckGetArrayIndex();
00220 d.array.hasRefnum= false;
00221 d.array.refnum = 0;
00222 }
00223
00224 #if CMK_CHARMPY
00225
00226
00227
00228
00229
00230 extern void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*);
00231
00232 static void CkCallbackSendExt(const CkCallback &cb, void *msg)
00233 {
00234 CkAssert(msg != NULL);
00235 char *extResultMsgData[2] = {NULL, NULL};
00236 int extResultMsgDataSizes[2] = {0, 0};
00237 CkReductionMsg* redMsg = (CkReductionMsg*) msg;
00238
00239 switch (cb.type) {
00240 case CkCallback::sendChare:
00241 CreateReductionTargetMsgExt(redMsg->getData(), redMsg->getLength(), redMsg->getReducer(),
00242 cb.d.chare.refnum, extResultMsgData, extResultMsgDataSizes);
00243 CkChareExtSend_multi(cb.d.chare.id.onPE, cb.d.chare.id.objPtr, cb.d.chare.ep,
00244 2, extResultMsgData, extResultMsgDataSizes);
00245 break;
00246 case CkCallback::sendGroup:
00247 CreateReductionTargetMsgExt(redMsg->getData(), redMsg->getLength(), redMsg->getReducer(),
00248 cb.d.group.refnum, extResultMsgData, extResultMsgDataSizes);
00249 CkGroupExtSend_multi(cb.d.group.id.idx, cb.d.group.onPE, cb.d.group.ep,
00250 2, extResultMsgData, extResultMsgDataSizes);
00251 break;
00252 case CkCallback::sendArray:
00253 CreateReductionTargetMsgExt(redMsg->getData(), redMsg->getLength(), redMsg->getReducer(),
00254 cb.d.array.refnum, extResultMsgData, extResultMsgDataSizes);
00255 CkArrayExtSend_multi(cb.d.array.id.idx, cb.d.array.idx.asChild().data(), cb.d.array.idx.dimension,
00256 cb.d.array.ep, 2, extResultMsgData, extResultMsgDataSizes);
00257 break;
00258 case CkCallback::bcastGroup:
00259 CreateReductionTargetMsgExt(redMsg->getData(), redMsg->getLength(), redMsg->getReducer(),
00260 cb.d.group.refnum, extResultMsgData, extResultMsgDataSizes);
00261
00262 CkGroupExtSend_multi(cb.d.group.id.idx, -1, cb.d.group.ep, 2, extResultMsgData, extResultMsgDataSizes);
00263 break;
00264 case CkCallback::bcastArray:
00265 CreateReductionTargetMsgExt(redMsg->getData(), redMsg->getLength(), redMsg->getReducer(),
00266 cb.d.array.refnum, extResultMsgData, extResultMsgDataSizes);
00267
00268 CkArrayExtSend_multi(cb.d.array.id.idx, cb.d.array.idx.asChild().data(), 0,
00269 cb.d.array.ep, 2, extResultMsgData, extResultMsgDataSizes);
00270 break;
00271 default:
00272 CkAbort("Unsupported callback for ext reduction, or corrupted callback");
00273 break;
00274 }
00275
00276 CkFreeMsg(msg);
00277 }
00278 #endif
00279
00280 void CkCallback::send(int length,const void *data) const
00281 {
00282 if(requiresMsgConstruction())
00283 send(CkDataMsg::buildNew(length,data));
00284 else
00285 send(NULL);
00286 }
00287
00288
00289
00290
00291
00292 void CkCallback::send(void *msg) const
00293 {
00294
00295
00296 int opts = 0;
00297
00298 #if CMK_CHARMPY
00299 if (isCkExtReductionCb) {
00300 CkCallbackSendExt(*this, msg);
00301 return;
00302 }
00303 #endif
00304
00305 switch(type) {
00306
00307 case ignore:
00308 if (msg) CkFreeMsg(msg);
00309 break;
00310 case ckExit:
00311 if (msg) CkFreeMsg(msg);
00312 if (CharmLibInterOperate) LibCkExit();
00313 else CkExit();
00314 break;
00315 case resumeThread:
00316 if (d.thread.onPE==CkMyPe()) {
00317 CkCallback *dest=CpvAccess(threadCBs)->get(d.thread.cb);
00318 if (dest==0 || dest->d.thread.ret!=(void*)-1)
00319 CkAbort("Already sent a value to this callback!\n");
00320 dest->d.thread.ret=msg;
00321 if (dest->d.thread.th!=NULL)
00322 CthAwaken(dest->d.thread.th);
00323 }
00324 else
00325 _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
00326 break;
00327 case call1Fn:
00328 (d.c1fn.fn)(msg);
00329 break;
00330 case callCFn:
00331 if (d.cfn.onPE==CkMyPe())
00332 (d.cfn.fn)(d.cfn.param,msg);
00333 else
00334 _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
00335 break;
00336 case sendChare:
00337 if (!msg) msg=CkAllocSysMsg();
00338 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
00339 CkSendMsg(d.chare.ep, msg, &d.chare.id);
00340 break;
00341 case isendChare:
00342 if (!msg) msg=CkAllocSysMsg();
00343 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
00344 CkSendMsgInline(d.chare.ep, msg, &d.chare.id);
00345 break;
00346 case sendGroup:
00347 if (!msg) msg=CkAllocSysMsg();
00348 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00349 CkSendMsgBranch(d.group.ep, msg, d.group.onPE, d.group.id);
00350 break;
00351 case sendNodeGroup:
00352 if (!msg) msg=CkAllocSysMsg();
00353 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00354 if (_entryTable[d.group.ep]->isImmediate) opts = CK_MSG_IMMEDIATE;
00355 CkSendMsgNodeBranch(d.group.ep, msg, d.group.onPE, d.group.id, opts);
00356 break;
00357 case isendGroup:
00358 if (!msg) msg=CkAllocSysMsg();
00359 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00360 CkSendMsgBranchInline(d.group.ep, msg, d.group.onPE, d.group.id);
00361 break;
00362 case isendNodeGroup:
00363 if (!msg) msg=CkAllocSysMsg();
00364 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00365 if (_entryTable[d.group.ep]->isImmediate) opts = CK_MSG_IMMEDIATE;
00366 CkSendMsgNodeBranchInline(d.group.ep, msg, d.group.onPE, d.group.id, opts);
00367 break;
00368 case sendArray:
00369 if (!msg) msg=CkAllocSysMsg();
00370 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
00371 CkSetMsgArrayIfNotThere(msg);
00372 CkSendMsgArray(d.array.ep, msg, d.array.id, d.array.idx.asChild());
00373 break;
00374 case isendArray:
00375 if (!msg) msg=CkAllocSysMsg();
00376 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
00377 CkSendMsgArrayInline(d.array.ep, msg, d.array.id, d.array.idx.asChild());
00378 break;
00379 case bcastGroup:
00380 if (!msg) msg=CkAllocSysMsg();
00381 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00382 CkBroadcastMsgBranch(d.group.ep, msg, d.group.id);
00383 break;
00384 case bcastNodeGroup:
00385 if (!msg) msg=CkAllocSysMsg();
00386 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
00387 if (_entryTable[d.group.ep]->isImmediate) opts = CK_MSG_IMMEDIATE;
00388 CkBroadcastMsgNodeBranch(d.group.ep, msg, d.group.id, opts);
00389 break;
00390 case bcastArray:
00391 if (!msg) msg=CkAllocSysMsg();
00392 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
00393 CkBroadcastMsgArray(d.array.ep, msg, d.array.id);
00394 break;
00395 case bcastSection: {
00396 if(!msg)msg=CkAllocSysMsg();
00397 if (d.section.hasRefnum) CkSetRefNum(msg, d.section.refnum);
00398 CkSectionInfo sinfo(d.section.sinfo);
00399 CkSectionID secID(sinfo, d.section._elems, d.section._nElems, d.section.pelist, d.section.npes);
00400 CkBroadcastMsgSection(d.section.ep, msg, secID);
00401 break;
00402 }
00403 case replyCCS: {
00404 void *data=NULL;
00405 int length=0;
00406 if (msg) {
00407 CkDataMsg *m=(CkDataMsg *)msg;
00408 m->check();
00409 data=m->getData();
00410 length=m->getLength();
00411 }
00412 CcsSendDelayedReply(d.ccsReply.reply,length,data);
00413 if (msg) CkFreeMsg(msg);
00414 } break;
00415 case invalid:
00416 CmiAbort("Called send on uninitialized callback");
00417 break;
00418 default:
00419 CmiAbort("Called send on corrupted callback");
00420 break;
00421 };
00422 }
00423
00424 void CkCallback::pup(PUP::er &p) {
00425
00426 int t = (int)type;
00427 p|t;
00428 type = (callbackType)t;
00429 switch (type) {
00430 case resumeThread:
00431 p|d.thread.onPE;
00432 p|d.thread.cb;
00433 break;
00434 case isendChare:
00435 case sendChare:
00436 p|d.chare.ep;
00437 p|d.chare.id;
00438 p|d.chare.hasRefnum;
00439 p|d.chare.refnum;
00440 break;
00441 case isendGroup:
00442 case sendGroup:
00443 case isendNodeGroup:
00444 case sendNodeGroup:
00445 p|d.group.onPE;
00446 p|d.group.hasRefnum;
00447 p|d.group.refnum;
00448 case bcastNodeGroup:
00449 case bcastGroup:
00450 p|d.group.ep;
00451 p|d.group.id;
00452 p|d.group.hasRefnum;
00453 p|d.group.refnum;
00454 break;
00455 case isendArray:
00456 case sendArray:
00457 p|d.array.idx;
00458 p|d.array.hasRefnum;
00459 p|d.array.refnum;
00460 case bcastArray:
00461 p|d.array.ep;
00462 p|d.array.id;
00463 p|d.array.hasRefnum;
00464 p|d.array.refnum;
00465 break;
00466 case replyCCS:
00467 p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
00468 break;
00469 case call1Fn:
00470 p((char*)&d.c1fn, sizeof(d.c1fn));
00471 break;
00472 case callCFn:
00473 p((char*)&d.cfn, sizeof(d.cfn));
00474 break;
00475 case ignore:
00476 case ckExit:
00477 case invalid:
00478 break;
00479 default:
00480 CkAbort("Inconsistent CkCallback type");
00481 }
00482 }
00483
00484 bool CkCallback::containsPointer() const {
00485 switch(type) {
00486 case invalid:
00487 case ignore:
00488 case ckExit:
00489 case sendGroup:
00490 case sendNodeGroup:
00491 case sendArray:
00492 case isendGroup:
00493 case isendNodeGroup:
00494 case isendArray:
00495 case bcastGroup:
00496 case bcastNodeGroup:
00497 case bcastArray:
00498 return false;
00499
00500 case resumeThread:
00501 case callCFn:
00502 case call1Fn:
00503 case replyCCS:
00504 case bcastSection:
00505 return true;
00506
00507 case sendChare:
00508 case isendChare:
00509 #if CMK_CHARE_USE_PTR
00510 return true;
00511 #else
00512 return false;
00513 #endif
00514
00515 default:
00516 CkAbort("Asked about an unknown CkCallback type");
00517 return true;
00518 }
00519 }
00520
00521 void CkCallback::thread_destroy() const {
00522 if (type==resumeThread && CpvAccess(threadCBs)->get(d.thread.cb)==this) {
00523 CpvAccess(threadCBs)->remove(d.thread.cb);
00524 }
00525 }
00526
00527 CkCallbackResumeThread::~CkCallbackResumeThread() {
00528 void * res = thread_delay();
00529 if (result != NULL) *result = res;
00530 else CkFreeMsg(res);
00531 thread_destroy();
00532 }
00533
00534
00535
00536
00537
00538 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData)
00539 {
00540 CkCallback *cb=(CkCallback *)cbPtr;
00541 CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
00542 msg->reply=CcsDelayReply();
00543 msg->length=reqLen;
00544 memcpy(msg->data,reqData,reqLen);
00545 cb->send(msg);
00546 }
00547
00548
00549 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
00550 {
00551 CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
00552 }
00553
00554
00555 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
00556 _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
00557 }
00558
00559 #if CMK_ERROR_CHECKING
00560 enum {dataMsgTag=0x7ed2beef};
00561 #endif
00562
00563 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
00564 {
00565 CkDataMsg *msg=new (&length,0) CkDataMsg;
00566 msg->length=length;
00567 memcpy(msg->data,data,length);
00568 #if CMK_ERROR_CHECKING
00569 msg->checkTag=dataMsgTag;
00570 #endif
00571 return msg;
00572 }
00573
00574 void CkDataMsg::check(void)
00575 {
00576 #if CMK_ERROR_CHECKING
00577 if (checkTag!=dataMsgTag)
00578 CkAbort("CkDataMsg corrupted-- bad tag.");
00579 #endif
00580 }
00581
00582 void CkCallbackInit() {
00583 CpvInitialize(threadCB_t*, threadCBs);
00584 CpvAccess(threadCBs) = new threadCB_t;
00585 CpvInitialize(unsigned int, nextThreadCB);
00586 CpvAccess(nextThreadCB)=1;
00587 }
00588
00589 #include "CkCallback.def.h"
00590