00001
00009 #include "ck.h"
00010 #include "trace.h"
00011 #include "queueing.h"
00012
00013 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00014 #include "pathHistory.h"
00015 void automaticallySetMessagePriority(envelope *env);
00016 #endif
00017
00018 #if CMK_LBDB_ON
00019 #include "LBDatabase.h"
00020 #endif // CMK_LBDB_ON
00021
00022 #ifndef CMK_CHARE_USE_PTR
00023 #include <map>
00024 CkpvDeclare(CkVec<void *>, chare_objs);
00025 CkpvDeclare(CkVec<int>, chare_types);
00026 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
00027
00028 typedef std::map<int, CkChareID> Vidblockmap;
00029 CkpvDeclare(Vidblockmap, vmap);
00030 CkpvDeclare(int, currentChareIdx);
00031 #endif
00032
00033
00034 #define CK_MSG_SKIP_OR_IMM (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
00035
00036 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
00037
00038 int CMessage_CkMessage::__idx=-1;
00039 int CMessage_CkArgMsg::__idx=0;
00040 int CkIndex_Chare::__idx;
00041 int CkIndex_Group::__idx;
00042 int CkIndex_ArrayBase::__idx=-1;
00043
00044 extern int _defaultObjectQ;
00045
00046 void _initChareTables()
00047 {
00048 #ifndef CMK_CHARE_USE_PTR
00049
00050 CkpvInitialize(CkVec<void *>, chare_objs);
00051 CkpvInitialize(CkVec<int>, chare_types);
00052 CkpvInitialize(CkVec<VidBlock *>, vidblocks);
00053 CkpvInitialize(Vidblockmap, vmap);
00054 CkpvInitialize(int, currentChareIdx);
00055 CkpvAccess(currentChareIdx) = -1;
00056 #endif
00057 }
00058
00059
00060 Chare::Chare(void) {
00061 thishandle.onPE=CkMyPe();
00062 thishandle.objPtr=this;
00063 #if CMK_ERROR_CHECKING
00064 magic = CHARE_MAGIC;
00065 #endif
00066 #ifndef CMK_CHARE_USE_PTR
00067
00068 if (CkpvAccess(currentChareIdx) >= 0) {
00069 thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
00070 }
00071 chareIdx = CkpvAccess(currentChareIdx);
00072 #endif
00073 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00074 mlogData = new ChareMlogData();
00075 mlogData->objID.type = TypeChare;
00076 mlogData->objID.data.chare.id = thishandle;
00077 #endif
00078 #if CMK_OBJECT_QUEUE_AVAILABLE
00079 if (_defaultObjectQ) CkEnableObjQ();
00080 #endif
00081 }
00082
00083 Chare::Chare(CkMigrateMessage* m) {
00084 thishandle.onPE=CkMyPe();
00085 thishandle.objPtr=this;
00086 #if CMK_ERROR_CHECKING
00087 magic = 0;
00088 #endif
00089
00090 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00091 mlogData = NULL;
00092 #endif
00093
00094 #if CMK_OBJECT_QUEUE_AVAILABLE
00095 if (_defaultObjectQ) CkEnableObjQ();
00096 #endif
00097 }
00098
00099 void Chare::CkEnableObjQ()
00100 {
00101 #if CMK_OBJECT_QUEUE_AVAILABLE
00102 objQ.create();
00103 #endif
00104 }
00105
00106 Chare::~Chare() {
00107 #ifndef CMK_CHARE_USE_PTR
00108
00109
00110
00111 if (chareIdx != -1)
00112 {
00113 CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
00114 CkpvAccess(chare_objs)[chareIdx] = NULL;
00115 Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
00116 if (iter != CkpvAccess(vmap).end()) {
00117 register CkChareID *pCid = (CkChareID *)
00118 _allocMsg(DeleteVidMsg, sizeof(CkChareID));
00119 int srcPe = iter->second.onPE;
00120 *pCid = iter->second;
00121 register envelope *ret = UsrToEnv(pCid);
00122 ret->setVidPtr(iter->second.objPtr);
00123 ret->setSrcPe(CkMyPe());
00124 CmiSetHandler(ret, _charmHandlerIdx);
00125 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
00126 CpvAccess(_qd)->create();
00127 CkpvAccess(vmap).erase(iter);
00128 }
00129 }
00130 #endif
00131 }
00132
00133 void Chare::pup(PUP::er &p)
00134 {
00135 p(thishandle.onPE);
00136 thishandle.objPtr=(void *)this;
00137 #ifndef CMK_CHARE_USE_PTR
00138 p(chareIdx);
00139 if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
00140 #endif
00141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00142 if(p.isUnpacking()){
00143 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
00144 mlogData = new ChareMlogData();
00145 }
00146 mlogData->pup(p);
00147 #endif
00148 #if CMK_ERROR_CHECKING
00149 p(magic);
00150 #endif
00151 }
00152
00153 int Chare::ckGetChareType() const {
00154 return -3;
00155 }
00156 char *Chare::ckDebugChareName(void) {
00157 char buf[100];
00158 sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
00159 return strdup(buf);
00160 }
00161 int Chare::ckDebugChareID(char *str, int limit) {
00162
00163 str[0] = 0;
00164 return 1;
00165 }
00166 void Chare::ckDebugPup(PUP::er &p) {
00167 pup(p);
00168 }
00169
00171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
00172 CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
00173 traceAddThreadListeners(th, UsrToEnv(msg));
00174 }
00175
00176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
00177 p.comment("Bytes");
00178 int ts=UsrToEnv(msg)->getTotalsize();
00179 int msgLen=ts-sizeof(envelope);
00180 if (msgLen>0)
00181 p((char*)msg,msgLen);
00182 }
00183
00184 IrrGroup::IrrGroup(void) {
00185 thisgroup = CkpvAccess(_currentGroup);
00186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00187 mlogData->objID.type = TypeGroup;
00188 mlogData->objID.data.group.id = thisgroup;
00189 mlogData->objID.data.group.onPE = CkMyPe();
00190 #endif
00191 }
00192
00193 IrrGroup::~IrrGroup() {
00194
00195 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00196 CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
00197 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00198 }
00199
00200 void IrrGroup::pup(PUP::er &p)
00201 {
00202 Chare::pup(p);
00203 p|thisgroup;
00204 }
00205
00206 int IrrGroup::ckGetChareType() const {
00207 return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
00208 }
00209
00210 int IrrGroup::ckDebugChareID(char *str, int limit) {
00211 if (limit<5) return -1;
00212 str[0] = 1;
00213 *((int*)&str[1]) = thisgroup.idx;
00214 return 5;
00215 }
00216
00217 char *IrrGroup::ckDebugChareName() {
00218 return strdup(_chareTable[ckGetChareType()]->name);
00219 }
00220
00221 void IrrGroup::ckJustMigrated(void)
00222 {
00223 }
00224
00225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
00226
00227 }
00228
00229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
00230 Chare::CkAddThreadListeners(th, msg);
00231 CthSetThreadID(th, thisgroup.idx, 0, 0);
00232 }
00233
00234 void Group::pup(PUP::er &p)
00235 {
00236 CkReductionMgr::pup(p);
00237 reductionInfo.pup(p);
00238 }
00239
00240
00241 CkDelegateMgr::~CkDelegateMgr() { }
00242
00243
00244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
00245 { CkSendMsg(ep,m,c); }
00246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
00247 { CkSendMsgBranch(ep,m,onPE,g); }
00248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
00249 { CkBroadcastMsgBranch(ep,m,g); }
00250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
00251 { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
00252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
00253 { CkSendMsgNodeBranch(ep,m,onNode,g); }
00254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
00255 { CkBroadcastMsgNodeBranch(ep,m,g); }
00256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
00257 { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
00258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
00259 {
00260 CProxyElement_ArrayBase ap(a,idx);
00261 ap.ckInsert((CkArrayMessage *)m,ep,onPE);
00262 }
00263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
00264 {
00265 CProxyElement_ArrayBase ap(a,idx);
00266 ap.ckSend((CkArrayMessage *)m,ep);
00267 }
00268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
00269 {
00270 CProxy_ArrayBase ap(a);
00271 ap.ckBroadcast((CkArrayMessage *)m,ep);
00272 }
00273
00274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
00275 {
00276 CmiAbort("ArraySectionSend is not implemented!\n");
00277
00278
00279
00280
00281 }
00282
00283
00284 CkDelegateData::~CkDelegateData() {}
00285
00286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
00287 return pd;
00288 }
00289
00293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
00294 if (dPtr) dPtr->ref();
00295 ckUndelegate();
00296 delegatedMgr = dTo;
00297 delegatedPtr = dPtr;
00298 delegatedGroupId = delegatedMgr->CkGetGroupID();
00299 isNodeGroup = delegatedMgr->isNodeGroup();
00300 }
00301 void CProxy::ckUndelegate(void) {
00302 delegatedMgr=NULL;
00303 delegatedGroupId.setZero();
00304 if (delegatedPtr) delegatedPtr->unref();
00305 delegatedPtr=NULL;
00306 }
00307
00309 CProxy::CProxy(const CProxy &src)
00310 :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId),
00311 isNodeGroup(src.isNodeGroup) {
00312 delegatedPtr = NULL;
00313 if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
00314 delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
00315 }
00316 }
00317
00319 CProxy& CProxy::operator=(const CProxy &src) {
00320 CkDelegateData *oldPtr=delegatedPtr;
00321 ckUndelegate();
00322 delegatedMgr=src.delegatedMgr;
00323 delegatedGroupId = src.delegatedGroupId;
00324 isNodeGroup = src.isNodeGroup;
00325
00326 if(delegatedMgr != NULL && src.delegatedPtr != NULL)
00327 delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
00328 else
00329 delegatedPtr = NULL;
00330
00331
00332 if (oldPtr) oldPtr->unref();
00333 return *this;
00334 }
00335
00336 void CProxy::pup(PUP::er &p) {
00337 if (!p.isUnpacking()) {
00338 if (ckDelegatedTo() != NULL) {
00339 delegatedGroupId = delegatedMgr->CkGetGroupID();
00340 isNodeGroup = delegatedMgr->isNodeGroup();
00341 }
00342 }
00343 p|delegatedGroupId;
00344 if (!delegatedGroupId.isZero()) {
00345 p|isNodeGroup;
00346 if (p.isUnpacking()) {
00347 delegatedMgr = ckDelegatedTo();
00348 }
00349
00350 int migCtor, cIdx;
00351 if (!p.isUnpacking()) {
00352 if (isNodeGroup) {
00353 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00354 cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx();
00355 migCtor = _chareTable[cIdx]->migCtor;
00356 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00357 }
00358 else {
00359 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00360 cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
00361 migCtor = _chareTable[cIdx]->migCtor;
00362 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00363 }
00364 }
00365
00366 p|migCtor;
00367
00368
00369
00370 if (delegatedMgr == NULL) {
00371
00372
00373 int objId = _entryTable[migCtor]->chareIdx;
00374 int objSize = _chareTable[objId]->size;
00375 void *obj = malloc(objSize);
00376 _entryTable[migCtor]->call(NULL, obj);
00377 delegatedPtr = static_cast<CkDelegateMgr *> (obj)
00378 ->DelegatePointerPup(p, delegatedPtr);
00379 free(obj);
00380
00381 }
00382 else {
00383
00384
00385 delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
00386
00387 }
00388
00389 if (p.isUnpacking() && delegatedPtr) {
00390 delegatedPtr->ref();
00391 }
00392 }
00393 }
00394
00395
00396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
00397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
00398 _cookie.get_aid() = aid; \
00399 _cookie.get_pe() = CkMyPe(); \
00400 _elems = new CkArrayIndex[nElems]; \
00401 for (int i=0; i<nElems; i++) _elems[i] = elems[i]; \
00402 pelist = NULL; \
00403 npes = 0; \
00404 }
00405
00406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
00407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
00408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
00409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
00410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
00411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
00412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
00413
00414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
00415 pelist = new int[npes];
00416 for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
00417 _cookie.get_aid() = gid;
00418 }
00419
00420 CkSectionID::CkSectionID(const CkSectionID &sid) {
00421 int i;
00422 _cookie = sid._cookie;
00423 _nElems = sid._nElems;
00424 if (_nElems > 0) {
00425 _elems = new CkArrayIndex[_nElems];
00426 for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
00427 } else _elems = NULL;
00428 npes = sid.npes;
00429 if (npes > 0) {
00430 pelist = new int[npes];
00431 for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
00432 } else pelist = NULL;
00433 }
00434
00435 void CkSectionID::operator=(const CkSectionID &sid) {
00436 int i;
00437 _cookie = sid._cookie;
00438 _nElems = sid._nElems;
00439 if (_nElems > 0) {
00440 _elems = new CkArrayIndex[_nElems];
00441 for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
00442 } else _elems = NULL;
00443 npes = sid.npes;
00444 if (npes > 0) {
00445 pelist = new int[npes];
00446 for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
00447 } else pelist = NULL;
00448 }
00449
00450 void CkSectionID::pup(PUP::er &p) {
00451 p | _cookie;
00452 p(_nElems);
00453 if (_nElems > 0) {
00454 if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
00455 for (int i=0; i< _nElems; i++) p | _elems[i];
00456 npes = 0;
00457 pelist = NULL;
00458 } else {
00459
00460 _elems = NULL;
00461 p(npes);
00462 if (p.isUnpacking()) pelist = new int[npes];
00463 p(pelist, npes);
00464 }
00465 }
00466
00467
00468
00469 #ifdef CMK_CUDA
00470 void CUDACallbackManager(void *fn) {
00471 if (fn != NULL) {
00472 CkCallback *cb = (CkCallback*) fn;
00473 cb->send();
00474 }
00475 }
00476
00477 #endif
00478
00479 extern "C"
00480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
00481 {
00482 UsrToEnv(msg)->setRef(ref);
00483 }
00484
00485 extern "C"
00486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
00487 {
00488 return UsrToEnv(msg)->getRef();
00489 }
00490
00491 extern "C"
00492 int CkGetSrcPe(void *msg)
00493 {
00494 return UsrToEnv(msg)->getSrcPe();
00495 }
00496
00497 extern "C"
00498 int CkGetSrcNode(void *msg)
00499 {
00500 return CmiNodeOf(CkGetSrcPe(msg));
00501 }
00502
00503 extern "C"
00504 void *CkLocalBranch(CkGroupID gID) {
00505 return _localBranch(gID);
00506 }
00507
00508 static
00509 void *_ckLocalNodeBranch(CkGroupID groupID) {
00510 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00511 void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
00512 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00513 return retval;
00514 }
00515
00516 extern "C"
00517 void *CkLocalNodeBranch(CkGroupID groupID)
00518 {
00519 void *retval;
00520
00521 if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
00522 return CkpvAccess(_currentNodeGroupObj);
00523 while (NULL== (retval=_ckLocalNodeBranch(groupID)))
00524 {
00525 CsdScheduler(0);
00526 }
00527 return retval;
00528 }
00529
00530 extern "C"
00531 void *CkLocalChare(const CkChareID *pCid)
00532 {
00533 int pe=pCid->onPE;
00534 if (pe<0) {
00535 if (pe!=(-(CkMyPe()+1)))
00536 return NULL;
00537 #ifdef CMK_CHARE_USE_PTR
00538 VidBlock *v=(VidBlock *)pCid->objPtr;
00539 #else
00540 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
00541 #endif
00542 return v->getLocalChareObj();
00543 }
00544 else
00545 {
00546 if (pe!=CkMyPe())
00547 return NULL;
00548 #ifdef CMK_CHARE_USE_PTR
00549 return pCid->objPtr;
00550 #else
00551 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
00552 #endif
00553 }
00554 }
00555
00556 CkpvDeclare(char**,Ck_argv);
00557
00558 extern "C" char **CkGetArgv(void) {
00559 return CkpvAccess(Ck_argv);
00560 }
00561 extern "C" int CkGetArgc(void) {
00562 return CmiGetArgc(CkpvAccess(Ck_argv));
00563 }
00564
00565
00566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
00567 {
00568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00569 CpvAccess(_currentObj) = (Chare *)obj;
00570
00571 #endif
00572
00573
00574
00575 #if CMK_CHARMDEBUG
00576 CpdBeforeEp(epIdx, obj, msg);
00577 #endif
00578 _entryTable[epIdx]->call(msg, obj);
00579 #if CMK_CHARMDEBUG
00580 CpdAfterEp(epIdx);
00581 #endif
00582 if (_entryTable[epIdx]->noKeep)
00583 {
00584 _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
00585 }
00586 }
00587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
00588 {
00589
00590
00591
00592
00593 void *deliverMsg;
00594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00595 CpvAccess(_currentObj) = (Chare *)obj;
00596 #endif
00597 if (_entryTable[epIdx]->noKeep)
00598 {
00599 deliverMsg=(void *)msg;
00600 } else
00601 {
00602 void *oldMsg=(void *)msg;
00603 deliverMsg=CkCopyMsg(&oldMsg);
00604 #if CMK_ERROR_CHECKING
00605 if (oldMsg!=msg)
00606 CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
00607 #endif
00608 }
00609 #if CMK_CHARMDEBUG
00610 CpdBeforeEp(epIdx, obj, (void*)msg);
00611 #endif
00612 _entryTable[epIdx]->call(deliverMsg, obj);
00613 #if CMK_CHARMDEBUG
00614 CpdAfterEp(epIdx);
00615 #endif
00616 }
00617
00618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
00619 {
00620 register void *msg = EnvToUsr(env);
00621 _SET_USED(env, 0);
00622 CkDeliverMessageFree(epIdx,msg,obj);
00623 }
00624
00625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
00626 {
00627
00628 #if CMK_TRACE_ENABLED
00629 if (_entryTable[epIdx]->traceEnabled) {
00630 _TRACE_BEGIN_EXECUTE(env);
00631 _invokeEntryNoTrace(epIdx,env,obj);
00632 _TRACE_END_EXECUTE();
00633 }
00634 else
00635 #endif
00636 _invokeEntryNoTrace(epIdx,env,obj);
00637
00638 }
00639
00640
00641
00642 extern "C"
00643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
00644 {
00645 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00646 envelope *env = UsrToEnv(msg);
00647 _CHECK_USED(env);
00648 if(pCid == 0) {
00649 env->setMsgtype(NewChareMsg);
00650 } else {
00651 pCid->onPE = (-(CkMyPe()+1));
00652
00653 pCid->objPtr = (void *) new VidBlock();
00654 _MEMCHECK(pCid->objPtr);
00655 env->setMsgtype(NewVChareMsg);
00656 env->setVidPtr(pCid->objPtr);
00657 #ifndef CMK_CHARE_USE_PTR
00658 CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
00659 int idx = CkpvAccess(vidblocks).size()-1;
00660 pCid->objPtr = (void *)(CmiIntPtr)idx;
00661 env->setVidPtr((void *)(CmiIntPtr)idx);
00662 #endif
00663 }
00664 env->setEpIdx(eIdx);
00665 env->setByPe(CkMyPe());
00666 env->setSrcPe(CkMyPe());
00667 CmiSetHandler(env, _charmHandlerIdx);
00668 _TRACE_CREATION_1(env);
00669 CpvAccess(_qd)->create();
00670 _STATS_RECORD_CREATE_CHARE_1();
00671 _SET_USED(env, 1);
00672 if(destPE == CK_PE_ANY)
00673 env->setForAnyPE(1);
00674 else
00675 env->setForAnyPE(0);
00676 _CldEnqueue(destPE, env, _infoIdx);
00677 _TRACE_CREATION_DONE(1);
00678 }
00679
00680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
00681 {
00682 register int gIdx = _entryTable[epIdx]->chareIdx;
00683 register void *obj = malloc(_chareTable[gIdx]->size);
00684 _MEMCHECK(obj);
00685 setMemoryTypeChare(obj);
00686 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
00687 CkpvAccess(_groupTable)->find(groupID).setObj(obj);
00688 CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
00689 CkpvAccess(_groupIDTable)->push_back(groupID);
00690 PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
00691 if(ptrq) {
00692 void *pending;
00693 while((pending=ptrq->deq())!=0)
00694 _CldEnqueue(CkMyPe(), pending, _infoIdx);
00695
00696 CkpvAccess(_groupTable)->find(groupID).clearPending();
00697 }
00698 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
00699
00700 CkpvAccess(_currentGroup) = groupID;
00701 CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
00702 #ifndef CMK_CHARE_USE_PTR
00703
00704 CkpvAccess(currentChareIdx) = -1;
00705 #endif
00706 _invokeEntryNoTrace(epIdx,env,obj);
00707 _STATS_RECORD_PROCESS_GROUP_1();
00708 }
00709
00710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
00711 {
00712 register int gIdx = _entryTable[epIdx]->chareIdx;
00713 int objSize=_chareTable[gIdx]->size;
00714 register void *obj = malloc(objSize);
00715 _MEMCHECK(obj);
00716 setMemoryTypeChare(obj);
00717 CkpvAccess(_currentGroup) = groupID;
00718
00719
00720
00721
00722
00723
00724
00725 CkpvAccess(_currentNodeGroupObj) = obj;
00726 #ifndef CMK_CHARE_USE_PTR
00727
00728 CkpvAccess(currentChareIdx) = -1;
00729 #endif
00730 _invokeEntryNoTrace(epIdx,env,obj);
00731 CkpvAccess(_currentNodeGroupObj) = NULL;
00732 _STATS_RECORD_PROCESS_NODE_GROUP_1();
00733
00734 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00735 CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
00736 CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
00737 CksvAccess(_nodeGroupIDTable).push_back(groupID);
00738
00739 PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
00740 if(ptrq) {
00741 void *pending;
00742 while((pending=ptrq->deq())!=0)
00743 _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
00744
00745 CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
00746 }
00747 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00748 }
00749
00750 void _createGroup(CkGroupID groupID, envelope *env)
00751 {
00752 _CHECK_USED(env);
00753 _SET_USED(env, 1);
00754 register int epIdx = env->getEpIdx();
00755 int gIdx = _entryTable[epIdx]->chareIdx;
00756 CkNodeGroupID rednMgr;
00757 if(_chareTable[gIdx]->isIrr == 0){
00758 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
00759 rednMgr = rednMgrProxy;
00760
00761 }else{
00762 rednMgr.setZero();
00763 }
00764 env->setGroupNum(groupID);
00765 env->setSrcPe(CkMyPe());
00766 env->setRednMgr(rednMgr);
00767 env->setGroupEpoch(CkpvAccess(_charmEpoch));
00768
00769 if(CkNumPes()>1) {
00770 CkPackMessage(&env);
00771 CmiSetHandler(env, _bocHandlerIdx);
00772 _numInitMsgs++;
00773 CmiSyncBroadcast(env->getTotalsize(), (char *)env);
00774 CpvAccess(_qd)->create(CkNumPes()-1);
00775 CkUnpackMessage(&env);
00776 }
00777 _STATS_RECORD_CREATE_GROUP_1();
00778 CkCreateLocalGroup(groupID, epIdx, env);
00779 }
00780
00781 void _createNodeGroup(CkGroupID groupID, envelope *env)
00782 {
00783 _CHECK_USED(env);
00784 _SET_USED(env, 1);
00785 register int epIdx = env->getEpIdx();
00786 env->setGroupNum(groupID);
00787 env->setSrcPe(CkMyPe());
00788 env->setGroupEpoch(CkpvAccess(_charmEpoch));
00789 if(CkNumNodes()>1) {
00790 CkPackMessage(&env);
00791 CmiSetHandler(env, _bocHandlerIdx);
00792 _numInitMsgs++;
00793 if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
00794 CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
00795 CpvAccess(_qd)->create(CkNumNodes()-1);
00796 CkUnpackMessage(&env);
00797 }
00798 _STATS_RECORD_CREATE_NODE_GROUP_1();
00799 CkCreateLocalNodeGroup(groupID, epIdx, env);
00800 }
00801
00802
00803
00804 static CkGroupID _groupCreate(envelope *env)
00805 {
00806 register CkGroupID groupNum;
00807
00808
00809
00810 if(CkMyPe() == 0)
00811 groupNum.idx = CkpvAccess(_numGroups)++;
00812 else
00813 groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
00814 _createGroup(groupNum, env);
00815 return groupNum;
00816 }
00817
00818
00819 static CkGroupID _nodeGroupCreate(envelope *env)
00820 {
00821 register CkGroupID groupNum;
00822 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
00823 if(CkMyNode() == 0)
00824 groupNum.idx = CksvAccess(_numNodeGroups)++;
00825 else
00826 groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
00827 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
00828 _createNodeGroup(groupNum, env);
00829 return groupNum;
00830 }
00831
00832
00833
00834
00835
00836
00837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
00838 {
00839 int idx;
00840 int x = (int)ceil(log((double)numNodes)/log((double)2));
00841 int n = 32 - (x+1);
00842 idx = (myNode<<n) + numGroups;
00843
00844
00845
00846 idx = - idx;
00847
00848 return idx;
00849 }
00850
00851 extern "C"
00852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
00853 {
00854 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00855 register envelope *env = UsrToEnv(msg);
00856 env->setMsgtype(BocInitMsg);
00857 env->setEpIdx(eIdx);
00858 env->setSrcPe(CkMyPe());
00859 _TRACE_CREATION_N(env, CkNumPes());
00860 CkGroupID gid = _groupCreate(env);
00861 _TRACE_CREATION_DONE(1);
00862 return gid;
00863 }
00864
00865 extern "C"
00866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
00867 {
00868 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
00869 register envelope *env = UsrToEnv(msg);
00870 env->setMsgtype(NodeBocInitMsg);
00871 env->setEpIdx(eIdx);
00872 env->setSrcPe(CkMyPe());
00873 _TRACE_CREATION_N(env, CkNumNodes());
00874 CkGroupID gid = _nodeGroupCreate(env);
00875 _TRACE_CREATION_DONE(1);
00876 return gid;
00877 }
00878
00879 static inline void *_allocNewChare(envelope *env, int &idx)
00880 {
00881 int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
00882 void *tmp=malloc(_chareTable[chareIdx]->size);
00883 _MEMCHECK(tmp);
00884 #ifndef CMK_CHARE_USE_PTR
00885 CkpvAccess(chare_objs).push_back(tmp);
00886 CkpvAccess(chare_types).push_back(chareIdx);
00887 idx = CkpvAccess(chare_objs).size()-1;
00888 #endif
00889 setMemoryTypeChare(tmp);
00890 return tmp;
00891 }
00892
00893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
00894 {
00895 int idx;
00896 register void *obj = _allocNewChare(env, idx);
00897 #ifndef CMK_CHARE_USE_PTR
00898
00899 CkpvAccess(currentChareIdx) = idx;
00900 #endif
00901 _invokeEntry(env->getEpIdx(),env,obj);
00902 }
00903
00904 void CkCreateLocalChare(int epIdx, envelope *env)
00905 {
00906 env->setEpIdx(epIdx);
00907 _processNewChareMsg(NULL, env);
00908 }
00909
00910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
00911 {
00912 int idx;
00913 register void *obj = _allocNewChare(env, idx);
00914 register CkChareID *pCid = (CkChareID *)
00915 _allocMsg(FillVidMsg, sizeof(CkChareID));
00916 pCid->onPE = CkMyPe();
00917 #ifndef CMK_CHARE_USE_PTR
00918 pCid->objPtr = (void*)(CmiIntPtr)idx;
00919 #else
00920 pCid->objPtr = obj;
00921 #endif
00922
00923 register envelope *ret = UsrToEnv(pCid);
00924 ret->setVidPtr(env->getVidPtr());
00925 register int srcPe = env->getByPe();
00926 ret->setSrcPe(CkMyPe());
00927 CmiSetHandler(ret, _charmHandlerIdx);
00928 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
00929 #ifndef CMK_CHARE_USE_PTR
00930
00931 CkChareID vid;
00932 vid.onPE = srcPe;
00933 vid.objPtr = env->getVidPtr();
00934 CkpvAccess(vmap)[idx] = vid;
00935 #endif
00936 CpvAccess(_qd)->create();
00937 #ifndef CMK_CHARE_USE_PTR
00938
00939 CkpvAccess(currentChareIdx) = idx;
00940 #endif
00941 _invokeEntry(env->getEpIdx(),env,obj);
00942 }
00943
00944
00945
00946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
00947 {
00948 register int epIdx = env->getEpIdx();
00949 register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
00950 register void *obj;
00951 if (mainIdx != -1) {
00952 CmiAssert(CkMyPe()==0);
00953 obj = _mainTable[mainIdx]->getObj();
00954 }
00955 else {
00956 #ifndef CMK_CHARE_USE_PTR
00957 if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
00958 obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
00959 else
00960 obj = env->getObjPtr();
00961 #else
00962 obj = env->getObjPtr();
00963 #endif
00964 }
00965 _invokeEntry(epIdx,env,obj);
00966 }
00967
00968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
00969 {
00970 register int epIdx = env->getEpIdx();
00971 register void *obj = env->getObjPtr();
00972 _invokeEntry(epIdx,env,obj);
00973 }
00974
00975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
00976 {
00977 #ifndef CMK_CHARE_USE_PTR
00978 register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
00979 #else
00980 register VidBlock *vptr = (VidBlock *) env->getVidPtr();
00981 _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
00982 #endif
00983 register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
00984 _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
00985 if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
00986 CmiFree(env);
00987 }
00988
00989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
00990 {
00991 #ifndef CMK_CHARE_USE_PTR
00992 register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
00993 #else
00994 VidBlock *vptr = (VidBlock *) env->getVidPtr();
00995 _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
00996 #endif
00997 _SET_USED(env, 1);
00998 vptr->send(env);
00999 }
01000
01001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
01002 {
01003 #ifndef CMK_CHARE_USE_PTR
01004 register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
01005 delete vptr;
01006 CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
01007 #endif
01008 CmiFree(env);
01009 }
01010
01011
01012
01022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
01023 {
01024
01025 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
01026 IrrGroup *obj = ck->localBranch(groupID);
01027 if (obj==NULL) {
01028 ck->getGroupTable()->find(groupID).enqMsg(env);
01029 }
01030 else {
01031 ck->process();
01032 }
01033 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
01034 return obj;
01035 }
01036
01037 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
01038 {
01039 return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
01040 }
01041
01042 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
01043 {
01044 #if CMK_LBDB_ON
01045
01046 LDObjHandle objHandle;
01047 int objstopped = 0;
01048 LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
01049 if (the_lbdb->RunningObject(&objHandle)) {
01050 objstopped = 1;
01051 the_lbdb->ObjectStop(objHandle);
01052 }
01053 #endif
01054 _invokeEntry(epIdx,env,obj);
01055 #if CMK_LBDB_ON
01056 if (objstopped) the_lbdb->ObjectStart(objHandle);
01057 #endif
01058 _STATS_RECORD_PROCESS_BRANCH_1();
01059 }
01060
01061 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
01062 {
01063 register CkGroupID groupID = env->getGroupNum();
01064 register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
01065 if(obj) {
01066 _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
01067 }
01068 }
01069
01070 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
01071 {
01072 env->setMsgtype(ForChareMsg);
01073 env->setObjPtr(obj);
01074 _processForChareMsg(ck,env);
01075 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
01076 }
01077
01078 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
01079 {
01080 env->setEpIdx(epIdx);
01081 _deliverForNodeBocMsg(ck,env, obj);
01082 }
01083
01084 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
01085 {
01086 register CkGroupID groupID = env->getGroupNum();
01087 register void *obj;
01088
01089 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
01090 obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
01091 if(!obj) {
01092 #if CMK_IMMEDIATE_MSG
01093 if (CmiIsImmediate(env))
01094 CmiDelayImmediate();
01095 else
01096 #endif
01097 CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
01098 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
01099 return;
01100 }
01101 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
01102 #if CMK_IMMEDIATE_MSG
01103 if (!CmiIsImmediate(env))
01104 #endif
01105 ck->process();
01106 env->setMsgtype(ForChareMsg);
01107 env->setObjPtr(obj);
01108 _processForChareMsg(ck,env);
01109 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
01110 }
01111
01112 void _processBocInitMsg(CkCoreState *ck,envelope *env)
01113 {
01114 register CkGroupID groupID = env->getGroupNum();
01115 register int epIdx = env->getEpIdx();
01116 if (!env->getGroupDep().isZero()) {
01117 CkGroupID dep = env->getGroupDep();
01118 IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
01119 if (obj == NULL) return;
01120 }
01121 else
01122 ck->process();
01123 CkCreateLocalGroup(groupID, epIdx, env);
01124 }
01125
01126 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
01127 {
01128 register CkGroupID groupID = env->getGroupNum();
01129 register int epIdx = env->getEpIdx();
01130 CkCreateLocalNodeGroup(groupID, epIdx, env);
01131 }
01132
01133
01134
01135 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
01136 CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
01137 if (mgr) {
01138 _SET_USED(env, 0);
01139 mgr->insertElement((CkMessage *)EnvToUsr(env));
01140 }
01141 }
01142 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
01143 CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
01144 if (mgr) {
01145 _SET_USED(env, 0);
01146 mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
01147 }
01148 }
01149
01150
01151 #define TELLMSGTYPE(x) //x
01152
01158 void _processHandler(void *converseMsg,CkCoreState *ck)
01159 {
01160 register envelope *env = (envelope *) converseMsg;
01161
01162 MESSAGE_PHASE_CHECK(env);
01163
01164
01165 if (ck->watcher!=NULL) {
01166 if (!ck->watcher->processMessage(&env,ck)) return;
01167 }
01168
01169 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01170 Chare *obj=NULL;
01171 CkObjID sender;
01172 MCount SN;
01173 MlogEntry *entry=NULL;
01174 if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
01175 env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForChareMsg){
01176 sender = env->sender;
01177 SN = env->SN;
01178 int result = preProcessReceivedMessage(env,&obj,&entry);
01179 if(result == 0){
01180 return;
01181 }
01182 }
01183 #endif
01184
01185 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01186
01187 criticalPath_start(env);
01188 #endif
01189
01190
01191 switch(env->getMsgtype()) {
01192
01193 case BocInitMsg :
01194 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
01195
01196
01197 if(env->isPacked()) CkUnpackMessage(&env);
01198 _processBocInitMsg(ck,env);
01199 break;
01200 case NodeBocInitMsg :
01201 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
01202 ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
01203 _processNodeBocInitMsg(ck,env);
01204 break;
01205 case ForBocMsg :
01206 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
01207
01208 if(env->isPacked()) CkUnpackMessage(&env);
01209 _processForBocMsg(ck,env);
01210
01211 break;
01212 case ForNodeBocMsg :
01213 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
01214
01215 if(env->isPacked()) CkUnpackMessage(&env);
01216 _processForNodeBocMsg(ck,env);
01217
01218 break;
01219
01220
01221 case ArrayEltInitMsg:
01222 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
01223 if(env->isPacked()) CkUnpackMessage(&env);
01224 _processArrayEltInitMsg(ck,env);
01225 break;
01226 case ForArrayEltMsg:
01227 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
01228 if(env->isPacked()) CkUnpackMessage(&env);
01229 _processArrayEltMsg(ck,env);
01230 break;
01231
01232
01233 case NewChareMsg :
01234 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
01235 ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
01236 _processNewChareMsg(ck,env);
01237 _STATS_RECORD_PROCESS_CHARE_1();
01238 break;
01239 case NewVChareMsg :
01240 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
01241 ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
01242 _processNewVChareMsg(ck,env);
01243 _STATS_RECORD_PROCESS_CHARE_1();
01244 break;
01245 case ForChareMsg :
01246 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
01247 ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
01248 _processForPlainChareMsg(ck,env);
01249 _STATS_RECORD_PROCESS_MSG_1();
01250 break;
01251 case ForVidMsg :
01252 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
01253 ck->process();
01254 _processForVidMsg(ck,env);
01255 break;
01256 case FillVidMsg :
01257 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
01258 ck->process();
01259 _processFillVidMsg(ck,env);
01260 break;
01261 case DeleteVidMsg :
01262 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
01263 ck->process();
01264 _processDeleteVidMsg(ck,env);
01265 break;
01266
01267 default:
01268 CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
01269 }
01270 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01271 if(obj != NULL){
01272 postProcessReceivedMessage(obj,sender,SN,entry);
01273 }
01274 #endif
01275
01276
01277 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01278 criticalPath_end();
01279
01280 #endif
01281
01282
01283 }
01284
01285
01286
01287
01288 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
01289 int *queueing, int *priobits, unsigned int **prioptr)
01290 {
01291 register envelope *env = (envelope *)converseMsg;
01292 *pfn = (CldPackFn)CkPackMessage;
01293 *len = env->getTotalsize();
01294 *queueing = env->getQueueing();
01295 *priobits = env->getPriobits();
01296 *prioptr = (unsigned int *) env->getPrioPtr();
01297 }
01298
01299 void CkPackMessage(envelope **pEnv)
01300 {
01301 register envelope *env = *pEnv;
01302 if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
01303 register void *msg = EnvToUsr(env);
01304 _TRACE_BEGIN_PACK();
01305 msg = _msgTable[env->getMsgIdx()]->pack(msg);
01306 _TRACE_END_PACK();
01307 env=UsrToEnv(msg);
01308 env->setPacked(1);
01309 *pEnv = env;
01310 }
01311 }
01312
01313 void CkUnpackMessage(envelope **pEnv)
01314 {
01315 register envelope *env = *pEnv;
01316 register int msgIdx = env->getMsgIdx();
01317 if(env->isPacked()) {
01318 register void *msg = EnvToUsr(env);
01319 _TRACE_BEGIN_UNPACK();
01320 msg = _msgTable[msgIdx]->unpack(msg);
01321 _TRACE_END_UNPACK();
01322 env=UsrToEnv(msg);
01323 env->setPacked(0);
01324 *pEnv = env;
01325 }
01326 }
01327
01328
01329
01330
01331 #if CMK_OBJECT_QUEUE_AVAILABLE
01332 static int index_objectQHandler;
01333 #endif
01334 int index_tokenHandler;
01335 int index_skipCldHandler;
01336
01337 void _skipCldHandler(void *converseMsg)
01338 {
01339 register envelope *env = (envelope *)(converseMsg);
01340 CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
01341 #if CMK_GRID_QUEUE_AVAILABLE
01342 if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
01343 CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
01344 env, env->getQueueing (), env->getPriobits (),
01345 (unsigned int *) env->getPrioPtr ());
01346 } else {
01347 CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
01348 env, env->getQueueing (), env->getPriobits (),
01349 (unsigned int *) env->getPrioPtr ());
01350 }
01351 #else
01352 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
01353 env, env->getQueueing(),env->getPriobits(),
01354 (unsigned int *)env->getPrioPtr());
01355 #endif
01356 }
01357
01358
01359
01360
01361 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
01362 {
01363 #if CMK_CHARMDEBUG
01364 if (!ConverseDeliver(pe)) {
01365 CmiFree(env);
01366 return;
01367 }
01368 #endif
01369 if(pe == CkMyPe() ){
01370 if(!CmiNodeAlive(CkMyPe())){
01371 printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
01372
01373 }
01374 }
01375 if (pe == CkMyPe() && !CmiImmIsRunning()) {
01376 #if CMK_OBJECT_QUEUE_AVAILABLE
01377 Chare *obj = CkFindObjectPtr(env);
01378 if (obj && obj->CkGetObjQueue().queue()) {
01379 _enqObjQueue(obj, env);
01380 }
01381 else
01382 #endif
01383 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
01384 env, env->getQueueing(),env->getPriobits(),
01385 (unsigned int *)env->getPrioPtr());
01386 #if CMK_PERSISTENT_COMM
01387 CmiPersistentOneSend();
01388 #endif
01389 } else {
01390 if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
01391 CkPackMessage(&env);
01392 int len=env->getTotalsize();
01393 CmiSetXHandler(env,CmiGetHandler(env));
01394 #if CMK_OBJECT_QUEUE_AVAILABLE
01395 CmiSetHandler(env,index_objectQHandler);
01396 #else
01397 CmiSetHandler(env,index_skipCldHandler);
01398 #endif
01399 CmiSetInfo(env,infoFn);
01400 if (pe==CLD_BROADCAST) {
01401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01402 CmiSyncBroadcast(len, (char *)env);
01403 #else
01404 CmiSyncBroadcastAndFree(len, (char *)env);
01405 #endif
01406
01407 }
01408 else if (pe==CLD_BROADCAST_ALL) {
01409 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01410 CmiSyncBroadcastAll(len, (char *)env);
01411 #else
01412 CmiSyncBroadcastAllAndFree(len, (char *)env);
01413 #endif
01414
01415 }
01416 else{
01417 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01418 CmiSyncSend(pe, len, (char *)env);
01419 #else
01420 CmiSyncSendAndFree(pe, len, (char *)env);
01421 #endif
01422
01423 }
01424 }
01425 }
01426
01427 #if CMK_BIGSIM_CHARM
01428 # define _skipCldEnqueue _CldEnqueue
01429 #endif
01430
01431
01432 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
01433 {
01434 #if CMK_CHARMDEBUG
01435 if (!ConverseDeliver(-1)) {
01436 CmiFree(env);
01437 return;
01438 }
01439 #endif
01440 CkPackMessage(&env);
01441 int len=env->getTotalsize();
01442 CmiSyncListSendAndFree(npes, pes, len, (char *)env);
01443 }
01444
01445 static void _noCldEnqueue(int pe, envelope *env)
01446 {
01447
01448
01449
01450
01451
01452 #if CMK_CHARMDEBUG
01453 if (!ConverseDeliver(pe)) {
01454 CmiFree(env);
01455 return;
01456 }
01457 #endif
01458 CkPackMessage(&env);
01459 int len=env->getTotalsize();
01460 if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
01461 else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
01462 else CmiSyncSendAndFree(pe, len, (char *)env);
01463 }
01464
01465
01466
01467 void _noCldNodeEnqueue(int node, envelope *env)
01468 {
01469
01470
01471
01472
01473
01474 #if CMK_CHARMDEBUG
01475 if (!ConverseDeliver(node)) {
01476 CmiFree(env);
01477 return;
01478 }
01479 #endif
01480 CkPackMessage(&env);
01481 int len=env->getTotalsize();
01482 if (node==CLD_BROADCAST) {
01483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01484 CmiSyncNodeBroadcast(len, (char *)env);
01485 #else
01486 CmiSyncNodeBroadcastAndFree(len, (char *)env);
01487 #endif
01488 }
01489 else if (node==CLD_BROADCAST_ALL) {
01490 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01491 CmiSyncNodeBroadcastAll(len, (char *)env);
01492 #else
01493 CmiSyncNodeBroadcastAllAndFree(len, (char *)env);
01494 #endif
01495
01496 }
01497 else {
01498 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01499 CmiSyncNodeSend(node, len, (char *)env);
01500 #else
01501 CmiSyncNodeSendAndFree(node, len, (char *)env);
01502 #endif
01503 }
01504 }
01505
01506 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
01507 {
01508 register envelope *env = UsrToEnv(msg);
01509 _CHECK_USED(env);
01510 _SET_USED(env, 1);
01511 #if CMK_REPLAYSYSTEM
01512 setEventID(env);
01513 #endif
01514 env->setMsgtype(ForChareMsg);
01515 env->setEpIdx(eIdx);
01516 env->setSrcPe(CkMyPe());
01517 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01518 criticalPath_send(env);
01519 automaticallySetMessagePriority(env);
01520 #endif
01521 #if CMK_CHARMDEBUG
01522 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
01523 #endif
01524 #if CMK_OBJECT_QUEUE_AVAILABLE
01525 CmiSetHandler(env, index_objectQHandler);
01526 #else
01527 CmiSetHandler(env, _charmHandlerIdx);
01528 #endif
01529 if (pCid->onPE < 0) {
01530 register int pe = -(pCid->onPE+1);
01531 if(pe==CkMyPe()) {
01532 #ifndef CMK_CHARE_USE_PTR
01533 VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
01534 #else
01535 VidBlock *vblk = (VidBlock *) pCid->objPtr;
01536 #endif
01537 void *objPtr;
01538 if (NULL!=(objPtr=vblk->getLocalChare()))
01539 {
01540 env->setObjPtr(objPtr);
01541 return pe;
01542 }
01543 else {
01544 vblk->send(env);
01545 return -1;
01546 }
01547 } else {
01548 env->setMsgtype(ForVidMsg);
01549 env->setVidPtr(pCid->objPtr);
01550 return pe;
01551 }
01552 }
01553 else {
01554 env->setObjPtr(pCid->objPtr);
01555 return pCid->onPE;
01556 }
01557 }
01558
01559 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
01560 {
01561 int destPE = _prepareMsg(eIdx, msg, pCid);
01562 if (destPE != -1) {
01563 register envelope *env = UsrToEnv(msg);
01564 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01565 criticalPath_send(env);
01566 automaticallySetMessagePriority(env);
01567 #endif
01568 CmiBecomeImmediate(env);
01569 }
01570 return destPE;
01571 }
01572
01573 extern "C"
01574 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
01575 {
01576 if (opts & CK_MSG_INLINE) {
01577 CkSendMsgInline(entryIdx, msg, pCid, opts);
01578 return;
01579 }
01580 #if CMK_ERROR_CHECKING
01581 if (opts & CK_MSG_IMMEDIATE) {
01582 CmiAbort("Immediate message is not allowed in Chare!");
01583 }
01584 #endif
01585 register envelope *env = UsrToEnv(msg);
01586 int destPE=_prepareMsg(entryIdx,msg,pCid);
01587
01588
01589
01590
01591 #if defined(_FAULT_CAUSAL_)
01592 sendChareMsg(env,destPE,_infoIdx,pCid);
01593 #else
01594 _TRACE_CREATION_1(env);
01595 if (destPE!=-1) {
01596 CpvAccess(_qd)->create();
01597 if (opts & CK_MSG_SKIP_OR_IMM)
01598 _noCldEnqueue(destPE, env);
01599 else
01600 _CldEnqueue(destPE, env, _infoIdx);
01601 }
01602 _TRACE_CREATION_DONE(1);
01603 #endif
01604 }
01605
01606 extern "C"
01607 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
01608 {
01609 if (pCid->onPE==CkMyPe())
01610 {
01611 if(!CmiNodeAlive(CkMyPe())){
01612 return;
01613 }
01614 #if CMK_CHARMDEBUG
01615
01616 _prepareMsg(entryIndex,msg,pCid);
01617 #endif
01618
01619 register envelope *env = UsrToEnv(msg);
01620 if (env->isPacked()) CkUnpackMessage(&env);
01621 _STATS_RECORD_PROCESS_MSG_1();
01622 _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
01623 }
01624 else {
01625
01626 CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
01627 }
01628 }
01629
01630 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
01631 {
01632 register envelope *env = UsrToEnv(msg);
01633 #if CMK_ERROR_CHECKING
01634 CkNodeGroupID nodeRedMgr;
01635 #endif
01636 _CHECK_USED(env);
01637 _SET_USED(env, 1);
01638 #if CMK_REPLAYSYSTEM
01639 setEventID(env);
01640 #endif
01641 env->setMsgtype(type);
01642 env->setEpIdx(eIdx);
01643 env->setGroupNum(gID);
01644 env->setSrcPe(CkMyPe());
01645 #if CMK_ERROR_CHECKING
01646 nodeRedMgr.setZero();
01647 env->setRednMgr(nodeRedMgr);
01648 #endif
01649 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01650 criticalPath_send(env);
01651 automaticallySetMessagePriority(env);
01652 #endif
01653 #if CMK_CHARMDEBUG
01654 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
01655 #endif
01656 CmiSetHandler(env, _charmHandlerIdx);
01657 return env;
01658 }
01659
01660 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
01661 {
01662 envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
01663 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01664 criticalPath_send(env);
01665 automaticallySetMessagePriority(env);
01666 #endif
01667 CmiBecomeImmediate(env);
01668 return env;
01669 }
01670
01671 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
01672 int pe=CLD_BROADCAST_ALL, int opts = 0)
01673 {
01674 int numPes;
01675 register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01676 #if defined(_FAULT_MLOG_)
01677 sendTicketGroupRequest(env,pe,_infoIdx);
01678 #elif defined(_FAULT_CAUSAL_)
01679 sendGroupMsg(env,pe,_infoIdx);
01680 #else
01681 _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
01682 _TRACE_CREATION_N(env, numPes);
01683 if (opts & CK_MSG_SKIP_OR_IMM)
01684 _noCldEnqueue(pe, env);
01685 else
01686 _skipCldEnqueue(pe, env, _infoIdx);
01687 _TRACE_CREATION_DONE(1);
01688 #endif
01689 }
01690
01691 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
01692 int npes, int *pes)
01693 {
01694 register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01695 _TRACE_CREATION_MULTICAST(env, npes, pes);
01696 _CldEnqueueMulti(npes, pes, env, _infoIdx);
01697 _TRACE_CREATION_DONE(1);
01698 }
01699
01700 extern "C"
01701 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
01702 {
01703 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
01704 if (destPE==CkMyPe())
01705 {
01706 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
01707 return;
01708 }
01709
01710 register envelope *env = UsrToEnv(msg);
01711 int numPes;
01712 _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
01713 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
01714 _TRACE_CREATION_N(env, numPes);
01715 _noCldEnqueue(destPE, env);
01716 _STATS_RECORD_SEND_BRANCH_1();
01717 CkpvAccess(_coreState)->create();
01718 _TRACE_CREATION_DONE(1);
01719 #else
01720
01721 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
01722 #endif
01723 }
01724
01725 extern "C"
01726 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
01727 {
01728 if (destPE==CkMyPe())
01729 {
01730 if(!CmiNodeAlive(CkMyPe())){
01731 return;
01732 }
01733 IrrGroup *obj=(IrrGroup *)_localBranch(gID);
01734 if (obj!=NULL)
01735 {
01736 #if CMK_ERROR_CHECKING
01737 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01738 #else
01739 envelope *env=UsrToEnv(msg);
01740 #endif
01741 _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
01742 return;
01743 }
01744 }
01745
01746 CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
01747 }
01748
01749 extern "C"
01750 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
01751 {
01752 if (opts & CK_MSG_INLINE) {
01753 CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
01754 return;
01755 }
01756 if (opts & CK_MSG_IMMEDIATE) {
01757 CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
01758 return;
01759 }
01760 _sendMsgBranch(eIdx, msg, gID, pe, opts);
01761 _STATS_RECORD_SEND_BRANCH_1();
01762 CkpvAccess(_coreState)->create();
01763 }
01764
01765 extern "C"
01766 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
01767 {
01768 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
01769 register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
01770 _TRACE_CREATION_MULTICAST(env, npes, pes);
01771 _noCldEnqueueMulti(npes, pes, env);
01772 _TRACE_CREATION_DONE(1);
01773 #else
01774 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
01775 CpvAccess(_qd)->create(-npes);
01776 #endif
01777 _STATS_RECORD_SEND_BRANCH_N(npes);
01778 CpvAccess(_qd)->create(npes);
01779 }
01780
01781 extern "C"
01782 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
01783 {
01784 if (opts & CK_MSG_IMMEDIATE) {
01785 CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
01786 return;
01787 }
01788
01789 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
01790 _STATS_RECORD_SEND_BRANCH_N(npes);
01791 CpvAccess(_qd)->create(npes);
01792 }
01793
01794 extern "C"
01795 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
01796 {
01797 int npes;
01798 int *pes;
01799 if (opts & CK_MSG_IMMEDIATE) {
01800 CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
01801 return;
01802 }
01803
01804 register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
01805 CmiLookupGroup(grp, &npes, &pes);
01806 _TRACE_CREATION_MULTICAST(env, npes, pes);
01807 _CldEnqueueGroup(grp, env, _infoIdx);
01808 _TRACE_CREATION_DONE(1);
01809 _STATS_RECORD_SEND_BRANCH_N(npes);
01810 CpvAccess(_qd)->create(npes);
01811 }
01812
01813 extern "C"
01814 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
01815 {
01816 _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
01817 _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
01818 CpvAccess(_qd)->create(CkNumPes());
01819 }
01820
01821 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
01822 int node=CLD_BROADCAST_ALL, int opts=0)
01823 {
01824 int numPes;
01825 register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01826 #if defined(_FAULT_MLOG_)
01827 sendTicketNodeGroupRequest(env,node,_infoIdx);
01828 #elif defined(_FAULT_CAUSAL_)
01829 sendNodeGroupMsg(env,node,_infoIdx);
01830 #else
01831 numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
01832 _TRACE_CREATION_N(env, numPes);
01833 if (opts & CK_MSG_SKIP_OR_IMM) {
01834 _noCldNodeEnqueue(node, env);
01835 if (opts & CK_MSG_IMMEDIATE) {
01836 CkpvAccess(_coreState)->create(-numPes);
01837 }
01838 }
01839 else
01840 _CldNodeEnqueue(node, env, _infoIdx);
01841 _TRACE_CREATION_DONE(1);
01842 #endif
01843 }
01844
01845 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
01846 int npes, int *nodes)
01847 {
01848 register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01849 _TRACE_CREATION_N(env, npes);
01850 for (int i=0; i<npes; i++) {
01851 _CldNodeEnqueue(nodes[i], env, _infoIdx);
01852 }
01853 _TRACE_CREATION_DONE(1);
01854 }
01855
01856 extern "C"
01857 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
01858 {
01859 #if CMK_IMMEDIATE_MSG
01860 if (node==CkMyNode())
01861 {
01862 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
01863 return;
01864 }
01865
01866 register envelope *env = UsrToEnv(msg);
01867 int numPes;
01868 _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
01869 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01870 _TRACE_CREATION_N(env, numPes);
01871 _noCldNodeEnqueue(node, env);
01872 _STATS_RECORD_SEND_BRANCH_1();
01873
01874
01875 _TRACE_CREATION_DONE(1);
01876 #else
01877
01878 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
01879 #endif
01880 }
01881
01882 extern "C"
01883 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
01884 {
01885 if (node==CkMyNode())
01886 {
01887 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
01888 void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
01889 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
01890 if (obj!=NULL)
01891 {
01892 #if CMK_ERROR_CHECKING
01893 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01894 #else
01895 envelope *env=UsrToEnv(msg);
01896 #endif
01897 _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
01898 return;
01899 }
01900 }
01901
01902 CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
01903 }
01904
01905 extern "C"
01906 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
01907 {
01908 if (opts & CK_MSG_INLINE) {
01909 CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
01910 return;
01911 }
01912 if (opts & CK_MSG_IMMEDIATE) {
01913 CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
01914 return;
01915 }
01916 _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
01917 _STATS_RECORD_SEND_NODE_BRANCH_1();
01918 CkpvAccess(_coreState)->create();
01919 }
01920
01921 extern "C"
01922 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
01923 {
01924 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
01925 register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
01926 _noCldEnqueueMulti(npes, nodes, env);
01927 #else
01928 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
01929 CpvAccess(_qd)->create(-npes);
01930 #endif
01931 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
01932 CpvAccess(_qd)->create(npes);
01933 }
01934
01935 extern "C"
01936 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
01937 {
01938 if (opts & CK_MSG_IMMEDIATE) {
01939 CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
01940 return;
01941 }
01942
01943 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
01944 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
01945 CpvAccess(_qd)->create(npes);
01946 }
01947
01948 extern "C"
01949 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
01950 {
01951 _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
01952 _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
01953 CpvAccess(_qd)->create(CkNumNodes());
01954 }
01955
01956
01957 extern "C"
01958 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
01959 { return _prepareMsg(eIdx,msg,pCid); }
01960 extern "C"
01961 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
01962 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
01963 extern "C"
01964 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
01965 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
01966
01967 void _ckModuleInit(void) {
01968 index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
01969 #if CMK_OBJECT_QUEUE_AVAILABLE
01970 index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
01971 #endif
01972 index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
01973 CkpvInitialize(TokenPool*, _tokenPool);
01974 CkpvAccess(_tokenPool) = new TokenPool;
01975 }
01976
01977
01978
01979
01980 extern void CkArrayManagerInsert(int onPe,void *msg);
01981
01982
01983 static void _prepareOutgoingArrayMsg(envelope *env,int type)
01984 {
01985 _CHECK_USED(env);
01986 _SET_USED(env, 1);
01987 env->setMsgtype(type);
01988 #if CMK_CHARMDEBUG
01989 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
01990 #endif
01991 CmiSetHandler(env, _charmHandlerIdx);
01992 CpvAccess(_qd)->create();
01993 }
01994
01995 extern "C"
01996 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
01997 register envelope *env = UsrToEnv(msg);
01998 env->getsetArrayMgr()=aID;
01999 _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
02000 _CldEnqueue(pe, env, _infoIdx);
02001 }
02002
02003 extern "C"
02004 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
02005 register envelope *env = UsrToEnv(msg);
02006 _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
02007 #if defined(_FAULT_MLOG_)
02008 sendTicketArrayRequest(env,pe,_infoIdx);
02009 #elif defined(_FAULT_CAUSAL_)
02010 sendArrayMsg(env,pe,_infoIdx);
02011 #else
02012 if (opts & CK_MSG_IMMEDIATE)
02013 CmiBecomeImmediate(env);
02014 if (opts & CK_MSG_SKIP_OR_IMM)
02015 _noCldEnqueue(pe, env);
02016 else
02017 _skipCldEnqueue(pe, env, _infoIdx);
02018 #endif
02019 }
02020
02021 class ElementDestroyer : public CkLocIterator {
02022 private:
02023 CkLocMgr *locMgr;
02024 public:
02025 ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
02026 void addLocation(CkLocation &loc) {
02027 loc.destroyAll();
02028 }
02029 };
02030
02031 void CkDeleteChares() {
02032 int i;
02033 int numGroups = CkpvAccess(_groupIDTable)->size();
02034
02035
02036 #ifndef CMK_CHARE_USE_PTR
02037 for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
02038 Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
02039 delete obj;
02040 CkpvAccess(chare_objs)[i] = NULL;
02041 }
02042 for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
02043 VidBlock *obj = CkpvAccess(vidblocks)[i];
02044 delete obj;
02045 CkpvAccess(vidblocks)[i] = NULL;
02046 }
02047 #endif
02048
02049
02050 for(i=0;i<numGroups;i++) {
02051 IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
02052 if(obj && obj->isLocMgr()) {
02053 CkLocMgr *mgr = (CkLocMgr*)obj;
02054 ElementDestroyer destroyer(mgr);
02055 mgr->iterate(destroyer);
02056 }
02057 }
02058
02059
02060 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
02061 for(i=0;i<numGroups;i++) {
02062 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
02063 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
02064 if (obj) delete obj;
02065 }
02066 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
02067
02068
02069 if (CkMyRank() == 0) {
02070 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
02071 for(i=0;i<numNodeGroups;i++) {
02072 CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
02073 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
02074 if (obj) delete obj;
02075 }
02076 }
02077 }
02078
02079
02080
02081 #include "crc32.h"
02082
02083 CkpvDeclare(int, envelopeEventID);
02084 int _recplay_crc = 0;
02085 int _recplay_checksum = 0;
02086 int _recplay_logsize = 1024*1024;
02087
02088
02089 #define REPLAYDEBUG(args)
02090
02091 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
02092
02093 #include "trace-common.h"
02094
02095 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
02096
02097 char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
02098 strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
02099 sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
02100 FILE *f=fopen(fName,permissions);
02101 REPLAYDEBUG("openReplayfile "<<fName);
02102 if (f==NULL) {
02103 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
02104 CkMyPe(),fName,permissions);
02105 CkAbort("openReplayFile> Could not open replay file");
02106 }
02107 return f;
02108 }
02109
02110 #include "BaseLB.h"
02111
02112 class CkMessageRecorder : public CkMessageWatcher {
02113 char *buffer;
02114 unsigned int curpos;
02115 bool firstOpen;
02116 public:
02117 CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
02118 ~CkMessageRecorder() {
02119 flushLog(0);
02120 fprintf(f,"-1 -1 -1 ");
02121 fclose(f);
02122 delete[] buffer;
02123 #if 0
02124 FILE *stsfp = fopen("sts", "w");
02125 void traceWriteSTS(FILE *stsfp,int nUserEvents);
02126 traceWriteSTS(stsfp, 0);
02127 fclose(stsfp);
02128 #endif
02129 CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
02130 }
02131
02132 private:
02133 void flushLog(int verbose=1) {
02134 if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
02135 fprintf(f, "%s", buffer);
02136 curpos=0;
02137 }
02138 virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
02139 if ((*envptr)->getEvent()) {
02140 bool wasPacked = (*envptr)->isPacked();
02141 if (!wasPacked) CkPackMessage(envptr);
02142 envelope *env = *envptr;
02143 unsigned int crc1=0, crc2=0;
02144 if (_recplay_crc) {
02145
02146 crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02147 crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02148 } else if (_recplay_checksum) {
02149 crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02150 crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02151 }
02152 curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
02153 if (curpos > _recplay_logsize-128) flushLog();
02154 if (!wasPacked) CkUnpackMessage(envptr);
02155 }
02156 return CmiTrue;
02157 }
02158 virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
02159 curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
02160 if (curpos > _recplay_logsize-128) flushLog();
02161 return CmiTrue;
02162 }
02163
02164 virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
02165 FILE *f;
02166 if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
02167 else f = openReplayFile("ckreplay_",".lb","a");
02168 firstOpen = false;
02169 if (f != NULL) {
02170 PUP::toDisk p(f);
02171 p | (*msg)->n_moves;
02172 (*msg)->pup(p);
02173 fclose(f);
02174 }
02175 return CmiTrue;
02176 }
02177 };
02178
02179 class CkMessageDetailRecorder : public CkMessageWatcher {
02180 public:
02181 CkMessageDetailRecorder(FILE *f_) {
02182 f=f_;
02183
02184
02185
02186 CmiUInt2 little = sizeof(void*);
02187 fwrite(&little, 2, 1, f);
02188 }
02189 ~CkMessageDetailRecorder() {fclose(f);}
02190 private:
02191 virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
02192 bool wasPacked = (*envptr)->isPacked();
02193 if (!wasPacked) CkPackMessage(envptr);
02194 envelope *env = *envptr;
02195 CmiUInt4 size = env->getTotalsize();
02196 fwrite(&size, 4, 1, f);
02197 fwrite(env, env->getTotalsize(), 1, f);
02198 if (!wasPacked) CkUnpackMessage(envptr);
02199 return CmiTrue;
02200 }
02201 };
02202
02203 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
02204 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
02205
02206 #if CMK_BIGSIM_CHARM
02207 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
02208 int pb,unsigned int *prio);
02209 #endif
02210
02211 class CkMessageReplay : public CkMessageWatcher {
02212 int counter;
02213 int nextPE, nextSize, nextEvent, nexttype;
02214 int nextEP;
02215 unsigned int crc1, crc2;
02216 FILE *lbFile;
02218 void getNext(void) {
02219 if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
02220 if (nextSize > 0) {
02221
02222 if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
02223 CkAbort("CkMessageReplay> Syntax error reading replay file");
02224 }
02225 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
02226 } else if (nextSize == -2) {
02227
02228
02229 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
02230 } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
02231 CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
02232 CkAbort("CkMessageReplay> Unrecognized input");
02233 }
02234
02235
02236
02237
02238
02239
02240 counter++;
02241 }
02243 CmiBool isNext(envelope *env) {
02244 if (nextPE!=env->getSrcPe()) return CmiFalse;
02245 if (nextEvent!=env->getEvent()) return CmiFalse;
02246 if (nextSize<0) return CmiFalse;
02247 #if 1
02248 if (nextEP != env->getEpIdx()) {
02249 CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
02250 return CmiFalse;
02251 }
02252 #endif
02253 #if ! CMK_BIGSIM_CHARM
02254 if (nextSize!=env->getTotalsize())
02255 {
02256 CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
02257 return CmiFalse;
02258 }
02259 if (_recplay_crc || _recplay_checksum) {
02260 bool wasPacked = env->isPacked();
02261 if (!wasPacked) CkPackMessage(&env);
02262 if (_recplay_crc) {
02263
02264 unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02265 unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02266 if (crcnew1 != crc1) {
02267 CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
02268 }
02269 if (crcnew2 != crc2) {
02270 CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
02271 }
02272 } else if (_recplay_checksum) {
02273 unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
02274 unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
02275 if (crcnew1 != crc1) {
02276 CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
02277 }
02278 if (crcnew2 != crc2) {
02279 CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
02280 }
02281 }
02282 if (!wasPacked) CkUnpackMessage(&env);
02283 }
02284 #endif
02285 return CmiTrue;
02286 }
02287 CmiBool isNext(CthThreadToken *token) {
02288 if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
02289 return CmiFalse;
02290 }
02291
02293 CkQ<envelope *> delayedMessages;
02295 CkQ<CthThreadToken *> delayedTokens;
02296
02298 void flush(void) {
02299 if (nextSize>0) {
02300 int len=delayedMessages.length();
02301 for (int i=0;i<len;i++) {
02302 envelope *env=delayedMessages.deq();
02303 if (isNext(env)) {
02304 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
02305 CsdEnqueueLifo((void*)env);
02306 return;
02307 }
02308 else
02309
02310 {
02311 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
02312 delayedMessages.enq(env);
02313 }
02314 }
02315 } else if (nextSize==-2) {
02316 int len=delayedTokens.length();
02317 for (int i=0;i<len;++i) {
02318 CthThreadToken *token=delayedTokens.deq();
02319 if (isNext(token)) {
02320 REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
02321 #if ! CMK_BIGSIM_CHARM
02322 CsdEnqueueLifo((void*)token);
02323 #else
02324 CthEnqueueBigSimThread(token,0,0,NULL);
02325 #endif
02326 return;
02327 } else {
02328 REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
02329 delayedTokens.enq(token);
02330 }
02331 }
02332 }
02333 }
02334
02335 public:
02336 CkMessageReplay(FILE *f_) : lbFile(NULL) {
02337 counter=0;
02338 f=f_;
02339 getNext();
02340 REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
02341 if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
02342 }
02343 ~CkMessageReplay() {fclose(f);}
02344
02345 private:
02346 virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
02347 bool wasPacked = (*envptr)->isPacked();
02348 if (!wasPacked) CkPackMessage(envptr);
02349 envelope *env = *envptr;
02350
02351 REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
02352 if (env->getEvent() == 0) return CmiTrue;
02353 if (isNext(env)) {
02354 REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
02355 getNext();
02356 flush();
02357 if (!wasPacked) CkUnpackMessage(envptr);
02358 return CmiTrue;
02359 }
02360 #if CMK_SMP
02361 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
02362
02363
02364 int nextpe = CkMyPe()+1;
02365 if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
02366 nextpe = CkNodeFirst(CkMyNode());
02367 CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
02368 return CmiFalse;
02369 }
02370 #endif
02371 else {
02372 REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
02373 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
02374 delayedMessages.enq(env);
02375 flush();
02376 return CmiFalse;
02377 }
02378 }
02379 virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
02380 REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
02381 if (isNext(token)) {
02382 REPLAYDEBUG("Executing token: "<<token->serialNo)
02383 getNext();
02384 flush();
02385 return CmiTrue;
02386 } else {
02387 REPLAYDEBUG("Queueing token: "<<token->serialNo
02388 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
02389 delayedTokens.enq(token);
02390 return CmiFalse;
02391 }
02392 }
02393
02394 virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
02395 if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
02396 if (lbFile != NULL) {
02397 int num_moves;
02398 PUP::fromDisk p(lbFile);
02399 p | num_moves;
02400 if (num_moves != (*msg)->n_moves) {
02401 delete *msg;
02402 *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
02403 }
02404 (*msg)->pup(p);
02405 }
02406 return CmiTrue;
02407 }
02408 };
02409
02410 class CkMessageDetailReplay : public CkMessageWatcher {
02411 void *getNext() {
02412 CmiUInt4 size; size_t nread;
02413 if ((nread=fread(&size, 4, 1, f)) < 1) {
02414 if (feof(f)) return NULL;
02415 CkPrintf("Broken record file (metadata) got %d\n",nread);
02416 CkAbort("");
02417 }
02418 void *env = CmiAlloc(size);
02419 long tell = ftell(f);
02420 if ((nread=fread(env, size, 1, f)) < 1) {
02421 CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
02422 CkAbort("");
02423 }
02424
02425 return env;
02426 }
02427 public:
02428 double starttime;
02429 CkMessageDetailReplay(FILE *f_) {
02430 f=f_;
02431 starttime=CkWallTimer();
02432
02433 CmiUInt2 little;
02434 fread(&little, 2, 1, f);
02435 if (little != sizeof(void*)) {
02436 CkAbort("Replaying on a different architecture from which recording was done!");
02437 }
02438
02439 CsdEnqueue(getNext());
02440
02441 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
02442 }
02443 virtual CmiBool process(envelope **env,CkCoreState *ck) {
02444 void *msg = getNext();
02445 if (msg != NULL) CsdEnqueue(msg);
02446 return CmiTrue;
02447 }
02448 };
02449
02450 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
02451 #if ! CMK_BIGSIM_CHARM
02452 CkPrintf("[%d] Quiescence detected\n",CkMyPe());
02453 #endif
02454 CkMessageReplay *replay = (CkMessageReplay*)rep;
02455
02456 }
02457
02458 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
02459 CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
02460 CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
02461 ConverseExit();
02462 }
02463
02464 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
02465 CkCoreState *ck = CkpvAccess(_coreState);
02466 if (ck->watcher!=NULL) {
02467 return ck->watcher->processThread(token,ck);
02468 }
02469 return CmiTrue;
02470 }
02471
02472 CpvCExtern(int, CthResumeNormalThreadIdx);
02473 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
02474 {
02475 CthThread t = token->thread;
02476
02477 if(t == NULL){
02478 free(token);
02479 return;
02480 }
02481 #if CMK_TRACE_ENABLED
02482 #if ! CMK_TRACE_IN_CHARM
02483 if(CpvAccess(traceOn))
02484 CthTraceResume(t);
02485
02486
02487 #endif
02488 #endif
02489
02490
02491 if (CpdExecuteThreadResume(token)) {
02492 CthResume(t);
02493 }
02494 }
02495
02496 void CpdHandleLBMessage(LBMigrateMsg **msg) {
02497 CkCoreState *ck = CkpvAccess(_coreState);
02498 if (ck->watcher!=NULL) {
02499 ck->watcher->processLBMessage(msg, ck);
02500 }
02501 }
02502
02503 #if CMK_BIGSIM_CHARM
02504 CpvExtern(int , CthResumeBigSimThreadIdx);
02505 #endif
02506
02507 #include "ckliststring.h"
02508 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
02509 CmiArgGroup("Charm++","Record/Replay");
02510 CmiBool forceReplay = CmiFalse;
02511 char *procs = NULL;
02512 _replaySystem = 0;
02513 if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
02514 _recplay_crc = 1;
02515 }
02516 if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
02517 _recplay_checksum = 1;
02518 }
02519 CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
02520 REPLAYDEBUG("CkMessageWatcherInit ");
02521 if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
02522 CkListString list(procs);
02523 if (list.includes(CkMyPe())) {
02524 CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
02525 CpdSetInitializeMemory(1);
02526 ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
02527 }
02528 }
02529 if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
02530 if (CkMyPe() == 0) {
02531 CmiPrintf("Charm++> record mode.\n");
02532 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
02533 CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
02534 _recplay_crc = _recplay_checksum = 0;
02535 }
02536 }
02537 CpdSetInitializeMemory(1);
02538 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
02539 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
02540 }
02541 if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
02542 forceReplay = CmiTrue;
02543 CpdSetInitializeMemory(1);
02544
02545 #if CMK_SHARED_VARS_UNAVAILABLE
02546 _Cmi_mype = atoi(procs);
02547 while (procs[0]!='/') procs++;
02548 procs++;
02549 _Cmi_numpes = atoi(procs);
02550 #else
02551 CkAbort("+replay-detail available only for non-SMP build");
02552 #endif
02553 _replaySystem = 1;
02554 ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
02555 }
02556 if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
02557 if (CkMyPe() == 0) {
02558 CmiPrintf("Charm++> replay mode.\n");
02559 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
02560 CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
02561 _recplay_crc = _recplay_checksum = 0;
02562 }
02563 }
02564 CpdSetInitializeMemory(1);
02565 #if ! CMK_BIGSIM_CHARM
02566 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
02567 #else
02568 CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
02569 #endif
02570 ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
02571 }
02572 if (_recplay_crc && _recplay_checksum) {
02573 CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
02574 }
02575 }
02576
02577 extern "C"
02578 int CkMessageToEpIdx(void *msg) {
02579 envelope *env=UsrToEnv(msg);
02580 int ep=env->getEpIdx();
02581 if (ep==CkIndex_CkArray::recvBroadcast(0))
02582 return env->getsetArrayBcastEp();
02583 else
02584 return ep;
02585 }
02586
02587 extern "C"
02588 int getCharmEnvelopeSize() {
02589 return sizeof(envelope);
02590 }
02591
02592 extern "C"
02593 int isCharmEnvelope(void *msg) {
02594
02595 envelope *e = (envelope *)msg;
02596 if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
02597 if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
02598 if (e->getTotalsize() < sizeof(envelope)) return 0;
02599 if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
02600 #if CMK_SMP
02601 if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
02602 #else
02603 if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
02604 #endif
02605 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
02606 return 1;
02607 }
02608
02609
02610 #include "CkMarshall.def.h"
02611