00001
00009 #include "ck.h"
00010 #include "trace.h"
00011 #include "queueing.h"
00012
00013 #include "pathHistory.h"
00014
00015 #if CMK_LBDB_ON
00016 #include "LBDatabase.h"
00017 #endif // CMK_LBDB_ON
00018
00019 #ifndef CMK_CHARE_USE_PTR
00020 #include <map>
00021 CkpvDeclare(std::vector<void *>, chare_objs);
00022 CkpvDeclare(std::vector<int>, chare_types);
00023 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
00024
00025 typedef std::map<int, CkChareID> Vidblockmap;
00026 CkpvDeclare(Vidblockmap, vmap);
00027 CkpvDeclare(int, currentChareIdx);
00028 #endif
00029
00030
00031 CkpvDeclare(ArrayObjMap, array_objs);
00032
00033 #define CK_MSG_SKIP_OR_IMM (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
00034
00035 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
00036
00037 int CMessage_CkMessage::__idx=-1;
00038 int CMessage_CkArgMsg::__idx=0;
00039 int CkIndex_Chare::__idx;
00040 int CkIndex_Group::__idx;
00041 int CkIndex_ArrayBase::__idx=-1;
00042
00043 extern int _defaultObjectQ;
00044
00045 void _initChareTables()
00046 {
00047 #ifndef CMK_CHARE_USE_PTR
00048
00049 CkpvInitialize(std::vector<void *>, chare_objs);
00050 CkpvInitialize(std::vector<int>, chare_types);
00051 CkpvInitialize(std::vector<VidBlock *>, vidblocks);
00052 CkpvInitialize(Vidblockmap, vmap);
00053 CkpvInitialize(int, currentChareIdx);
00054 CkpvAccess(currentChareIdx) = -1;
00055 #endif
00056
00057 CkpvInitialize(ArrayObjMap, array_objs);
00058 }
00059
00060
00061 Chare::Chare(void) {
00062 thishandle.onPE=CkMyPe();
00063 thishandle.objPtr=this;
00064 #if CMK_ERROR_CHECKING
00065 magic = CHARE_MAGIC;
00066 #endif
00067 #ifndef CMK_CHARE_USE_PTR
00068
00069 if (CkpvAccess(currentChareIdx) >= 0) {
00070 thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
00071 }
00072 chareIdx = CkpvAccess(currentChareIdx);
00073 #endif
00074 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00075 mlogData = new ChareMlogData();
00076 mlogData->objID.type = TypeChare;
00077 mlogData->objID.data.chare.id = thishandle;
00078 #endif
00079 #if CMK_OBJECT_QUEUE_AVAILABLE
00080 if (_defaultObjectQ) CkEnableObjQ();
00081 #endif
00082 }
00083
00084 Chare::Chare(CkMigrateMessage* m) {
00085 thishandle.onPE=CkMyPe();
00086 thishandle.objPtr=this;
00087 #if CMK_ERROR_CHECKING
00088 magic = 0;
00089 #endif
00090
00091 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00092 mlogData = NULL;
00093 #endif
00094
00095 #if CMK_OBJECT_QUEUE_AVAILABLE
00096 if (_defaultObjectQ) CkEnableObjQ();
00097 #endif
00098 }
00099
00100 void Chare::CkEnableObjQ()
00101 {
00102 #if CMK_OBJECT_QUEUE_AVAILABLE
00103 objQ.create();
00104 #endif
00105 }
00106
00107 Chare::~Chare() {
00108 #ifndef CMK_CHARE_USE_PTR
00109
00110
00111
00112 if (chareIdx != -1)
00113 {
00114 CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
00115 CkpvAccess(chare_objs)[chareIdx] = NULL;
00116 Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
00117 if (iter != CkpvAccess(vmap).end()) {
00118 CkChareID *pCid = (CkChareID *)
00119 _allocMsg(DeleteVidMsg, sizeof(CkChareID));
00120 int srcPe = iter->second.onPE;
00121 *pCid = iter->second;
00122 envelope *ret = UsrToEnv(pCid);
00123 ret->setVidPtr(iter->second.objPtr);
00124 ret->setSrcPe(CkMyPe());
00125 CmiSetHandler(ret, _charmHandlerIdx);
00126 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
00127 CpvAccess(_qd)->create();
00128 CkpvAccess(vmap).erase(iter);
00129 }
00130 }
00131 #endif
00132 }
00133
00134 void Chare::pup(PUP::er &p)
00135 {
00136 p(thishandle.onPE);
00137 thishandle.objPtr=(void *)this;
00138 #ifndef CMK_CHARE_USE_PTR
00139 p(chareIdx);
00140 if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
00141 #endif
00142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00143 if(p.isUnpacking()){
00144 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
00145 mlogData = new ChareMlogData();
00146 }
00147 mlogData->pup(p);
00148 #endif
00149 #if CMK_ERROR_CHECKING
00150 p(magic);
00151 #endif
00152 }
00153
00154 int Chare::ckGetChareType() const {
00155 return -3;
00156 }
00157 char *Chare::ckDebugChareName(void) {
00158 char buf[100];
00159 sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
00160 return strdup(buf);
00161 }
00162 int Chare::ckDebugChareID(char *str, int limit) {
00163
00164 str[0] = 0;
00165 return 1;
00166 }
00167 void Chare::ckDebugPup(PUP::er &p) {
00168 pup(p);
00169 }
00170
00172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
00173 CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
00174 traceAddThreadListeners(th, UsrToEnv(msg));
00175 }
00176
00177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
00178 p.comment("Bytes");
00179 int ts=UsrToEnv(msg)->getTotalsize();
00180 int msgLen=ts-sizeof(envelope);
00181 if (msgLen>0)
00182 p((char*)msg,msgLen);
00183 }
00184
00185 IrrGroup::IrrGroup(void) {
00186 thisgroup = CkpvAccess(_currentGroup);
00187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00188 mlogData->objID.type = TypeGroup;
00189 mlogData->objID.data.group.id = thisgroup;
00190 mlogData->objID.data.group.onPE = CkMyPe();
00191 #endif
00192 }
00193
00194 IrrGroup::~IrrGroup() {
00195
00196 if (CkpvAccess(_destroyingNodeGroup)) {
00197 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00198 CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
00199 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00200 CkpvAccess(_destroyingNodeGroup) = false;
00201 } else {
00202 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00203 CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
00204 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00205 }
00206 }
00207
00208 void IrrGroup::pup(PUP::er &p)
00209 {
00210 Chare::pup(p);
00211 p|thisgroup;
00212 }
00213
00214 int IrrGroup::ckGetChareType() const {
00215 return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
00216 }
00217
00218 int IrrGroup::ckDebugChareID(char *str, int limit) {
00219 if (limit<5) return -1;
00220 str[0] = 1;
00221 *((int*)&str[1]) = thisgroup.idx;
00222 return 5;
00223 }
00224
00225 char *IrrGroup::ckDebugChareName() {
00226 return strdup(_chareTable[ckGetChareType()]->name);
00227 }
00228
00229 void IrrGroup::ckJustMigrated(void)
00230 {
00231 }
00232
00233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
00234
00235 }
00236
00237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
00238 Chare::CkAddThreadListeners(th, msg);
00239 CthSetThreadID(th, thisgroup.idx, 0, 0);
00240 }
00241
00242 void Group::pup(PUP::er &p)
00243 {
00244 CkReductionMgr::pup(p);
00245 p|reductionInfo;
00246 }
00247
00248
00249 CkDelegateMgr::~CkDelegateMgr() { }
00250
00251
00252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
00253 { CkSendMsg(ep,m,c); }
00254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
00255 { CkSendMsgBranch(ep,m,onPE,g); }
00256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
00257 { CkBroadcastMsgBranch(ep,m,g); }
00258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
00259 { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
00260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
00261 { CkSendMsgNodeBranch(ep,m,onNode,g); }
00262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
00263 { CkBroadcastMsgNodeBranch(ep,m,g); }
00264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
00265 { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
00266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
00267 {
00268 CProxyElement_ArrayBase ap(a,idx);
00269 ap.ckInsert((CkArrayMessage *)m,ep,onPE);
00270 }
00271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
00272 {
00273 CProxyElement_ArrayBase ap(a,idx);
00274 ap.ckSend((CkArrayMessage *)m,ep);
00275 }
00276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
00277 {
00278 CProxy_ArrayBase ap(a);
00279 ap.ckBroadcast((CkArrayMessage *)m,ep);
00280 }
00281
00282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
00283 {
00284 CmiAbort("ArraySectionSend is not implemented!\n");
00285
00286
00287
00288
00289 }
00290
00291
00292 CkDelegateData::~CkDelegateData() {}
00293
00294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
00295 return pd;
00296 }
00297
00301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
00302 if (dPtr) dPtr->ref();
00303 ckUndelegate();
00304 delegatedMgr = dTo;
00305 delegatedPtr = dPtr;
00306 delegatedGroupId = delegatedMgr->CkGetGroupID();
00307 isNodeGroup = delegatedMgr->isNodeGroup();
00308 }
00309 void CProxy::ckUndelegate(void) {
00310 delegatedMgr=NULL;
00311 delegatedGroupId.setZero();
00312 if (delegatedPtr) delegatedPtr->unref();
00313 delegatedPtr=NULL;
00314 }
00315
00317 CProxy::CProxy(const CProxy &src)
00318 :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId),
00319 isNodeGroup(src.isNodeGroup) {
00320 delegatedPtr = NULL;
00321 if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
00322 delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
00323 }
00324 }
00325
00327 CProxy& CProxy::operator=(const CProxy &src) {
00328 CkDelegateData *oldPtr=delegatedPtr;
00329 ckUndelegate();
00330 delegatedMgr=src.delegatedMgr;
00331 delegatedGroupId = src.delegatedGroupId;
00332 isNodeGroup = src.isNodeGroup;
00333
00334 if(delegatedMgr != NULL && src.delegatedPtr != NULL)
00335 delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
00336 else
00337 delegatedPtr = NULL;
00338
00339
00340 if (oldPtr) oldPtr->unref();
00341 return *this;
00342 }
00343
00344 void CProxy::pup(PUP::er &p) {
00345 if (!p.isUnpacking()) {
00346 if (ckDelegatedTo() != NULL) {
00347 delegatedGroupId = delegatedMgr->CkGetGroupID();
00348 isNodeGroup = delegatedMgr->isNodeGroup();
00349 }
00350 }
00351 p|delegatedGroupId;
00352 if (!delegatedGroupId.isZero()) {
00353 p|isNodeGroup;
00354 if (p.isUnpacking()) {
00355 delegatedMgr = ckDelegatedTo();
00356 }
00357
00358 int migCtor = 0, cIdx;
00359 if (!p.isUnpacking()) {
00360 if (isNodeGroup) {
00361 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00362 cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx();
00363 migCtor = _chareTable[cIdx]->migCtor;
00364 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00365 }
00366 else {
00367 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00368 cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
00369 migCtor = _chareTable[cIdx]->migCtor;
00370 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00371 }
00372 }
00373
00374 p|migCtor;
00375
00376
00377
00378 if (delegatedMgr == NULL) {
00379
00380
00381 int objId = _entryTable[migCtor]->chareIdx;
00382 size_t objSize = _chareTable[objId]->size;
00383 void *obj = malloc(objSize);
00384 _entryTable[migCtor]->call(NULL, obj);
00385 delegatedPtr = static_cast<CkDelegateMgr *> (obj)
00386 ->DelegatePointerPup(p, delegatedPtr);
00387 free(obj);
00388
00389 }
00390 else {
00391
00392
00393 delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
00394
00395 }
00396
00397 if (p.isUnpacking() && delegatedPtr) {
00398 delegatedPtr->ref();
00399 }
00400 }
00401 }
00402
00403
00404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
00405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
00406 _elems.assign(elems, elems+nElems); \
00407 _cookie.get_aid() = aid; \
00408 _cookie.get_pe() = CkMyPe(); \
00409 } \
00410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
00411 _elems.resize(elems.size()); \
00412 for (int i=0; i<_elems.size(); ++i) { \
00413 _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
00414 } \
00415 _cookie.get_aid() = aid; \
00416 _cookie.get_pe() = CkMyPe(); \
00417 } \
00418
00419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
00420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
00421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
00422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
00423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
00424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
00425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
00426
00427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
00428 _cookie.get_aid() = gid;
00429 pelist.assign(_pelist, _pelist+_npes);
00430 }
00431
00432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
00433 _cookie.get_aid() = gid;
00434 }
00435
00436 CkSectionID::CkSectionID(const CkSectionID &sid) {
00437 _cookie = sid._cookie;
00438 pelist = sid.pelist;
00439 _elems = sid._elems;
00440 bfactor = sid.bfactor;
00441 }
00442
00443 void CkSectionID::operator=(const CkSectionID &sid) {
00444 _cookie = sid._cookie;
00445 pelist = sid.pelist;
00446 _elems = sid._elems;
00447 bfactor = sid.bfactor;
00448 }
00449
00450 void CkSectionID::pup(PUP::er &p) {
00451 p | _cookie;
00452 p | pelist;
00453 p | _elems;
00454 p | bfactor;
00455 }
00456
00457
00458
00459 #if CMK_CUDA
00460 void CUDACallbackManager(void *fn) {
00461 if (fn != NULL) {
00462 CkCallback *cb = (CkCallback*) fn;
00463 cb->send();
00464 }
00465 }
00466
00467 #endif
00468
00469 void QdCreate(int n) {
00470 CpvAccess(_qd)->create(n);
00471 }
00472
00473 void QdProcess(int n) {
00474 CpvAccess(_qd)->process(n);
00475 }
00476
00477 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
00478 {
00479 UsrToEnv(msg)->setRef(ref);
00480 }
00481
00482 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
00483 {
00484 return UsrToEnv(msg)->getRef();
00485 }
00486
00487 int CkGetSrcPe(void *msg)
00488 {
00489 return UsrToEnv(msg)->getSrcPe();
00490 }
00491
00492 int CkGetSrcNode(void *msg)
00493 {
00494 return CmiNodeOf(CkGetSrcPe(msg));
00495 }
00496
00497 void *CkLocalBranch(CkGroupID gID) {
00498 return _localBranch(gID);
00499 }
00500
00501 static
00502 void *_ckLocalNodeBranch(CkGroupID groupID) {
00503 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00504 void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
00505 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00506 return retval;
00507 }
00508
00509 void *CkLocalNodeBranch(CkGroupID groupID)
00510 {
00511 void *retval;
00512
00513 if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
00514 return CkpvAccess(_currentNodeGroupObj);
00515 while (NULL== (retval=_ckLocalNodeBranch(groupID)))
00516 {
00517 CsdScheduler(0);
00518 }
00519 return retval;
00520 }
00521
00522 void *CkLocalChare(const CkChareID *pCid)
00523 {
00524 int pe=pCid->onPE;
00525 if (pe<0) {
00526 if (pe!=(-(CkMyPe()+1)))
00527 return NULL;
00528 #ifdef CMK_CHARE_USE_PTR
00529 VidBlock *v=(VidBlock *)pCid->objPtr;
00530 #else
00531 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
00532 #endif
00533 return v->getLocalChareObj();
00534 }
00535 else
00536 {
00537 if (pe!=CkMyPe())
00538 return NULL;
00539 #ifdef CMK_CHARE_USE_PTR
00540 return pCid->objPtr;
00541 #else
00542 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
00543 #endif
00544 }
00545 }
00546
00547 CkpvDeclare(char**,Ck_argv);
00548
00549 char **CkGetArgv(void) {
00550 return CkpvAccess(Ck_argv);
00551 }
00552 int CkGetArgc(void) {
00553 return CmiGetArgc(CkpvAccess(Ck_argv));
00554 }
00555
00556
00557 void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
00558 {
00559 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00560 CpvAccess(_currentObj) = (Chare *)obj;
00561
00562 #endif
00563
00564
00565
00566 #if CMK_CHARMDEBUG
00567 CpdBeforeEp(epIdx, obj, msg);
00568 #endif
00569 _entryTable[epIdx]->call(msg, obj);
00570 #if CMK_CHARMDEBUG
00571 CpdAfterEp(epIdx);
00572 #endif
00573 if (_entryTable[epIdx]->noKeep)
00574 {
00575 _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
00576 }
00577 }
00578 void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
00579 {
00580
00581
00582
00583
00584 void *deliverMsg;
00585 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00586 CpvAccess(_currentObj) = (Chare *)obj;
00587 #endif
00588 if (_entryTable[epIdx]->noKeep)
00589 {
00590 deliverMsg=(void *)msg;
00591 } else
00592 {
00593 void *oldMsg=(void *)msg;
00594 deliverMsg=CkCopyMsg(&oldMsg);
00595 #if CMK_ERROR_CHECKING
00596 if (oldMsg!=msg)
00597 CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
00598 #endif
00599 }
00600 #if CMK_CHARMDEBUG
00601 CpdBeforeEp(epIdx, obj, (void*)msg);
00602 #endif
00603 _entryTable[epIdx]->call(deliverMsg, obj);
00604 #if CMK_CHARMDEBUG
00605 CpdAfterEp(epIdx);
00606 #endif
00607 }
00608
00609 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
00610 {
00611 void *msg = EnvToUsr(env);
00612 _SET_USED(env, 0);
00613 #if CMK_ONESIDED_IMPL
00614 if(CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_P2P_RECV_MSG ||
00615 CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_MSG ||
00616 CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_DONE_MSG)
00617 CkDeliverMessageReadonly(epIdx,msg,obj);
00618 else
00619 #endif
00620 CkDeliverMessageFree(epIdx,msg,obj);
00621 }
00622
00623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
00624 {
00625
00626 #if CMK_TRACE_ENABLED
00627 if (_entryTable[epIdx]->traceEnabled) {
00628 _TRACE_BEGIN_EXECUTE(env, obj);
00629 if(_entryTable[epIdx]->appWork)
00630 _TRACE_BEGIN_APPWORK();
00631 _invokeEntryNoTrace(epIdx,env,obj);
00632 if(_entryTable[epIdx]->appWork)
00633 _TRACE_END_APPWORK();
00634 _TRACE_END_EXECUTE();
00635 }
00636 else
00637 #endif
00638 _invokeEntryNoTrace(epIdx,env,obj);
00639
00640 }
00641
00642
00643
00644 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
00645 {
00646 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00647 envelope *env = UsrToEnv(msg);
00648 _CHECK_USED(env);
00649 if(pCid == 0) {
00650 env->setMsgtype(NewChareMsg);
00651 } else {
00652 pCid->onPE = (-(CkMyPe()+1));
00653
00654 pCid->objPtr = (void *) new VidBlock();
00655 _MEMCHECK(pCid->objPtr);
00656 env->setMsgtype(NewVChareMsg);
00657 env->setVidPtr(pCid->objPtr);
00658 #ifndef CMK_CHARE_USE_PTR
00659 CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
00660 int idx = CkpvAccess(vidblocks).size()-1;
00661 pCid->objPtr = (void *)(CmiIntPtr)idx;
00662 env->setVidPtr((void *)(CmiIntPtr)idx);
00663 #endif
00664 }
00665 env->setEpIdx(eIdx);
00666 env->setByPe(CkMyPe());
00667 env->setSrcPe(CkMyPe());
00668 CmiSetHandler(env, _charmHandlerIdx);
00669 _TRACE_CREATION_1(env);
00670 CpvAccess(_qd)->create();
00671 _STATS_RECORD_CREATE_CHARE_1();
00672 _SET_USED(env, 1);
00673 if(destPE == CK_PE_ANY)
00674 env->setForAnyPE(1);
00675 else
00676 env->setForAnyPE(0);
00677 _CldEnqueue(destPE, env, _infoIdx);
00678 _TRACE_CREATION_DONE(1);
00679 }
00680
00681 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
00682 {
00683 int gIdx = _entryTable[epIdx]->chareIdx;
00684 void *obj = malloc(_chareTable[gIdx]->size);
00685 _MEMCHECK(obj);
00686 setMemoryTypeChare(obj);
00687 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00688 CkpvAccess(_groupTable)->find(groupID).setObj(obj);
00689 CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
00690 CkpvAccess(_groupIDTable)->push_back(groupID);
00691 PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
00692 if(ptrq) {
00693 void *pending;
00694 while((pending=ptrq->deq())!=0) {
00695 #if CMK_BIGSIM_CHARM
00696
00697
00698 _CldEnqueue(CkMyPe(), pending, _infoIdx);
00699 #else
00700 CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
00701 #endif
00702 }
00703 CkpvAccess(_groupTable)->find(groupID).clearPending();
00704 }
00705 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00706
00707 CkpvAccess(_currentGroup) = groupID;
00708 CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
00709
00710 #ifndef CMK_CHARE_USE_PTR
00711 int callingChareIdx = CkpvAccess(currentChareIdx);
00712 CkpvAccess(currentChareIdx) = -1;
00713 #endif
00714
00715 _invokeEntryNoTrace(epIdx,env,obj);
00716
00717 #ifndef CMK_CHARE_USE_PTR
00718 CkpvAccess(currentChareIdx) = callingChareIdx;
00719 #endif
00720
00721 _STATS_RECORD_PROCESS_GROUP_1();
00722 }
00723
00724 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
00725 {
00726 int gIdx = _entryTable[epIdx]->chareIdx;
00727 size_t objSize=_chareTable[gIdx]->size;
00728 void *obj = malloc(objSize);
00729 _MEMCHECK(obj);
00730 setMemoryTypeChare(obj);
00731 CkpvAccess(_currentGroup) = groupID;
00732
00733
00734
00735
00736
00737
00738
00739 CkpvAccess(_currentNodeGroupObj) = obj;
00740
00741 #ifndef CMK_CHARE_USE_PTR
00742 int callingChareIdx = CkpvAccess(currentChareIdx);
00743 CkpvAccess(currentChareIdx) = -1;
00744 #endif
00745
00746 _invokeEntryNoTrace(epIdx,env,obj);
00747
00748 #ifndef CMK_CHARE_USE_PTR
00749 CkpvAccess(currentChareIdx) = callingChareIdx;
00750 #endif
00751
00752 CkpvAccess(_currentNodeGroupObj) = NULL;
00753 _STATS_RECORD_PROCESS_NODE_GROUP_1();
00754
00755 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00756 CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
00757 CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
00758 CksvAccess(_nodeGroupIDTable).push_back(groupID);
00759
00760 PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
00761 if(ptrq) {
00762 void *pending;
00763 while((pending=ptrq->deq())!=0) {
00764 _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
00765 }
00766 CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
00767 }
00768 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00769 }
00770
00771 void _createGroup(CkGroupID groupID, envelope *env)
00772 {
00773 _CHECK_USED(env);
00774 _SET_USED(env, 1);
00775 int epIdx = env->getEpIdx();
00776 int gIdx = _entryTable[epIdx]->chareIdx;
00777 env->setGroupNum(groupID);
00778 env->setSrcPe(CkMyPe());
00779 env->setGroupEpoch(CkpvAccess(_charmEpoch));
00780
00781 if(CkNumPes()>1) {
00782 CkPackMessage(&env);
00783 CmiSetHandler(env, _bocHandlerIdx);
00784 _numInitMsgs++;
00785 CmiSyncBroadcast(env->getTotalsize(), (char *)env);
00786 CpvAccess(_qd)->create(CkNumPes()-1);
00787 CkUnpackMessage(&env);
00788 }
00789 _STATS_RECORD_CREATE_GROUP_1();
00790 CkCreateLocalGroup(groupID, epIdx, env);
00791 }
00792
00793 void _createNodeGroup(CkGroupID groupID, envelope *env)
00794 {
00795 _CHECK_USED(env);
00796 _SET_USED(env, 1);
00797 int epIdx = env->getEpIdx();
00798 env->setGroupNum(groupID);
00799 env->setSrcPe(CkMyPe());
00800 env->setGroupEpoch(CkpvAccess(_charmEpoch));
00801 if(CkNumNodes()>1) {
00802 CkPackMessage(&env);
00803 CmiSetHandler(env, _bocHandlerIdx);
00804 _numInitMsgs++;
00805 if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
00806 CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
00807 CpvAccess(_qd)->create(CkNumNodes()-1);
00808 CkUnpackMessage(&env);
00809 }
00810 _STATS_RECORD_CREATE_NODE_GROUP_1();
00811 CkCreateLocalNodeGroup(groupID, epIdx, env);
00812 }
00813
00814
00815
00816 static CkGroupID _groupCreate(envelope *env)
00817 {
00818 CkGroupID groupNum;
00819
00820
00821
00822 if(CkMyPe() == 0)
00823 groupNum.idx = CkpvAccess(_numGroups)++;
00824 else
00825 groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
00826 _createGroup(groupNum, env);
00827 return groupNum;
00828 }
00829
00830
00831 static CkGroupID _nodeGroupCreate(envelope *env)
00832 {
00833 CkGroupID groupNum;
00834 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00835 if(CkMyNode() == 0)
00836 groupNum.idx = CksvAccess(_numNodeGroups)++;
00837 else
00838 groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
00839 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00840 _createNodeGroup(groupNum, env);
00841 return groupNum;
00842 }
00843
00844
00845
00846
00847
00848
00849 int _getGroupIdx(int numNodes,int myNode,int numGroups)
00850 {
00851 int idx;
00852 int x = (int)ceil(log((double)numNodes)/log((double)2));
00853 int n = 32 - (x+1);
00854 idx = (myNode<<n) + numGroups;
00855
00856
00857
00858 idx = - idx;
00859
00860 return idx;
00861 }
00862
00863 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
00864 {
00865 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00866 envelope *env = UsrToEnv(msg);
00867 env->setMsgtype(BocInitMsg);
00868 env->setEpIdx(eIdx);
00869 env->setSrcPe(CkMyPe());
00870 _TRACE_CREATION_N(env, CkNumPes());
00871 CkGroupID gid = _groupCreate(env);
00872 _TRACE_CREATION_DONE(1);
00873 return gid;
00874 }
00875
00876 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
00877 {
00878 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00879 envelope *env = UsrToEnv(msg);
00880 env->setMsgtype(NodeBocInitMsg);
00881 env->setEpIdx(eIdx);
00882 env->setSrcPe(CkMyPe());
00883 _TRACE_CREATION_N(env, CkNumNodes());
00884 CkGroupID gid = _nodeGroupCreate(env);
00885 _TRACE_CREATION_DONE(1);
00886 return gid;
00887 }
00888
00889 static inline void *_allocNewChare(envelope *env, int &idx)
00890 {
00891 int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
00892 void *tmp=malloc(_chareTable[chareIdx]->size);
00893 _MEMCHECK(tmp);
00894 #ifndef CMK_CHARE_USE_PTR
00895 CkpvAccess(chare_objs).push_back(tmp);
00896 CkpvAccess(chare_types).push_back(chareIdx);
00897 idx = CkpvAccess(chare_objs).size()-1;
00898 #endif
00899 setMemoryTypeChare(tmp);
00900 return tmp;
00901 }
00902
00903
00904 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
00905 int groupDepNum = env->getGroupDepNum();
00906 if(groupDepNum != 0) {
00907 CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
00908 for(int i=0;i<groupDepNum;i++) {
00909 CkGroupID depID = groupDepPtr[i];
00910 if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
00911 return true;
00912 }
00913 }
00914 }
00915 return false;
00916 }
00917
00918 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
00919 {
00920 if(isGroupDepUnsatisfied(ck, env))
00921 return;
00922 if(ck)
00923 ck->process();
00924 int idx;
00925 void *obj = _allocNewChare(env, idx);
00926 #ifndef CMK_CHARE_USE_PTR
00927 CkpvAccess(currentChareIdx) = idx;
00928 #endif
00929 _invokeEntry(env->getEpIdx(),env,obj);
00930 if(ck)
00931 _STATS_RECORD_PROCESS_CHARE_1();
00932 }
00933
00934 void CkCreateLocalChare(int epIdx, envelope *env)
00935 {
00936 env->setEpIdx(epIdx);
00937 _processNewChareMsg(NULL, env);
00938 }
00939
00940 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
00941 {
00942 if(isGroupDepUnsatisfied(ck, env))
00943 return;
00944 ck->process();
00945 int idx;
00946 void *obj = _allocNewChare(env, idx);
00947 CkChareID *pCid = (CkChareID *)
00948 _allocMsg(FillVidMsg, sizeof(CkChareID));
00949 pCid->onPE = CkMyPe();
00950 #ifndef CMK_CHARE_USE_PTR
00951 pCid->objPtr = (void*)(CmiIntPtr)idx;
00952 #else
00953 pCid->objPtr = obj;
00954 #endif
00955
00956 envelope *ret = UsrToEnv(pCid);
00957 ret->setVidPtr(env->getVidPtr());
00958 int srcPe = env->getByPe();
00959 ret->setSrcPe(CkMyPe());
00960 CmiSetHandler(ret, _charmHandlerIdx);
00961 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
00962 #ifndef CMK_CHARE_USE_PTR
00963
00964 CkChareID vid;
00965 vid.onPE = srcPe;
00966 vid.objPtr = env->getVidPtr();
00967 CkpvAccess(vmap)[idx] = vid;
00968 #endif
00969 CpvAccess(_qd)->create();
00970 #ifndef CMK_CHARE_USE_PTR
00971 CkpvAccess(currentChareIdx) = idx;
00972 #endif
00973 _invokeEntry(env->getEpIdx(),env,obj);
00974 _STATS_RECORD_PROCESS_CHARE_1();
00975 }
00976
00977
00978
00979 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
00980 {
00981 if(isGroupDepUnsatisfied(ck, env))
00982 return;
00983 ck->process();
00984 int epIdx = env->getEpIdx();
00985 int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
00986 void *obj;
00987 if (mainIdx != -1) {
00988 CmiAssert(CkMyPe()==0);
00989 obj = _mainTable[mainIdx]->getObj();
00990 }
00991 else {
00992 #ifndef CMK_CHARE_USE_PTR
00993 if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
00994 obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
00995 else
00996 obj = env->getObjPtr();
00997 #else
00998 obj = env->getObjPtr();
00999 #endif
01000 }
01001 _invokeEntry(epIdx,env,obj);
01002 _STATS_RECORD_PROCESS_MSG_1();
01003 }
01004
01005 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
01006 {
01007 ck->process();
01008 #ifndef CMK_CHARE_USE_PTR
01009 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
01010 #else
01011 VidBlock *vptr = (VidBlock *) env->getVidPtr();
01012 _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
01013 #endif
01014 CkChareID *pcid = (CkChareID *) EnvToUsr(env);
01015 _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
01016 if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
01017 CmiFree(env);
01018 }
01019
01020 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
01021 {
01022 ck->process();
01023 #ifndef CMK_CHARE_USE_PTR
01024 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
01025 #else
01026 VidBlock *vptr = (VidBlock *) env->getVidPtr();
01027 _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
01028 #endif
01029 _SET_USED(env, 1);
01030 vptr->send(env);
01031 }
01032
01033 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
01034 {
01035 ck->process();
01036 #ifndef CMK_CHARE_USE_PTR
01037 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
01038 delete vptr;
01039 CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
01040 #endif
01041 CmiFree(env);
01042 }
01043
01044
01045
01055 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
01056 {
01057
01058 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
01059 IrrGroup *obj = ck->localBranch(groupID);
01060 if (obj==NULL) {
01061 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
01062 }
01063 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
01064 return obj;
01065 }
01066
01067 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
01068 {
01069 return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
01070 }
01071
01072 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
01073 {
01074 #if CMK_LBDB_ON
01075
01076 LDObjHandle objHandle;
01077 int objstopped = 0;
01078 LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
01079 if (the_lbdb->RunningObject(&objHandle)) {
01080 objstopped = 1;
01081 the_lbdb->ObjectStop(objHandle);
01082 }
01083 #endif
01084
01085 #if CMK_ONESIDED_IMPL && CMK_SMP
01086 unsigned short int msgType = CMI_ZC_MSGTYPE(env);
01087 #endif
01088
01089 _invokeEntry(epIdx,env,obj);
01090
01091 #if CMK_ONESIDED_IMPL && CMK_SMP
01092 if(msgType == CMK_ZC_BCAST_RECV_DONE_MSG && CmiMyRank()!=0) {
01093 updatePeerCounterAndPush(env);
01094 }
01095 #endif
01096
01097 #if CMK_LBDB_ON
01098 if (objstopped) the_lbdb->ObjectStart(objHandle);
01099 #endif
01100 _STATS_RECORD_PROCESS_BRANCH_1();
01101 }
01102
01103 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
01104 {
01105 if(isGroupDepUnsatisfied(ck, env))
01106 return;
01107 CkGroupID groupID = env->getGroupNum();
01108 IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
01109 if(obj) {
01110 ck->process();
01111 _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
01112 }
01113 }
01114
01115 IrrGroup* _getCkLocalBranchFromGroupID(CkGroupID &gID) {
01116 CkCoreState *ck = CkpvAccess(_coreState);
01117 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
01118 IrrGroup *obj = ck->localBranch(gID);
01119 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
01120 return obj;
01121 }
01122
01123 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
01124 {
01125 env->setEpIdx(epIdx);
01126 _invokeEntry(epIdx,env,obj);
01127 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
01128 }
01129
01130 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
01131 {
01132 if(isGroupDepUnsatisfied(ck, env))
01133 return;
01134 CkGroupID groupID = env->getGroupNum();
01135 void *obj;
01136
01137 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
01138 obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
01139 if(!obj) {
01140 #if CMK_IMMEDIATE_MSG
01141 if (CmiIsImmediate(env)) {
01142
01143 CmiResetImmediate(env);
01144 }
01145 #endif
01146 CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
01147 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
01148 return;
01149 }
01150 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
01151 ck->process();
01152 _invokeEntry(env->getEpIdx(),env,obj);
01153 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
01154 }
01155
01156 void _processBocInitMsg(CkCoreState *ck,envelope *env)
01157 {
01158 if(isGroupDepUnsatisfied(ck, env))
01159 return;
01160 CkGroupID groupID = env->getGroupNum();
01161 int epIdx = env->getEpIdx();
01162 ck->process();
01163 CkCreateLocalGroup(groupID, epIdx, env);
01164 }
01165
01166 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
01167 {
01168 if(isGroupDepUnsatisfied(ck, env))
01169 return;
01170 ck->process();
01171 CkGroupID groupID = env->getGroupNum();
01172 int epIdx = env->getEpIdx();
01173 CkCreateLocalNodeGroup(groupID, epIdx, env);
01174 }
01175
01176
01177 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
01178 ArrayObjMap& object_map = CkpvAccess(array_objs);
01179 auto iter = object_map.find(env->getRecipientID());
01180 if (iter != object_map.end()) {
01181
01182 _SET_USED(env, 0);
01183 ck->process();
01184 int opts = 0;
01185 CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
01186 if (msg->array_hops()>1) {
01187 CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
01188 }
01189 bool doFree = !(opts & CK_MSG_KEEP);
01190 #if CMK_ONESIDED_IMPL
01191 if(CMI_ZC_MSGTYPE(env) == CMK_ZC_P2P_RECV_MSG)
01192 doFree = false;
01193 #endif
01194 iter->second->ckInvokeEntry(env->getEpIdx(), msg, doFree);
01195 } else {
01196
01197 CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
01198 if (mgr) {
01199 _SET_USED(env, 0);
01200 ck->process();
01201 mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
01202 }
01203 }
01204 }
01205
01206
01207 #define TELLMSGTYPE(x) //x
01208
01214 void _processHandler(void *converseMsg,CkCoreState *ck)
01215 {
01216 envelope *env = (envelope *) converseMsg;
01217
01218 MESSAGE_PHASE_CHECK(env);
01219
01220 #if CMK_ONESIDED_IMPL
01221 if(CMI_ZC_MSGTYPE(env) == CMK_ZC_P2P_SEND_MSG || CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG){
01222 envelope *prevEnv = env;
01223
01224
01225 ncpyEmApiMode mode = (CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG) ? ncpyEmApiMode::BCAST_SEND : ncpyEmApiMode::P2P_SEND;
01226
01227 env = CkRdmaIssueRgets(env, mode, prevEnv);
01228
01229 if(env) {
01230
01231
01232
01233 CkFreeMsg(EnvToUsr(prevEnv));
01234 } else {
01235
01236 return;
01237 }
01238 }
01239 #endif
01240
01241
01242 if (ck->watcher!=NULL) {
01243 if (!ck->watcher->processMessage(&env,ck)) return;
01244 }
01245
01246 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01247 Chare *obj=NULL;
01248 CkObjID sender;
01249 MCount SN;
01250 MlogEntry *entry=NULL;
01251 if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
01252 || env->getMsgtype() == ForArrayEltMsg
01253 || env->getMsgtype() == ForChareMsg) {
01254 sender = env->sender;
01255 SN = env->SN;
01256 int result = preProcessReceivedMessage(env,&obj,&entry);
01257 if(result == 0){
01258 return;
01259 }
01260 }
01261 #endif
01262 #if USE_CRITICAL_PATH_HEADER_ARRAY
01263 CK_CRITICALPATH_START(env)
01264 #endif
01265
01266 switch(env->getMsgtype()) {
01267
01268 case BocInitMsg :
01269 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
01270
01271
01272 if(env->isPacked()) CkUnpackMessage(&env);
01273 _processBocInitMsg(ck,env);
01274 break;
01275 case NodeBocInitMsg :
01276 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
01277 if(env->isPacked()) CkUnpackMessage(&env);
01278 _processNodeBocInitMsg(ck,env);
01279 break;
01280 case ForBocMsg :
01281 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
01282
01283 if(env->isPacked()) CkUnpackMessage(&env);
01284 _processForBocMsg(ck,env);
01285
01286 break;
01287 case ForNodeBocMsg :
01288 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
01289
01290 if(env->isPacked()) CkUnpackMessage(&env);
01291 _processForNodeBocMsg(ck,env);
01292
01293 break;
01294
01295
01296 case ForArrayEltMsg:
01297 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
01298 if(env->isPacked()) CkUnpackMessage(&env);
01299 _processArrayEltMsg(ck,env);
01300 break;
01301
01302
01303 case NewChareMsg :
01304 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
01305 if(env->isPacked()) CkUnpackMessage(&env);
01306 _processNewChareMsg(ck,env);
01307 break;
01308 case NewVChareMsg :
01309 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
01310 if(env->isPacked()) CkUnpackMessage(&env);
01311 _processNewVChareMsg(ck,env);
01312 break;
01313 case ForChareMsg :
01314 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
01315 if(env->isPacked()) CkUnpackMessage(&env);
01316 _processForPlainChareMsg(ck,env);
01317 break;
01318 case ForVidMsg :
01319 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
01320 _processForVidMsg(ck,env);
01321 break;
01322 case FillVidMsg :
01323
01324 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
01325 _processFillVidMsg(ck,env);
01326 break;
01327 case DeleteVidMsg :
01328
01329 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
01330 _processDeleteVidMsg(ck,env);
01331 break;
01332
01333 default:
01334 CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
01335 }
01336 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01337 if(obj != NULL){
01338 postProcessReceivedMessage(obj,sender,SN,entry);
01339 }
01340 #endif
01341
01342
01343 #if USE_CRITICAL_PATH_HEADER_ARRAY
01344 CK_CRITICALPATH_END()
01345 #endif
01346
01347 }
01348
01349
01350
01351
01352 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
01353 int *queueing, int *priobits, unsigned int **prioptr)
01354 {
01355 envelope *env = (envelope *)converseMsg;
01356 *pfn = (CldPackFn)CkPackMessage;
01357 *len = env->getTotalsize();
01358 *queueing = env->getQueueing();
01359 *priobits = env->getPriobits();
01360 *prioptr = (unsigned int *) env->getPrioPtr();
01361 }
01362
01363 void CkPackMessage(envelope **pEnv)
01364 {
01365 envelope *env = *pEnv;
01366 if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
01367 void *msg = EnvToUsr(env);
01368 _TRACE_BEGIN_PACK();
01369 msg = _msgTable[env->getMsgIdx()]->pack(msg);
01370 _TRACE_END_PACK();
01371 env=UsrToEnv(msg);
01372 env->setPacked(1);
01373 *pEnv = env;
01374 }
01375 }
01376
01377 void CkUnpackMessage(envelope **pEnv)
01378 {
01379 envelope *env = *pEnv;
01380 int msgIdx = env->getMsgIdx();
01381 if(env->isPacked()) {
01382 void *msg = EnvToUsr(env);
01383 _TRACE_BEGIN_UNPACK();
01384 msg = _msgTable[msgIdx]->unpack(msg);
01385 _TRACE_END_UNPACK();
01386 env=UsrToEnv(msg);
01387 env->setPacked(0);
01388 *pEnv = env;
01389 }
01390 }
01391
01392
01393
01394
01395 #if CMK_OBJECT_QUEUE_AVAILABLE
01396 static int index_objectQHandler;
01397 #endif
01398 int index_tokenHandler;
01399 int index_skipCldHandler;
01400
01401 void _skipCldHandler(void *converseMsg)
01402 {
01403 envelope *env = (envelope *)(converseMsg);
01404 CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
01405 #if CMK_GRID_QUEUE_AVAILABLE
01406 if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
01407 CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
01408 env, env->getQueueing (), env->getPriobits (),
01409 (unsigned int *) env->getPrioPtr ());
01410 } else {
01411 CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
01412 env, env->getQueueing (), env->getPriobits (),
01413 (unsigned int *) env->getPrioPtr ());
01414 }
01415 #else
01416 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
01417 env, env->getQueueing(),env->getPriobits(),
01418 (unsigned int *)env->getPrioPtr());
01419 #endif
01420 }
01421
01422
01423
01424
01425 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
01426 {
01427 #if CMK_CHARMDEBUG
01428 if (!ConverseDeliver(pe)) {
01429 CmiFree(env);
01430 return;
01431 }
01432 #endif
01433
01434 #if CMK_ONESIDED_IMPL
01435
01436 if(CMI_IS_ZC_BCAST(env))
01437 CkRdmaPrepareBcastMsg(env);
01438 #endif
01439
01440 #if CMK_FAULT_EVAC
01441 if(pe == CkMyPe() ){
01442 if(!CmiNodeAlive(CkMyPe())){
01443 printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
01444
01445 }
01446 }
01447 #endif
01448 if (pe == CkMyPe() && !CmiImmIsRunning()) {
01449 #if CMK_OBJECT_QUEUE_AVAILABLE
01450 Chare *obj = CkFindObjectPtr(env);
01451 if (obj && obj->CkGetObjQueue().queue()) {
01452 _enqObjQueue(obj, env);
01453 }
01454 else
01455 #endif
01456 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
01457 env, env->getQueueing(),env->getPriobits(),
01458 (unsigned int *)env->getPrioPtr());
01459 #if CMK_PERSISTENT_COMM
01460 CmiPersistentOneSend();
01461 #endif
01462 } else {
01463 if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
01464 CkPackMessage(&env);
01465 int len=env->getTotalsize();
01466 CmiSetXHandler(env,CmiGetHandler(env));
01467 #if CMK_OBJECT_QUEUE_AVAILABLE
01468 CmiSetHandler(env,index_objectQHandler);
01469 #else
01470 CmiSetHandler(env,index_skipCldHandler);
01471 #endif
01472 CmiSetInfo(env,infoFn);
01473 if (pe==CLD_BROADCAST) {
01474 #if CMK_MESSAGE_LOGGING
01475 if(env->flags & CK_FREE_MSG_MLOG)
01476 CmiSyncBroadcastAndFree(len, (char *)env);
01477 else
01478 CmiSyncBroadcast(len, (char *)env);
01479 #else
01480 CmiSyncBroadcastAndFree(len, (char *)env);
01481 #endif
01482
01483 }
01484 else if (pe==CLD_BROADCAST_ALL) {
01485 #if CMK_MESSAGE_LOGGING
01486 if(env->flags & CK_FREE_MSG_MLOG)
01487 CmiSyncBroadcastAllAndFree(len, (char *)env);
01488 else
01489 CmiSyncBroadcastAll(len, (char *)env);
01490 #else
01491 CmiSyncBroadcastAllAndFree(len, (char *)env);
01492 #endif
01493
01494 }
01495 else{
01496 #if CMK_MESSAGE_LOGGING
01497 if(env->flags & CK_FREE_MSG_MLOG)
01498 CmiSyncSendAndFree(pe, len, (char *)env);
01499 else
01500 CmiSyncSend(pe, len, (char *)env);
01501 #else
01502 CmiSyncSendAndFree(pe, len, (char *)env);
01503 #endif
01504
01505 }
01506 }
01507 }
01508
01509 #if CMK_BIGSIM_CHARM
01510 # define _skipCldEnqueue _CldEnqueue
01511 #endif
01512
01513
01514 static void _noCldEnqueueMulti(int npes, const int *pes, envelope *env)
01515 {
01516 #if CMK_CHARMDEBUG
01517 if (!ConverseDeliver(-1)) {
01518 CmiFree(env);
01519 return;
01520 }
01521 #endif
01522 CkPackMessage(&env);
01523 int len=env->getTotalsize();
01524 CmiSyncListSendAndFree(npes, pes, len, (char *)env);
01525 }
01526
01527 static void _noCldEnqueue(int pe, envelope *env)
01528 {
01529
01530
01531
01532
01533
01534 #if CMK_CHARMDEBUG
01535 if (!ConverseDeliver(pe)) {
01536 CmiFree(env);
01537 return;
01538 }
01539 #endif
01540
01541 #if CMK_ONESIDED_IMPL
01542
01543 if(CMI_IS_ZC_BCAST(env))
01544 CkRdmaPrepareBcastMsg(env);
01545 #endif
01546
01547 CkPackMessage(&env);
01548 int len=env->getTotalsize();
01549 if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
01550 else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
01551 else CmiSyncSendAndFree(pe, len, (char *)env);
01552 }
01553
01554
01555
01556 void _noCldNodeEnqueue(int node, envelope *env)
01557 {
01558
01559
01560
01561
01562
01563 #if CMK_CHARMDEBUG
01564 if (!ConverseDeliver(node)) {
01565 CmiFree(env);
01566 return;
01567 }
01568 #endif
01569
01570 #if CMK_ONESIDED_IMPL
01571
01572 if(CMI_IS_ZC_BCAST(env))
01573 CkRdmaPrepareBcastMsg(env);
01574 #endif
01575
01576 CkPackMessage(&env);
01577 int len=env->getTotalsize();
01578 if (node==CLD_BROADCAST) {
01579 #if CMK_MESSAGE_LOGGING
01580 if(env->flags & CK_FREE_MSG_MLOG)
01581 CmiSyncNodeBroadcastAndFree(len, (char *)env);
01582 else
01583 CmiSyncNodeBroadcast(len, (char *)env);
01584 #else
01585 CmiSyncNodeBroadcastAndFree(len, (char *)env);
01586 #endif
01587 }
01588 else if (node==CLD_BROADCAST_ALL) {
01589 #if CMK_MESSAGE_LOGGING
01590 if(env->flags & CK_FREE_MSG_MLOG)
01591 CmiSyncNodeBroadcastAllAndFree(len, (char *)env);
01592 else
01593 CmiSyncNodeBroadcastAll(len, (char *)env);
01594 #else
01595 CmiSyncNodeBroadcastAllAndFree(len, (char *)env);
01596 #endif
01597
01598 }
01599 else {
01600 #if CMK_MESSAGE_LOGGING
01601 if(env->flags & CK_FREE_MSG_MLOG)
01602 CmiSyncNodeSendAndFree(node, len, (char *)env);
01603 else
01604 CmiSyncNodeSend(node, len, (char *)env);
01605 #else
01606 CmiSyncNodeSendAndFree(node, len, (char *)env);
01607 #endif
01608 }
01609 }
01610
01611 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
01612 {
01613 envelope *env = UsrToEnv(msg);
01614 _CHECK_USED(env);
01615 _SET_USED(env, 1);
01616 #if CMK_REPLAYSYSTEM
01617 setEventID(env);
01618 #endif
01619 env->setMsgtype(ForChareMsg);
01620 env->setEpIdx(eIdx);
01621 env->setSrcPe(CkMyPe());
01622
01623 #if USE_CRITICAL_PATH_HEADER_ARRAY
01624 CK_CRITICALPATH_SEND(env)
01625
01626 #endif
01627 #if CMK_CHARMDEBUG
01628 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
01629 #endif
01630 #if CMK_OBJECT_QUEUE_AVAILABLE
01631 CmiSetHandler(env, index_objectQHandler);
01632 #else
01633 CmiSetHandler(env, _charmHandlerIdx);
01634 #endif
01635 if (pCid->onPE < 0) {
01636 int pe = -(pCid->onPE+1);
01637 if(pe==CkMyPe()) {
01638 #ifndef CMK_CHARE_USE_PTR
01639 VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
01640 #else
01641 VidBlock *vblk = (VidBlock *) pCid->objPtr;
01642 #endif
01643 void *objPtr;
01644 if (NULL!=(objPtr=vblk->getLocalChare()))
01645 {
01646 env->setObjPtr(objPtr);
01647 return pe;
01648 }
01649 else {
01650 vblk->send(env);
01651 return -1;
01652 }
01653 } else {
01654 env->setMsgtype(ForVidMsg);
01655 env->setVidPtr(pCid->objPtr);
01656 return pe;
01657 }
01658 }
01659 else {
01660 env->setObjPtr(pCid->objPtr);
01661 return pCid->onPE;
01662 }
01663 }
01664
01665 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
01666 {
01667 int destPE = _prepareMsg(eIdx, msg, pCid);
01668 if (destPE != -1) {
01669 envelope *env = UsrToEnv(msg);
01670
01671 #if USE_CRITICAL_PATH_HEADER_ARRAY
01672 CK_CRITICALPATH_SEND(env)
01673
01674 #endif
01675 CmiBecomeImmediate(env);
01676 }
01677 return destPE;
01678 }
01679
01680 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
01681 {
01682 if (opts & CK_MSG_INLINE) {
01683 CkSendMsgInline(entryIdx, msg, pCid, opts);
01684 return;
01685 }
01686 envelope *env = UsrToEnv(msg);
01687 #if CMK_ERROR_CHECKING
01688
01689 if (opts & CK_MSG_IMMEDIATE)
01690 #if CMK_ONESIDED_IMPL
01691 if (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)
01692 #endif
01693 CmiAbort("Immediate message is not allowed in Chare!");
01694 #endif
01695 int destPE=_prepareMsg(entryIdx,msg,pCid);
01696
01697
01698
01699
01700 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01701 if (destPE!=-1) {
01702 CpvAccess(_qd)->create();
01703 }
01704 sendChareMsg(env,destPE,_infoIdx,pCid);
01705 #else
01706 _TRACE_CREATION_1(env);
01707 if (destPE!=-1) {
01708 CpvAccess(_qd)->create();
01709 if (opts & CK_MSG_SKIP_OR_IMM)
01710 _noCldEnqueue(destPE, env);
01711 else
01712 _CldEnqueue(destPE, env, _infoIdx);
01713 }
01714 _TRACE_CREATION_DONE(1);
01715 #endif
01716 }
01717
01718 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
01719 {
01720 if (pCid->onPE==CkMyPe())
01721 {
01722 #if CMK_FAULT_EVAC
01723 if(!CmiNodeAlive(CkMyPe())){
01724 return;
01725 }
01726 #endif
01727 #if CMK_CHARMDEBUG
01728
01729 _prepareMsg(entryIndex,msg,pCid);
01730 #endif
01731
01732 envelope *env = UsrToEnv(msg);
01733 if (env->isPacked()) CkUnpackMessage(&env);
01734 _STATS_RECORD_PROCESS_MSG_1();
01735 _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
01736 }
01737 else {
01738
01739 CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
01740 }
01741 }
01742
01743 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
01744 {
01745 envelope *env = UsrToEnv(msg);
01746
01747
01748
01749
01750 _CHECK_USED(env);
01751 _SET_USED(env, 1);
01752 #if CMK_REPLAYSYSTEM
01753 setEventID(env);
01754 #endif
01755 env->setMsgtype(type);
01756 env->setEpIdx(eIdx);
01757 env->setGroupNum(gID);
01758 env->setSrcPe(CkMyPe());
01759
01760
01761
01762
01763
01764
01765
01766 #if USE_CRITICAL_PATH_HEADER_ARRAY
01767 CK_CRITICALPATH_SEND(env)
01768
01769 #endif
01770 #if CMK_CHARMDEBUG
01771 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
01772 #endif
01773 CmiSetHandler(env, _charmHandlerIdx);
01774 return env;
01775 }
01776
01777 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
01778 {
01779 envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
01780 #if USE_CRITICAL_PATH_HEADER_ARRAY
01781 CK_CRITICALPATH_SEND(env)
01782
01783 #endif
01784 CmiBecomeImmediate(env);
01785 return env;
01786 }
01787
01788 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
01789 int pe=CLD_BROADCAST_ALL, int opts = 0)
01790 {
01791 int numPes;
01792 envelope *env;
01793 if (opts & CK_MSG_IMMEDIATE) {
01794 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
01795 }else
01796 {
01797 env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01798 }
01799
01800 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01801 sendGroupMsg(env,pe,_infoIdx);
01802 #else
01803 _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
01804 _TRACE_CREATION_N(env, numPes);
01805 if (opts & CK_MSG_SKIP_OR_IMM)
01806 _noCldEnqueue(pe, env);
01807 else
01808 _skipCldEnqueue(pe, env, _infoIdx);
01809 _TRACE_CREATION_DONE(1);
01810 #endif
01811 }
01812
01813 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
01814 int npes, const int *pes)
01815 {
01816 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01817 _TRACE_CREATION_MULTICAST(env, npes, pes);
01818 _CldEnqueueMulti(npes, pes, env, _infoIdx);
01819 _TRACE_CREATION_DONE(1);
01820 }
01821
01822 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
01823 {
01824 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
01825 if (destPE==CkMyPe())
01826 {
01827 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
01828 return;
01829 }
01830
01831 envelope *env = UsrToEnv(msg);
01832 int numPes;
01833 _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
01834 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
01835 _TRACE_CREATION_N(env, numPes);
01836 _noCldEnqueue(destPE, env);
01837 _STATS_RECORD_SEND_BRANCH_1();
01838 CkpvAccess(_coreState)->create();
01839 _TRACE_CREATION_DONE(1);
01840 #else
01841
01842 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
01843 #endif
01844 }
01845
01846 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
01847 {
01848 if (destPE==CkMyPe())
01849 {
01850 #if CMK_FAULT_EVAC
01851 if(!CmiNodeAlive(CkMyPe())){
01852 return;
01853 }
01854 #endif
01855 IrrGroup *obj=(IrrGroup *)_localBranch(gID);
01856 if (obj!=NULL)
01857 {
01858 #if CMK_ERROR_CHECKING
01859 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01860 #else
01861 envelope *env=UsrToEnv(msg);
01862 #endif
01863 _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
01864 return;
01865 }
01866 }
01867
01868 CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
01869 }
01870
01871 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
01872 {
01873 if (opts & CK_MSG_INLINE) {
01874 CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
01875 return;
01876 }
01877 envelope *env=UsrToEnv(msg);
01878
01879 if (opts & CK_MSG_IMMEDIATE) {
01880 #if CMK_ONESIDED_IMPL
01881 if (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)
01882 #endif
01883 {
01884 CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
01885 return;
01886 }
01887 }
01888 _sendMsgBranch(eIdx, msg, gID, pe, opts);
01889 _STATS_RECORD_SEND_BRANCH_1();
01890 CkpvAccess(_coreState)->create();
01891 }
01892
01893 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,const int *pes)
01894 {
01895 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
01896 envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
01897 _TRACE_CREATION_MULTICAST(env, npes, pes);
01898 _noCldEnqueueMulti(npes, pes, env);
01899 _TRACE_CREATION_DONE(1);
01900 #else
01901 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
01902 CpvAccess(_qd)->create(-npes);
01903 #endif
01904 _STATS_RECORD_SEND_BRANCH_N(npes);
01905 CpvAccess(_qd)->create(npes);
01906 }
01907
01908 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,const int *pes, int opts)
01909 {
01910 if (opts & CK_MSG_IMMEDIATE) {
01911 CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
01912 return;
01913 }
01914
01915 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
01916 _STATS_RECORD_SEND_BRANCH_N(npes);
01917 CpvAccess(_qd)->create(npes);
01918 }
01919
01920 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
01921 {
01922 int npes;
01923 int *pes;
01924 if (opts & CK_MSG_IMMEDIATE) {
01925 CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
01926 return;
01927 }
01928
01929 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01930 CmiLookupGroup(grp, &npes, &pes);
01931 _TRACE_CREATION_MULTICAST(env, npes, pes);
01932 _CldEnqueueGroup(grp, env, _infoIdx);
01933 _TRACE_CREATION_DONE(1);
01934 _STATS_RECORD_SEND_BRANCH_N(npes);
01935 CpvAccess(_qd)->create(npes);
01936 }
01937
01938 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
01939 {
01940 _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
01941 _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
01942 CpvAccess(_qd)->create(CkNumPes());
01943 }
01944
01945 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
01946 int node=CLD_BROADCAST_ALL, int opts=0)
01947 {
01948 int numPes;
01949 envelope *env;
01950 if (opts & CK_MSG_IMMEDIATE) {
01951 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01952 }else
01953 {
01954 env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01955 }
01956 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01957 sendNodeGroupMsg(env,node,_infoIdx);
01958 #else
01959 numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
01960 _TRACE_CREATION_N(env, numPes);
01961 if (opts & CK_MSG_SKIP_OR_IMM) {
01962 _noCldNodeEnqueue(node, env);
01963 }
01964 else
01965 _CldNodeEnqueue(node, env, _infoIdx);
01966 _TRACE_CREATION_DONE(1);
01967 #endif
01968 }
01969
01970 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
01971 int npes, const int *nodes)
01972 {
01973 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01974 _TRACE_CREATION_N(env, npes);
01975 for (int i=0; i<npes; i++) {
01976 _CldNodeEnqueue(nodes[i], env, _infoIdx);
01977 }
01978 _TRACE_CREATION_DONE(1);
01979 }
01980
01981 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
01982 {
01983 #if CMK_IMMEDIATE_MSG
01984 if (node==CkMyNode())
01985 {
01986 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
01987 return;
01988 }
01989
01990 envelope *env = UsrToEnv(msg);
01991 int numPes;
01992 _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
01993 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01994 _TRACE_CREATION_N(env, numPes);
01995 _noCldNodeEnqueue(node, env);
01996 _STATS_RECORD_SEND_BRANCH_1();
01997 CkpvAccess(_coreState)->create();
01998 _TRACE_CREATION_DONE(1);
01999 #else
02000
02001 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
02002 #endif
02003 }
02004
02005 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
02006 {
02007 if (node==CkMyNode()) {
02008 #if CMK_ONESIDED_IMPL
02009 if (CMI_ZC_MSGTYPE(msg) == CMK_REG_NO_ZC_MSG)
02010 #endif
02011 {
02012 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
02013 void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
02014 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
02015 if (obj!=NULL)
02016 {
02017 #if CMK_ERROR_CHECKING
02018 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
02019 #else
02020 envelope *env=UsrToEnv(msg);
02021 #endif
02022 _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
02023 return;
02024 }
02025 }
02026 }
02027
02028 CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
02029 }
02030
02031 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
02032 {
02033 if (opts & CK_MSG_INLINE) {
02034 CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
02035 return;
02036 }
02037 if (opts & CK_MSG_IMMEDIATE) {
02038 CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
02039 return;
02040 }
02041 _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
02042 _STATS_RECORD_SEND_NODE_BRANCH_1();
02043 CkpvAccess(_coreState)->create();
02044 }
02045
02046 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,const int *nodes)
02047 {
02048 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
02049 envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
02050 _noCldEnqueueMulti(npes, nodes, env);
02051 #else
02052 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
02053 CpvAccess(_qd)->create(-npes);
02054 #endif
02055 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
02056 CpvAccess(_qd)->create(npes);
02057 }
02058
02059 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,const int *nodes, int opts)
02060 {
02061 if (opts & CK_MSG_IMMEDIATE) {
02062 CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
02063 return;
02064 }
02065
02066 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
02067 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
02068 CpvAccess(_qd)->create(npes);
02069 }
02070
02071 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
02072 {
02073 _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
02074 _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
02075 CpvAccess(_qd)->create(CkNumNodes());
02076 }
02077
02078
02079 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
02080 { return _prepareMsg(eIdx,msg,pCid); }
02081 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
02082 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
02083 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
02084 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
02085
02086 void _ckModuleInit(void) {
02087 CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
02088 #if CMK_OBJECT_QUEUE_AVAILABLE
02089 CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
02090 #endif
02091 CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
02092 CkpvInitialize(TokenPool*, _tokenPool);
02093 CkpvAccess(_tokenPool) = new TokenPool;
02094 }
02095
02096
02097
02098
02099 static void _prepareOutgoingArrayMsg(envelope *env,int type)
02100 {
02101 _CHECK_USED(env);
02102 _SET_USED(env, 1);
02103 env->setMsgtype(type);
02104 #if CMK_CHARMDEBUG
02105 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
02106 #endif
02107 CmiSetHandler(env, _charmHandlerIdx);
02108 CpvAccess(_qd)->create();
02109 }
02110
02111 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
02112 envelope *env = UsrToEnv(msg);
02113 _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
02114 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02115 sendArrayMsg(env,pe,_infoIdx);
02116 #else
02117 if (opts & CK_MSG_IMMEDIATE)
02118 CmiBecomeImmediate(env);
02119 if (opts & CK_MSG_SKIP_OR_IMM)
02120 _noCldEnqueue(pe, env);
02121 else
02122 _skipCldEnqueue(pe, env, _infoIdx);
02123 #endif
02124 }
02125
02126 class ElementDestroyer : public CkLocIterator {
02127 private:
02128 CkLocMgr *locMgr;
02129 public:
02130 ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
02131 void addLocation(CkLocation &loc) {
02132 loc.destroyAll();
02133 }
02134 };
02135
02136 void CkDeleteChares() {
02137 int i;
02138 int numGroups = CkpvAccess(_groupIDTable)->size();
02139
02140
02141 #ifndef CMK_CHARE_USE_PTR
02142 for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
02143 Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
02144 delete obj;
02145 CkpvAccess(chare_objs)[i] = NULL;
02146 }
02147 for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
02148 VidBlock *obj = CkpvAccess(vidblocks)[i];
02149 delete obj;
02150 CkpvAccess(vidblocks)[i] = NULL;
02151 }
02152 #endif
02153
02154
02155 for(i=0;i<numGroups;i++) {
02156 IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
02157 if(obj && obj->isLocMgr()) {
02158 CkLocMgr *mgr = (CkLocMgr*)obj;
02159 ElementDestroyer destroyer(mgr);
02160 mgr->iterate(destroyer);
02161 }
02162 }
02163
02164
02165 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
02166 for(i=0;i<numGroups;i++) {
02167 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
02168 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
02169 if (obj) delete obj;
02170 }
02171 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
02172
02173
02174 if (CkMyRank() == 0) {
02175 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
02176 for(i=0;i<numNodeGroups;i++) {
02177 CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
02178 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
02179 if (obj) delete obj;
02180 }
02181 }
02182 }
02183
02184 #if CMK_BIGSIM_CHARM
02185 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
02186 int pb,unsigned int *prio);
02187 #endif
02188
02189
02190
02191 static std::vector< std::vector<char> > ext_args;
02192 static std::vector<char*> ext_argv;
02193
02194
02195
02196
02197
02198 void StartCharmExt(int argc, char **argv) {
02199 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
02200
02201
02202 char *ns = getenv("NETSTART");
02203 if (ns != 0) {
02204 int fd;
02205 if (-1 != (fd = open("/dev/null", O_RDWR))) {
02206 dup2(fd, 0);
02207 dup2(fd, 1);
02208 dup2(fd, 2);
02209 }
02210 }
02211 #endif
02212 ext_args.resize(argc);
02213 ext_argv.resize(argc + 1, NULL);
02214 for (int i=0; i < argc; i++) {
02215 ext_args[i].resize(strlen(argv[i]) + 1);
02216 strcpy(ext_args[i].data(), argv[i]);
02217 ext_argv[i] = ext_args[i].data();
02218 }
02219 ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
02220 }
02221
02222 void (*CkRegisterMainModuleCallback)() = NULL;
02223 void registerCkRegisterMainModuleCallback(void (*cb)()) {
02224 CkRegisterMainModuleCallback = cb;
02225 }
02226
02227 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
02228 void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
02229 MainchareCtorExtCallback = cb;
02230 }
02231
02232 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
02233 void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
02234 ReadOnlyRecvExtCallback = cb;
02235 }
02236
02237 void* ReadOnlyExt::ro_data = NULL;
02238 size_t ReadOnlyExt::data_size = 0;
02239
02240 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
02241 void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
02242 ChareMsgRecvExtCallback = cb;
02243 }
02244
02245 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
02246 void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
02247 GroupMsgRecvExtCallback = cb;
02248 }
02249
02250 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
02251 void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
02252 ArrayMsgRecvExtCallback = cb;
02253 }
02254
02255 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
02256 void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
02257 ArrayElemLeaveExt = cb;
02258 }
02259
02260 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
02261 void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
02262 ArrayElemJoinExt = cb;
02263 }
02264
02265 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
02266 void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
02267 ArrayResumeFromSyncExtCallback = cb;
02268 }
02269
02270 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
02271 void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
02272 CreateReductionTargetMsgExt = cb;
02273 }
02274
02275 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
02276 void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
02277 PyReductionExt = cb;
02278 }
02279
02280 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
02281 void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
02282 ArrayMapProcNumExtCallback = cb;
02283 }
02284
02285 int CkMyPeHook() { return CkMyPe(); }
02286 int CkNumPesHook() { return CkNumPes(); }
02287
02288 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
02289 ro_data = malloc(msgSize);
02290 memcpy(ro_data, msg, msgSize);
02291 data_size = msgSize;
02292 }
02293
02294 void ReadOnlyExt::_roPup(void *pup_er) {
02295 PUP::er &p=*(PUP::er *)pup_er;
02296 if (!p.isUnpacking()) {
02297
02298 p | data_size;
02299 p((char*)ro_data, data_size);
02300 } else {
02301 CkAssert(CkMyPe() != 0);
02302 CkAssert(ro_data == NULL);
02303 PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
02304 p_mem | data_size;
02305
02306 ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
02307 p_mem.advance(data_size);
02308 }
02309 }
02310
02311 CkpvExtern(int, _currentChareType);
02312
02313 MainchareExt::MainchareExt(CkArgMsg *m) {
02314 int cIdx = CkpvAccess(_currentChareType);
02315
02316 int ctorEpIdx = _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
02317 MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
02318 delete m;
02319 }
02320
02321 GroupExt::GroupExt(void *impl_msg) {
02322
02323
02324 int chareIdx = ckGetChareType();
02325 int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
02326 CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
02327 char *impl_buf = impl_msg_typed->msgBuf;
02328 PUP::fromMem implP(impl_buf);
02329 int msgSize; implP|msgSize;
02330 int dcopy_start; implP|dcopy_start;
02331 GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
02332 dcopy_start);
02333 }
02334
02335 ArrayMapExt::ArrayMapExt(void *impl_msg) {
02336
02337 int chareIdx = ckGetChareType();
02338 int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
02339 CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
02340 char *impl_buf = impl_msg_typed->msgBuf;
02341 PUP::fromMem implP(impl_buf);
02342 int msgSize; implP|msgSize;
02343 int dcopy_start; implP|dcopy_start;
02344 GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
02345 dcopy_start);
02346 }
02347
02348
02349 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
02350
02351 CkAssert(num_bufs >= 1);
02352 int totalSize = 0;
02353 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02354 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
02355 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02356 PUP::toMem implP((void *)impl_msg->msgBuf);
02357 implP|totalSize;
02358 implP|buf_sizes[0];
02359 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02360 UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
02361
02362
02363 CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
02364 return gId.idx;
02365 }
02366
02367
02368 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
02369 char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
02370
02371 CkAssert(num_bufs >= 1);
02372 int totalSize = 0;
02373 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02374 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
02375 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02376 PUP::toMem implP((void *)impl_msg->msgBuf);
02377 implP|useAtSync;
02378 implP|totalSize;
02379 implP|buf_sizes[0];
02380 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02381 CkArrayOptions opts;
02382 if (ndims != -1)
02383 opts = CkArrayOptions(ndims, dims);
02384 if (map_gid >= 0) {
02385 CkGroupID map_gId;
02386 map_gId.idx = map_gid;
02387 opts.setMap(CProxy_Group(map_gId));
02388 }
02389 UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
02390
02391 CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
02392 return gId.idx;
02393 }
02394
02395
02396 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
02397 char **bufs, int *buf_sizes, char useAtSync) {
02398 CkAssert(num_bufs >= 1);
02399 int totalSize = 0;
02400 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02401 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
02402 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02403 PUP::toMem implP((void *)impl_msg->msgBuf);
02404 implP|useAtSync;
02405 implP|totalSize;
02406 implP|buf_sizes[0];
02407 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02408
02409 UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
02410 CkArrayIndex newIdx(ndims, index);
02411 CkGroupID gId;
02412 gId.idx = aid;
02413 CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
02414 }
02415
02416 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
02417
02418
02419 CkGroupID gId;
02420 gId.idx = aid;
02421 CkArrayIndex arrayIndex(ndims, index);
02422 CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
02423 ArrayElement* arrayElement = arrayProxy.ckLocal();
02424 CkAssert(arrayElement != NULL);
02425 arrayElement->migrateMe(toPe);
02426 }
02427
02428 void CkArrayDoneInsertingExt(int aid) {
02429 CkGroupID gId;
02430 gId.idx = aid;
02431 CProxy_ArrayBase(gId).doneInserting();
02432 }
02433
02434 int CkGroupGetReductionNumber(int g_id) {
02435 CkGroupID gId;
02436 gId.idx = g_id;
02437 return ((Group*)CkLocalBranch(gId))->getRedNo();
02438 }
02439
02440 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
02441 CkGroupID gId;
02442 gId.idx = aid;
02443 CkArrayIndex arrayIndex(ndims, index);
02444 CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
02445 ArrayElement* arrayElement = arrayProxy.ckLocal();
02446 CkAssert(arrayElement != NULL);
02447 return arrayElement->getRedNo();
02448 }
02449
02450 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
02451
02452 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
02453 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02454 PUP::toMem implP((void *)impl_msg->msgBuf);
02455 implP|msgSize;
02456 implP|epIdx;
02457 int d=0; implP|d;
02458 implP(msg, msgSize);
02459 CkChareID chareID;
02460 chareID.onPE = onPE;
02461 chareID.objPtr = objPtr;
02462
02463 CkSendMsg(epIdx, impl_msg, &chareID);
02464 }
02465
02466 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
02467 CkAssert(num_bufs >= 1);
02468
02469 int totalSize = 0;
02470 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02471 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
02472 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02473 PUP::toMem implP((void *)impl_msg->msgBuf);
02474 implP | totalSize;
02475 implP | epIdx;
02476 implP | buf_sizes[0];
02477 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02478 CkChareID chareID;
02479 chareID.onPE = onPE;
02480 chareID.objPtr = objPtr;
02481
02482 CkSendMsg(epIdx, impl_msg, &chareID);
02483 }
02484
02485 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
02486
02487 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
02488 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02489 PUP::toMem implP((void *)impl_msg->msgBuf);
02490 implP|msgSize;
02491 implP|epIdx;
02492 int d=0; implP|d;
02493 implP(msg, msgSize);
02494 CkGroupID gId;
02495 gId.idx = gid;
02496
02497 if (pe == -1)
02498 CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
02499 else
02500 CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
02501 }
02502
02503 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
02504 CkAssert(num_bufs >= 1);
02505
02506 int totalSize = 0;
02507 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02508 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
02509 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02510 PUP::toMem implP((void *)impl_msg->msgBuf);
02511 implP | totalSize;
02512 implP | epIdx;
02513 implP | buf_sizes[0];
02514 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02515 CkGroupID gId;
02516 gId.idx = gid;
02517
02518 if (pe == -1)
02519 CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
02520 else
02521 CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
02522 }
02523
02524 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
02525 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
02526 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02527 PUP::toMem implP((void *)impl_msg->msgBuf);
02528 implP|msgSize;
02529 implP|epIdx;
02530 int d=0; implP|d;
02531 implP(msg, msgSize);
02532 UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
02533 CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
02534 impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
02535 CkGroupID gId;
02536 gId.idx = aid;
02537 if (ndims > 0) {
02538 CkArrayIndex arrIndex(ndims, idx);
02539
02540 CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
02541 } else {
02542 CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
02543 }
02544 }
02545
02546 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
02547 CkAssert(num_bufs >= 1);
02548 int totalSize = 0;
02549 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
02550 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
02551 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
02552 PUP::toMem implP((void *)impl_msg->msgBuf);
02553 implP | totalSize;
02554 implP | epIdx;
02555 implP | buf_sizes[0];
02556 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
02557 UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
02558 CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
02559 impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
02560 CkGroupID gId;
02561 gId.idx = aid;
02562 if (ndims > 0) {
02563 CkArrayIndex arrIndex(ndims, idx);
02564
02565 CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
02566 } else {
02567 CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
02568 }
02569 }
02570
02571
02572
02573 #include "crc32.h"
02574
02575 CkpvDeclare(int, envelopeEventID);
02576 int _recplay_crc = 0;
02577 int _recplay_checksum = 0;
02578 int _recplay_logsize = 1024*1024;
02579
02580
02581 #define REPLAYDEBUG(args)
02582
02583 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
02584
02585 #include "trace-common.h"
02586 #include "BaseLB.h"
02587
02588 #if CMK_REPLAYSYSTEM
02589 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
02590 std::string fName = CkpvAccess(traceRoot);
02591 fName += prefix;
02592 fName += std::to_string(CkMyPe());
02593 fName += suffix;
02594 FILE *f = fopen(fName.c_str(), permissions);
02595 REPLAYDEBUG("openReplayfile " << fName.c_str());
02596 if (f==NULL) {
02597 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
02598 CkMyPe(), fName.c_str(), permissions);
02599 CkAbort("openReplayFile> Could not open replay file");
02600 }
02601 return f;
02602 }
02603
02604 class CkMessageRecorder : public CkMessageWatcher {
02605 unsigned int curpos;
02606 bool firstOpen;
02607 std::vector<char> buffer;
02608 public:
02609 CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
02610 ~CkMessageRecorder() {
02611 flushLog(0);
02612 fprintf(f,"-1 -1 -1 ");
02613 fclose(f);
02614 #if 0
02615 FILE *stsfp = fopen("sts", "w");
02616 void traceWriteSTS(FILE *stsfp,int nUserEvents);
02617 traceWriteSTS(stsfp, 0);
02618 fclose(stsfp);
02619 #endif
02620 CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
02621 }
02622
02623 private:
02624 void flushLog(int verbose=1) {
02625 if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
02626 fprintf(f, "%s", buffer.data());
02627 curpos=0;
02628 }
02629 virtual bool process(envelope **envptr,CkCoreState *ck) {
02630 if ((*envptr)->getEvent()) {
02631 bool wasPacked = (*envptr)->isPacked();
02632 if (!wasPacked) CkPackMessage(envptr);
02633 envelope *env = *envptr;
02634 unsigned int crc1=0, crc2=0;
02635 if (_recplay_crc) {
02636
02637 crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02638 crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02639 } else if (_recplay_checksum) {
02640 crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02641 crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02642 }
02643 curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
02644 if (curpos > _recplay_logsize-128) flushLog();
02645 if (!wasPacked) CkUnpackMessage(envptr);
02646 }
02647 return true;
02648 }
02649 virtual bool process(CthThreadToken *token,CkCoreState *ck) {
02650 curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
02651 if (curpos > _recplay_logsize-128) flushLog();
02652 return true;
02653 }
02654
02655 virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
02656 FILE *f;
02657 if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
02658 else f = openReplayFile("ckreplay_",".lb","a");
02659 firstOpen = false;
02660 if (f != NULL) {
02661 PUP::toDisk p(f);
02662 p | (*msg)->n_moves;
02663 (*msg)->pup(p);
02664 fclose(f);
02665 }
02666 return true;
02667 }
02668 };
02669
02670 class CkMessageDetailRecorder : public CkMessageWatcher {
02671 public:
02672 CkMessageDetailRecorder(FILE *f_) {
02673 f=f_;
02674
02675
02676
02677 CmiUInt2 little = sizeof(void*);
02678 fwrite(&little, 2, 1, f);
02679 }
02680 ~CkMessageDetailRecorder() {fclose(f);}
02681 private:
02682 virtual bool process(envelope **envptr, CkCoreState *ck) {
02683 bool wasPacked = (*envptr)->isPacked();
02684 if (!wasPacked) CkPackMessage(envptr);
02685 envelope *env = *envptr;
02686 CmiUInt4 size = env->getTotalsize();
02687 fwrite(&size, 4, 1, f);
02688 fwrite(env, env->getTotalsize(), 1, f);
02689 if (!wasPacked) CkUnpackMessage(envptr);
02690 return true;
02691 }
02692 };
02693
02694 void CkMessageReplayQuiescence(void *rep, double time);
02695 void CkMessageDetailReplayDone(void *rep, double time);
02696
02697 class CkMessageReplay : public CkMessageWatcher {
02698 int counter;
02699 int nextPE, nextSize, nextEvent, nexttype;
02700 int nextEP;
02701 unsigned int crc1, crc2;
02702 FILE *lbFile;
02704 void getNext(void) {
02705 if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
02706 if (nextSize > 0) {
02707
02708 if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
02709 CkAbort("CkMessageReplay> Syntax error reading replay file");
02710 }
02711 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
02712 } else if (nextSize == -2) {
02713
02714
02715 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
02716 } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
02717 CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
02718 CkAbort("CkMessageReplay> Unrecognized input");
02719 }
02720
02721
02722
02723
02724
02725
02726 counter++;
02727 }
02729 bool isNext(envelope *env) {
02730 if (nextPE!=env->getSrcPe()) return false;
02731 if (nextEvent!=env->getEvent()) return false;
02732 if (nextSize<0) return false;
02733 #if 1
02734 if (nextEP != env->getEpIdx()) {
02735 CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
02736 return false;
02737 }
02738 #endif
02739 #if ! CMK_BIGSIM_CHARM
02740 if (nextSize!=env->getTotalsize())
02741 {
02742 CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
02743 return false;
02744 }
02745 if (_recplay_crc || _recplay_checksum) {
02746 bool wasPacked = env->isPacked();
02747 if (!wasPacked) CkPackMessage(&env);
02748 if (_recplay_crc) {
02749
02750 unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02751 unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02752 if (crcnew1 != crc1) {
02753 CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
02754 }
02755 if (crcnew2 != crc2) {
02756 CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
02757 }
02758 } else if (_recplay_checksum) {
02759 unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02760 unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02761 if (crcnew1 != crc1) {
02762 CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
02763 }
02764 if (crcnew2 != crc2) {
02765 CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
02766 }
02767 }
02768 if (!wasPacked) CkUnpackMessage(&env);
02769 }
02770 #endif
02771 return true;
02772 }
02773 bool isNext(CthThreadToken *token) {
02774 if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
02775 return false;
02776 }
02777
02779 CkQ<envelope *> delayedMessages;
02781 CkQ<CthThreadToken *> delayedTokens;
02782
02784 void flush(void) {
02785 if (nextSize>0) {
02786 int len=delayedMessages.length();
02787 for (int i=0;i<len;i++) {
02788 envelope *env=delayedMessages.deq();
02789 if (isNext(env)) {
02790 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
02791 CsdEnqueueLifo((void*)env);
02792 return;
02793 }
02794 else
02795
02796 {
02797 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
02798 delayedMessages.enq(env);
02799 }
02800 }
02801 } else if (nextSize==-2) {
02802 int len=delayedTokens.length();
02803 for (int i=0;i<len;++i) {
02804 CthThreadToken *token=delayedTokens.deq();
02805 if (isNext(token)) {
02806 REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
02807 #if ! CMK_BIGSIM_CHARM
02808 CsdEnqueueLifo((void*)token);
02809 #else
02810 CthEnqueueBigSimThread(token,0,0,NULL);
02811 #endif
02812 return;
02813 } else {
02814 REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
02815 delayedTokens.enq(token);
02816 }
02817 }
02818 }
02819 }
02820
02821 public:
02822 CkMessageReplay(FILE *f_) : lbFile(NULL) {
02823 counter=0;
02824 f=f_;
02825 getNext();
02826 REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
02827 #if CMI_QD
02828 if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
02829 #endif
02830 }
02831 ~CkMessageReplay() {fclose(f);}
02832
02833 private:
02834 virtual bool process(envelope **envptr,CkCoreState *ck) {
02835 bool wasPacked = (*envptr)->isPacked();
02836 if (!wasPacked) CkPackMessage(envptr);
02837 envelope *env = *envptr;
02838
02839 REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
02840 if (env->getEvent() == 0) return true;
02841 if (isNext(env)) {
02842 REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
02843 getNext();
02844 flush();
02845 if (!wasPacked) CkUnpackMessage(envptr);
02846 return true;
02847 }
02848 #if CMK_SMP
02849 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
02850
02851
02852 int nextpe = CkMyPe()+1;
02853 if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
02854 nextpe = CkNodeFirst(CkMyNode());
02855 CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
02856 return false;
02857 }
02858 #endif
02859 else {
02860 REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
02861 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
02862 delayedMessages.enq(env);
02863 flush();
02864 return false;
02865 }
02866 }
02867 virtual bool process(CthThreadToken *token, CkCoreState *ck) {
02868 REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
02869 if (isNext(token)) {
02870 REPLAYDEBUG("Executing token: "<<token->serialNo)
02871 getNext();
02872 flush();
02873 return true;
02874 } else {
02875 REPLAYDEBUG("Queueing token: "<<token->serialNo
02876 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
02877 delayedTokens.enq(token);
02878 return false;
02879 }
02880 }
02881
02882 virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
02883 if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
02884 if (lbFile != NULL) {
02885 int num_moves = 0;
02886 PUP::fromDisk p(lbFile);
02887 p | num_moves;
02888 if (num_moves != (*msg)->n_moves) {
02889 delete *msg;
02890 *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
02891 }
02892 (*msg)->pup(p);
02893 }
02894 return true;
02895 }
02896 };
02897
02898 class CkMessageDetailReplay : public CkMessageWatcher {
02899 void *getNext() {
02900 CmiUInt4 size; size_t nread;
02901 if ((nread=fread(&size, 4, 1, f)) < 1) {
02902 if (feof(f)) return NULL;
02903 CkPrintf("Broken record file (metadata) got %d\n",nread);
02904 CkAbort("");
02905 }
02906 void *env = CmiAlloc(size);
02907 long tell = ftell(f);
02908 if ((nread=fread(env, size, 1, f)) < 1) {
02909 CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
02910 CkAbort("");
02911 }
02912
02913 return env;
02914 }
02915 public:
02916 double starttime;
02917 CkMessageDetailReplay(FILE *f_) {
02918 f=f_;
02919 starttime=CkWallTimer();
02920
02921 CmiUInt2 little;
02922 fread(&little, 2, 1, f);
02923 if (little != sizeof(void*)) {
02924 CkAbort("Replaying on a different architecture from which recording was done!");
02925 }
02926
02927 CsdEnqueue(getNext());
02928
02929 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
02930 }
02931 virtual bool process(envelope **env,CkCoreState *ck) {
02932 void *msg = getNext();
02933 if (msg != NULL) CsdEnqueue(msg);
02934 return true;
02935 }
02936 };
02937
02938 void CkMessageReplayQuiescence(void *rep, double time) {
02939 #if ! CMK_BIGSIM_CHARM
02940 CkPrintf("[%d] Quiescence detected\n",CkMyPe());
02941 #endif
02942 CkMessageReplay *replay = (CkMessageReplay*)rep;
02943
02944 }
02945
02946 void CkMessageDetailReplayDone(void *rep, double time) {
02947 CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
02948 CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
02949 ConverseExit();
02950 }
02951 #endif
02952
02953 static bool CpdExecuteThreadResume(CthThreadToken *token) {
02954 CkCoreState *ck = CkpvAccess(_coreState);
02955 if (ck->watcher!=NULL) {
02956 return ck->watcher->processThread(token,ck);
02957 }
02958 return true;
02959 }
02960
02961 CpvExtern(int, CthResumeNormalThreadIdx);
02962 void CthResumeNormalThreadDebug(CthThreadToken* token)
02963 {
02964 CthThread t = token->thread;
02965
02966 if(t == NULL){
02967 free(token);
02968 return;
02969 }
02970 #if CMK_TRACE_ENABLED
02971 #if ! CMK_TRACE_IN_CHARM
02972 if(CpvAccess(traceOn))
02973 CthTraceResume(t);
02974
02975
02976 #endif
02977 #endif
02978 #if CMK_OMP
02979 CthSetPrev(t, CthSelf());
02980 #endif
02981
02982 if (CpdExecuteThreadResume(token)) {
02983 CthResume(t);
02984 }
02985 #if CMK_OMP
02986 CthScheduledDecrement();
02987 CthSetPrev(CthSelf(), 0);
02988 #endif
02989 }
02990
02991 void CpdHandleLBMessage(LBMigrateMsg **msg) {
02992 CkCoreState *ck = CkpvAccess(_coreState);
02993 if (ck->watcher!=NULL) {
02994 ck->watcher->processLBMessage(msg, ck);
02995 }
02996 }
02997
02998 #if CMK_BIGSIM_CHARM
02999 CpvExtern(int , CthResumeBigSimThreadIdx);
03000 #endif
03001
03002 #include "ckliststring.h"
03003 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
03004 CmiArgGroup("Charm++","Record/Replay");
03005 bool forceReplay = false;
03006 char *procs = NULL;
03007 _replaySystem = 0;
03008 if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
03009 if(CmiMyRank() == 0) _recplay_crc = 1;
03010 }
03011 if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
03012 if(CmiMyRank() == 0) _recplay_checksum = 1;
03013 }
03014 int tmplogsize;
03015 if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
03016 {
03017 if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
03018 }
03019 REPLAYDEBUG("CkMessageWatcherInit ");
03020 if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
03021 #if CMK_REPLAYSYSTEM
03022 CkListString list(procs);
03023 if (list.includes(CkMyPe())) {
03024 CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
03025 CpdSetInitializeMemory(1);
03026 ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
03027 }
03028 #else
03029 CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
03030 #endif
03031 }
03032 if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
03033 #if CMK_REPLAYSYSTEM
03034 if (CkMyPe() == 0) {
03035 CmiPrintf("Charm++> record mode.\n");
03036 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
03037 CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
03038 _recplay_crc = _recplay_checksum = 0;
03039 }
03040 }
03041 CpdSetInitializeMemory(1);
03042 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
03043 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
03044 #else
03045 CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
03046 #endif
03047 }
03048 if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
03049 #if CMK_REPLAYSYSTEM
03050 forceReplay = true;
03051 CpdSetInitializeMemory(1);
03052
03053 #if CMK_SHARED_VARS_UNAVAILABLE
03054 _Cmi_mype = atoi(procs);
03055 while (procs[0]!='/') procs++;
03056 procs++;
03057 _Cmi_numpes = atoi(procs);
03058 #else
03059 CkAbort("+replay-detail available only for non-SMP build");
03060 #endif
03061 _replaySystem = 1;
03062 ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
03063 #else
03064 CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
03065 #endif
03066 }
03067 if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
03068 #if CMK_REPLAYSYSTEM
03069 if (CkMyPe() == 0) {
03070 CmiPrintf("Charm++> replay mode.\n");
03071 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
03072 CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
03073 _recplay_crc = _recplay_checksum = 0;
03074 }
03075 }
03076 CpdSetInitializeMemory(1);
03077 #if ! CMK_BIGSIM_CHARM
03078 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
03079 #else
03080 CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
03081 #endif
03082 ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
03083 #else
03084 CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
03085 #endif
03086 }
03087 if (_recplay_crc && _recplay_checksum) {
03088 CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
03089 }
03090 }
03091
03092 int CkMessageToEpIdx(void *msg) {
03093 envelope *env=UsrToEnv(msg);
03094 int ep=env->getEpIdx();
03095 if (ep==CkIndex_CkArray::recvBroadcast(0))
03096 return env->getsetArrayBcastEp();
03097 else
03098 return ep;
03099 }
03100
03101 int getCharmEnvelopeSize() {
03102 return sizeof(envelope);
03103 }
03104
03106 int isCharmEnvelope(void *msg) {
03107 envelope *e = (envelope *)msg;
03108 if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
03109 if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
03110 if (e->getTotalsize() < sizeof(envelope)) return 0;
03111 if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
03112 #if CMK_SMP
03113 if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
03114 #else
03115 if (e->getSrcPe()>=CkNumPes()) return 0;
03116 #endif
03117 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
03118 return 1;
03119 }
03120
03121 #include "CkMarshall.def.h"