00001
00052 #include "charm++.h"
00053 #include "register.h"
00054 #include "ck.h"
00055 #include "pathHistory.h"
00056
00057 #if CMK_LBDB_ON
00058 #include "LBDatabase.h"
00059 #endif // CMK_LBDB_ON
00060
00061 CpvDeclare(int ,serializer);
00062
00063 bool _isAnytimeMigration;
00064 bool _isStaticInsertion;
00065 bool _isNotifyChildInRed;
00066
00067 #define ARRAY_DEBUG_OUTPUT 0
00068
00069 #if ARRAY_DEBUG_OUTPUT
00070 # define DEB(x) CkPrintf x //General debug messages
00071 # define DEBI(x) CkPrintf x //Index debug messages
00072 # define DEBC(x) CkPrintf x //Construction debug messages
00073 # define DEBS(x) CkPrintf x //Send/recv/broadcast debug messages
00074 # define DEBM(x) CkPrintf x //Migration debug messages
00075 # define DEBL(x) CkPrintf x //Load balancing debug messages
00076 # define DEBK(x) CkPrintf x //Spring Cleaning debug messages
00077 # define DEBB(x) CkPrintf x //Broadcast debug messages
00078 # define AA "ArrayBOC on %d: "
00079 # define AB ,CkMyPe()
00080 # define DEBUG(x) x
00081 #else
00082 # define DEB(X)
00083 # define DEBI(X)
00084 # define DEBC(X)
00085 # define DEBS(x)
00086 # define DEBM(X)
00087 # define DEBL(X)
00088 # define DEBK(x)
00089 # define DEBB(x)
00090 # define str(x)
00091 # define DEBUG(x)
00092 #endif
00093
00095 class CkArrayBroadcaster : public CkArrayListener {
00096 inline int &getData(ArrayElement *el) {return *ckGetData(el);}
00097 public:
00098 CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
00099 CkArrayBroadcaster(CkMigrateMessage *m);
00100 virtual void pup(PUP::er &p);
00101 virtual ~CkArrayBroadcaster();
00102 PUPable_decl(CkArrayBroadcaster);
00103
00104 virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
00105
00108 virtual CmiBool ckElementCreated(ArrayElement *elt)
00109 { return bringUpToDate(elt); }
00110
00113 virtual CmiBool ckElementArriving(ArrayElement *elt)
00114 { return bringUpToDate(elt); }
00115
00116 void incoming(CkArrayMessage *msg);
00117
00118 CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
00119
00120 void springCleaning(void);
00121
00122 void flushState();
00123 private:
00124 int bcastNo;
00125 int oldBcastNo;
00126
00127
00128 CkQ<CkArrayMessage *> oldBcasts;
00129 bool stableLocations;
00130 bool broadcastViaScheduler;
00131
00132 CmiBool bringUpToDate(ArrayElement *el);
00133 };
00134
00136 class CkArrayReducer : public CkArrayListener {
00137 CkGroupID mgrID;
00138 CkReductionMgr *mgr;
00139 typedef contributorInfo *I;
00140 inline contributorInfo *getData(ArrayElement *el)
00141 {return (I)ckGetData(el);}
00142 public:
00144 CkArrayReducer(CkGroupID mgrID_);
00145 CkArrayReducer(CkMigrateMessage *m);
00146 virtual void pup(PUP::er &p);
00147 virtual ~CkArrayReducer();
00148 PUPable_decl(CkArrayReducer);
00149
00150 void ckBeginInserting(void) {mgr->creatingContributors();}
00151 void ckEndInserting(void) {mgr->doneCreatingContributors();}
00152
00153 void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
00154
00155 void ckElementCreating(ArrayElement *elt)
00156 {mgr->contributorCreated(getData(elt));}
00157 void ckElementDied(ArrayElement *elt)
00158 {mgr->contributorDied(getData(elt));}
00159
00160 void ckElementLeaving(ArrayElement *elt)
00161 {mgr->contributorLeaving(getData(elt));}
00162 CmiBool ckElementArriving(ArrayElement *elt)
00163 {mgr->contributorArriving(getData(elt)); return CmiTrue; }
00164 };
00165
00166
00167
00168
00169
00170
00171
00172 void
00173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
00174 CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
00175 ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
00176 }
00177
00178
00179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
00180
00181 CkVerboseListener::CkVerboseListener(void)
00182 :CkArrayListener(0)
00183 {
00184 VL_PRINT<<"INIT Creating listener"<<endl;
00185 }
00186
00187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
00188 {
00189 CkArrayListener::ckRegister(arrMgr,dataOffset_);
00190 VL_PRINT<<"INIT Registering array manager at offset "<<dataOffset_<<endl;
00191 }
00192 void CkVerboseListener::ckBeginInserting(void)
00193 {
00194 VL_PRINT<<"INIT Begin inserting elements"<<endl;
00195 }
00196 void CkVerboseListener::ckEndInserting(void)
00197 {
00198 VL_PRINT<<"INIT Done inserting elements"<<endl;
00199 }
00200
00201 void CkVerboseListener::ckElementStamp(int *eltInfo)
00202 {
00203 VL_PRINT<<"LIFE Stamping element"<<endl;
00204 }
00205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
00206 {
00207 VL_PRINT<<"LIFE About to create element "<<idx2str(elt)<<endl;
00208 }
00209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
00210 {
00211 VL_PRINT<<"LIFE Created element "<<idx2str(elt)<<endl;
00212 return CmiTrue;
00213 }
00214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
00215 {
00216 VL_PRINT<<"LIFE Deleting element "<<idx2str(elt)<<endl;
00217 }
00218
00219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
00220 {
00221 VL_PRINT<<"MIG Leaving: element "<<idx2str(elt)<<endl;
00222 }
00223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
00224 {
00225 VL_PRINT<<"MIG Arriving: element "<<idx2str(elt)<<endl;
00226 return CmiTrue;
00227 }
00228
00229
00230
00231 class ArrayElement_initInfo {
00232 public:
00233 CkArray *thisArray;
00234 CkArrayID thisArrayID;
00235 CkArrayIndex numInitial;
00236 int listenerData[CK_ARRAYLISTENER_MAXLEN];
00237 CmiBool fromMigration;
00238 };
00239
00240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
00241
00242 void ArrayElement::initBasics(void)
00243 {
00244 #if CMK_OUT_OF_CORE
00245 if (CkpvAccess(CkSaveRestorePrefetch))
00246 return;
00247 #endif
00248 #if CMK_GRID_QUEUE_AVAILABLE
00249 grid_queue_interval = 0;
00250 grid_queue_threshold = 0;
00251 msg_count = 0;
00252 msg_count_grid = 0;
00253 border_flag = 0;
00254
00255 grid_queue_interval = CmiGridQueueGetInterval ();
00256 grid_queue_threshold = CmiGridQueueGetThreshold ();
00257 #endif
00258 ArrayElement_initInfo &info=CkpvAccess(initInfo);
00259 thisArray=info.thisArray;
00260 thisArrayID=info.thisArrayID;
00261 numInitialElements=info.numInitial.getCombinedCount();
00262 if (info.listenerData) {
00263 memcpy(listenerData,info.listenerData,sizeof(listenerData));
00264 }
00265 if (!info.fromMigration) {
00266 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00267 l->ckElementCreating(this));
00268 }
00269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00270 if(mlogData == NULL)
00271 mlogData = new ChareMlogData();
00272 mlogData->objID.type = TypeArray;
00273 mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
00274 #endif
00275 #ifdef _PIPELINED_ALLREDUCE_
00276 allredMgr = NULL;
00277 #endif
00278 }
00279
00280 ArrayElement::ArrayElement(void)
00281 {
00282 initBasics();
00283 #if CMK_MEM_CHECKPOINT
00284 init_checkpt();
00285 #endif
00286 }
00287
00288 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
00289 {
00290 initBasics();
00291 }
00292
00293
00294 void ArrayElement::ckAboutToMigrate(void) {
00295 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00296 l->ckElementLeaving(this));
00297 CkMigratable::ckAboutToMigrate();
00298 }
00299 void ArrayElement::ckJustMigrated(void) {
00300 CkMigratable::ckJustMigrated();
00301 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00302 if (!l->ckElementArriving(this)) return;);
00303 }
00304
00305 void ArrayElement::ckJustRestored(void) {
00306 CkMigratable::ckJustRestored();
00307
00308 }
00309
00310 #ifdef _PIPELINED_ALLREDUCE_
00311 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
00312 CMK_REFNUM_TYPE userFlag)
00313 {
00314 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00315 msg->setUserFlag(userFlag);
00316 msg->setMigratableContributor(true);
00317 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00318 }
00319 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
00320 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00321 {
00322 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00323 msg->setUserFlag(userFlag);
00324 msg->setCallback(cb);
00325 msg->setMigratableContributor(true);
00326 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00327 }
00328 void ArrayElement::contribute2(CkReductionMsg *msg)
00329 {
00330 msg->setMigratableContributor(true);
00331 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00332 }
00333 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00334 {
00335 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
00336 msg->setUserFlag(userFlag);
00337 msg->setCallback(cb);
00338 msg->setMigratableContributor(true);
00339 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00340 }
00341 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
00342 {
00343 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
00344 msg->setUserFlag(userFlag);
00345 msg->setMigratableContributor(true);
00346 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00347 }
00348
00349 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
00350 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00351 {
00352
00353 if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD)
00354 {
00355 if (!allredMgr) {
00356 allredMgr = new AllreduceMgr();
00357 }
00358
00359 int fragNo = dataSize/FRAG_SIZE;
00360 int size = FRAG_SIZE;
00361
00362 for (int i=0; i<fragNo; i++) {
00363
00364 CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
00365 if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
00366 size = dataSize%FRAG_SIZE;
00367 }
00368 CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
00369
00370 msg->reducer = type;
00371 msg->nFrags = fragNo;
00372 msg->fragNo = i;
00373 msg->callback = defrag_cb;
00374 msg->userFlag = userFlag;
00375 allredMgr->cb = cb;
00376 allredMgr->cb.type = CkCallback::sendArray;
00377 allredMgr->cb.d.array.idx = myIndex;
00378 contribute2(msg);
00379 }
00380 return;
00381 }
00382 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00383 msg->setUserFlag(userFlag);
00384 msg->setCallback(cb);
00385 msg->setMigratableContributor(true);
00386 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00387 }
00388
00389
00390 #else
00391 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
00392 *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
00393 #endif
00394
00395 void ArrayElement::defrag(CkReductionMsg *msg)
00396 {
00397
00398 #ifdef _PIPELINED_ALLREDUCE_
00399 allredMgr->allreduce_recieve(msg);
00400 #endif
00401 }
00402
00404 void ArrayElement::ckDestroy(void)
00405 {
00406 if(_BgOutOfCoreFlag!=1){
00407 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00408 l->ckElementDied(this));
00409 }
00410 CkMigratable::ckDestroy();
00411 }
00412
00413
00414 ArrayElement::~ArrayElement()
00415 {
00416 #if CMK_OUT_OF_CORE
00417 if (CkpvAccess(CkSaveRestorePrefetch))
00418 return;
00419 #endif
00420
00421 thisArray=(CkArray *)0xDEADa7a1;
00422 }
00423
00424 void ArrayElement::pup(PUP::er &p)
00425 {
00426 DEBM((AA" ArrayElement::pup()\n"AB));
00427 CkMigratable::pup(p);
00428 thisArrayID.pup(p);
00429 if (p.isUnpacking())
00430 thisArray=thisArrayID.ckLocalBranch();
00431 p(listenerData,CK_ARRAYLISTENER_MAXLEN);
00432 #if CMK_MEM_CHECKPOINT
00433 p(budPEs, 2);
00434 #endif
00435 p.syncComment(PUP::sync_last_system,"ArrayElement");
00436 #if CMK_GRID_QUEUE_AVAILABLE
00437 p|grid_queue_interval;
00438 p|grid_queue_threshold;
00439 p|msg_count;
00440 p|msg_count_grid;
00441 p|border_flag;
00442 if (p.isUnpacking ()) {
00443 msg_count = 0;
00444 msg_count_grid = 0;
00445 border_flag = 0;
00446 }
00447 #endif
00448 }
00449
00450 char *ArrayElement::ckDebugChareName(void) {
00451 char buf[200];
00452 const char *className=_chareTable[ckGetChareType()]->name;
00453 const int *d=thisIndexMax.data();
00454 const short int *s=(const short int*)d;
00455 switch (thisIndexMax.dimension) {
00456 case 0: sprintf(buf,"%s",className); break;
00457 case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
00458 case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
00459 case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
00460 case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
00461 case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
00462 case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
00463 default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
00464 };
00465 return strdup(buf);
00466 }
00467
00468 int ArrayElement::ckDebugChareID(char *str, int limit) {
00469 if (limit<21) return -1;
00470 str[0] = 2;
00471 *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
00472 *((CkArrayIndex*)&str[5]) = thisIndexMax;
00473 return 21;
00474 }
00475
00477 void ArrayElement::CkAbort(const char *str) const
00478 {
00479 CkError("[%d] Array element at index %s aborting:\n",
00480 CkMyPe(), idx2str(thisIndexMax));
00481 CkMigratable::CkAbort(str);
00482 }
00483
00484 void ArrayElement::recvBroadcast(CkMessage *m){
00485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00486 CkArrayMessage *bcast = (CkArrayMessage *)m;
00487 envelope *env = UsrToEnv(m);
00488 int epIdx= env->piggyBcastIdx;
00489 ckInvokeEntry(epIdx,bcast,CmiTrue);
00490 #endif
00491 }
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501 inline void CkArray::springCleaning(void)
00502 {
00503 DEBK((AA"Starting spring cleaning\n"AB));
00504 broadcaster->springCleaning();
00505 }
00506
00507 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
00508 ((CkArray *)forArray)->springCleaning();
00509 }
00510
00511
00512
00513 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
00514 :CProxy(), _aid(e->ckGetArrayID())
00515 {}
00516 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
00517 :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
00518 {}
00519
00520 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
00521 {return ckLocalBranch()->getLocMgr(); }
00522
00523 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
00524
00525 CkArrayOptions::CkArrayOptions(void)
00526 :numInitial(),map(_defaultArrayMapID)
00527 {
00528 init();
00529 }
00530
00531 CkArrayOptions::CkArrayOptions(int ni1)
00532 :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
00533 {
00534 init();
00535 }
00536
00537 CkArrayOptions::CkArrayOptions(int ni1, int ni2)
00538 :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
00539 {
00540 init();
00541 }
00542
00543 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3)
00544 :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
00545 {
00546 init();
00547 }
00548
00549 void CkArrayOptions::init()
00550 {
00551 locMgr.setZero();
00552 anytimeMigration = _isAnytimeMigration;
00553 staticInsertion = _isStaticInsertion;
00554 reductionClient.type = CkCallback::invalid;
00555 disableNotifyChildInRed = !_isNotifyChildInRed;
00556 broadcastViaScheduler = false;
00557 }
00558
00559 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
00560 {
00561 staticInsertion = b;
00562 if (b && map == _defaultArrayMapID)
00563 map = _fastArrayMapID;
00564 return *this;
00565 }
00566
00568 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
00569 {
00570 CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
00571
00572
00573
00574 return setLocationManager(arr->getLocMgr()->getGroupID());
00575 }
00576 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
00577 {
00578 arrayListeners.push_back(listener);
00579 return *this;
00580 }
00581
00582 void CkArrayOptions::pup(PUP::er &p) {
00583 p|numInitial;
00584 p|map;
00585 p|locMgr;
00586 p|arrayListeners;
00587 p|reductionClient;
00588 p|anytimeMigration;
00589 p|disableNotifyChildInRed;
00590 p|staticInsertion;
00591 p|broadcastViaScheduler;
00592 }
00593
00594 CkArrayListener::CkArrayListener(int nInts_)
00595 :nInts(nInts_)
00596 {
00597 dataOffset=-1;
00598 }
00599 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
00600 nInts=-1; dataOffset=-1;
00601 }
00602 void CkArrayListener::pup(PUP::er &p) {
00603 p|nInts;
00604 p|dataOffset;
00605 }
00606
00607 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
00608 {
00609 if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
00610 dataOffset=dataOffset_;
00611 }
00612
00613 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
00614 const CkArrayOptions &opts_)
00615 {
00616 CkArrayOptions opts(opts_);
00617 CkGroupID locMgr = opts.getLocationManager();
00618 if (locMgr.isZero())
00619 {
00620 #if !CMK_LBDB_ON
00621 CkGroupID _lbdb;
00622 #endif
00623 CkEntryOptions e_opts;
00624 e_opts.setGroupDepID(opts.getMap());
00625 locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
00626 opts.setLocationManager(locMgr);
00627 }
00628
00629 m->array_ep()=ctor;
00630 CkMarshalledMessage marsh(m);
00631 CkEntryOptions e_opts;
00632 e_opts.setGroupDepID(locMgr);
00633 #if !GROUP_LEVEL_REDUCTION
00634 CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
00635 CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
00636 nodereductionProxy.setAttachedGroup(ag);
00637 #else
00638 CkNodeGroupID dummyid;
00639 CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
00640 #endif
00641 return (CkArrayID)ag;
00642 }
00643
00644 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
00645 {
00646 return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
00647 }
00648
00649 extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
00650
00651 struct CkInsertIdxMsg {
00652 char core[CmiReservedHeaderSize];
00653 CkArrayIndex idx;
00654 CkArrayMessage *m;
00655 int ctor;
00656 int onPe;
00657 CkArrayID _aid;
00658 };
00659
00660 static int ckinsertIdxHdl;
00661
00662 void ckinsertIdxFunc(void *m)
00663 {
00664 CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
00665 CProxy_ArrayBase ca(msg->_aid);
00666 ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
00667 CmiFree(msg);
00668 }
00669
00670 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
00671 const CkArrayIndex &idx)
00672 {
00673 if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
00674 m->array_ep()=ctor;
00675 CkArray *ca = ckLocalBranch();
00676 if (ca == NULL) {
00677 CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
00678 msg->idx = idx;
00679 msg->m = m;
00680 msg->ctor = ctor;
00681 msg->onPe = onPe;
00682 msg->_aid = _aid;
00683 CmiSetHandler(msg, ckinsertIdxHdl);
00684 ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
00685 if (ca == NULL) return;
00686 }
00687 ca->prepareCtorMsg(m,onPe,idx);
00688 if (ckIsDelegated()) {
00689 ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
00690 return;
00691 }
00692
00693 DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
00694 CkArrayManagerInsert(onPe,m,_aid);
00695 }
00696
00697 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
00698 {
00699 ckInsertIdx(m,ctorIndex,onPe,_idx);
00700 }
00701
00702 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
00703 {
00704 return ckLocalBranch()->lookup(_idx);
00705 }
00706
00707
00708 void CProxy_ArrayBase::pup(PUP::er &p)
00709 {
00710 CProxy::pup(p);
00711 _aid.pup(p);
00712 }
00713 void CProxyElement_ArrayBase::pup(PUP::er &p)
00714 {
00715 CProxy_ArrayBase::pup(p);
00716 p|_idx.nInts;
00717 p|_idx.dimension;
00718 p(_idx.data(),_idx.nInts);
00719 }
00720
00721 void CProxySection_ArrayBase::pup(PUP::er &p)
00722 {
00723 CProxy_ArrayBase::pup(p);
00724 p | _nsid;
00725 if (p.isUnpacking()) {
00726 if (_nsid == 1) _sid = new CkSectionID;
00727 else if (_nsid > 1) _sid = new CkSectionID[_nsid];
00728 else _sid = NULL;
00729 }
00730 for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
00731 }
00732
00733
00734 void _ckArrayInit(void)
00735 {
00736 CkpvInitialize(ArrayElement_initInfo,initInfo);
00737 CkDisableTracing(CkIndex_CkArray::insertElement(0));
00738 CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
00739
00740 CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
00741
00742 ckinsertIdxHdl = CkRegisterHandler(ckinsertIdxFunc);
00743 }
00744
00745 CkArray::CkArray(CkArrayOptions &opts,
00746 CkMarshalledMessage &initMsg,
00747 CkNodeGroupID nodereductionID)
00748 : CkReductionMgr(),
00749 locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
00750 locMgrID(opts.getLocationManager()),
00751 thisProxy(thisgroup),
00752
00753 elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
00754 stableLocations(opts.staticInsertion && !opts.anytimeMigration),
00755 numInitial(opts.getNumInitial()), isInserting(CmiTrue)
00756 {
00757 if (!stableLocations)
00758 CcdCallOnConditionKeep(CcdPERIODIC_1minute,
00759 staticSpringCleaning, (void *)this);
00760
00761
00762 if(opts.disableNotifyChildInRed)
00763 disableNotifyChildrenStart = CmiTrue;
00764
00765
00766 listenerDataOffset=0;
00767 broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
00768 addListener(broadcaster);
00769 reducer=new CkArrayReducer(thisgroup);
00770 addListener(reducer);
00771
00772
00773
00774
00775
00776 int lNo,nL=opts.getListeners();
00777 for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
00778
00779 for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
00780
00782 locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
00783
00785
00786 #if !GROUP_LEVEL_REDUCTION
00787 CProxy_CkArrayReductionMgr nodetemp(nodereductionID);
00788 nodeProxy = nodetemp;
00789
00790 #endif
00791
00792 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00793
00794 children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
00795 numChildren = 0;
00796
00797
00798
00799
00800 int level = 0;
00801 int aux = CmiMyPe();
00802 int max = CmiNumPes();
00803 int factor = _MLOG_BCAST_BFACTOR_;
00804 int startLevel = 0;
00805 int startNextLevel = 1;
00806 while(aux >= 0){
00807 level++;
00808 startLevel = startNextLevel;
00809 startNextLevel += factor;
00810 aux -= factor;
00811 factor *= _MLOG_BCAST_BFACTOR_;
00812 }
00813
00814
00815 int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
00816 for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
00817 if(first + i >= CmiNumPes())
00818 break;
00819 children[i] = first + i;
00820 numChildren++;
00821 }
00822
00823 #endif
00824
00825
00826 if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
00827 ckSetReductionClient(&opts.reductionClient);
00828 }
00829
00830 CkArray::CkArray(CkMigrateMessage *m)
00831 :CkReductionMgr(m), thisProxy(thisgroup)
00832 {
00833 locMgr=NULL;
00834 isInserting=CmiTrue;
00835 }
00836
00837 #if CMK_ERROR_CHECKING
00838 inline void testPup(PUP::er &p,int shouldBe) {
00839 int a=shouldBe;
00840 p|a;
00841 if (a!=shouldBe)
00842 CkAbort("PUP direction mismatch!");
00843 }
00844 #else
00845 inline void testPup(PUP::er &p,int shouldBe) {}
00846 #endif
00847
00848 void CkArray::pup(PUP::er &p){
00849 CkReductionMgr::pup(p);
00850 p|numInitial;
00851 p|locMgrID;
00852 p|listeners;
00853 p|listenerDataOffset;
00854 p|stableLocations;
00855 testPup(p,1234);
00856 if(p.isUnpacking()){
00857 thisProxy=thisgroup;
00858 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
00859 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
00861 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
00862 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
00864 if (!stableLocations)
00865 CcdCallOnConditionKeep(CcdPERIODIC_1minute,
00866 staticSpringCleaning, (void *)this);
00867 }
00868 }
00869
00870 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
00871 int dataOffset=0; \
00872 for (int lNo=0;lNo<listeners.size();lNo++) { \
00873 CkArrayListener *l=listeners[lNo]; \
00874 l->ckElementStamp(&listenerData[dataOffset]); \
00875 dataOffset+=l->ckGetLen(); \
00876 } \
00877 } while (0)
00878
00879
00880 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
00881 {
00882 envelope *env=UsrToEnv((void *)m);
00883 env->getsetArrayIndex()=idx;
00884 int *listenerData=env->getsetArrayListenerData();
00885 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
00886 if (onPe==-1) onPe=procNum(idx);
00887 if (onPe!=CkMyPe()&&onPe!=-1)
00888 getLocMgr()->inform(idx,onPe);
00889 }
00890
00891 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
00892 CkElementCreation_t type)
00893 {
00894 ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
00895 if (type==CkElementCreation_resume)
00896 {
00897
00898 int *listenerData=ret->listenerData;
00899 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
00900 }
00901 return ret;
00902 }
00903
00904 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
00905 CkMessage *msg,CmiBool fromMigration)
00906 {
00907
00908 ArrayElement_initInfo &init=CkpvAccess(initInfo);
00909 init.numInitial=numInitial;
00910 init.thisArray=this;
00911 init.thisArrayID=thisgroup;
00912 if (msg)
00913 memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
00914 sizeof(init.listenerData));
00915 init.fromMigration=fromMigration;
00916
00917
00918 int elSize=_chareTable[elChareType]->size;
00919 ArrayElement *elem = (ArrayElement *)malloc(elSize);
00920 #ifndef CMK_OPTIMIZE
00921 if (elem!=NULL) setMemoryTypeChare(elem);
00922 #endif
00923 return elem;
00924 }
00925
00927 CmiBool CkArray::insertElement(CkMessage *me)
00928 {
00929 CK_MAGICNUMBER_CHECK
00930 CkArrayMessage *m=(CkArrayMessage *)me;
00931 const CkArrayIndex &idx=m->array_index();
00932 int onPe;
00933 if (locMgr->isRemote(idx,&onPe))
00934 {
00935 CkArrayManagerInsert(onPe,me,thisgroup);
00936 return CmiFalse;
00937 }
00938 int ctorIdx=m->array_ep();
00939 int chareType=_entryTable[ctorIdx]->chareIdx;
00940 ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
00941 #ifndef CMK_CHARE_USE_PTR
00942 ((Chare *)elt)->chareIdx = -1;
00943 #endif
00944 if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
00945 CK_ARRAYLISTENER_LOOP(listeners,
00946 if (!l->ckElementCreated(elt)) return CmiFalse;);
00947 return CmiTrue;
00948 }
00949
00950 void CProxy_ArrayBase::doneInserting(void)
00951 {
00952 DEBC((AA"Broadcasting a doneInserting request\n"AB));
00953
00954 CProxy_CkArray(_aid).remoteDoneInserting();
00955 }
00956
00957 void CkArray::doneInserting(void)
00958 {
00959 thisProxy[CkMyPe()].remoteDoneInserting();
00960 }
00961
00963 void CkArray::remoteDoneInserting(void)
00964 {
00965 CK_MAGICNUMBER_CHECK
00966 if (isInserting) {
00967 isInserting=CmiFalse;
00968 DEBC((AA"Done inserting objects\n"AB));
00969 for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
00970 locMgr->doneInserting();
00971 }
00972 }
00973
00974 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
00975 int onPe,int ctor,CkDeliver_t type)
00976 {
00977 CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
00978 prepareCtorMsg(m,onPe,idx);
00979 m->array_ep()=ctor;
00980
00981 if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
00982 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
00983 CkArrayManagerInsert(onPe,m,thisgroup);
00984 } else {
00985
00986 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
00987 return insertElement(m);
00988 }
00989 return CmiTrue;
00990 }
00991
00992 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
00993 {
00994 CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
00995 if (local) {
00996 int onPe=CkMyPe();
00997 prepareCtorMsg(m,onPe,idx);
00998 #if CMK_BIGSIM_CHARM
00999 BgEntrySplit("split-array-new");
01000 #endif
01001 insertElement(m);
01002 }
01003 else {
01004 int onPe=-1;
01005 prepareCtorMsg(m,onPe,idx);
01006 CkArrayManagerInsert(onPe,m,getGroupID());
01007 }
01008 }
01009
01010
01012 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
01013 {
01014 envelope *env=UsrToEnv((void *)msg);
01015 env->getsetArrayMgr()=aid;
01016 env->getsetArraySrcPe()=CkMyPe();
01017 env->setEpIdx(ep);
01018 env->getsetArrayHops()=0;
01019 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01020 criticalPath_send(env);
01021 automaticallySetMessagePriority(env);
01022 #endif
01023 }
01024
01025
01027 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
01028 {
01029 envelope *env=UsrToEnv((void *)msg);
01030 env->getsetArrayMgr()=aid;
01031 env->getsetArraySrcPe()=CkMyPe();
01032 env->setEpIdx(ep);
01033 env->getsetArrayHops()=0;
01034 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01035 criticalPath_send(env);
01036 automaticallySetMessagePriority(env);
01037 #endif
01038 }
01039
01040 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
01041 {
01042 #if CMK_ERROR_CHECKING
01043
01044 if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
01045 if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
01046 CkAbort("Array index length (nInts) is too long-- did you "
01047 "use bytes instead of integers?\n");
01048 #endif
01049 msg_prepareSend(msg,ep,ckGetArrayID());
01050 msg->array_index()=_idx;
01051 if (ckIsDelegated())
01052 ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
01053 else
01054 {
01055 CkArray *localbranch = ckLocalBranch();
01056 if (localbranch == NULL) {
01057 CkArrayManagerDeliver(CkMyPe(), msg, 0);
01058 }
01059 else {
01060 if (opts & CK_MSG_INLINE)
01061 localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
01062 else
01063 localbranch->deliver(msg, CkDeliver_queue, opts);
01064 }
01065 }
01066 }
01067
01068 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
01069 {
01070 CkFutureID f=CkCreateAttachedFuture(msg);
01071 ckSend(msg,ep);
01072 return CkWaitReleaseFuture(f);
01073 }
01074
01075 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts )
01076 {
01077 CProxySection_ArrayBase sp(sID);
01078 sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
01079 }
01080
01081 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
01082 {
01083 if (ckIsDelegated())
01084 ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
01085 else {
01086
01087 for (int k=0; k<_nsid; ++k) {
01088 for (int i=0; i< _sid[k]._nElems-1; i++) {
01089 CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
01090 void *newMsg=CkCopyMsg((void **)&msg);
01091 ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
01092 }
01093 if (_sid[k]._nElems > 0) {
01094 void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
01095 CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
01096 ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
01097 }
01098 }
01099 }
01100 }
01101
01102 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
01103 {
01104 CkArrayMessage *m=(CkArrayMessage *)msg;
01105 m->array_index()=idx;
01106 msg_prepareSend(m,entryIndex,aID);
01107 CkArray *a=(CkArray *)_localBranch(aID);
01108 if (a == NULL)
01109 CkArrayManagerDeliver(CkMyPe(), msg, 0);
01110 else
01111 a->deliver(m,CkDeliver_queue,opts);
01112 }
01113
01114 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
01115 {
01116 CkArrayMessage *m=(CkArrayMessage *)msg;
01117 m->array_index()=idx;
01118 msg_prepareSend(m,entryIndex,aID);
01119 CkArray *a=(CkArray *)_localBranch(aID);
01120 int oldStatus = CkDisableTracing(entryIndex);
01121 a->deliver(m,CkDeliver_inline,opts);
01122 if (oldStatus) CkEnableTracing(entryIndex);
01123 }
01124
01125
01126
01127 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
01128 :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
01129 mgrID(mgrID_)
01130 {
01131 mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
01132 }
01133 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
01134 :CkArrayListener(m)
01135 {
01136 mgr=NULL;
01137 }
01138 void CkArrayReducer::pup(PUP::er &p) {
01139 CkArrayListener::pup(p);
01140 p|mgrID;
01141 if (p.isUnpacking())
01142 mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
01143 }
01144 CkArrayReducer::~CkArrayReducer() {}
01145
01146
01147
01148 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
01149 :CkArrayListener(1),
01150 bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
01151 { }
01152
01153 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
01154 :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
01155 { }
01156
01157 void CkArrayBroadcaster::pup(PUP::er &p) {
01158 CkArrayListener::pup(p);
01159
01160
01161 p|bcastNo;
01162 p|stableLocations;
01163 p|broadcastViaScheduler;
01164 if (p.isUnpacking()) {
01165 oldBcastNo=bcastNo;
01166 }
01167 }
01168
01169 CkArrayBroadcaster::~CkArrayBroadcaster()
01170 {
01171 CkArrayMessage *msg;
01172 while (NULL!=(msg=oldBcasts.deq())) delete msg;
01173 }
01174
01175 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
01176 {
01177 bcastNo++;
01178 DEBB((AA"Received broadcast %d\n"AB,bcastNo));
01179
01180 if (stableLocations)
01181 return;
01182
01183 CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
01184 oldBcasts.enq((CkArrayMessage *)msg);
01185 }
01186
01188 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
01189 CmiBool doFree)
01190 {
01191 int &elBcastNo=getData(el);
01192
01193 if (elBcastNo >= bcastNo) return CmiFalse;
01194 elBcastNo++;
01195 DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
01196 int epIdx=bcast->array_ep_bcast();
01197
01198 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01199 DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
01200 return CmiTrue;
01201 #else
01202 if (!broadcastViaScheduler)
01203 return el->ckInvokeEntry(epIdx, bcast, doFree);
01204 else {
01205 if (!doFree) {
01206 CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
01207 bcast = newMsg;
01208 }
01209 envelope *env = UsrToEnv(bcast);
01210 env->getsetArrayEp() = epIdx;
01211 env->getsetArrayMgr() = el->ckGetArrayID();
01212 env->getsetArrayIndex() = el->ckGetArrayIndex();
01213 CkArrayManagerDeliver(CkMyPe(), bcast, 0);
01214 return true;
01215 }
01216 #endif
01217 }
01218
01220 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
01221 {
01222 if (stableLocations) return CmiTrue;
01223 int &elBcastNo=getData(el);
01224 if (elBcastNo<bcastNo)
01225 {
01226
01227 int i,nDeliver=bcastNo-elBcastNo;
01228 DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
01229
01230
01231 for (i=oldBcasts.length()-1;i>=nDeliver;i--)
01232 oldBcasts.enq(oldBcasts.deq());
01233
01234
01235 for (i=nDeliver-1;i>=0;i--)
01236 {
01237 CkArrayMessage *msg=oldBcasts.deq();
01238 if(msg == NULL)
01239 continue;
01240 oldBcasts.enq(msg);
01241 if (!deliver(msg, el, CmiFalse))
01242 return CmiFalse;
01243 }
01244 }
01245
01246 return CmiTrue;
01247 }
01248
01249
01250 void CkArrayBroadcaster::springCleaning(void)
01251 {
01252
01253 int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
01254 if (nDelete>0) {
01255 DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
01256 for (int i=0;i<nDelete;i++)
01257 delete oldBcasts.deq();
01258 }
01259 oldBcastNo=bcastNo;
01260 }
01261
01262 void CkArrayBroadcaster::flushState()
01263 {
01264 bcastNo = oldBcastNo = 0;
01265 CkArrayMessage *msg;
01266 while (NULL!=(msg=oldBcasts.deq())) delete msg;
01267 }
01268
01269 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
01270 {
01271 CProxy_ArrayBase ap(aID);
01272 ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
01273 }
01274
01275 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
01276 {
01277 msg->array_ep_bcast()=ep;
01278 if (ckIsDelegated())
01279 ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
01280 else
01281 {
01282 _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
01283 int skipsched = opts & CK_MSG_EXPEDITED;
01284
01285 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01286 CProxy_CkArray ap(_aid);
01287 ap[CpvAccess(serializer)].sendBroadcast(msg);
01288 CkGroupID _id = _aid;
01289
01290 #else
01291 if (CkMyPe()==CpvAccess(serializer))
01292 {
01293 DEBB((AA"Sending array broadcast\n"AB));
01294 if (skipsched)
01295 CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
01296 else
01297 CProxy_CkArray(_aid).recvBroadcast(msg);
01298 } else {
01299 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
01300 CProxy_CkArray ap(_aid);
01301 if (skipsched)
01302 ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
01303 else
01304 ap[CpvAccess(serializer)].sendBroadcast(msg);
01305 }
01306 #endif
01307 }
01308 }
01309
01311 void CkArray::sendBroadcast(CkMessage *msg)
01312 {
01313 CK_MAGICNUMBER_CHECK
01314 if(CkMyPe() == CpvAccess(serializer)){
01315 #if _MLOG_BCAST_TREE_
01316
01317 for(int i=0; i<numChildren; i++){
01318 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
01319 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
01320 }
01321
01322
01323 recvBroadcast(msg);
01324 #else
01325
01326 thisProxy.recvBroadcast(msg);
01327 #endif
01328 }else{
01329 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
01330 }
01331 }
01332 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
01333 {
01334 CK_MAGICNUMBER_CHECK
01335
01336 thisProxy.recvExpeditedBroadcast(msg);
01337 }
01338
01339 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01340 int _tempBroadcastCount=0;
01341
01342
01343 void CkArray::recvBroadcastViaTree(CkMessage *msg)
01344 {
01345 CK_MAGICNUMBER_CHECK
01346
01347
01348 for(int i=0; i<numChildren; i++){
01349 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
01350 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
01351 }
01352
01353
01354 recvBroadcast(msg);
01355 }
01356
01357 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
01358 if(homePe(*index)==CmiMyPe()){
01359 CkArrayMessage *bcast = (CkArrayMessage *)data;
01360 int epIdx=bcast->array_ep_bcast();
01361 DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
01362 CkArrayMessage *copy = (CkArrayMessage *) CkCopyMsg((void **)&bcast);
01363 envelope *env = UsrToEnv(copy);
01364 env->sender.data.group.onPE = CkMyPe();
01365 env->TN = env->SN=0;
01366 env->piggyBcastIdx = epIdx;
01367 env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
01368 env->getsetArrayMgr() = thisgroup;
01369 env->getsetArrayIndex() = *index;
01370 env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
01371 env->setSrcPe(CkMyPe());
01372 rec->deliver(copy,CkDeliver_queue);
01373 _tempBroadcastCount++;
01374 }else{
01375 if(locMgr->homeElementCount != -1){
01376 DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
01377 }
01378 }
01379 }
01380
01381 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
01382 arr->broadcastHomeElements(data,rec,index);
01383 }
01384 #else
01385 void CkArray::recvBroadcastViaTree(CkMessage *msg){
01386 }
01387 #endif
01388
01389
01391 void CkArray::recvBroadcast(CkMessage *m)
01392 {
01393 CK_MAGICNUMBER_CHECK
01394 CkArrayMessage *msg=(CkArrayMessage *)m;
01395 broadcaster->incoming(msg);
01396
01397 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01398 _tempBroadcastCount=0;
01399 locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
01400 #else
01401
01402 int idx=0, len=0, count=0;
01403 if (stableLocations) {
01404 len = 0;
01405 while (elements->next(idx)!=NULL) len++;
01406 idx = 0;
01407 }
01408 ArrayElement *el;
01409 #if CMK_BIGSIM_CHARM
01410 void *root;
01411 _TRACE_BG_TLINE_END(&root);
01412 BgSetEntryName("start-broadcast", &root);
01413 CkVec<void *> logs;
01414 extern void stopVTimer();
01415 extern void startVTimer();
01416 #endif
01417 while (NULL!=(el=elements->next(idx))) {
01418 #if CMK_BIGSIM_CHARM
01419
01420 stopVTimer();
01421 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
01422 logs.push_back(curlog);
01423 startVTimer();
01424 #endif
01425 CmiBool doFree = CmiFalse;
01426 if (stableLocations && ++count == len) doFree = CmiTrue;
01427 broadcaster->deliver(msg, el, doFree);
01428 }
01429 #endif
01430
01431 #if CMK_BIGSIM_CHARM
01432
01433 stopVTimer();
01434 BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
01435 startVTimer();
01436 #endif
01437
01438
01439
01440 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01441 if (stableLocations)
01442 delete msg;
01443 #else
01444 if (stableLocations && len == 0)
01445 delete msg;
01446 #endif
01447 }
01448
01449 #include "CkArray.def.h"
01450
01451