00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "charm++.h"
00014 #include "envelope.h"
00015 #include "register.h"
00016
00017 #include "ckmulticast.h"
00018 #include "spanningTree.h"
00019 #include "XArraySectionReducer.h"
00020
00021 #include <map>
00022 #include <vector>
00023 #include <unordered_map>
00024
00025 #define DEBUGF(x) // CkPrintf x;
00026
00027
00028 #if CMK_MESSAGE_LOGGING
00029 #define SPLIT_MULTICAST 0
00030 #else
00031 #define SPLIT_MULTICAST 1
00032 #endif
00033
00034
00035
00036
00037 #define MAXFRAGS 100
00038
00039 typedef CkQ<multicastGrpMsg *> multicastGrpMsgBuf;
00040 typedef std::vector<CkArrayIndex> arrayIndexList;
00041 typedef std::vector<int> groupPeList;
00042 typedef std::vector<CkSectionInfo> sectionIdList;
00043 typedef std::vector<CkReductionMsg *> reductionMsgs;
00044 typedef CkQ<int> PieceSize;
00045 typedef std::vector<CmiUInt8> ObjKeyList;
00046 typedef unsigned char byte;
00047
00053 class reductionInfo {
00054 public:
00056 int lcount [MAXFRAGS];
00058 int ccount [MAXFRAGS];
00060 int gcount [MAXFRAGS];
00062 int8_t npProcessed;
00064 CkCallback *storedCallback;
00066 redClientFn storedClient;
00068 void *storedClientParam;
00070 int redNo;
00072 reductionMsgs msgs [MAXFRAGS];
00074 reductionMsgs futureMsgs;
00075
00076 public:
00077 reductionInfo(): npProcessed(0),
00078 storedCallback(NULL),
00079 storedClientParam(NULL),
00080 redNo(0) {
00081 for (int8_t i=0; i<MAXFRAGS; i++)
00082 lcount [i] = ccount [i] = gcount [i] = 0;
00083 }
00084 };
00085
00087 #define COOKIE_NOTREADY 0
00088 #define COOKIE_READY 1
00089 #define COOKIE_OLD 2
00090
00091
00092
00093
00094 class mCastPacket {
00095 public:
00096 CkSectionInfo cookie;
00097 int offset;
00098 int n;
00099 std::vector<char> data;
00100 int seqno;
00101 int count;
00102 int totalsize;
00103
00104 mCastPacket(CkSectionInfo &_cookie, int _offset, int _n, char *_d, int _s, int _c, int _t):
00105 cookie(_cookie), offset(_offset), n(_n), data(_d, _d+_n), seqno(_s), count(_c), totalsize(_t) {}
00106 };
00107
00108 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
00109
00110 class SectionLocation {
00111 public:
00112 mCastEntry *entry;
00113 int pe;
00114 public:
00115 SectionLocation(): entry(NULL), pe(-1) {}
00116 SectionLocation( mCastEntry *e, int p) { set(e, p); }
00117 inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
00118 inline void clear() { entry = NULL; pe = -1; }
00119 };
00120
00121
00122
00123
00125 class mCastEntry
00126 {
00127 public:
00129 CkArrayID aid;
00131 CkSectionInfo parentGrp;
00133 sectionIdList children;
00135 int bfactor;
00137 int numChild;
00139 arrayIndexList allElem;
00141 groupPeList allGrpElem;
00143 ObjKeyList allObjKeys;
00145 arrayIndexList localElem;
00147 bool localGrpElem;
00149 int pe;
00151 CkSectionInfo rootSid;
00152 multicastGrpMsgBuf msgBuf;
00154 multicastGrpPacketBuf packetBuf;
00156 char *asm_msg;
00157 int asm_fill;
00159 mCastEntry *oldc, *newc;
00161 SectionLocation oldtree;
00162
00163 reductionInfo red;
00164
00165 char needRebuild;
00166 private:
00167 char flag;
00168 char grpSec;
00169 public:
00170 mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), localGrpElem(0), asm_msg(NULL),
00171 asm_fill(0), oldc(NULL), newc(NULL), needRebuild(0),
00172 flag(COOKIE_NOTREADY), grpSec(0) {}
00173 mCastEntry(CkGroupID _gid): aid(_gid), numChild(0), localGrpElem(0), asm_msg(NULL),
00174 asm_fill(0), oldc(NULL), newc(NULL), needRebuild(0),
00175 flag(COOKIE_NOTREADY), grpSec(1) {}
00176 mCastEntry(mCastEntry *);
00178 inline int hasParent() { return parentGrp.get_val()?1:0; }
00180 inline int isObsolete() { return (flag == COOKIE_OLD); }
00182 inline void setObsolete() { flag=COOKIE_OLD; }
00184 inline int notReady() { return (flag == COOKIE_NOTREADY); }
00186 inline void setReady() { flag=COOKIE_READY; }
00188 inline int isGrpSec() { return grpSec; }
00189 inline int getNumLocalElems(){
00190 return (isGrpSec()? localGrpElem : localElem.size());
00191 }
00192 inline void setLocalGrpElem() { localGrpElem = 1; }
00193 inline int getNumAllElems(){
00194 return (isGrpSec()? allGrpElem.size() : allElem.size());
00195 }
00197 inline void incReduceNo() {
00198 red.redNo ++;
00199 for (mCastEntry *next = newc; next; next=next->newc)
00200 next->red.redNo++;
00201 }
00203 inline CkArrayID getAid() { return aid; }
00204 inline int hasOldtree() { return oldtree.entry != NULL; }
00205 inline void print() {
00206 CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
00207 }
00208 };
00209
00210
00211
00212
00213 class cookieMsg: public CMessage_cookieMsg {
00214 public:
00215 CkSectionInfo cookie;
00216 public:
00217 cookieMsg() {};
00218 cookieMsg(CkSectionInfo m): cookie(m) {};
00219 };
00220
00221
00222
00223
00229 class multicastSetupMsg: public CMessage_multicastSetupMsg {
00230 public:
00234 int nIdx;
00239 CkArrayIndex *arrIdx;
00249 int *peElems;
00250 CkSectionInfo parent;
00251 CkSectionInfo rootSid;
00252 int redNo;
00253 int forGrpSec(){
00254 CkAssert(nIdx);
00255 return ((void *)arrIdx == (void *)peElems);
00256 }
00257 int bfactor;
00258 };
00259
00260
00261
00262
00264 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
00265 };
00266
00267
00268 extern void CkPackMessage(envelope **pEnv);
00269 extern void CkUnpackMessage(envelope **pEnv);
00270
00271
00272
00273 void _ckMulticastInit(void)
00274 {
00275
00276
00277
00278
00279 }
00280
00281
00282 mCastEntry::mCastEntry (mCastEntry *old):
00283 numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY), grpSec(old->isGrpSec())
00284 {
00285 int i;
00286 aid = old->aid;
00287 parentGrp = old->parentGrp;
00288 allElem = old->allElem;
00289 allGrpElem = old->allGrpElem;
00290 #if CMK_LBDB_ON
00291 allObjKeys = old->allObjKeys;
00292 #endif
00293 pe = old->pe;
00294 red.storedCallback = old->red.storedCallback;
00295 red.storedClient = old->red.storedClient;
00296 red.storedClientParam = old->red.storedClientParam;
00297 red.redNo = old->red.redNo;
00298 needRebuild = 0;
00299 asm_msg = NULL;
00300 asm_fill = 0;
00301 }
00302
00303
00304
00305 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
00306 {
00307 setSection(_id, aid, al, n, dfactor);
00308 }
00309
00310 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n, int factor)
00311 {
00312
00313 mCastEntry *entry = new mCastEntry(aid);
00314
00315 entry->allElem.resize(n);
00316 entry->allObjKeys.reserve(n);
00317 for (int i=0; i<n; i++) {
00318 entry->allElem[i] = al[i];
00319 #if CMK_LBDB_ON
00320 CmiUInt8 _key;
00321 if(CProxy_ArrayBase(aid).ckLocMgr()->lookupID(al[i], _key))
00322 entry->allObjKeys.push_back(_key);
00323 #endif
00324 }
00325 entry->allObjKeys.shrink_to_fit();
00326 entry->bfactor = factor;
00327
00328 _id.get_aid() = aid;
00329 _id.get_val() = entry;
00330
00331 initCookie(_id);
00332 }
00333
00334
00335
00336
00337 void CkMulticastMgr::setSection(CkSectionInfo &id)
00338 {
00339 initCookie(id);
00340 }
00341
00342
00343
00344
00346 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
00347 {
00348 CkArrayID aid = proxy.ckGetArrayID();
00349 CkSectionInfo &_id = proxy.ckGetSectionInfo();
00350
00351 mCastEntry *entry = new mCastEntry(aid);
00352
00353 const CkArrayIndex *al = proxy.ckGetArrayElements();
00354 entry->allElem.resize(proxy.ckGetNumElements());
00355 entry->allObjKeys.reserve(proxy.ckGetNumElements());
00356 for (int i=0; i<proxy.ckGetNumElements(); i++) {
00357 entry->allElem[i] = al[i];
00358 #if CMK_LBDB_ON
00359 CmiUInt8 _key;
00360 if(CProxy_ArrayBase(aid).ckLocMgr()->lookupID(al[i], _key))
00361 entry->allObjKeys.push_back(_key);
00362 #endif
00363 }
00364 entry->allObjKeys.shrink_to_fit();
00365 if(proxy.ckGetBfactor() == USE_DEFAULT_BRANCH_FACTOR)
00366 entry->bfactor = dfactor;
00367 else
00368 entry->bfactor = proxy.ckGetBfactor();
00369 _id.get_aid() = aid;
00370 _id.get_val() = entry;
00371 initCookie(_id);
00372 }
00373
00374
00375
00376
00377 void CkMulticastMgr::resetSection(CProxySection_ArrayBase &proxy)
00378 {
00379 CkSectionInfo &info = proxy.ckGetSectionInfo();
00380
00381 int oldpe = info.get_pe();
00382 if (oldpe == CkMyPe()) return;
00383
00384 CkArrayID aid = proxy.ckGetArrayID();
00385 CkSectionID *sid = proxy.ckGetSectionIDs();
00386 mCastEntry *entry = new mCastEntry(aid);
00387
00388 mCastEntry *oldentry = (mCastEntry *)info.get_val();
00389 DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
00390
00391 const std::vector<CkArrayIndex> &al = sid->_elems;
00392 CmiAssert(info.get_aid() == aid);
00393 prepareCookie(entry, *sid, al.data(), sid->_elems.size(), aid);
00394
00395 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00396
00397
00398 entry->oldtree.set(oldentry, oldpe);
00399
00400
00401 mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
00402
00403
00404 mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
00405 }
00406
00407
00409 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
00410 {
00411 entry->allElem.resize(count);
00412 entry->allObjKeys.reserve(count);
00413 for (int i=0; i<count; i++) {
00414 entry->allElem[i] = al[i];
00415 #if CMK_LBDB_ON
00416 CmiUInt8 _key;
00417 if(CProxy_ArrayBase(aid).ckLocMgr()->lookupID(al[i], _key))
00418 entry->allObjKeys.push_back(_key);
00419 #endif
00420 }
00421 entry->allObjKeys.shrink_to_fit();
00422 if(sid.bfactor == USE_DEFAULT_BRANCH_FACTOR)
00423 entry->bfactor = dfactor;
00424 else
00425 entry->bfactor = sid.bfactor;
00426
00427 sid._cookie.get_aid() = aid;
00428 sid._cookie.get_val() = entry;
00429 sid._cookie.get_pe() = CkMyPe();
00430 }
00431
00432
00434 void CkMulticastMgr::prepareGrpCookie(mCastEntry *entry, CkSectionID &sid, const int *pelist, int count, CkGroupID gid)
00435 {
00436 entry->allGrpElem.resize(count);
00437 for (int i=0; i<count; i++) {
00438 entry->allGrpElem[i] = pelist[i];
00439 }
00440
00441 if(sid.bfactor == USE_DEFAULT_BRANCH_FACTOR)
00442 entry->bfactor = dfactor;
00443 else
00444 entry->bfactor = sid.bfactor;
00445
00446 sid._cookie.get_aid() = gid;
00447 sid._cookie.get_val() = entry;
00448 sid._cookie.get_pe() = CkMyPe();
00449 DEBUGF(("[%d]In prepareGrpCookie: entry: %p, entry->isGrpSec(): %d\n", CkMyPe(), entry, entry->isGrpSec()));
00450 CkAssert(entry->isGrpSec());
00451 }
00452
00453
00454
00455 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy, int opts)
00456 {
00457 if(opts == GROUP_SECTION_PROXY){
00458 initGrpDelegateMgr((CProxySection_Group *) cproxy, opts);
00459 return;
00460 }
00461
00462 CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
00463 int numSubSections = proxy->ckGetNumSubSections();
00464 CkCallback *sectionCB;
00465
00466 if (numSubSections > 1)
00467 {
00477
00478 ck::impl::XArraySectionReducer *red =
00479 new ck::impl::XArraySectionReducer(numSubSections, nullptr);
00480
00481 sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
00482 }
00483 for (int i=0; i<numSubSections; i++)
00484 {
00485 CkArrayID aid = proxy->ckGetArrayIDn(i);
00486 mCastEntry *entry = new mCastEntry(aid);
00487 CkSectionID *sid = &( proxy->ckGetSectionID(i) );
00488 const CkArrayIndex *al = proxy->ckGetArrayElements(i);
00489 if (numSubSections > 1)
00490 entry->red.storedCallback = sectionCB;
00491 prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
00492 initCookie(sid->_cookie);
00493 }
00494 }
00495
00496
00497
00498 void CkMulticastMgr::initGrpDelegateMgr(CProxySection_Group *proxy, int opts)
00499 {
00500 int numSubSections = proxy->ckGetNumSections();
00501 for (int i=0; i<numSubSections; i++)
00502 {
00503 CkGroupID gid = proxy->ckGetGroupIDn(i);
00504 mCastEntry *entry = new mCastEntry(gid);
00505 CkSectionID *sid = &( proxy->ckGetSectionID(i) );
00506 const int *pelist = proxy->ckGetElements(i);
00507 prepareGrpCookie(entry, *sid, pelist, proxy->ckGetNumElements(i), gid);
00508 initGrpCookie(sid->_cookie);
00509 }
00510 }
00511
00512
00513 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
00514 {
00515 mCastEntry *entry = (mCastEntry *)s.get_val();
00516 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00517 mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
00518 }
00519
00520
00521
00522 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
00523 {
00524 mCastEntry *entry = (mCastEntry *)s.get_val();
00525 entry->red.redNo = red;
00526
00527 initCookie(s);
00528
00529
00530 }
00531
00532
00533
00534
00535 void CkMulticastMgr::initCookie(CkSectionInfo s)
00536 {
00537 mCastEntry *entry = (mCastEntry *)s.get_val();
00538 const int n = entry->allElem.size();
00539 DEBUGF(("init: %d elems %p at %f\n", n, s.get_val(), CkWallTimer()));
00540
00541
00542 std::map<int, std::vector<int>> elemBins;
00543 CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
00544 for (int i=0; i < n; i++) {
00545 int ape = array->lastKnown(entry->allElem[i]);
00546 CmiAssert(ape >=0 && ape < CkNumPes());
00547 elemBins[ape].push_back(i);
00548 }
00549
00550 multicastSetupMsg *msg = new (n, (elemBins.size()+1)*2, 0) multicastSetupMsg;
00551 msg->nIdx = elemBins.size();
00552 msg->parent = CkSectionInfo(entry->getAid());
00553 msg->rootSid = s;
00554 msg->redNo = entry->red.redNo;
00555 msg->bfactor = entry->bfactor;
00556 int cntElems=0, idx=0;
00557 for (std::map<int, std::vector<int> >::iterator itr = elemBins.begin();
00558 itr != elemBins.end(); ++itr) {
00559 msg->peElems[idx++] = itr->first;
00560 msg->peElems[idx++] = cntElems;
00561 std::vector<int> &elems = itr->second;
00562 for (int j=0; j < elems.size(); j++) {
00563 msg->arrIdx[cntElems++] = entry->allElem[elems[j]];
00564 }
00565 }
00566 msg->peElems[idx++] = -1;
00567 msg->peElems[idx] = cntElems;
00568
00569 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00570 mCastGrp[CkMyPe()].setup(msg);
00571 }
00572
00573
00574
00575 void CkMulticastMgr::initGrpCookie(CkSectionInfo s)
00576 {
00577 mCastEntry *entry = (mCastEntry *)s.get_val();
00578 int n = entry->allGrpElem.size();
00579 DEBUGF(("init: %d elems %p\n", n, s.get_val()));
00580
00581 multicastSetupMsg *msg = new (0, n, 0) multicastSetupMsg;
00582 DEBUGF(("[%d] initGrpCookie: msg->arrIdx:%p, msg->lastKnown: %p \n", CkMyPe(), msg->arrIdx, msg->lastKnown));
00583 msg->nIdx = n;
00584 msg->parent = CkSectionInfo(entry->getAid());
00585 msg->rootSid = s;
00586 msg->redNo = entry->red.redNo;
00587 msg->bfactor = entry->bfactor;
00588
00589 for (int i=0; i<n; i++) {
00590 msg->peElems[i] = entry->allGrpElem[i];
00591 }
00592
00593 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00594 mCastGrp[CkMyPe()].setup(msg);
00595 }
00596
00597
00598
00599
00600 void CkMulticastMgr::teardown(CkSectionInfo cookie)
00601 {
00602 int i;
00603 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00604
00605 sect->setObsolete();
00606
00607 releaseBufferedReduceMsgs(sect);
00608
00609 CProxy_CkMulticastMgr mp(thisgroup);
00610 for (i=0; i<sect->children.size(); i++)
00611 mp[sect->children[i].get_pe()].teardown(sect->children[i]);
00612 }
00613
00614
00615
00616
00617 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
00618 {
00619 int i;
00620 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00621
00622 sect->rootSid = newroot;
00623
00624 sect->setObsolete();
00625
00626 releaseBufferedReduceMsgs(sect);
00627
00628 CProxy_CkMulticastMgr mp(thisgroup);
00629 for (i=0; i<sect->children.size(); i++)
00630 mp[sect->children[i].get_pe()].teardown(sect->children[i]);
00631 }
00632
00633
00634
00635
00636 void CkMulticastMgr::freeup(CkSectionInfo cookie)
00637 {
00638 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00639 CProxy_CkMulticastMgr mp(thisgroup);
00640
00641 while (sect)
00642 {
00643
00644 for (int i=0; i<sect->children.size(); i++)
00645 mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
00646
00647 DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
00648 mCastEntry *oldc= sect->oldc;
00649 delete sect;
00650 sect = oldc;
00651 }
00652 }
00653
00654
00655 typedef std::vector<int>::iterator TreeIterator;
00656
00657 void CkMulticastMgr::setup(multicastSetupMsg *msg)
00658 {
00659 mCastEntry *entry;
00660 CkArrayID aid = msg->rootSid.get_aid();
00661 if (msg->parent.get_pe() == CkMyPe())
00662 entry = (mCastEntry *)msg->rootSid.get_val();
00663 else{
00664 if(msg->forGrpSec())
00665 entry = new mCastEntry((CkGroupID) aid);
00666 else
00667 entry = new mCastEntry(aid);
00668 }
00669 entry->aid = aid;
00670 entry->pe = CkMyPe();
00671 entry->rootSid = msg->rootSid;
00672 entry->parentGrp = msg->parent;
00673 int factor = entry->bfactor = msg->bfactor;
00674
00675 DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems, grpSec: %d, factor: %d\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx, entry->isGrpSec(), factor));
00676 entry->red.redNo = msg->redNo;
00677
00678 const int numpes = msg->nIdx;
00679 std::unordered_map<int, int> peIdx;
00680 if (!entry->isGrpSec()) peIdx.reserve(numpes);
00681 std::vector<int> mySubTreePEs;
00682 mySubTreePEs.reserve(numpes);
00683
00684 mySubTreePEs.push_back(CkMyPe());
00685
00686 for (int i1=0; i1 < numpes; i1++) {
00687 if (entry->isGrpSec()) {
00688
00689 if (msg->peElems[i1] != CkMyPe())
00690 mySubTreePEs.push_back(msg->peElems[i1]);
00691 else
00692 entry->setLocalGrpElem();
00693 } else {
00694
00695 int i2 = i1*2;
00696 int pe = msg->peElems[i2];
00697 peIdx[pe] = i2;
00698 if (pe != CkMyPe()) {
00699 mySubTreePEs.push_back(pe);
00700 } else {
00701 int begin = msg->peElems[i2+1];
00702 int end = msg->peElems[i2+3];
00703 entry->localElem.reserve(entry->localElem.size() + (end - begin));
00704 for (int j=begin; j < end; j++)
00705 entry->localElem.push_back(msg->arrIdx[j]);
00706 }
00707 }
00708 }
00709
00710
00711 int num = mySubTreePEs.size() - 1, numchild = 0;
00712 if (factor <= 0) numchild = num;
00713 else numchild = num<factor?num:factor;
00714
00715 entry->numChild = numchild;
00716
00717
00718 if (numchild)
00719 {
00720
00721 bool isRoot = (msg->parent.get_pe() == CkMyPe());
00722 ST_RecursivePartition<TreeIterator> treeBuilder(false,!isRoot);
00723 numchild = treeBuilder.buildSpanningTree(mySubTreePEs.begin(), mySubTreePEs.end(), numchild);
00724 entry->numChild = numchild;
00725
00726 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00727
00728
00729 for (int i=0; i < numchild; i++)
00730 {
00731 TreeIterator subtreeStart = treeBuilder.begin(i), subtreeEnd = treeBuilder.end(i);
00732
00733
00734 int numSubTreeElems = 0;
00735 int numSubTreePes = treeBuilder.subtreeSize(i);
00736 multicastSetupMsg *m;
00737
00738 if (entry->isGrpSec()) {
00739 m = new (0, numSubTreePes, 0) multicastSetupMsg;
00740 } else {
00741 for (TreeIterator j=subtreeStart; j != subtreeEnd; j++) {
00742 int idx = peIdx[*j];
00743 numSubTreeElems += (msg->peElems[idx+3] - msg->peElems[idx+1]);
00744 }
00745 m = new (numSubTreeElems, (numSubTreePes+1)*2, 0) multicastSetupMsg;
00746 }
00747
00748
00749 m->parent = CkSectionInfo(aid, entry);
00750 m->nIdx = numSubTreePes;
00751 m->rootSid = msg->rootSid;
00752 m->redNo = msg->redNo;
00753 m->bfactor = msg->bfactor;
00754
00755
00756 int cntElems = 0, i2 = 0;
00757 for (TreeIterator j=subtreeStart; j != subtreeEnd; j++)
00758 {
00759 int childPE = *j;
00760 m->peElems[i2++] = childPE;
00761 if (!entry->isGrpSec()) {
00762 m->peElems[i2++] = cntElems;
00763 int i1 = peIdx[childPE];
00764 for (int k=msg->peElems[i1+1]; k < msg->peElems[i1+3]; k++) {
00765 m->arrIdx[cntElems++] = msg->arrIdx[k];
00766 }
00767 }
00768 }
00769 if (!entry->isGrpSec()) {
00770 m->peElems[i2++] = -1;
00771 m->peElems[i2] = cntElems;
00772 }
00773
00774 int childroot = *subtreeStart;
00775 DEBUGF(("[%d] call set up %d numelem:%d, bfactor: %d\n", CkMyPe(), childroot, numSubTreeElems, m->bfactor));
00776
00777 mCastGrp[childroot].setup(m);
00778 }
00779 }
00780
00781 else
00782 childrenReady(entry);
00783 delete msg;
00784 }
00785
00786
00787
00788
00789 void CkMulticastMgr::childrenReady(mCastEntry *entry)
00790 {
00791
00792 entry->setReady();
00793 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00794
00795 DEBUGF(("[%d] childrenReady entry %p groupsection?: %d, Arrayelems: %d, GroupElems: %d, redNo: %d\n", CkMyPe(), entry, entry->isGrpSec(), entry->allElem.size(), entry->allGrpElem.size(), entry->red.redNo));
00796
00797 if (entry->hasParent())
00798 mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
00799 #if SPLIT_MULTICAST
00800
00801 while (!entry->packetBuf.isEmpty())
00802 {
00803 mCastPacket *packet = entry->packetBuf.deq();
00804 packet->cookie.get_val() = entry;
00805 mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->offset, packet->n, packet->data.data(), packet->seqno, packet->count, packet->totalsize, 1);
00806 delete packet;
00807 }
00808 #endif
00809
00810 while (!entry->msgBuf.isEmpty())
00811 {
00812 multicastGrpMsg *newmsg = entry->msgBuf.deq();
00813 DEBUGF(("[%d] release buffer %p ep:%d, msg-used?: %d\n", CkMyPe(), newmsg, newmsg->ep, UsrToEnv(newmsg)->isUsed()));
00814 newmsg->_cookie.get_val() = entry;
00815 mCastGrp[CkMyPe()].recvMsg(newmsg);
00816 }
00817
00818 releaseFutureReduceMsgs(entry);
00819 }
00820
00821
00822
00823
00824 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
00825 {
00826 mCastEntry *entry = (mCastEntry *)sid.get_val();
00827 entry->children.push_back(child);
00828 if (entry->children.size() == entry->numChild) {
00829 childrenReady(entry);
00830 }
00831 }
00832
00833
00834
00835
00836
00837
00838 void CkMulticastMgr::rebuild(CkSectionInfo §Id)
00839 {
00840
00841 mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
00842 CkAssert(curCookie->pe == CkMyPe());
00843
00844 while (curCookie->newc) curCookie = curCookie->newc;
00845 if (curCookie->isObsolete()) return;
00846
00847
00848 mCastEntry *newCookie = new mCastEntry(curCookie);
00849
00850
00851 newCookie->oldc = curCookie;
00852 curCookie->newc = newCookie;
00853
00854 sectId.get_val() = newCookie;
00855
00856 DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
00857
00858 curCookie->setObsolete();
00859
00860 resetCookie(sectId);
00861 }
00862
00863 void CkMulticastMgr::resetCookie(CkSectionInfo s)
00864 {
00865 mCastEntry *newCookie = (mCastEntry*)s.get_val();
00866 mCastEntry *oldCookie = newCookie->oldc;
00867
00868
00869 DEBUGF(("reset: oldc: %p\n", oldCookie));
00870 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00871 int mype = CkMyPe();
00872 mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
00873
00874
00875 initCookie(s);
00876 }
00877
00878 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
00879 {
00880 DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._elems.size()));
00881
00882 ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
00883 for (int i=0; i< sid._elems.size()-1; i++) {
00884 CProxyElement_ArrayBase ap(a, sid._elems[i]);
00885 void *newMsg=CkCopyMsg((void **)&m);
00886 #if CMK_MESSAGE_LOGGING
00887 envelope *env = UsrToEnv(newMsg);
00888 env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
00889 #endif
00890 ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
00891 }
00892 if (!sid._elems.empty()) {
00893 CProxyElement_ArrayBase ap(a, sid._elems[sid._elems.size()-1]);
00894 ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
00895 }
00896 }
00897
00898 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
00899 {
00900 DEBUGF(("ArraySectionSend\n"));
00901 #if CMK_MESSAGE_LOGGING
00902 envelope *env = UsrToEnv(m);
00903 env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
00904 #endif
00905
00906 for (int snum = 0; snum < nsid; snum++) {
00907 void *msgCopy = m;
00908 if (nsid - snum > 1)
00909 msgCopy = CkCopyMsg(&m);
00910 sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
00911 }
00912 }
00913
00914
00915 void CkMulticastMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid)
00916 {
00917 #if CMK_MESSAGE_LOGGING
00918 envelope *env = UsrToEnv(m);
00919 env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
00920 #endif
00921
00922 DEBUGF(("[%d] GroupSectionSend, nsid: %d \n", CkMyPe(), nsid));
00923 for (int snum = 0; snum < nsid; snum++) {
00924 void *msgCopy = m;
00925 if (nsid - snum > 1)
00926 msgCopy = CkCopyMsg(&m);
00927 DEBUGF(("GroupSectionSend, msg-used(m):%d, msg-used(msgCopy):%d\n", UsrToEnv(m)->isUsed(), UsrToEnv(msgCopy)->isUsed()));
00928 sendToSection(pd, ep, msgCopy, &(sid[snum]), 0);
00929 }
00930 }
00931
00932 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
00933 {
00934 multicastGrpMsg *msg = (multicastGrpMsg *)m;
00935 msg->ep = ep;
00936 CkSectionInfo &s = sid->_cookie;
00937 mCastEntry *entry;
00938
00939
00940 if (s.get_pe() == CkMyPe()) {
00941 entry = (mCastEntry *)s.get_val();
00942 if (NULL == entry)
00943 CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
00944
00945
00946 if (entry->newc) {
00947 do { entry=entry->newc; } while (entry->newc);
00948 s.get_val() = entry;
00949 }
00950
00951 #if CMK_LBDB_ON
00952 if(!entry->isGrpSec()){
00953
00954 envelope *env = UsrToEnv(msg);
00955 const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
00956 LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.data(),entry->allObjKeys.size(),env->getTotalsize());
00957 }
00958 #endif
00959
00960
00961 if (entry->needRebuild == 1) {
00962 msg->_cookie = s;
00963 SimpleSend(ep, msg, s.get_aid(), *sid, opts);
00964 entry->needRebuild = 2;
00965 return;
00966 }
00967
00968 else if (entry->needRebuild == 2) rebuild(s);
00969 }
00970
00971 else {
00972
00973 CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
00974 }
00975
00976
00977 msg->_cookie = s;
00978
00979 #if SPLIT_MULTICAST
00980
00981 envelope *env = UsrToEnv(m);
00982 CkPackMessage(&env);
00983 int totalsize = env->getTotalsize();
00984 int packetSize = 0;
00985 int totalcount = 0;
00986 if(totalsize < split_threshold){
00987 packetSize = totalsize;
00988 totalcount = 1;
00989 }else{
00990 packetSize = split_size;
00991 totalcount = totalsize/split_size;
00992 if(totalsize%split_size) totalcount++;
00993
00994
00995
00996 }
00997 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00998 int sizesofar = 0;
00999 char *data = (char*) env;
01000 if (totalcount == 1) {
01001
01002 if (s.get_pe() == CkMyPe()) {
01003 CkUnpackMessage(&env);
01004 msg = (multicastGrpMsg *)EnvToUsr(env);
01005 recvMsg(msg);
01006 }
01007
01008 else {
01009 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01010 msg = (multicastGrpMsg *)EnvToUsr(env);
01011 mCastGrp[s.get_pe()].recvMsg(msg);
01012 }
01013 return;
01014 }
01015 for (int i=0; i<totalcount; i++) {
01016 int mysize = packetSize;
01017 if (mysize + sizesofar > totalsize) {
01018 mysize = totalsize-sizesofar;
01019 }
01020
01021 mCastGrp[s.get_pe()].recvPacket(s, sizesofar, mysize, data, i, totalcount, totalsize, 0);
01022 sizesofar += mysize;
01023 data += mysize;
01024 }
01025 CmiFree(env);
01026 #else
01027 if (s.get_pe() == CkMyPe()) {
01028 DEBUGF((CkPrintf("sendToSection, msg-used :%d\n", UsrToEnv(msg)->isUsed()));
01029 recvMsg(msg);
01030 }
01031 else {
01032 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01033 mCastGrp[s.get_pe()].recvMsg(msg);
01034 }
01035 #endif
01036 }
01037
01038 void CkMulticastMgr::recvPacket(CkSectionInfo &&_cookie, int offset, int n, char *data, int seqno, int count, int totalsize, bool fromBuffer)
01039 {
01040 int i;
01041 mCastEntry *entry = (mCastEntry *)_cookie.get_val();
01042
01043
01044 if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
01045 entry->packetBuf.enq(new mCastPacket(_cookie, offset, n, data, seqno, count, totalsize));
01046
01047 return;
01048 }
01049
01050
01051
01052
01053
01054 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01055 for (i=0; i<entry->children.size(); i++) {
01056 mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], offset, n, data, seqno, count, totalsize, 0);
01057 }
01058
01059 if (entry->asm_msg == NULL) {
01060 CmiAssert(entry->asm_fill == 0);
01061 entry->asm_msg = (char *)CmiAlloc(totalsize);
01062 }
01063 memcpy(entry->asm_msg+offset, data, n);
01064 entry->asm_fill += n;
01065 if (entry->asm_fill == totalsize) {
01066 CkUnpackMessage((envelope **)&entry->asm_msg);
01067 multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
01068 msg->_cookie = _cookie;
01069
01070 sendToLocal(msg);
01071 entry->asm_msg = NULL;
01072 entry->asm_fill = 0;
01073 }
01074
01075 }
01076
01077 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
01078 {
01079 int i;
01080 CkSectionInfo §ionInfo = msg->_cookie;
01081 mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
01082 CmiAssert(entry->getAid() == sectionInfo.get_aid());
01083
01084 if (entry->notReady()) {
01085 DEBUGF(("entry not ready, enq buffer %p, msg-used?: %d\n", msg, UsrToEnv(msg)->isUsed()));
01086 entry->msgBuf.enq(msg);
01087 return;
01088 }
01089
01090
01091
01092 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01093 for (i=0; i<entry->children.size(); i++) {
01094 multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
01095 #if CMK_MESSAGE_LOGGING
01096 envelope *env = UsrToEnv(newmsg);
01097 env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
01098 #endif
01099 newmsg->_cookie = entry->children[i];
01100 mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
01101 }
01102
01103 sendToLocal(msg);
01104 }
01105
01106 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
01107 {
01108 int i;
01109 CkSectionInfo §ionInfo = msg->_cookie;
01110 mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
01111 CmiAssert(entry->getAid() == sectionInfo.get_aid());
01112 CkGroupID aid = sectionInfo.get_aid();
01113
01114
01115 int nLocal;
01116
01117
01118 if(entry->isGrpSec()){
01119 nLocal = entry->localGrpElem;
01120 if(nLocal){
01121 DEBUGF(("[%d] send to local branch, GroupSection\n", CkMyPe()));
01122 int mpe = CkMyPe();
01123 CkAssert(nLocal == 1);
01124 CProxyElement_Group ap(aid, CkMyPe());
01125 if (ap.ckIsDelegated()) {
01126 CkGroupMsgPrep(msg->ep, msg, aid);
01127 (ap.ckDelegatedTo())->GroupSend(ap.ckDelegatedPtr(), msg->ep, msg, CkMyPe(), aid);
01128 }
01129 else{
01130 if (_entryTable[msg->ep]->noKeep)
01131 CkSendMsgBranchInline(msg->ep, msg, CkMyPe(), aid, 0);
01132 else
01133 CkSendMsgBranch(msg->ep, msg, CkMyPe(), aid,0);
01134 }
01135 }
01136 return;
01137 }
01138
01139
01140 nLocal = entry->localElem.size();
01141 DEBUGF(("[%d] send to local %d elems, ArraySection\n", CkMyPe(), nLocal));
01142 for (i=0; i<nLocal-1; i++) {
01143 CProxyElement_ArrayBase ap(aid, entry->localElem[i]);
01144 if (_entryTable[msg->ep]->noKeep) {
01145 CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
01146 }
01147 else {
01148
01149 multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
01150 ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
01151 }
01152
01153
01154
01155
01156
01157
01158 }
01159 if (nLocal) {
01160 CProxyElement_ArrayBase ap(aid, entry->localElem[nLocal-1]);
01161 ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
01162
01163 }
01164 else {
01165 CkAssert (entry->rootSid.get_pe() == CkMyPe());
01166 delete msg;
01167 }
01168 }
01169
01170
01171
01172 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
01173 {
01174 CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
01175 if (!CkMcastBaseMsg::checkMagic(m)) {
01176 CmiPrintf("ERROR: This is not a CkMulticast message!\n");
01177 CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
01178 }
01179
01180 if (m->gpe() != -1) {
01181 id.get_pe() = m->gpe();
01182 id.get_val() = m->entry();
01183 id.get_aid() = m->_cookie.get_aid();
01184 }
01185
01186 }
01187
01188
01189
01190 void CkMulticastMgr::setReductionClient(CProxySection_ArrayBase &proxy, CkCallback *cb)
01191 {
01192 CkCallback *sectionCB;
01193 int numSubSections = proxy.ckGetNumSubSections();
01194
01195 if (numSubSections > 1)
01196 {
01203
01204 CkSectionInfo &sInfo = proxy.ckGetSectionID(0)._cookie;
01205 mCastEntry *entry = (mCastEntry *)sInfo.get_val();
01206 delete entry->red.storedCallback;
01207 ck::impl::XGroupSectionReducer *red =
01208 new ck::impl::XGroupSectionReducer(numSubSections, cb);
01209
01210 sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
01211 }
01212
01213 else
01214 sectionCB = cb;
01215
01216 for (int i=0; i<numSubSections; i++)
01217 {
01218 CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
01219 mCastEntry *entry = (mCastEntry *)sInfo.get_val();
01220 entry->red.storedCallback = sectionCB;
01221 }
01222 }
01223
01224
01225
01226 void CkMulticastMgr::setReductionClient(CProxySection_Group &proxy, CkCallback *cb)
01227 {
01228 CkCallback *sectionCB;
01229 int numSubSections = proxy.ckGetNumSections();
01230 DEBUGF(("[%d]setReductionClient for grpSec, numSubSections: %d \n", CkMyPe(), numSubSections));
01231
01232 if (numSubSections > 1)
01233 {
01240
01241 CkSectionInfo &sInfo = proxy.ckGetSectionID(0)._cookie;
01242 mCastEntry *entry = (mCastEntry *)sInfo.get_val();
01243 delete entry->red.storedCallback;
01244 ck::impl::XGroupSectionReducer *red =
01245 new ck::impl::XGroupSectionReducer(numSubSections, cb);
01246
01247 sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
01248 }
01249
01250 else
01251 sectionCB = cb;
01252
01253 for (int i=0; i<numSubSections; i++)
01254 {
01255 CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
01256 mCastEntry *entry = (mCastEntry *)sInfo.get_val();
01257 entry->red.storedCallback = sectionCB;
01258 }
01259 }
01260
01261
01262 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
01263 {
01264 CkSectionInfo &id = proxy.ckGetSectionInfo();
01265 mCastEntry *entry = (mCastEntry *)id.get_val();
01266 entry->red.storedClient = fn;
01267 entry->red.storedClientParam = param;
01268 }
01269
01270 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
01271 {
01272 CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
01273 msg->reducer = type;
01274 msg->sid = id;
01275 msg->sourceFlag = -1;
01276 msg->redNo = id.get_redNo();
01277 msg->gcount = 1;
01278 msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
01279 msg->callback = cb;
01280 msg->userFlag=userFlag;
01281 #if CMK_MESSAGE_LOGGING
01282 envelope *env = UsrToEnv(msg);
01283 env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
01284 #endif
01285 return msg;
01286 }
01287
01288
01289 void CkMulticastMgr::contribute(CkSectionInfo &id, int userFlag, int fragSize)
01290 {
01291 CkCallback cb;
01292 contribute(0, NULL, CkReduction::nop, id, cb, userFlag, fragSize);
01293 }
01294
01295 void CkMulticastMgr::contribute(CkSectionInfo &id, const CkCallback &cb, int userFlag, int fragSize)
01296 {
01297 contribute(0, NULL, CkReduction::nop, id, cb, userFlag, fragSize);
01298 }
01299
01300 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
01301 {
01302 CkCallback cb;
01303 contribute(dataSize, data, type, id, cb, userFlag, fragSize);
01304 }
01305
01306
01307 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, const CkCallback &cb, int userFlag, int fragSize)
01308 {
01309 if (id.get_val() == NULL || id.get_redNo() == -1)
01310 CmiAbort("contribute: SectionID is not initialized\n");
01311
01312 int8_t nFrags;
01313 if (-1 == fragSize) {
01314 nFrags = 1;
01315 fragSize = dataSize;
01316 }
01317 else {
01318 CmiAssert (dataSize >= fragSize);
01319 nFrags = dataSize/fragSize;
01320 if (dataSize%fragSize) nFrags++;
01321 }
01322
01323 if (MAXFRAGS < nFrags) {
01324 CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
01325 CmiAbort ("frag size too small\n");
01326 }
01327
01328 int mpe = id.get_pe();
01329 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01330
01331
01332 int fSize = fragSize;
01333 for (int8_t i=0; i<nFrags; i++) {
01334 if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
01335 fSize = dataSize%fragSize;
01336 }
01337
01338 CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
01339
01340
01341 msg->reducer = type;
01342 msg->sid = id;
01343 msg->nFrags = nFrags;
01344 msg->fragNo = i;
01345 msg->sourceFlag = -1;
01346 msg->redNo = id.get_redNo();
01347 msg->gcount = 1;
01348 msg->rebuilt = (mpe == CkMyPe())?0:1;
01349 msg->callback = cb;
01350 msg->userFlag = userFlag;
01351
01352 #if CMK_MESSAGE_LOGGING
01353 envelope *env = UsrToEnv(msg);
01354 env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
01355 #endif
01356
01357 mCastGrp[mpe].recvRedMsg(msg);
01358
01359 data = (void*)(((char*)data) + fSize);
01360 }
01361
01362 id.get_redNo()++;
01363 DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
01364 }
01365
01366 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id,
01367 mCastEntry* entry,
01368 reductionInfo& redInfo) {
01369 int8_t i;
01370 int dataSize = 0;
01371 int8_t nFrags = redInfo.msgs[0][0]->nFrags;
01372
01373
01374 if (1 == nFrags) {
01375 CkReductionMsg* msg = redInfo.msgs[0][0];
01376
01377
01378 redInfo.msgs[0].clear();
01379
01380 return msg;
01381 }
01382
01383 for (i=0; i<nFrags; i++) {
01384 dataSize += redInfo.msgs[i][0]->dataSize;
01385 }
01386
01387 CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
01388 #if CMK_MESSAGE_LOGGING
01389 envelope *env = UsrToEnv(msg);
01390 env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
01391 #endif
01392
01393
01394 msg->redNo = redInfo.msgs[0][0]->redNo;
01395 msg->reducer = redInfo.msgs[0][0]->reducer;
01396 msg->sid = id;
01397 msg->nFrags = nFrags;
01398
01399
01400 msg->sourceFlag = 2;
01401 msg->rebuilt = redInfo.msgs[0][0]->rebuilt;
01402 msg->callback = redInfo.msgs[0][0]->callback;
01403 msg->userFlag = redInfo.msgs[0][0]->userFlag;
01404
01405 byte* data = (byte*)msg->getData ();
01406 for (i=0; i<nFrags; i++) {
01407
01408 memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
01409 data += redInfo.msgs[i][0]->dataSize;
01410
01411
01412 delete redInfo.msgs[i][0];
01413 redInfo.msgs[i].clear();
01414 }
01415
01416 return msg;
01417 }
01418
01419
01420
01421 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
01422 mCastEntry* entry, reductionInfo& redInfo,
01423 int currentTreeUp) {
01424
01425 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01426 reductionMsgs& rmsgs = redInfo.msgs[index];
01427 int dataSize = rmsgs[0]->dataSize;
01428 int i;
01429 int oldRedNo = redInfo.redNo;
01430 int nFrags = rmsgs[0]->nFrags;
01431 int fragNo = rmsgs[0]->fragNo;
01432 int userFlag = rmsgs[0]->userFlag;
01433
01434
01435 CkReduction::reducerType reducer = rmsgs[0]->reducer;
01436 CkReduction::reducerFn f= CkReduction::reducerTable()[reducer].fn;
01437 CkAssert(NULL != f);
01438
01439
01440 CkCallback msg_cb;
01441 int8_t rebuilt = 0;
01442 for (i=0; i<rmsgs.size(); i++) {
01443 if (rmsgs[i]->rebuilt) rebuilt = 1;
01444 if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
01445 }
01446
01447
01448 CkReductionMsg *newmsg = (*f)(rmsgs.size(), rmsgs.data());
01449 #if CMK_MESSAGE_LOGGING
01450 envelope *env = UsrToEnv(newmsg);
01451 env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
01452 #endif
01453 newmsg->redNo = redInfo.redNo;
01454 newmsg->nFrags = nFrags;
01455 newmsg->fragNo = fragNo;
01456 newmsg->userFlag = userFlag;
01457 newmsg->reducer = reducer;
01458
01459
01460 redInfo.npProcessed ++;
01461
01462
01463 for (i=0; i<rmsgs.size(); i++)
01464 if (rmsgs[i]!=newmsg) delete rmsgs[i];
01465 rmsgs.clear();
01466
01467
01468 if (entry->hasParent()) {
01469
01470 newmsg->sid = entry->parentGrp;
01471 newmsg->sourceFlag = 2;
01472 newmsg->redNo = oldRedNo;
01473 newmsg->gcount = redInfo.gcount [index];
01474 newmsg->rebuilt = rebuilt;
01475 newmsg->callback = msg_cb;
01476 DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
01477 mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
01478 } else {
01479 newmsg->sid = id;
01480
01481 rmsgs.push_back (newmsg);
01482
01483 if (redInfo.npProcessed == nFrags) {
01484
01485 newmsg = combineFrags (id, entry, redInfo);
01486
01487 CkSetRefNum(newmsg, userFlag);
01488
01489 if (redInfo.storedCallback != NULL)
01490 redInfo.storedCallback->send(newmsg);
01491 else if ( !msg_cb.isInvalid() )
01492 msg_cb.send(newmsg);
01493 else if (redInfo.storedClient != NULL) {
01494 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
01495 delete newmsg;
01496 }
01497 else
01498 CmiAbort("Did you forget to register a reduction client?");
01499
01500 DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
01501
01502 if (currentTreeUp) {
01503 if (entry->oldc) {
01504
01505 mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
01506 entry->oldc = NULL;
01507 }
01508 if (entry->hasOldtree()) {
01509
01510 int oldpe = entry->oldtree.pe;
01511 mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
01512 entry->oldtree.clear();
01513 }
01514 }
01515
01516 if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
01517 }
01518 }
01519 }
01520
01521
01522
01531 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
01532 {
01533 int8_t i;
01535 CkSectionInfo id = msg->sid;
01537 mCastEntry *entry = (mCastEntry *)id.get_val();
01538 CmiAssert(entry!=NULL);
01539
01540 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01541
01542 int updateReduceNo = 0;
01543
01544
01546 if (entry->isObsolete()) {
01547
01548 DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.get_pe()));
01549
01551 if (!entry->hasParent()) {
01552 mCastEntry *newentry = entry->newc;
01553 while (newentry && newentry->newc) newentry=newentry->newc;
01554 if (newentry) entry = newentry;
01555 CmiAssert(entry!=NULL);
01556 }
01557
01559 if (!entry->hasParent() && !entry->isObsolete()) {
01561 msg->sourceFlag = 0;
01563 updateReduceNo = 1;
01564 }
01566 else {
01567
01568 CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
01569
01570 msg->sid = entry->rootSid;
01571 msg->sourceFlag = 0;
01572
01573 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01574 return;
01575 }
01576 }
01577
01579 reductionInfo &redInfo = entry->red;
01580
01581
01582 DEBUGF(("[%d] RecvRedMsg, entry: %p, lcount: %d, cccount: %d, #localelems: %d, #children: %d \n", CkMyPe(), entry, redInfo.lcount[msg->fragNo], redInfo.ccount[msg->fragNo], entry->getNumLocalElems(), entry->children.size()));
01583
01584
01586 if (msg->redNo < redInfo.redNo) {
01587 CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
01588 CmiAbort("CkMulticast received a reduction msg with redNo less than the current redn number. Should never happen! \n");
01589 }
01590
01591
01593 if (entry->notReady() || msg->redNo > redInfo.redNo) {
01594 DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
01595 redInfo.futureMsgs.push_back(msg);
01596 return;
01597 }
01598
01599
01600 const int index = msg->fragNo;
01601
01602 if (msg->isFromUser()) {
01603 redInfo.lcount [index] ++;
01604 }
01605
01606 if (msg->sourceFlag == 2) {
01607 redInfo.ccount [index] ++;
01608 }
01609
01610 redInfo.gcount [index] += msg->gcount;
01611
01612
01613
01614 redInfo.msgs [index].push_back(msg);
01615
01616
01618 int currentTreeUp = 0;
01619 if (redInfo.lcount [index] == entry->getNumLocalElems() && redInfo.ccount [index] == entry->children.size())
01620 currentTreeUp = 1;
01621
01623 int mixTreeUp = 0;
01624 if (!entry->hasParent()) {
01625 mixTreeUp = 1;
01626 for (int8_t i=0; i<msg->nFrags; i++)
01627 if (entry->getNumAllElems() != redInfo.gcount [i])
01628 mixTreeUp = 0;
01629 }
01630
01631
01632 if (!currentTreeUp && !mixTreeUp && redInfo.msgs[index].size() > 1 && CkReduction::reducerTable()[msg->reducer].streamable) {
01633 reductionMsgs& rmsgs = redInfo.msgs[index];
01634 CkReduction::reducerType reducer = rmsgs[0]->reducer;
01635 CkReduction::reducerFn f= CkReduction::reducerTable()[msg->reducer].fn;
01636 CkAssert(f != NULL);
01637
01638 int oldRedNo = redInfo.redNo;
01639 int nFrags = rmsgs[0]->nFrags;
01640 int fragNo = rmsgs[0]->fragNo;
01641 int userFlag = rmsgs[0]->userFlag;
01642 CkSectionInfo oldId = rmsgs[0]->sid;
01643 CkCallback msg_cb;
01644 int8_t rebuilt = 0;
01645 if (msg->rebuilt) rebuilt = 1;
01646 if (!msg->callback.isInvalid()) msg_cb = msg->callback;
01647
01648 CkReductionMsg *newmsg = (*f)(rmsgs.size(), rmsgs.data());
01649 #if CMK_MESSAGE_LOGGING
01650 envelope *env = UsrToEnv(newmsg);
01651 env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
01652 #endif
01653 newmsg->redNo = oldRedNo;
01654 newmsg->nFrags = nFrags;
01655 newmsg->fragNo = fragNo;
01656 newmsg->userFlag = userFlag;
01657 newmsg->reducer = reducer;
01658 if (rebuilt) newmsg->rebuilt = 1;
01659 if (!msg_cb.isInvalid()) newmsg->callback = msg_cb;
01660 newmsg->gcount = redInfo.gcount[index];
01661 newmsg->sid = oldId;
01662
01663 rmsgs.pop_back();
01664 delete msg;
01665
01666 CkAssert(rmsgs.size() == 1);
01667 }
01668
01669
01671 if (currentTreeUp || mixTreeUp)
01672 {
01673 const int8_t nFrags = msg->nFrags;
01675 reduceFragment (index, id, entry, redInfo, currentTreeUp);
01676
01677
01678
01679 if (updateReduceNo)
01680 mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
01681
01683 if (redInfo.npProcessed == nFrags) {
01684
01686 entry->incReduceNo();
01687
01689 for (i=0; i<nFrags; i++) {
01690 redInfo.lcount [i] = 0;
01691 redInfo.ccount [i] = 0;
01692 redInfo.gcount [i] = 0;
01693 }
01694 redInfo.npProcessed = 0;
01696 releaseFutureReduceMsgs(entry);
01697 }
01698 }
01699 }
01700
01701
01702
01703 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
01704 {
01705 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01706
01707 for (int i=0; i<entry->red.futureMsgs.size(); i++) {
01708 DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
01709 mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
01710 }
01711 entry->red.futureMsgs.clear();
01712 }
01713
01714
01715
01716
01717 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
01718 {
01719 int i;
01720 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01721
01722 for (int j=0; j<MAXFRAGS; j++) {
01723 for (i=0; i<entry->red.msgs[j].size(); i++) {
01724 CkReductionMsg *msg = entry->red.msgs[j][i];
01725 DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
01726 msg->sid = entry->rootSid;
01727 msg->sourceFlag = 0;
01728 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01729 }
01730 entry->red.msgs[j].clear();
01731 }
01732
01733
01734 for (i=0; i<entry->red.futureMsgs.size(); i++) {
01735 CkReductionMsg *msg = entry->red.futureMsgs[i];
01736 DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
01737 msg->sid = entry->rootSid;
01738 msg->sourceFlag = 0;
01739 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01740 }
01741 entry->red.futureMsgs.clear();
01742 }
01743
01744
01745
01746 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
01747 {
01748 DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
01749 if (entry->red.redNo < red)
01750 entry->red.redNo = red;
01751
01752 CProxy_CkMulticastMgr mp(thisgroup);
01753 for (int i=0; i<entry->children.size(); i++) {
01754 mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
01755 }
01756
01757 releaseFutureReduceMsgs(entry);
01758 }
01759
01760 #include "CkMulticast.def.h"
01761