00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #include "charm++.h"
00013 #include "ckcallback-ccs.h"
00014 #include "CkCallback.decl.h"
00015 #include "envelope.h"
00016 CProxy_ckcallback_group _ckcallbackgroup;
00017
00018 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
00019 CpvStaticDeclare(threadCB_t, threadCBs);
00020 CpvStaticDeclare(unsigned int, nextThreadCB);
00021
00022
00023 class ckcallback_main : public CBase_ckcallback_main {
00024 public:
00025 ckcallback_main(CkArgMsg *m) {
00026 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
00027 delete m;
00028 }
00029 };
00030
00031
00032
00033 class ckcallback_group : public CBase_ckcallback_group {
00034 public:
00035 ckcallback_group() { }
00036 ckcallback_group(CkMigrateMessage *m) { }
00037 void registerCcsCallback(const char *name,const CkCallback &cb);
00038 void call(CkCallback &c,CkMarshalledMessage &msg) {
00039 c.send(msg.getMessage());
00040 }
00041 };
00042
00043
00044
00045 void CkCallback::impl_thread_init(void)
00046 {
00047 int exist;
00048 CkCallback **cb;
00049 d.thread.onPE=CkMyPe();
00050 do {
00051 if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
00052 d.thread.cb=CpvAccess(nextThreadCB)++;
00053 cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
00054 } while (exist==1);
00055 *cb = this;
00056 d.thread.th=NULL;
00057 d.thread.ret=(void*)-1;
00058 }
00059
00060
00061 void *CkCallback::impl_thread_delay(void) const
00062 {
00063 if (type!=resumeThread)
00064 CkAbort("Called impl_thread_delay on non-threaded callback");
00065 if (CkMyPe()!=d.thread.onPE)
00066 CkAbort("Called thread_delay on different processor than where callback was created");
00067
00068
00069 CkCallback *dest=(CkCallback *)this;
00070 if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
00071 if (dest==0)
00072 CkAbort("Called thread_delay on an already deleted callback");
00073 if (dest->d.thread.ret==(void*)-1)
00074 {
00075 dest->d.thread.th=CthSelf();
00076 CthSuspend();
00077 if (dest->d.thread.ret==(void*)-1)
00078 CkAbort("thread resumed, but callback data is still empty");
00079 }
00080 return dest->d.thread.ret;
00081 }
00082
00083
00084
00085
00086
00087 CkCallback::CkCallback(Chare *p, int ep, CmiBool doInline) {
00088 #ifndef CMK_OPTIMIZE
00089 bzero(this, sizeof(CkCallback));
00090 #endif
00091 type=doInline?isendChare:sendChare;
00092 d.chare.ep=ep;
00093 d.chare.id=p->ckGetChareID();
00094 }
00095 CkCallback::CkCallback(Group *p, int ep, CmiBool doInline) {
00096 #ifndef CMK_OPTIMIZE
00097 bzero(this, sizeof(CkCallback));
00098 #endif
00099 type=doInline?isendGroup:sendGroup;
00100 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
00101 }
00102 CkCallback::CkCallback(NodeGroup *p, int ep, CmiBool doInline) {
00103 #ifndef CMK_OPTIMIZE
00104 bzero(this, sizeof(CkCallback));
00105 #endif
00106 type=doInline?isendNodeGroup:sendNodeGroup;
00107 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
00108 }
00109
00110 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
00111 #ifndef CMK_OPTIMIZE
00112 bzero(this, sizeof(CkCallback));
00113 #endif
00114 type=bcastNodeGroup;
00115 d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
00116 }
00117
00118 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,CmiBool doInline) {
00119 #ifndef CMK_OPTIMIZE
00120 bzero(this, sizeof(CkCallback));
00121 #endif
00122 type=doInline?isendNodeGroup:sendNodeGroup;
00123 d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
00124 }
00125
00126 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,CmiBool doInline) {
00127 #ifndef CMK_OPTIMIZE
00128 bzero(this, sizeof(CkCallback));
00129 #endif
00130 type=doInline?isendGroup:sendGroup;
00131 d.group.ep=ep;
00132 d.group.id=grpElt.ckGetGroupID();
00133 d.group.onPE=grpElt.ckGetGroupPe();
00134 }
00135 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,CmiBool doInline) {
00136 #ifndef CMK_OPTIMIZE
00137 bzero(this, sizeof(CkCallback));
00138 #endif
00139 type=doInline?isendArray:sendArray;
00140 d.array.ep=ep;
00141 d.array.id=arrElt.ckGetArrayID();
00142 d.array.idx = arrElt.ckGetIndex();
00143 }
00144
00145 CkCallback::CkCallback(int ep,CProxySection_ArrayBase §Elt,CmiBool doInline) {
00146 #ifndef CMK_OPTIMIZE
00147 bzero(this, sizeof(CkCallback));
00148 #endif
00149 type=bcastSection;
00150 d.section.ep=ep;
00151 CkSectionID secID=sectElt.ckGetSectionID(0);
00152 d.section.sinfo = secID._cookie.info;
00153 d.section._elems = secID._elems;
00154 d.section._nElems = secID._nElems;
00155 d.section.pelist = secID.pelist;
00156 d.section.npes = secID.npes;
00157 secID._elems = NULL;
00158 secID.pelist = NULL;
00159 }
00160
00161 CkCallback::CkCallback(int ep, CkSectionID &id) {
00162 #ifndef CMK_OPTIMIZE
00163 bzero(this, sizeof(CkCallback));
00164 #endif
00165 type=bcastSection;
00166 d.section.ep=ep;
00167 d.section.sinfo = id._cookie.info;
00168 d.section._elems = id._elems;
00169 d.section._nElems = id._nElems;
00170 d.section.pelist = id.pelist;
00171 d.section.npes = id.npes;
00172 }
00173
00174 CkCallback::CkCallback(ArrayElement *p, int ep,CmiBool doInline) {
00175 #ifndef CMK_OPTIMIZE
00176 bzero(this, sizeof(CkCallback));
00177 #endif
00178 type=doInline?isendArray:sendArray;
00179 d.array.ep=ep;
00180 d.array.id=p->ckGetArrayID();
00181 d.array.idx = p->ckGetArrayIndex();
00182 }
00183
00184
00185 void CkCallback::send(int length,const void *data) const
00186 {
00187 send(CkDataMsg::buildNew(length,data));
00188 }
00189
00190
00191
00192
00193
00194 void CkCallback::send(void *msg) const
00195 {
00196 switch(type) {
00197
00198 case ignore:
00199 if (msg) CkFreeMsg(msg);
00200 break;
00201 case ckExit:
00202 if (msg) CkFreeMsg(msg);
00203 CkExit();
00204 break;
00205 case resumeThread:
00206 if (d.thread.onPE==CkMyPe()) {
00207 CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
00208 if (dest==0 || dest->d.thread.ret!=(void*)-1)
00209 CkAbort("Already sent a value to this callback!\n");
00210 dest->d.thread.ret=msg;
00211 if (dest->d.thread.th!=NULL)
00212 CthAwaken(dest->d.thread.th);
00213 }
00214 else
00215 _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
00216 break;
00217 case call1Fn:
00218 (d.c1fn.fn)(msg);
00219 break;
00220 case callCFn:
00221 if (d.cfn.onPE==CkMyPe())
00222 (d.cfn.fn)(d.cfn.param,msg);
00223 else
00224 _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
00225 break;
00226 case sendChare:
00227 if (!msg) msg=CkAllocSysMsg();
00228 CkSendMsg(d.chare.ep,msg,&d.chare.id);
00229 break;
00230 case isendChare:
00231 if (!msg) msg=CkAllocSysMsg();
00232 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
00233 break;
00234 case sendGroup:
00235 if (!msg) msg=CkAllocSysMsg();
00236 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
00237 break;
00238 case sendNodeGroup:
00239 if (!msg) msg=CkAllocSysMsg();
00240 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
00241 break;
00242 case isendGroup:
00243 if (!msg) msg=CkAllocSysMsg();
00244 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
00245 break;
00246 case isendNodeGroup:
00247 if (!msg) msg=CkAllocSysMsg();
00248 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
00249 break;
00250 case sendArray:
00251 if (!msg) msg=CkAllocSysMsg();
00252 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
00253 break;
00254 case isendArray:
00255 if (!msg) msg=CkAllocSysMsg();
00256 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
00257 break;
00258 case bcastGroup:
00259 if (!msg) msg=CkAllocSysMsg();
00260 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
00261 break;
00262 case bcastNodeGroup:
00263 if (!msg) msg=CkAllocSysMsg();
00264 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
00265 break;
00266 case bcastArray:
00267 if (!msg) msg=CkAllocSysMsg();
00268 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
00269 break;
00270 case bcastSection: {
00271 if(!msg)msg=CkAllocSysMsg();
00272 CkSectionInfo sinfo(d.section.sinfo);
00273 CkSectionID secID(sinfo, d.section._elems, d.section._nElems, d.section.pelist, d.section.npes);
00274 CkBroadcastMsgSection(d.section.ep,msg,secID);
00275 secID._elems = NULL;
00276 secID.pelist = NULL;
00277 break;
00278 }
00279 case replyCCS: {
00280 void *data=NULL;
00281 int length=0;
00282 if (msg) {
00283 CkDataMsg *m=(CkDataMsg *)msg;
00284 m->check();
00285 data=m->getData();
00286 length=m->getLength();
00287 }
00288 CcsSendDelayedReply(d.ccsReply.reply,length,data);
00289 if (msg) CkFreeMsg(msg);
00290 } break;
00291 case invalid:
00292 CmiAbort("Called send on uninitialized callback");
00293 break;
00294 default:
00295 CmiAbort("Called send on corrupted callback");
00296 break;
00297 };
00298 }
00299
00300 void CkCallback::pup(PUP::er &p) {
00301
00302 int t = (int)type;
00303 p|t;
00304 type = (callbackType)t;
00305 switch (type) {
00306 case resumeThread:
00307 p|d.thread.onPE;
00308 p|d.thread.cb;
00309 break;
00310 case isendChare:
00311 case sendChare:
00312 p|d.chare.ep;
00313 p|d.chare.id;
00314 break;
00315 case isendGroup:
00316 case sendGroup:
00317 case isendNodeGroup:
00318 case sendNodeGroup:
00319 p|d.group.onPE;
00320 case bcastNodeGroup:
00321 case bcastGroup:
00322 p|d.group.ep;
00323 p|d.group.id;
00324 break;
00325 case isendArray:
00326 case sendArray:
00327 p|d.array.idx;
00328 case bcastArray:
00329 p|d.array.ep;
00330 p|d.array.id;
00331 break;
00332 case replyCCS:
00333 p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
00334 break;
00335 case call1Fn:
00336 p((char*)&d.c1fn, sizeof(d.c1fn));
00337 break;
00338 case callCFn:
00339 p((char*)&d.cfn, sizeof(d.cfn));
00340 break;
00341 case ignore:
00342 case ckExit:
00343 case invalid:
00344 break;
00345 default:
00346 CkAbort("Inconsistent CkCallback type");
00347 }
00348 }
00349
00350 void CkCallback::thread_destroy() const {
00351 if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
00352 CpvAccess(threadCBs).remove(d.thread.cb);
00353 }
00354 }
00355
00356 CkCallbackResumeThread::~CkCallbackResumeThread() {
00357 void * res = thread_delay();
00358 if (result != NULL) *result = res;
00359 else CkFreeMsg(res);
00360 thread_destroy();
00361 }
00362
00363
00364
00365
00366
00367 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData)
00368 {
00369 CkCallback *cb=(CkCallback *)cbPtr;
00370 CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
00371 msg->reply=CcsDelayReply();
00372 msg->length=reqLen;
00373 memcpy(msg->data,reqData,reqLen);
00374 cb->send(msg);
00375 }
00376
00377
00378 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
00379 {
00380 CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
00381 }
00382
00383
00384 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
00385 _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
00386 }
00387
00388 enum {dataMsgTag=0x7ed2beef};
00389 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
00390 {
00391 CkDataMsg *msg=new (&length,0) CkDataMsg;
00392 msg->length=length;
00393 memcpy(msg->data,data,length);
00394 msg->checkTag=dataMsgTag;
00395 return msg;
00396 }
00397
00398 void CkDataMsg::check(void)
00399 {
00400 if (checkTag!=dataMsgTag)
00401 CkAbort("CkDataMsg corrupted-- bad tag.");
00402 }
00403
00404 void CkCallbackInit() {
00405 CpvInitialize(threadCB_t, threadCBs);
00406 CpvInitialize(unsigned int, nextThreadCB);
00407 CpvAccess(nextThreadCB)=1;
00408 }
00409
00410 #include "CkCallback.def.h"
00411