00001
00052 #include "charm++.h"
00053 #include "register.h"
00054 #include "ck.h"
00055 #include "pathHistory.h"
00056
00057 CpvDeclare(int ,serializer);
00058
00059 bool _isAnytimeMigration;
00060 bool _isStaticInsertion;
00061 bool _isNotifyChildInRed;
00062
00063 #define ARRAY_DEBUG_OUTPUT 0
00064
00065 #if ARRAY_DEBUG_OUTPUT
00066 # define DEB(x) CkPrintf x //General debug messages
00067 # define DEBI(x) CkPrintf x //Index debug messages
00068 # define DEBC(x) CkPrintf x //Construction debug messages
00069 # define DEBS(x) CkPrintf x //Send/recv/broadcast debug messages
00070 # define DEBM(x) CkPrintf x //Migration debug messages
00071 # define DEBL(x) CkPrintf x //Load balancing debug messages
00072 # define DEBK(x) CkPrintf x //Spring Cleaning debug messages
00073 # define DEBB(x) CkPrintf x //Broadcast debug messages
00074 # define AA "ArrayBOC on %d: "
00075 # define AB ,CkMyPe()
00076 # define DEBUG(x) x
00077 #else
00078 # define DEB(X)
00079 # define DEBI(X)
00080 # define DEBC(X)
00081 # define DEBS(x)
00082 # define DEBM(X)
00083 # define DEBL(X)
00084 # define DEBK(x)
00085 # define DEBB(x)
00086 # define str(x)
00087 # define DEBUG(x)
00088 #endif
00089
00091 class CkArrayBroadcaster : public CkArrayListener {
00092 inline int &getData(ArrayElement *el) {return *ckGetData(el);}
00093 public:
00094 CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
00095 CkArrayBroadcaster(CkMigrateMessage *m);
00096 virtual void pup(PUP::er &p);
00097 virtual ~CkArrayBroadcaster();
00098 PUPable_decl(CkArrayBroadcaster);
00099
00100 virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
00101
00104 virtual bool ckElementCreated(ArrayElement *elt)
00105 { return bringUpToDate(elt); }
00106
00109 virtual bool ckElementArriving(ArrayElement *elt)
00110 { return bringUpToDate(elt); }
00111
00112 void incoming(CkArrayMessage *msg);
00113
00114 bool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
00115
00116 void springCleaning(void);
00117
00118 void flushState();
00119 private:
00120 int bcastNo;
00121 int oldBcastNo;
00122
00123
00124 CkQ<CkArrayMessage *> oldBcasts;
00125 bool stableLocations;
00126 bool broadcastViaScheduler;
00127
00128 bool bringUpToDate(ArrayElement *el);
00129 };
00130
00132 class CkArrayReducer : public CkArrayListener {
00133 CkGroupID mgrID;
00134 CkReductionMgr *mgr;
00135 typedef contributorInfo *I;
00136 inline contributorInfo *getData(ArrayElement *el)
00137 {return (I)ckGetData(el);}
00138 public:
00140 CkArrayReducer(CkGroupID mgrID_);
00141 CkArrayReducer(CkMigrateMessage *m);
00142 virtual void pup(PUP::er &p);
00143 virtual ~CkArrayReducer();
00144 PUPable_decl(CkArrayReducer);
00145
00146 void ckBeginInserting(void) {mgr->creatingContributors();}
00147 void ckEndInserting(void) {mgr->doneCreatingContributors();}
00148
00149 void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
00150
00151 void ckElementCreating(ArrayElement *elt)
00152 {mgr->contributorCreated(getData(elt));}
00153 void ckElementDied(ArrayElement *elt)
00154 {mgr->contributorDied(getData(elt));}
00155
00156 void ckElementLeaving(ArrayElement *elt)
00157 {mgr->contributorLeaving(getData(elt));}
00158 bool ckElementArriving(ArrayElement *elt)
00159 {mgr->contributorArriving(getData(elt)); return true; }
00160 };
00161
00162
00163
00164
00165
00166
00167
00168 void
00169 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
00170 CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
00171 ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
00172 }
00173
00174
00175 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
00176
00177 CkVerboseListener::CkVerboseListener(void)
00178 :CkArrayListener(0)
00179 {
00180 VL_PRINT<<"INIT Creating listener"<<endl;
00181 }
00182
00183 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
00184 {
00185 CkArrayListener::ckRegister(arrMgr,dataOffset_);
00186 VL_PRINT<<"INIT Registering array manager at offset "<<dataOffset_<<endl;
00187 }
00188 void CkVerboseListener::ckBeginInserting(void)
00189 {
00190 VL_PRINT<<"INIT Begin inserting elements"<<endl;
00191 }
00192 void CkVerboseListener::ckEndInserting(void)
00193 {
00194 VL_PRINT<<"INIT Done inserting elements"<<endl;
00195 }
00196
00197 void CkVerboseListener::ckElementStamp(int *eltInfo)
00198 {
00199 VL_PRINT<<"LIFE Stamping element"<<endl;
00200 }
00201 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
00202 {
00203 VL_PRINT<<"LIFE About to create element "<<idx2str(elt)<<endl;
00204 }
00205 bool CkVerboseListener::ckElementCreated(ArrayElement *elt)
00206 {
00207 VL_PRINT<<"LIFE Created element "<<idx2str(elt)<<endl;
00208 return true;
00209 }
00210 void CkVerboseListener::ckElementDied(ArrayElement *elt)
00211 {
00212 VL_PRINT<<"LIFE Deleting element "<<idx2str(elt)<<endl;
00213 }
00214
00215 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
00216 {
00217 VL_PRINT<<"MIG Leaving: element "<<idx2str(elt)<<endl;
00218 }
00219 bool CkVerboseListener::ckElementArriving(ArrayElement *elt)
00220 {
00221 VL_PRINT<<"MIG Arriving: element "<<idx2str(elt)<<endl;
00222 return true;
00223 }
00224
00225
00226 #define CK_ARRAYLISTENER_LOOP(listVec,inside) \
00227 do { \
00228 int lIdx,lMax=listVec.size();\
00229 for (lIdx=0;lIdx<lMax;lIdx++) { \
00230 CkArrayListener *l=listVec[lIdx];\
00231 inside;\
00232 }\
00233 } while(0)
00234
00235
00236 class ArrayElement_initInfo {
00237 public:
00238 CkArray *thisArray;
00239 CkArrayID thisArrayID;
00240 CkArrayIndex numInitial;
00241 int listenerData[CK_ARRAYLISTENER_MAXLEN];
00242 bool fromMigration;
00243 };
00244
00245 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
00246
00247 void ArrayElement::initBasics(void)
00248 {
00249 #if CMK_OUT_OF_CORE
00250 if (CkpvAccess(CkSaveRestorePrefetch))
00251 return;
00252 #endif
00253 #if CMK_GRID_QUEUE_AVAILABLE
00254 grid_queue_interval = 0;
00255 grid_queue_threshold = 0;
00256 msg_count = 0;
00257 msg_count_grid = 0;
00258 border_flag = 0;
00259
00260 grid_queue_interval = CmiGridQueueGetInterval ();
00261 grid_queue_threshold = CmiGridQueueGetThreshold ();
00262 #endif
00263 ArrayElement_initInfo &info=CkpvAccess(initInfo);
00264 thisArray=info.thisArray;
00265 thisArrayID=info.thisArrayID;
00266 numInitialElements=info.numInitial.getCombinedCount();
00267 memcpy(listenerData,info.listenerData,sizeof(listenerData));
00268 if (!info.fromMigration) {
00269 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00270 l->ckElementCreating(this));
00271 }
00272 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00273 if(mlogData == NULL)
00274 mlogData = new ChareMlogData();
00275 mlogData->objID.type = TypeArray;
00276 mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
00277 #endif
00278 #ifdef _PIPELINED_ALLREDUCE_
00279 allredMgr = NULL;
00280 #endif
00281 DEBC((AA "Inserting %llu into PE level hashtable\n" AB, ckGetID().getID()));
00282 CkpvAccess(array_objs)[ckGetID().getID()] = this;
00283 }
00284
00285 ArrayElement::ArrayElement(void)
00286 {
00287 initBasics();
00288 #if CMK_MEM_CHECKPOINT
00289 init_checkpt();
00290 #endif
00291 }
00292
00293 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
00294 {
00295 initBasics();
00296 }
00297
00298
00299 void ArrayElement::ckAboutToMigrate(void) {
00300 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00301 l->ckElementLeaving(this));
00302 CkMigratable::ckAboutToMigrate();
00303 }
00304 void ArrayElement::ckJustMigrated(void) {
00305 CkMigratable::ckJustMigrated();
00306 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00307 if (!l->ckElementArriving(this)) return;);
00308 }
00309
00310 void ArrayElement::ckJustRestored(void) {
00311 CkMigratable::ckJustRestored();
00312
00313 }
00314
00315 #ifdef _PIPELINED_ALLREDUCE_
00316 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
00317 CMK_REFNUM_TYPE userFlag)
00318 {
00319 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00320 msg->setUserFlag(userFlag);
00321 msg->setMigratableContributor(true);
00322 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00323 }
00324 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
00325 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00326 {
00327 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00328 msg->setUserFlag(userFlag);
00329 msg->setCallback(cb);
00330 msg->setMigratableContributor(true);
00331 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00332 }
00333 void ArrayElement::contribute2(CkReductionMsg *msg)
00334 {
00335 msg->setMigratableContributor(true);
00336 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00337 }
00338 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00339 {
00340 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);
00341 msg->setUserFlag(userFlag);
00342 msg->setCallback(cb);
00343 msg->setMigratableContributor(true);
00344 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00345 }
00346 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
00347 {
00348 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);
00349 msg->setUserFlag(userFlag);
00350 msg->setMigratableContributor(true);
00351 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00352 }
00353
00354 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
00355 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
00356 {
00357
00358 if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD)
00359 {
00360 if (!allredMgr) {
00361 allredMgr = new AllreduceMgr();
00362 }
00363
00364 int fragNo = dataSize/FRAG_SIZE;
00365 int size = FRAG_SIZE;
00366
00367 for (int i=0; i<fragNo; i++) {
00368
00369 CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
00370 if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
00371 size = dataSize%FRAG_SIZE;
00372 }
00373 CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
00374
00375 msg->reducer = type;
00376 msg->nFrags = fragNo;
00377 msg->fragNo = i;
00378 msg->callback = defrag_cb;
00379 msg->userFlag = userFlag;
00380 allredMgr->cb = cb;
00381 allredMgr->cb.type = CkCallback::sendArray;
00382 allredMgr->cb.d.array.idx = myIndex;
00383 contribute2(msg);
00384 }
00385 return;
00386 }
00387 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
00388 msg->setUserFlag(userFlag);
00389 msg->setCallback(cb);
00390 msg->setMigratableContributor(true);
00391 thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
00392 }
00393
00394
00395 #else
00396 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
00397 *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
00398 #endif
00399
00400 void ArrayElement::defrag(CkReductionMsg *msg)
00401 {
00402
00403 #ifdef _PIPELINED_ALLREDUCE_
00404 allredMgr->allreduce_recieve(msg);
00405 #endif
00406 }
00407
00408 int ArrayElement::getRedNo(void) const
00409 {
00410 return ((contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()])->redNo;
00411 }
00412
00413
00414
00415
00416 void ArrayElement::ckDestroy(void)
00417 {
00418 if(_BgOutOfCoreFlag!=1){
00419 CK_ARRAYLISTENER_LOOP(thisArray->listeners,
00420 l->ckElementDied(this));
00421 }
00422 thisArray->deleteElt(CkMigratable::ckGetID());
00423 }
00424
00425
00426 ArrayElement::~ArrayElement()
00427 {
00428 #if CMK_OUT_OF_CORE
00429 if (CkpvAccess(CkSaveRestorePrefetch))
00430 return;
00431 #endif
00432
00433 DEBC((AA "Removing %llu from PE level hashtable\n" AB, ckGetID().getID()));
00434 CkpvAccess(array_objs).erase(ckGetID().getID());
00435
00436 thisArray=(CkArray *)(intptr_t)0xDEADa7a1;
00437 }
00438
00439 void ArrayElement::pup(PUP::er &p)
00440 {
00441 DEBM((AA " ArrayElement::pup()\n" AB));
00442 CkMigratable::pup(p);
00443 thisArrayID.pup(p);
00444 if (p.isUnpacking())
00445 thisArray=thisArrayID.ckLocalBranch();
00446 p(listenerData,CK_ARRAYLISTENER_MAXLEN);
00447 #if CMK_MEM_CHECKPOINT
00448 p(budPEs, 2);
00449 #endif
00450 p.syncComment(PUP::sync_last_system,"ArrayElement");
00451 #if CMK_GRID_QUEUE_AVAILABLE
00452 p|grid_queue_interval;
00453 p|grid_queue_threshold;
00454 p|msg_count;
00455 p|msg_count_grid;
00456 p|border_flag;
00457 if (p.isUnpacking ()) {
00458 msg_count = 0;
00459 msg_count_grid = 0;
00460 border_flag = 0;
00461 }
00462 #endif
00463 }
00464
00465 char *ArrayElement::ckDebugChareName(void) {
00466 char buf[200];
00467 const char *className=_chareTable[ckGetChareType()]->name;
00468 const int *d=thisIndexMax.data();
00469 const short int *s=(const short int*)d;
00470 switch (thisIndexMax.dimension) {
00471 case 0: sprintf(buf,"%s",className); break;
00472 case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
00473 case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
00474 case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
00475 case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
00476 case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
00477 case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
00478 default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
00479 };
00480 return strdup(buf);
00481 }
00482
00483 int ArrayElement::ckDebugChareID(char *str, int limit) {
00484 if (limit<21) return -1;
00485 str[0] = 2;
00486 *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
00487 *((CkArrayIndex*)&str[5]) = thisIndexMax;
00488 return 21;
00489 }
00490
00492 void ArrayElement::CkAbort(const char *str) const
00493 {
00494 CkError("[%d] Array element at index %s aborting:\n",
00495 CkMyPe(), idx2str(thisIndexMax));
00496 CkMigratable::CkAbort(str);
00497 }
00498
00499 void ArrayElement::recvBroadcast(CkMessage *m){
00500 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00501 CkArrayMessage *bcast = (CkArrayMessage *)m;
00502 envelope *env = UsrToEnv(m);
00503 int epIdx= env->piggyBcastIdx;
00504 ckInvokeEntry(epIdx,bcast,true);
00505 #endif
00506 }
00507
00508 ArrayElemExt::ArrayElemExt(void *impl_msg)
00509 {
00510 int chareIdx = ckGetChareType();
00511 ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
00512
00513 CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
00514 char *impl_buf = impl_msg_typed->msgBuf;
00515 PUP::fromMem implP(impl_buf);
00516 implP|usesAtSync;
00517 int msgSize; implP|msgSize;
00518 int dcopy_start; implP|dcopy_start;
00519
00520 ArrayMsgRecvExtCallback(((CkGroupID)thisArrayID).idx, int(thisIndexMax.getDimension()),
00521 thisIndexMax.data(), ctorEpIdx,
00522 msgSize, impl_buf+(2*sizeof(int))+sizeof(char), dcopy_start);
00523 }
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533 inline void CkArray::springCleaning(void)
00534 {
00535 DEBK((AA "Starting spring cleaning\n" AB));
00536 broadcaster->springCleaning();
00537 setupSpringCleaning();
00538 }
00539
00540 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
00541 ((CkArray *)forArray)->springCleaning();
00542 }
00543
00544 void CkArray::setupSpringCleaning() {
00545
00546 if (!stableLocations)
00547 springCleaningCcd = CcdCallOnCondition(CcdPERIODIC_1minute,
00548 staticSpringCleaning, (void *)this);
00549 }
00550
00551
00552
00553 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
00554 :CProxy(), _aid(e->ckGetArrayID())
00555 {}
00556 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
00557 :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
00558 {}
00559
00560 CProxySection_ArrayBase::CProxySection_ArrayBase(const CkArrayID &aid, const CkArrayIndex *elems, const int nElems, int factor) :CProxy_ArrayBase(aid) {
00561 _sid.emplace_back(aid, elems, nElems, factor);
00562 }
00563
00564 CProxySection_ArrayBase::CProxySection_ArrayBase(const CkArrayID &aid, const std::vector<CkArrayIndex> &elems, int factor) :CProxy_ArrayBase(aid) {
00565 _sid.emplace_back(aid, elems, factor);
00566 }
00567
00568 CProxySection_ArrayBase::CProxySection_ArrayBase(const int n, const CkArrayID *aid, CkArrayIndex const * const *elems, const int *nElems, int factor) :CProxy_ArrayBase(aid[0]) {
00569 _sid.resize(n);
00570 for (int i=0; i<_sid.size(); i++) {
00571 _sid[i] = CkSectionID(aid[i], elems[i], nElems[i], factor);
00572 }
00573 }
00574
00575 CProxySection_ArrayBase::CProxySection_ArrayBase(const std::vector<CkArrayID> &aid, const std::vector<std::vector<CkArrayIndex> > &elems, int factor) :CProxy_ArrayBase(aid[0]) {
00576 _sid.resize(aid.size());
00577 for (int i=0; i<_sid.size(); i++) {
00578 _sid[i] = CkSectionID(aid[i], elems[i], factor);
00579 }
00580 }
00581
00582
00583 void CProxySection_ArrayBase::ckAutoDelegate(int opts){
00584 if(_sid.empty())
00585 CmiAbort("Auto Delegation before setting up CkSectionID\n");
00586 CkArray *ckarr = CProxy_CkArray(_sid[0].get_aid()).ckLocalBranch();
00587 if(ckarr->isSectionAutoDelegated()){
00588 CkMulticastMgr *mCastGrp = CProxy_CkMulticastMgr(ckarr->getmCastMgr()).ckLocalBranch();
00589 ckSectionDelegate(mCastGrp, opts);
00590 }
00591 }
00592
00593
00594 void CProxySection_ArrayBase::setReductionClient(CkCallback *cb) {
00595 if(_sid.empty())
00596 CmiAbort("setReductionClient before setting up CkSectionID\n");
00597 CkArray *ckarr = CProxy_CkArray(_sid[0].get_aid()).ckLocalBranch();
00598 if(ckarr->isSectionAutoDelegated()){
00599 CkMulticastMgr *mCastGrp = CProxy_CkMulticastMgr(ckarr->getmCastMgr()).ckLocalBranch();
00600 mCastGrp->setReductionClient(*this, cb);
00601 }
00602 else{
00603 CmiAbort("setReductionClient called on section without autoDelegate");
00604 }
00605 }
00606
00607
00608 void CProxySection_ArrayBase::resetSection(){
00609 if(_sid.empty())
00610 CmiAbort("resetSection before setting up CkSectionID\n");
00611 CkArray *ckarr = CProxy_CkArray(_sid[0].get_aid()).ckLocalBranch();
00612 if(ckarr->isSectionAutoDelegated()){
00613 CkMulticastMgr *mCastGrp = CProxy_CkMulticastMgr(ckarr->getmCastMgr()).ckLocalBranch();
00614 mCastGrp->resetSection(*this);
00615 }
00616 else{
00617 CmiAbort("resetSection called on section without autoDelegate");
00618 }
00619 }
00620
00621 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
00622 {return ckLocalBranch()->getLocMgr(); }
00623
00624 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
00625
00626 static CkArrayID CkCreateArray(CkArrayMessage *m, int ctor, CkArrayOptions opts)
00627 {
00628 CkAssert(CkMyPe() == 0);
00629
00630 CkGroupID locMgr = opts.getLocationManager();
00631 if (locMgr.isZero())
00632 {
00633 CkEntryOptions e_opts;
00634 e_opts.setGroupDepID(opts.getMap());
00635 locMgr = CProxy_CkLocMgr::ckNew(opts, &e_opts);
00636 opts.setLocationManager(locMgr);
00637 }
00638 CkGroupID mCastMgr = opts.getMcastManager();
00639 if (opts.isSectionAutoDelegated() && mCastMgr.isZero())
00640 {
00641 CkEntryOptions e_opts;
00642 e_opts.setGroupDepID(locMgr);
00643
00644 mCastMgr = CProxy_CkMulticastMgr::ckNew(2, 8192, 8192, &e_opts);
00645 opts.setMcastManager(mCastMgr);
00646 }
00647
00648 m->array_ep()=ctor;
00649 CkMarshalledMessage marsh(m);
00650 CkEntryOptions e_opts;
00651 e_opts.setGroupDepID(locMgr);
00652 if(opts.isSectionAutoDelegated())
00653 {
00654 e_opts.setGroupDepID(mCastMgr);
00655 }
00656
00657
00658 envelope *env = UsrToEnv(m);
00659 for(int i=0; i<env->getGroupDepNum(); i++) {
00660 e_opts.addGroupDepID(env->getGroupDep(i));
00661 }
00662 CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,&e_opts);
00663 return (CkArrayID)ag;
00664 }
00665
00666 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
00667 const CkArrayOptions &opts)
00668 {
00669 return CkCreateArray(m, ctor, opts);
00670 }
00671
00672 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(CkArrayOptions opts)
00673 {
00674 return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,opts);
00675 }
00676
00677 void CProxy_ArrayBase::ckCreateEmptyArrayAsync(CkCallback cb, CkArrayOptions opts)
00678 {
00679 CkSendAsyncCreateArray(0, cb, opts, (CkArrayMessage *)CkAllocSysMsg());
00680 }
00681
00682 extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
00683
00684 struct CkInsertIdxMsg {
00685 char core[CmiReservedHeaderSize];
00686 CkArrayIndex idx;
00687 CkArrayMessage *m;
00688 int ctor;
00689 int onPe;
00690 CkArrayID _aid;
00691 };
00692
00693 static int ckinsertIdxHdl;
00694
00695 void ckinsertIdxFunc(void *m)
00696 {
00697 CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
00698 CProxy_ArrayBase ca(msg->_aid);
00699 ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
00700 CmiFree(msg);
00701 }
00702
00703 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int proposedPe,
00704 const CkArrayIndex &idx)
00705 {
00706 if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
00707 m->array_ep()=ctor;
00708 CkArray *ca = ckLocalBranch();
00709 if (ca == NULL) {
00710 CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
00711 msg->idx = idx;
00712 msg->m = m;
00713 msg->ctor = ctor;
00714 msg->onPe = proposedPe;
00715 msg->_aid = _aid;
00716 CmiSetHandler(msg, ckinsertIdxHdl);
00717 ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
00718 CkAssert (ca == NULL);
00719 return;
00720 }
00721
00722 int hostPe = ca->findInitialHostPe(idx, proposedPe);
00723
00724 int listenerData[CK_ARRAYLISTENER_MAXLEN];
00725 ca->prepareCtorMsg(m, listenerData);
00726 if (ckIsDelegated()) {
00727 ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,hostPe,_aid);
00728 return;
00729 }
00730
00731 DEBC((AA "Proxy inserting element %s on Pe %d\n" AB,idx2str(idx),hostPe));
00732 CProxy_CkArray(_aid)[hostPe].insertElement(m, idx, listenerData);
00733 }
00734
00735 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
00736 {
00737 ckInsertIdx(m,ctorIndex,onPe,_idx);
00738 }
00739
00740 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
00741 {
00742 return ckLocalBranch()->lookup(_idx);
00743 }
00744
00745
00746 void CProxy_ArrayBase::pup(PUP::er &p)
00747 {
00748 CProxy::pup(p);
00749 _aid.pup(p);
00750 }
00751 void CProxyElement_ArrayBase::pup(PUP::er &p)
00752 {
00753 CProxy_ArrayBase::pup(p);
00754 p|_idx;
00755 }
00756
00757 void CProxySection_ArrayBase::pup(PUP::er &p)
00758 {
00759 CProxy_ArrayBase::pup(p);
00760 p | _sid;
00761 }
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771 struct CkCreateArrayAsyncMsg : public CMessage_CkCreateArrayAsyncMsg {
00772 int ctor;
00773 CkCallback cb;
00774 CkArrayOptions opts;
00775 char *ctorPayload;
00776
00777 CkCreateArrayAsyncMsg(int ctor_, CkCallback cb_, CkArrayOptions opts_)
00778 : ctor(ctor_), cb(cb_), opts(opts_)
00779 { }
00780 };
00781
00782 static int ckArrayCreationHdl = 0;
00783
00784 void CkSendAsyncCreateArray(int ctor, CkCallback cb, CkArrayOptions opts, void *ctorMsg)
00785 {
00786 CkAssert(ctorMsg);
00787 UsrToEnv(ctorMsg)->setMsgtype(ArrayEltInitMsg);
00788 PUP::sizer ps;
00789 CkPupMessage(ps, &ctorMsg);
00790 CkCreateArrayAsyncMsg *msg = new (ps.size()) CkCreateArrayAsyncMsg(ctor, cb, opts);
00791 PUP::toMem p(msg->ctorPayload);
00792 CkPupMessage(p, &ctorMsg);
00793 CkFreeMsg(ctorMsg);
00794 envelope *env = UsrToEnv(msg);
00795 CmiSetHandler(env, ckArrayCreationHdl);
00796 CkPackMessage(&env);
00797 CmiSyncSendAndFree(0, env->getTotalsize(), (char*)env);
00798 }
00799
00800 static void CkCreateArrayAsync(void *vmsg)
00801 {
00802 envelope *venv = static_cast<envelope*>(vmsg);
00803 CkUnpackMessage(&venv);
00804 CkCreateArrayAsyncMsg *msg = static_cast<CkCreateArrayAsyncMsg*>(EnvToUsr(venv));
00805
00806
00807 PUP::fromMem p(msg->ctorPayload);
00808 void *vm;
00809 CkPupMessage(p, &vm);
00810 CkArrayMessage *m = static_cast<CkArrayMessage*>(vm);
00811
00812 CkArrayID aid = CkCreateArray(m, msg->ctor, msg->opts);
00813
00814
00815 if (!msg->cb.isInvalid())
00816 msg->cb.send(new CkArrayCreatedMsg(aid));
00817 delete msg;
00818 }
00819
00820
00821 void _ckArrayInit(void)
00822 {
00823 CkpvInitialize(ArrayElement_initInfo,initInfo);
00824 CkDisableTracing(CkIndex_CkArray::insertElement(0, CkArrayIndex(), 0));
00825 CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
00826
00827 CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
00828
00829 CmiAssignOnce(&ckinsertIdxHdl, CkRegisterHandler(ckinsertIdxFunc));
00830 CmiAssignOnce(&ckArrayCreationHdl, CkRegisterHandler(CkCreateArrayAsync));
00831 }
00832
00833 CkArray::CkArray(CkArrayOptions &&opts,
00834 CkMarshalledMessage &&initMsg)
00835 : locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
00836 locMgrID(opts.getLocationManager()),
00837 mCastMgrID(opts.getMcastManager()),
00838 sectionAutoDelegate(opts.isSectionAutoDelegated()),
00839 initCallback(opts.getInitCallback()),
00840 thisProxy(thisgroup),
00841 stableLocations(opts.staticInsertion && !opts.anytimeMigration),
00842 numInitial(opts.getNumInitial()), isInserting(true), numPesInited(0)
00843 {
00844
00845 locMgr->addManager(thisgroup,this);
00846
00847 setupSpringCleaning();
00848
00849
00850 if(opts.disableNotifyChildInRed)
00851 disableNotifyChildrenStart = true;
00852
00853
00854 listenerDataOffset=0;
00855 broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
00856 addListener(broadcaster);
00857 reducer=new CkArrayReducer(thisgroup);
00858 addListener(reducer);
00859
00860
00861
00862
00863
00864 int lNo,nL=opts.getListeners();
00865 for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
00866
00867 for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
00868
00870 locMgr->populateInitial(opts,initMsg.getMessage(),this);
00871 if (opts.staticInsertion)
00872 initDone();
00873
00874 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00875
00876 children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
00877 numChildren = 0;
00878
00879
00880
00881
00882 int level = 0;
00883 int aux = CmiMyPe();
00884 int max = CmiNumPes();
00885 int factor = _MLOG_BCAST_BFACTOR_;
00886 int startLevel = 0;
00887 int startNextLevel = 1;
00888 while(aux >= 0){
00889 level++;
00890 startLevel = startNextLevel;
00891 startNextLevel += factor;
00892 aux -= factor;
00893 factor *= _MLOG_BCAST_BFACTOR_;
00894 }
00895
00896
00897 int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
00898 for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
00899 if(first + i >= CmiNumPes())
00900 break;
00901 children[i] = first + i;
00902 numChildren++;
00903 }
00904
00905 #endif
00906
00907
00908 if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
00909 ckSetReductionClient(&opts.reductionClient);
00910 }
00911
00912 CkArray::CkArray(CkMigrateMessage *m)
00913 :CkReductionMgr(m), thisProxy(thisgroup)
00914 {
00915 locMgr=NULL;
00916 isInserting=true;
00917 }
00918
00919 CkArray::~CkArray()
00920 {
00921 if (!stableLocations)
00922 CcdCancelCallOnCondition(CcdPERIODIC_1minute, springCleaningCcd);
00923 }
00924
00925 #if CMK_ERROR_CHECKING
00926 inline void testPup(PUP::er &p,int shouldBe) {
00927 int a=shouldBe;
00928 p|a;
00929 if (a!=shouldBe)
00930 CkAbort("PUP direction mismatch!");
00931 }
00932 #else
00933 inline void testPup(PUP::er &p,int shouldBe) {}
00934 #endif
00935
00936 void CkArray::pup(PUP::er &p){
00937 CkReductionMgr::pup(p);
00938 p|numInitial;
00939 p|locMgrID;
00940 p|mCastMgrID;
00941 p|sectionAutoDelegate;
00942 p|initCallback;
00943 p|listeners;
00944 p|listenerDataOffset;
00945 p|stableLocations;
00946 p|numPesInited;
00947 testPup(p,1234);
00948 if(p.isUnpacking()){
00949 thisProxy=thisgroup;
00950 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
00951 locMgr->addManager(thisgroup,this);
00953 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
00954 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
00955 setupSpringCleaning();
00956 }
00957 }
00958
00959 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
00960 int dataOffset=0; \
00961 for (int lNo=0;lNo<listeners.size();lNo++) { \
00962 CkArrayListener *l=listeners[lNo]; \
00963 l->ckElementStamp(&listenerData[dataOffset]); \
00964 dataOffset+=l->ckGetLen(); \
00965 } \
00966 } while (0)
00967
00968
00969 void CkArray::prepareCtorMsg(CkMessage *m, int *listenerData)
00970 {
00971 envelope *env=UsrToEnv((void *)m);
00972 env->setMsgtype(ArrayEltInitMsg);
00973 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
00974 }
00975
00976 int CkArray::findInitialHostPe(const CkArrayIndex &idx, int proposedPe)
00977 {
00978 int hostPe = locMgr->whichPE(idx);
00979
00980 if (hostPe == -1 && proposedPe == -1)
00981 return procNum(idx);
00982 if (hostPe == -1)
00983 return proposedPe;
00984 if (proposedPe == -1)
00985 return hostPe;
00986 if (hostPe == proposedPe)
00987 return hostPe;
00988
00989 CkAbort("hostPe for a bound element disagrees with an explicit proposedPe");
00990 return -1;
00991 }
00992
00993 void CkArray::stampListenerData(CkMigratable *elt)
00994 {
00995 ArrayElement *elt2 = (ArrayElement *)elt;
00996 CK_ARRAYLISTENER_STAMP_LOOP(elt2->listenerData);
00997 }
00998
00999 CkMigratable *CkArray::allocateMigrated(int elChareType, CkElementCreation_t type)
01000 {
01001 ArrayElement *ret=allocate(elChareType, NULL, true, NULL);
01002 return ret;
01003 }
01004
01005 ArrayElement *CkArray::allocate(int elChareType, CkMessage *msg, bool fromMigration, int *listenerData)
01006 {
01007
01008 ArrayElement_initInfo &init=CkpvAccess(initInfo);
01009 init.numInitial=numInitial;
01010 init.thisArray=this;
01011 init.thisArrayID=thisgroup;
01012 if (listenerData)
01013 memcpy(init.listenerData, listenerData, sizeof(init.listenerData));
01014 init.fromMigration=fromMigration;
01015
01016
01017 size_t elSize=_chareTable[elChareType]->size;
01018 ArrayElement *elem = (ArrayElement *)malloc(elSize);
01019 if (elem!=NULL) setMemoryTypeChare(elem);
01020 return elem;
01021 }
01022
01023 void CkArray::insertElement(CkMarshalledMessage &&m, const CkArrayIndex &idx, int listenerData[CK_ARRAYLISTENER_MAXLEN])
01024 {
01025 insertElement((CkArrayMessage*)m.getMessage(), idx, listenerData);
01026 }
01027
01029 bool CkArray::insertElement(CkArrayMessage *me, const CkArrayIndex &idx, int listenerData[CK_ARRAYLISTENER_MAXLEN])
01030 {
01031 CK_MAGICNUMBER_CHECK
01032 int onPe;
01033 if (locMgr->isRemote(idx,&onPe))
01034 {
01035 thisProxy[onPe].insertElement(me, idx, listenerData);
01036 return false;
01037 }
01038 int ctorIdx = me->array_ep();
01039 int chareType=_entryTable[ctorIdx]->chareIdx;
01040 ArrayElement *elt=allocate(chareType, me, false, listenerData);
01041 if (!locMgr->addElement(thisgroup, idx, elt, ctorIdx, (void *)me)) return false;
01042 CK_ARRAYLISTENER_LOOP(listeners,
01043 if (!l->ckElementCreated(elt)) return false;);
01044 return true;
01045 }
01046
01047 void CkArray::initDone(void) {
01048 if (initCallback.isInvalid())
01049 return;
01050
01051 numPesInited++;
01052 DEBC(("PE %d initDone, numPesInited %d, treeKids %d, parent %d\n",
01053 CkMyPe(), numPesInited, treeKids(), treeParent()));
01054
01055
01056
01057 if (numPesInited == treeKids() + 1) {
01058 if (hasParent())
01059 thisProxy[treeParent()].initDone();
01060 else
01061 initCallback.send(CkReductionMsg::buildNew(0, NULL));
01062 }
01063 }
01064
01065 void CProxy_ArrayBase::doneInserting(void)
01066 {
01067 DEBC((AA "Broadcasting a doneInserting request\n" AB));
01068
01069 CProxy_CkArray(_aid).remoteDoneInserting();
01070 }
01071
01072 void CProxy_ArrayBase::beginInserting(void)
01073 {
01074 DEBC((AA "Broadcasting a beginInserting request\n" AB));
01075 CProxy_CkArray(_aid).remoteBeginInserting();
01076 }
01077
01078 void CkArray::doneInserting(void)
01079 {
01080 thisProxy[CkMyPe()].remoteDoneInserting();
01081 }
01082
01083 void CkArray::beginInserting(void)
01084 {
01085 thisProxy[CkMyPe()].remoteBeginInserting();
01086 }
01087
01089 void CkArray::remoteDoneInserting(void)
01090 {
01091 CK_MAGICNUMBER_CHECK
01092 if (isInserting) {
01093 isInserting=false;
01094 DEBC((AA "Done inserting objects\n" AB));
01095 for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
01096 locMgr->doneInserting();
01097 initDone();
01098 }
01099 }
01100
01101 void CkArray::remoteBeginInserting(void)
01102 {
01103 CK_MAGICNUMBER_CHECK;
01104
01105 if (!isInserting) {
01106 isInserting = true;
01107 DEBC((AA "Begin inserting objects\n" AB));
01108 for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
01109 locMgr->startInserting();
01110 }
01111 }
01112
01113 void CkArray::demandCreateElement(const CkArrayIndex &idx, int ctor, CkDeliver_t type)
01114 {
01115 CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
01116 envelope *env = UsrToEnv(m);
01117 env->setMsgtype(ArrayEltInitMsg);
01118 env->setArrayMgr(thisgroup);
01119 int listenerData[CK_ARRAYLISTENER_MAXLEN];
01120 prepareCtorMsg(m, listenerData);
01121 m->array_ep()=ctor;
01122
01123 DEBC((AA "Demand-creating %s\n" AB,idx2str(idx)));
01124 insertElement(m, idx, listenerData);
01125 }
01126
01127 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg)
01128 {
01129 CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
01130 int listenerData[CK_ARRAYLISTENER_MAXLEN];
01131 prepareCtorMsg(m, listenerData);
01132 #if CMK_BIGSIM_CHARM
01133 BgEntrySplit("split-array-new");
01134 #endif
01135 insertElement(m, idx, listenerData);
01136 }
01137
01138
01140 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
01141 {
01142 envelope *env=UsrToEnv((void *)msg);
01143 env->setMsgtype(ForArrayEltMsg);
01144 env->setArrayMgr(aid);
01145 env->getsetArraySrcPe()=CkMyPe();
01146 env->setRecipientID(ck::ObjID(0));
01147 #if CMK_SMP_TRACE_COMMTHREAD
01148 env->setSrcPe(CkMyPe());
01149 #endif
01150 env->setEpIdx(ep);
01151 env->getsetArrayHops()=0;
01152 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01153 criticalPath_send(env);
01154 automaticallySetMessagePriority(env);
01155 #endif
01156 }
01157
01158
01160 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
01161 {
01162 envelope *env=UsrToEnv((void *)msg);
01163 env->setArrayMgr(aid);
01164 env->getsetArraySrcPe()=CkMyPe();
01165 #if CMK_SMP_TRACE_COMMTHREAD
01166 env->setSrcPe(CkMyPe());
01167 #endif
01168 env->setEpIdx(ep);
01169 env->getsetArrayHops()=0;
01170 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01171 criticalPath_send(env);
01172 automaticallySetMessagePriority(env);
01173 #endif
01174 }
01175
01176 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
01177 {
01178 #if CMK_ERROR_CHECKING
01179
01180 if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
01181 if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
01182 CkAbort("Array index length (nInts) is too long-- did you "
01183 "use bytes instead of integers?\n");
01184 #endif
01185 msg_prepareSend(msg, ep, ckGetArrayID());
01186 if (ckIsDelegated())
01187 ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
01188 else
01189 {
01190 CkArray *localbranch = ckLocalBranch();
01191 if (localbranch == NULL) {
01192 CkAbort("Cannot send a message from an array without a local branch");
01193 }
01194 else {
01195 if (opts & CK_MSG_INLINE)
01196 localbranch->deliver(msg, _idx, CkDeliver_inline, opts & (~CK_MSG_INLINE));
01197 else
01198 localbranch->deliver(msg, _idx, CkDeliver_queue, opts);
01199 }
01200 }
01201 }
01202
01203 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
01204 {
01205 CkFutureID f=CkCreateAttachedFuture(msg);
01206 ckSend(msg,ep);
01207 return CkWaitReleaseFuture(f);
01208 }
01209
01210 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts )
01211 {
01212 CProxySection_ArrayBase sp(sID);
01213 sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
01214 }
01215
01216 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
01217 {
01218 if (ckIsDelegated())
01219 ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _sid.size(), _sid.data(), opts);
01220 else {
01221
01222 for (int k=0; k<_sid.size(); ++k) {
01223 for (int i=0; i< _sid[k]._elems.size()-1; i++) {
01224 CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
01225 void *newMsg=CkCopyMsg((void **)&msg);
01226 ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
01227 }
01228 if (!_sid[k]._elems.empty()) {
01229 void *newMsg= (k<_sid.size()-1) ? CkCopyMsg((void **)&msg) : msg;
01230 CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._elems.size()-1]);
01231 ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
01232 }
01233 }
01234 }
01235 }
01236
01237 void CkSetMsgArrayIfNotThere(void *msg) {
01238 envelope *env = UsrToEnv((void *)msg);
01239 env->setMsgtype(ForArrayEltMsg);
01240 CkArrayMessage *m = (CkArrayMessage *)msg;
01241 m->array_setIfNotThere(CkArray_IfNotThere_buffer);
01242 }
01243
01244 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
01245 {
01246 CkArrayMessage *m=(CkArrayMessage *)msg;
01247 msg_prepareSend(m,entryIndex,aID);
01248 CkArray *a=(CkArray *)_localBranch(aID);
01249 if (a == NULL)
01250 CkAbort("Cannot receive a message for an array without a local branch");
01251 else
01252 a->deliver(m, idx, CkDeliver_queue, opts);
01253 }
01254
01255 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
01256 {
01257 CkArrayMessage *m=(CkArrayMessage *)msg;
01258 msg_prepareSend(m,entryIndex,aID);
01259 CkArray *a=(CkArray *)_localBranch(aID);
01260 int oldStatus = CkDisableTracing(entryIndex);
01261 a->deliver(m, idx, CkDeliver_inline, opts);
01262 if (oldStatus) CkEnableTracing(entryIndex);
01263 }
01264
01265
01266
01267 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
01268 :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
01269 mgrID(mgrID_)
01270 {
01271 mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
01272 }
01273 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
01274 :CkArrayListener(m)
01275 {
01276 mgr=NULL;
01277 }
01278 void CkArrayReducer::pup(PUP::er &p) {
01279 CkArrayListener::pup(p);
01280 p|mgrID;
01281 if (p.isUnpacking())
01282 mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
01283 }
01284 CkArrayReducer::~CkArrayReducer() {}
01285
01286
01287
01288 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
01289 :CkArrayListener(1),
01290 bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
01291 { }
01292
01293 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
01294 :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
01295 { }
01296
01297 void CkArrayBroadcaster::pup(PUP::er &p) {
01298 CkArrayListener::pup(p);
01299
01300
01301 p|bcastNo;
01302 p|stableLocations;
01303 p|broadcastViaScheduler;
01304 if (p.isUnpacking()) {
01305 oldBcastNo=bcastNo;
01306 }
01307 }
01308
01309 CkArrayBroadcaster::~CkArrayBroadcaster()
01310 {
01311 CkArrayMessage *msg;
01312 while (NULL!=(msg=oldBcasts.deq())) delete msg;
01313 }
01314
01315 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
01316 {
01317 bcastNo++;
01318 DEBB((AA "Received broadcast %d\n" AB,bcastNo));
01319
01320 if (stableLocations)
01321 return;
01322
01323 CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
01324 oldBcasts.enq((CkArrayMessage *)msg);
01325 }
01326
01328 bool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
01329 bool doFree)
01330 {
01331 int &elBcastNo=getData(el);
01332
01333 if (elBcastNo >= bcastNo) return false;
01334 elBcastNo++;
01335 DEBB((AA "Delivering broadcast %d to element %s\n" AB,elBcastNo,idx2str(el)));
01336
01337 CkAssert(UsrToEnv(bcast)->getMsgtype() == ForArrayEltMsg);
01338
01339 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01340 DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
01341 return true;
01342 #else
01343 if (!broadcastViaScheduler)
01344 return el->ckInvokeEntry(bcast->array_ep(), bcast, doFree);
01345 else {
01346 if (!doFree) {
01347 CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
01348 bcast = newMsg;
01349 }
01350 envelope *env = UsrToEnv(bcast);
01351 env->setRecipientID(el->ckGetID());
01352 CkArrayManagerDeliver(CkMyPe(), bcast, 0);
01353 return true;
01354 }
01355 #endif
01356 }
01357
01359 bool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
01360 {
01361 if (stableLocations) return true;
01362 int &elBcastNo=getData(el);
01363 if (elBcastNo<bcastNo)
01364 {
01365
01366 int i,nDeliver=bcastNo-elBcastNo;
01367 DEBM((AA "Migrator %s missed %d broadcasts--\n" AB,idx2str(el),nDeliver));
01368
01369
01370 for (i=oldBcasts.length()-1;i>=nDeliver;i--)
01371 oldBcasts.enq(oldBcasts.deq());
01372
01373
01374 for (i=nDeliver-1;i>=0;i--)
01375 {
01376 CkArrayMessage *msg=oldBcasts.deq();
01377 if(msg == NULL)
01378 continue;
01379 oldBcasts.enq(msg);
01380 if (!deliver(msg, el, false))
01381 return false;
01382 }
01383 }
01384
01385 return true;
01386 }
01387
01388
01389 void CkArrayBroadcaster::springCleaning(void)
01390 {
01391
01392 int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
01393 if (nDelete>0) {
01394 DEBK((AA "Cleaning out %d old broadcasts\n" AB,nDelete));
01395 for (int i=0;i<nDelete;i++)
01396 delete oldBcasts.deq();
01397 }
01398 oldBcastNo=bcastNo;
01399 }
01400
01401 void CkArrayBroadcaster::flushState()
01402 {
01403 bcastNo = oldBcastNo = 0;
01404 CkArrayMessage *msg;
01405 while (NULL!=(msg=oldBcasts.deq())) delete msg;
01406 }
01407
01408 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
01409 {
01410 CProxy_ArrayBase ap(aID);
01411 ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
01412 }
01413
01414 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
01415 {
01416 envelope *env = UsrToEnv(msg);
01417 env->setMsgtype(ForBocMsg);
01418 msg->array_ep_bcast()=ep;
01419 if (ckIsDelegated())
01420 ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
01421 else
01422 {
01423 _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
01424 int skipsched = opts & CK_MSG_EXPEDITED;
01425
01426 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01427 CProxy_CkArray ap(_aid);
01428 ap[CpvAccess(serializer)].sendBroadcast(msg);
01429 CkGroupID _id = _aid;
01430
01431 #else
01432 if (CkMyPe()==CpvAccess(serializer))
01433 {
01434 DEBB((AA "Sending array broadcast\n" AB));
01435 if (skipsched)
01436 CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
01437 else
01438 CProxy_CkArray(_aid).recvBroadcast(msg);
01439 } else {
01440 DEBB((AA "Forwarding array broadcast to serializer node %d\n" AB,CpvAccess(serializer)));
01441 CProxy_CkArray ap(_aid);
01442 if (skipsched)
01443 ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
01444 else
01445 ap[CpvAccess(serializer)].sendBroadcast(msg);
01446 }
01447 #endif
01448 }
01449 }
01450
01452 void CkArray::sendBroadcast(CkMessage *msg)
01453 {
01454 CK_MAGICNUMBER_CHECK
01455 if(CkMyPe() == CpvAccess(serializer)){
01456 #if _MLOG_BCAST_TREE_
01457
01458 for(int i=0; i<numChildren; i++){
01459 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
01460 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
01461 }
01462
01463
01464 recvBroadcast(msg);
01465 #else
01466
01467 thisProxy.recvBroadcast(msg);
01468 #endif
01469 }else{
01470 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
01471 }
01472 }
01473 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
01474 {
01475 CK_MAGICNUMBER_CHECK
01476
01477 thisProxy.recvExpeditedBroadcast(msg);
01478 }
01479
01480 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01481 int _tempBroadcastCount=0;
01482
01483
01484 void CkArray::recvBroadcastViaTree(CkMessage *msg)
01485 {
01486 CK_MAGICNUMBER_CHECK
01487
01488
01489 for(int i=0; i<numChildren; i++){
01490 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
01491 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
01492 }
01493
01494
01495 recvBroadcast(msg);
01496 }
01497
01498 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
01499 if(homePe(*index)==CmiMyPe()){
01500 CkArrayMessage *bcast = (CkArrayMessage *)data;
01501 int epIdx=bcast->array_ep();
01502 DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
01503 CkArrayMessage *copy = (CkArrayMessage *) CkCopyMsg((void **)&bcast);
01504 envelope *env = UsrToEnv(copy);
01505 env->sender.data.group.onPE = CkMyPe();
01506 #if defined(_FAULT_CAUSAL_)
01507 env->TN = 0;
01508 #endif
01509 env->SN = 0;
01510 env->piggyBcastIdx = epIdx;
01511 env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
01512 env->setArrayMgr(thisgroup);
01513 env->setRecipientID(ck::ObjID(thisgroup, rec->getID());
01514 env->setSrcPe(CkMyPe());
01515 env->getsetArrayHops() = 0;
01516 deliver(copy,CkDeliver_queue);
01517 _tempBroadcastCount++;
01518 }else{
01519 if(locMgr->homeElementCount != -1){
01520 DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
01521 }
01522 }
01523 }
01524
01525 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
01526 arr->broadcastHomeElements(data,rec,index);
01527 }
01528 #else
01529 void CkArray::recvBroadcastViaTree(CkMessage *msg){
01530 }
01531 #endif
01532
01533
01535 void CkArray::recvBroadcast(CkMessage *m)
01536 {
01537 CK_MAGICNUMBER_CHECK
01538 CkArrayMessage *msg=(CkArrayMessage *)m;
01539
01540 envelope *env = UsrToEnv(msg);
01541
01542
01543 unsigned short ep = msg->array_ep_bcast();
01544 CkAssert(UsrToEnv(msg)->getGroupNum() == thisgroup);
01545
01546 recvBroadcastEpIdx = UsrToEnv(msg)->getEpIdx();
01547
01548 UsrToEnv(msg)->setMsgtype(ForArrayEltMsg);
01549 UsrToEnv(msg)->setArrayMgr(thisgroup);
01550 UsrToEnv(msg)->getsetArrayEp() = ep;
01551
01552 broadcaster->incoming(msg);
01553
01554 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01555 _tempBroadcastCount=0;
01556 locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
01557 #else
01558 #if CMK_BIGSIM_CHARM
01559 void *root;
01560 _TRACE_BG_TLINE_END(&root);
01561 BgSetEntryName("start-broadcast", &root);
01562 std::vector<void *> logs;
01563 extern void stopVTimer();
01564 extern void startVTimer();
01565 #endif
01566 int len = localElemVec.size();
01567
01568 #if CMK_ONESIDED_IMPL
01569
01570
01571 if(CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_ALL_DONE_MSG) {
01572
01573 unsigned int i=0;
01574 bool doFree = true;
01575 if (stableLocations && i == len-1) doFree = true;
01576 broadcaster->deliver(msg, (ArrayElement*)localElemVec[i], doFree);
01577
01578 } else if(CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_MSG) {
01579
01580
01581
01582 unsigned int i=0;
01583 bool doFree = false;
01584 if (stableLocations && i == len-1) doFree = true;
01585 broadcaster->deliver(msg, (ArrayElement*)localElemVec[i], doFree);
01586
01587 } else
01588 #endif
01589
01590 {
01591 for (unsigned int i = 0; i < len; ++i) {
01592 #if CMK_BIGSIM_CHARM
01593
01594 stopVTimer();
01595 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
01596 logs.push_back(curlog);
01597 startVTimer();
01598 #endif
01599 bool doFree = false;
01600 if (stableLocations && i == len-1) doFree = true;
01601 #if CMK_ONESIDED_IMPL
01602 if (CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_DONE_MSG) doFree = false;
01603 #endif
01604 broadcaster->deliver(msg, (ArrayElement*)localElemVec[i], doFree);
01605 }
01606
01607 #if CMK_ONESIDED_IMPL
01608 if (CMI_ZC_MSGTYPE(UsrToEnv(msg)) == CMK_ZC_BCAST_RECV_DONE_MSG) {
01609 CkArray *mgr = getArrayMgrFromMsg(env);
01610
01611
01612 env->setMsgtype(ForBocMsg);
01613 env->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01614 }
01615 #endif
01616 #endif
01617 }
01618
01619 #if CMK_BIGSIM_CHARM
01620
01621 stopVTimer();
01622 BgSplitEntry("end-broadcast", logs.data(), logs.size());
01623 startVTimer();
01624 #endif
01625
01626
01627
01628 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01629 if (stableLocations)
01630 delete msg;
01631 #else
01632 if (stableLocations && len == 0)
01633 delete msg;
01634 #endif
01635 }
01636
01637 #if CMK_ONESIDED_IMPL
01638 void CkArray::forwardZCMsgToOtherElems(envelope *env) {
01639 CkArrayMessage *msg=(CkArrayMessage *)(EnvToUsr(env));
01640
01641
01642 unsigned short ep = msg->array_ep_bcast();
01643 env->getsetArrayEp() = ep;
01644
01645 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_DONE_MSG;
01646
01647 int len = localElemVec.size();
01648
01649 for (unsigned int i = 1; i < len; ++i) {
01650 bool doFree = false;
01651 if (stableLocations && i == len-1 && CMI_ZC_MSGTYPE(env)!=CMK_ZC_BCAST_RECV_DONE_MSG) doFree = true;
01652 broadcaster->deliver((CkArrayMessage *)EnvToUsr(env), (ArrayElement*)localElemVec[i], doFree);
01653 }
01654 }
01655 #endif
01656
01657 void CkArray::flushStates() {
01658 CkReductionMgr::flushStates();
01659
01660
01661
01662
01663
01664
01665
01666
01667
01668 resetCountersWhenFlushingStates();
01669 CK_ARRAYLISTENER_LOOP(listeners, l->flushState());
01670 }
01671
01672 void CkArray::ckDestroy() {
01673 isDestroying = true;
01674
01675
01676
01677 locMgr->setDuringDestruction(true);
01678
01679
01680 while (localElemVec.size()) {
01681 localElemVec.front()->ckDestroy();
01682 }
01683
01684 locMgr->deleteManager(CkGroupID(thisProxy), this);
01685 if (!mCastMgrID.isZero()) {
01686 delete _localBranch(mCastMgrID);
01687 mCastMgrID.setZero();
01688 }
01689 delete this;
01690 }
01691
01692 #include "CkArray.def.h"
01693
01694