00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "charm++.h"
00014 #include "envelope.h"
00015 #include "register.h"
00016
00017 #include "ckmulticast.h"
00018 #include "spanningTreeStrategy.h"
00019 #include "XArraySectionReducer.h"
00020
00021 #define DEBUGF(x) // CkPrintf x;
00022
00023
00024 #define SPLIT_MULTICAST 1
00025
00026
00027 #define MAXFRAGS 100
00028
00029 typedef CkQ<multicastGrpMsg *> multicastGrpMsgBuf;
00030 typedef CkVec<CkArrayIndex> arrayIndexList;
00031 typedef CkVec<CkSectionInfo> sectionIdList;
00032 typedef CkVec<CkReductionMsg *> reductionMsgs;
00033 typedef CkQ<int> PieceSize;
00034 typedef CkVec<LDObjid> ObjKeyList;
00035 typedef unsigned char byte;
00036
00042 class reductionInfo {
00043 public:
00045 int lcount [MAXFRAGS];
00047 int ccount [MAXFRAGS];
00049 int gcount [MAXFRAGS];
00051 int npProcessed;
00053 CkCallback *storedCallback;
00055 redClientFn storedClient;
00057 void *storedClientParam;
00059 int redNo;
00061 reductionMsgs msgs [MAXFRAGS];
00063 reductionMsgs futureMsgs;
00064
00065 public:
00066 reductionInfo(): storedCallback(NULL),
00067 storedClientParam(NULL),
00068 redNo(0),
00069 npProcessed(0) {
00070 for (int i=0; i<MAXFRAGS; i++)
00071 lcount [i] = ccount [i] = gcount [i] = 0;
00072 }
00073 };
00074
00076 #define COOKIE_NOTREADY 0
00077 #define COOKIE_READY 1
00078 #define COOKIE_OLD 2
00079
00080 class mCastPacket {
00081 public:
00082 CkSectionInfo cookie;
00083 int offset;
00084 int n;
00085 char *data;
00086 int seqno;
00087 int count;
00088 int totalsize;
00089
00090 mCastPacket(CkSectionInfo &_cookie, int _offset, int _n, char *_d, int _s, int _c, int _t):
00091 cookie(_cookie), offset(_offset), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
00092 };
00093
00094 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
00095
00096 class SectionLocation {
00097 public:
00098 mCastEntry *entry;
00099 int pe;
00100 public:
00101 SectionLocation(): entry(NULL), pe(-1) {}
00102 SectionLocation( mCastEntry *e, int p) { set(e, p); }
00103 inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
00104 inline void clear() { entry = NULL; pe = -1; }
00105 };
00106
00107
00108
00109
00111 class mCastEntry
00112 {
00113 public:
00115 CkArrayID aid;
00117 CkSectionInfo parentGrp;
00119 sectionIdList children;
00121 int numChild;
00123 arrayIndexList allElem;
00125 ObjKeyList allObjKeys;
00127 arrayIndexList localElem;
00129 int pe;
00131 CkSectionInfo rootSid;
00132 multicastGrpMsgBuf msgBuf;
00134 multicastGrpPacketBuf packetBuf;
00136 char *asm_msg;
00137 int asm_fill;
00139 mCastEntry *oldc, *newc;
00141 SectionLocation oldtree;
00142
00143 reductionInfo red;
00144
00145 char needRebuild;
00146 private:
00147 char flag;
00148 public:
00149 mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
00150 oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
00151 mCastEntry(mCastEntry *);
00153 inline int hasParent() { return parentGrp.get_val()?1:0; }
00155 inline int isObsolete() { return (flag == COOKIE_OLD); }
00157 inline void setObsolete() { flag=COOKIE_OLD; }
00159 inline int notReady() { return (flag == COOKIE_NOTREADY); }
00161 inline void setReady() { flag=COOKIE_READY; }
00163 inline void incReduceNo() {
00164 red.redNo ++;
00165 for (mCastEntry *next = newc; next; next=next->newc)
00166 next->red.redNo++;
00167 }
00169 inline CkArrayID getAid() { return aid; }
00170 inline int hasOldtree() { return oldtree.entry != NULL; }
00171 inline void print() {
00172 CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
00173 }
00174 };
00175
00176
00177
00178
00179 class cookieMsg: public CMessage_cookieMsg {
00180 public:
00181 CkSectionInfo cookie;
00182 public:
00183 cookieMsg() {};
00184 cookieMsg(CkSectionInfo m): cookie(m) {};
00185 };
00186
00187
00188
00189
00191 class multicastSetupMsg: public CMessage_multicastSetupMsg {
00192 public:
00193 int nIdx;
00194 CkArrayIndex *arrIdx;
00195 int *lastKnown;
00196 CkSectionInfo parent;
00197 CkSectionInfo rootSid;
00198 int redNo;
00199 };
00200
00201
00202
00203
00205 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
00206 };
00207
00208
00209 extern void CkPackMessage(envelope **pEnv);
00210 extern void CkUnpackMessage(envelope **pEnv);
00211
00212
00213
00214 void _ckMulticastInit(void)
00215 {
00216
00217
00218
00219
00220 }
00221
00222
00223 mCastEntry::mCastEntry (mCastEntry *old):
00224 numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
00225 {
00226 int i;
00227 aid = old->aid;
00228 parentGrp = old->parentGrp;
00229 for (i=0; i<old->allElem.length(); i++)
00230 allElem.push_back(old->allElem[i]);
00231 #if CMK_LBDB_ON
00232 CmiAssert(old->allElem.length() == old->allObjKeys.length());
00233 for (i=0; i<old->allObjKeys.length(); i++)
00234 allObjKeys.push_back(old->allObjKeys[i]);
00235 #endif
00236 pe = old->pe;
00237 red.storedCallback = old->red.storedCallback;
00238 red.storedClient = old->red.storedClient;
00239 red.storedClientParam = old->red.storedClientParam;
00240 red.redNo = old->red.redNo;
00241 needRebuild = 0;
00242 asm_msg = NULL;
00243 asm_fill = 0;
00244 }
00245
00246 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);
00247
00248
00249
00250
00251 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
00252 {
00253
00254 mCastEntry *entry = new mCastEntry(aid);
00255
00256 for (int i=0; i<n; i++) {
00257 entry->allElem.push_back(al[i]);
00258 #if CMK_LBDB_ON
00259 const LDObjid key = idx2LDObjid(al[i]);
00260 entry->allObjKeys.push_back(key);
00261 #endif
00262 }
00263
00264 _id.get_aid() = aid;
00265 _id.get_val() = entry;
00266
00267 initCookie(_id);
00268 }
00269
00270
00271
00272
00273 void CkMulticastMgr::setSection(CkSectionInfo &id)
00274 {
00275 initCookie(id);
00276 }
00277
00278
00279
00280
00282 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
00283 {
00284 CkArrayID aid = proxy.ckGetArrayID();
00285 CkSectionInfo &_id = proxy.ckGetSectionInfo();
00286
00287 mCastEntry *entry = new mCastEntry(aid);
00288
00289 const CkArrayIndex *al = proxy.ckGetArrayElements();
00290 for (int i=0; i<proxy.ckGetNumElements(); i++) {
00291 entry->allElem.push_back(al[i]);
00292 #if CMK_LBDB_ON
00293 const LDObjid key = idx2LDObjid(al[i]);
00294 entry->allObjKeys.push_back(key);
00295 #endif
00296 }
00297 _id.get_type() = MulticastMsg;
00298 _id.get_aid() = aid;
00299 _id.get_val() = entry;
00300 initCookie(_id);
00301 }
00302
00303
00304
00305
00306 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
00307 {
00308 CkSectionInfo &info = proxy.ckGetSectionInfo();
00309
00310 int oldpe = info.get_pe();
00311 if (oldpe == CkMyPe()) return;
00312
00313 CkArrayID aid = proxy.ckGetArrayID();
00314 CkSectionID *sid = proxy.ckGetSectionIDs();
00315 mCastEntry *entry = new mCastEntry(aid);
00316
00317 mCastEntry *oldentry = (mCastEntry *)info.get_val();
00318 DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
00319
00320 const CkArrayIndex *al = sid->_elems;
00321 CmiAssert(info.get_aid() == aid);
00322 prepareCookie(entry, *sid, al, sid->_nElems, aid);
00323
00324 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00325
00326
00327 entry->oldtree.set(oldentry, oldpe);
00328
00329
00330 mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
00331
00332
00333 mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
00334 }
00335
00336
00337
00338
00340 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
00341 {
00342 for (int i=0; i<count; i++) {
00343 entry->allElem.push_back(al[i]);
00344 #if CMK_LBDB_ON
00345 const LDObjid key = idx2LDObjid(al[i]);
00346 entry->allObjKeys.push_back(key);
00347 #endif
00348 }
00349 sid._cookie.get_type() = MulticastMsg;
00350 sid._cookie.get_aid() = aid;
00351 sid._cookie.get_val() = entry;
00352 sid._cookie.get_pe() = CkMyPe();
00353 }
00354
00355
00356
00357
00358
00359 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
00360 {
00361 CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
00362 int numSubSections = proxy->ckGetNumSubSections();
00363 for (int i=0; i<numSubSections; i++)
00364 {
00365 CkArrayID aid = proxy->ckGetArrayIDn(i);
00366 mCastEntry *entry = new mCastEntry(aid);
00367 CkSectionID *sid = &( proxy->ckGetSectionID(i) );
00368 const CkArrayIndex *al = proxy->ckGetArrayElements(i);
00369 prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
00370 initCookie(sid->_cookie);
00371 }
00372 }
00373
00374
00375
00376
00377 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
00378 {
00379 mCastEntry *entry = (mCastEntry *)s.get_val();
00380 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00381 mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
00382 }
00383
00384
00385
00386 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
00387 {
00388 mCastEntry *entry = (mCastEntry *)s.get_val();
00389 entry->red.redNo = red;
00390
00391 initCookie(s);
00392
00393
00394 }
00395
00396
00397
00398
00399 void CkMulticastMgr::initCookie(CkSectionInfo s)
00400 {
00401 mCastEntry *entry = (mCastEntry *)s.get_val();
00402 int n = entry->allElem.length();
00403 DEBUGF(("init: %d elems %p\n", n, s.get_val()));
00404
00405 multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
00406 msg->nIdx = n;
00407 msg->parent = CkSectionInfo(entry->getAid());
00408 msg->rootSid = s;
00409 msg->redNo = entry->red.redNo;
00410
00411 CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
00412 for (int i=0; i<n; i++) {
00413 msg->arrIdx[i] = entry->allElem[i];
00414 int ape = array->lastKnown(entry->allElem[i]);
00415 CmiAssert(ape >=0 && ape < CkNumPes());
00416 msg->lastKnown[i] = ape;
00417 }
00418
00419 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00420 mCastGrp[CkMyPe()].setup(msg);
00421 }
00422
00423
00424
00425
00426 void CkMulticastMgr::teardown(CkSectionInfo cookie)
00427 {
00428 int i;
00429 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00430
00431 sect->setObsolete();
00432
00433 releaseBufferedReduceMsgs(sect);
00434
00435 CProxy_CkMulticastMgr mp(thisgroup);
00436 for (i=0; i<sect->children.length(); i++)
00437 mp[sect->children[i].get_pe()].teardown(sect->children[i]);
00438 }
00439
00440
00441
00442
00443 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
00444 {
00445 int i;
00446 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00447
00448 sect->rootSid = newroot;
00449
00450 sect->setObsolete();
00451
00452 releaseBufferedReduceMsgs(sect);
00453
00454 CProxy_CkMulticastMgr mp(thisgroup);
00455 for (i=0; i<sect->children.length(); i++)
00456 mp[sect->children[i].get_pe()].teardown(sect->children[i]);
00457 }
00458
00459
00460
00461
00462 void CkMulticastMgr::freeup(CkSectionInfo cookie)
00463 {
00464 mCastEntry *sect = (mCastEntry *)cookie.get_val();
00465 CProxy_CkMulticastMgr mp(thisgroup);
00466
00467 while (sect)
00468 {
00469
00470 for (int i=0; i<sect->children.length(); i++)
00471 mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
00472
00473 DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
00474 mCastEntry *oldc= sect->oldc;
00475 delete sect;
00476 sect = oldc;
00477 }
00478 }
00479
00480
00481
00482
00483 void CkMulticastMgr::setup(multicastSetupMsg *msg)
00484 {
00485 int i,j;
00486 mCastEntry *entry;
00487 CkArrayID aid = msg->rootSid.get_aid();
00488 if (msg->parent.get_pe() == CkMyPe())
00489 entry = (mCastEntry *)msg->rootSid.get_val();
00490 else
00491 entry = new mCastEntry(aid);
00492 entry->aid = aid;
00493 entry->pe = CkMyPe();
00494 entry->rootSid = msg->rootSid;
00495 entry->parentGrp = msg->parent;
00496
00497 DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
00498 entry->red.redNo = msg->redNo;
00499
00500
00501 int numpes = CkNumPes();
00502 arrayIndexPosList *lists = new arrayIndexPosList[numpes];
00503
00504 for (i=0; i<msg->nIdx; i++)
00505 {
00506 int lastKnown = msg->lastKnown[i];
00507
00508 if (lastKnown == CkMyPe())
00509 entry->localElem.insertAtEnd(msg->arrIdx[i]);
00510
00511 else
00512 lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
00513 }
00514
00515 CkVec<int> mySubTreePEs;
00516 mySubTreePEs.reserve(numpes);
00517
00518 mySubTreePEs.push_back(CkMyPe());
00519
00520 for (i=0; i<numpes; i++)
00521 {
00522 if (i==CkMyPe()) continue;
00523 if (lists[i].size())
00524 mySubTreePEs.push_back(i);
00525 }
00526
00527 int num = mySubTreePEs.size() - 1, numchild = 0;
00528 if (factor <= 0) numchild = num;
00529 else numchild = num<factor?num:factor;
00530
00531 entry->numChild = numchild;
00532
00533
00534 if (numchild)
00535 {
00536
00537 int *peListPtr = mySubTreePEs.getVec();
00538 topo::SpanningTreeVertex *nextGenInfo;
00539 nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
00540
00541 numchild = nextGenInfo->childIndex.size();
00542 entry->numChild = numchild;
00543
00544
00545
00546 arrayIndexPosList *slots = new arrayIndexPosList[numchild];
00547
00548
00549 for (i=0; i < numchild; i++)
00550 {
00551
00552 int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
00553 if (i < numchild-1)
00554 childEndIndex = nextGenInfo->childIndex[i+1];
00555 else
00556 childEndIndex = mySubTreePEs.size();
00557
00558 for (j = childStartIndex; j < childEndIndex; j++)
00559 {
00560 int pe = mySubTreePEs[j];
00561 for (int k=0; k<lists[pe].size(); k++)
00562 slots[i].push_back(lists[pe][k]);
00563 }
00564 }
00565
00566
00567 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00568 for (i=0; i<numchild; i++)
00569 {
00570
00571 int n = slots[i].length();
00572 multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
00573 m->parent = CkSectionInfo(aid, entry);
00574 m->nIdx = slots[i].length();
00575 m->rootSid = msg->rootSid;
00576 m->redNo = msg->redNo;
00577 for (j=0; j<slots[i].length(); j++)
00578 {
00579 m->arrIdx[j] = slots[i][j].idx;
00580 m->lastKnown[j] = slots[i][j].pe;
00581 }
00582 int childroot = slots[i][0].pe;
00583 DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
00584
00585 mCastGrp[childroot].setup(m);
00586 }
00587 delete [] slots;
00588 delete nextGenInfo;
00589 }
00590
00591 else
00592 {
00593 childrenReady(entry);
00594 }
00595 delete [] lists;
00596 delete msg;
00597 }
00598
00599
00600
00601
00602 void CkMulticastMgr::childrenReady(mCastEntry *entry)
00603 {
00604
00605 entry->setReady();
00606 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00607 DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
00608 if (entry->hasParent())
00609 mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
00610 #if SPLIT_MULTICAST
00611
00612 while (!entry->packetBuf.isEmpty())
00613 {
00614 mCastPacket *packet = entry->packetBuf.deq();
00615 packet->cookie.get_val() = entry;
00616 mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->offset, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
00617 delete [] packet->data;
00618 delete packet;
00619 }
00620 #endif
00621
00622 while (!entry->msgBuf.isEmpty())
00623 {
00624 multicastGrpMsg *newmsg = entry->msgBuf.deq();
00625 DEBUGF(("[%d] release buffer %p ep:%d\n", CkMyPe(), newmsg, newmsg->ep));
00626 newmsg->_cookie.get_val() = entry;
00627 mCastGrp[CkMyPe()].recvMsg(newmsg);
00628 }
00629
00630 releaseFutureReduceMsgs(entry);
00631 }
00632
00633
00634
00635
00636 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
00637 {
00638 mCastEntry *entry = (mCastEntry *)sid.get_val();
00639 entry->children.push_back(child);
00640 if (entry->children.length() == entry->numChild) {
00641 childrenReady(entry);
00642 }
00643 }
00644
00645
00646
00647
00648
00649
00650 void CkMulticastMgr::rebuild(CkSectionInfo §Id)
00651 {
00652
00653 mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
00654 CkAssert(curCookie->pe == CkMyPe());
00655
00656 while (curCookie->newc) curCookie = curCookie->newc;
00657 if (curCookie->isObsolete()) return;
00658
00659
00660 mCastEntry *newCookie = new mCastEntry(curCookie);
00661
00662
00663 newCookie->oldc = curCookie;
00664 curCookie->newc = newCookie;
00665
00666 sectId.get_val() = newCookie;
00667
00668 DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
00669
00670 curCookie->setObsolete();
00671
00672 resetCookie(sectId);
00673 }
00674
00675 void CkMulticastMgr::resetCookie(CkSectionInfo s)
00676 {
00677 mCastEntry *newCookie = (mCastEntry*)s.get_val();
00678 mCastEntry *oldCookie = newCookie->oldc;
00679
00680
00681 DEBUGF(("reset: oldc: %p\n", oldCookie));
00682 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00683 int mype = CkMyPe();
00684 mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
00685
00686
00687 initCookie(s);
00688 }
00689
00690 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
00691 {
00692 DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
00693
00694 ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
00695 for (int i=0; i< sid._nElems-1; i++) {
00696 CProxyElement_ArrayBase ap(a, sid._elems[i]);
00697 void *newMsg=CkCopyMsg((void **)&m);
00698 ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
00699 }
00700 if (sid._nElems > 0) {
00701 CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
00702 ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
00703 }
00704 }
00705
00706 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
00707 {
00708 for (int snum = 0; snum < nsid; snum++) {
00709 void *msgCopy = m;
00710 if (nsid - snum > 1)
00711 msgCopy = CkCopyMsg(&m);
00712 sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
00713 }
00714 }
00715
00716
00717
00718 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
00719 {
00720 DEBUGF(("ArraySectionSend\n"));
00721 multicastGrpMsg *msg = (multicastGrpMsg *)m;
00722 msg->ep = ep;
00723 CkSectionInfo &s = sid->_cookie;
00724 mCastEntry *entry;
00725
00726
00727 if (s.get_pe() == CkMyPe()) {
00728 entry = (mCastEntry *)s.get_val();
00729 if (NULL == entry)
00730 CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
00731
00732
00733 if (entry->newc) {
00734 do { entry=entry->newc; } while (entry->newc);
00735 s.get_val() = entry;
00736 }
00737
00738 #if CMK_LBDB_ON
00739
00740 envelope *env = UsrToEnv(msg);
00741 const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
00742 LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
00743 #endif
00744
00745
00746 if (entry->needRebuild == 1) {
00747 msg->_cookie = s;
00748 SimpleSend(ep, msg, s.get_aid(), *sid, opts);
00749 entry->needRebuild = 2;
00750 return;
00751 }
00752
00753 else if (entry->needRebuild == 2) rebuild(s);
00754 }
00755
00756 else {
00757
00758 CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
00759 }
00760
00761
00762 msg->_cookie = s;
00763
00764 #if SPLIT_MULTICAST
00765
00766 register envelope *env = UsrToEnv(m);
00767 CkPackMessage(&env);
00768 int totalsize = env->getTotalsize();
00769 int packetSize = 0;
00770 int totalcount = 0;
00771 if(totalsize < split_threshold){
00772 packetSize = totalsize;
00773 totalcount = 1;
00774 }else{
00775 packetSize = split_size;
00776 totalcount = totalsize/split_size;
00777 if(totalsize%split_size) totalcount++;
00778
00779
00780
00781 }
00782 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00783 int sizesofar = 0;
00784 char *data = (char*) env;
00785 if (totalcount == 1) {
00786
00787 if (s.get_pe() == CkMyPe()) {
00788 CkUnpackMessage(&env);
00789 msg = (multicastGrpMsg *)EnvToUsr(env);
00790 recvMsg(msg);
00791 }
00792
00793 else {
00794 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00795 msg = (multicastGrpMsg *)EnvToUsr(env);
00796 mCastGrp[s.get_pe()].recvMsg(msg);
00797 }
00798 return;
00799 }
00800 for (int i=0; i<totalcount; i++) {
00801 int mysize = packetSize;
00802 if (mysize + sizesofar > totalsize) {
00803 mysize = totalsize-sizesofar;
00804 }
00805
00806 mCastGrp[s.get_pe()].recvPacket(s, sizesofar, mysize, data, i, totalcount, totalsize, 0);
00807 sizesofar += mysize;
00808 data += mysize;
00809 }
00810 CmiFree(env);
00811 #else
00812 if (s.get_pe() == CkMyPe()) {
00813 recvMsg(msg);
00814 }
00815 else {
00816 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00817 mCastGrp[s.get_pe()].recvMsg(msg);
00818 }
00819 #endif
00820 }
00821
00822 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int offset, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
00823 {
00824 int i;
00825 mCastEntry *entry = (mCastEntry *)_cookie.get_val();
00826
00827
00828 if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
00829 char *newdata = new char[n];
00830 memcpy(newdata, data, n);
00831 entry->packetBuf.enq(new mCastPacket(_cookie, offset, n, newdata, seqno, count, totalsize));
00832
00833 return;
00834 }
00835
00836
00837
00838
00839
00840 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00841 for (i=0; i<entry->children.length(); i++) {
00842 mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], offset, n, data, seqno, count, totalsize, 0);
00843 }
00844
00845 if (entry->asm_msg == NULL) {
00846 CmiAssert(entry->asm_fill == 0);
00847 entry->asm_msg = (char *)CmiAlloc(totalsize);
00848 }
00849 memcpy(entry->asm_msg+offset, data, n);
00850 entry->asm_fill += n;
00851 if (entry->asm_fill == totalsize) {
00852 CkUnpackMessage((envelope **)&entry->asm_msg);
00853 multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
00854 msg->_cookie = _cookie;
00855
00856 sendToLocal(msg);
00857 entry->asm_msg = NULL;
00858 entry->asm_fill = 0;
00859 }
00860
00861 }
00862
00863 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
00864 {
00865 int i;
00866 CkSectionInfo §ionInfo = msg->_cookie;
00867 mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
00868 CmiAssert(entry->getAid() == sectionInfo.get_aid());
00869
00870 if (entry->notReady()) {
00871 DEBUGF(("entry not ready, enq buffer %p\n", msg));
00872 entry->msgBuf.enq(msg);
00873 return;
00874 }
00875
00876
00877
00878 CProxy_CkMulticastMgr mCastGrp(thisgroup);
00879 for (i=0; i<entry->children.length(); i++) {
00880 multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
00881 newmsg->_cookie = entry->children[i];
00882 mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
00883 }
00884
00885 sendToLocal(msg);
00886 }
00887
00888 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
00889 {
00890 int i;
00891 CkSectionInfo §ionInfo = msg->_cookie;
00892 mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
00893 CmiAssert(entry->getAid() == sectionInfo.get_aid());
00894 CkGroupID aid = sectionInfo.get_aid();
00895
00896
00897 int nLocal = entry->localElem.length();
00898 DEBUGF(("[%d] send to local %d elems\n", CkMyPe(), nLocal));
00899 for (i=0; i<nLocal-1; i++) {
00900 CProxyElement_ArrayBase ap(aid, entry->localElem[i]);
00901 if (_entryTable[msg->ep]->noKeep) {
00902 CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
00903 }
00904 else {
00905
00906 multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
00907 ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
00908 }
00909
00910
00911
00912
00913
00914
00915 }
00916 if (nLocal) {
00917 CProxyElement_ArrayBase ap(aid, entry->localElem[nLocal-1]);
00918 ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
00919
00920 }
00921 else {
00922 CkAssert (entry->rootSid.get_pe() == CkMyPe());
00923 delete msg;
00924 }
00925 }
00926
00927
00928
00929 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
00930 {
00931 CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
00932 if (CkMcastBaseMsg::checkMagic(m) == 0) {
00933 CmiPrintf("ERROR: This is not a CkMulticast message!\n");
00934 CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
00935 }
00936
00937 if (m->gpe() != -1) {
00938 id.get_type() = MulticastMsg;
00939 id.get_pe() = m->gpe();
00940 id.get_val() = m->cookie();
00941 }
00942
00943 }
00944
00945
00946
00947 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
00948 {
00949 CkCallback *sectionCB;
00950 int numSubSections = proxy.ckGetNumSubSections();
00951
00952 if (numSubSections > 1)
00953 {
00963
00964 ck::impl::XArraySectionReducer *red =
00965 new ck::impl::XArraySectionReducer(numSubSections, cb);
00966
00967 sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
00968 }
00969
00970 else
00971 sectionCB = cb;
00972
00973 for (int i=0; i<numSubSections; i++)
00974 {
00975 CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
00976 mCastEntry *entry = (mCastEntry *)sInfo.get_val();
00977 entry->red.storedCallback = sectionCB;
00978 }
00979 }
00980
00981 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
00982 {
00983 CkSectionInfo &id = proxy.ckGetSectionInfo();
00984 mCastEntry *entry = (mCastEntry *)id.get_val();
00985 entry->red.storedClient = fn;
00986 entry->red.storedClientParam = param;
00987 }
00988
00989 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
00990 {
00991 CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
00992 msg->reducer = type;
00993 msg->sid = id;
00994 msg->sourceFlag = 1;
00995 msg->redNo = id.get_redNo();
00996 msg->gcount = 1;
00997 msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
00998 msg->callback = cb;
00999 msg->userFlag=userFlag;
01000 return msg;
01001 }
01002
01003
01004
01005 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
01006 {
01007 CkCallback cb;
01008 contribute(dataSize, data, type, id, cb, userFlag, fragSize);
01009 }
01010
01011
01012 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
01013 {
01014 if (id.get_val() == NULL || id.get_redNo() == -1)
01015 CmiAbort("contribute: SectionID is not initialized\n");
01016
01017 int nFrags;
01018 if (-1 == fragSize) {
01019 nFrags = 1;
01020 fragSize = dataSize;
01021 }
01022 else {
01023 CmiAssert (dataSize >= fragSize);
01024 nFrags = dataSize/fragSize;
01025 if (dataSize%fragSize) nFrags++;
01026 }
01027
01028 if (MAXFRAGS < nFrags) {
01029 CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
01030 CmiAbort ("frag size too small\n");
01031 }
01032
01033 int mpe = id.get_pe();
01034 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01035
01036
01037 int fSize = fragSize;
01038 for (int i=0; i<nFrags; i++) {
01039 if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
01040 fSize = dataSize%fragSize;
01041 }
01042
01043 CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
01044
01045
01046 msg->reducer = type;
01047 msg->sid = id;
01048 msg->nFrags = nFrags;
01049 msg->fragNo = i;
01050 msg->sourceFlag = 1;
01051 msg->redNo = id.get_redNo();
01052 msg->gcount = 1;
01053 msg->rebuilt = (mpe == CkMyPe())?0:1;
01054 msg->callback = cb;
01055 msg->userFlag = userFlag;
01056
01057 mCastGrp[mpe].recvRedMsg(msg);
01058
01059 data = (void*)(((char*)data) + fSize);
01060 }
01061
01062 id.get_redNo()++;
01063 DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
01064 }
01065
01066 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id,
01067 mCastEntry* entry,
01068 reductionInfo& redInfo) {
01069 int i;
01070 int dataSize = 0;
01071 int nFrags = redInfo.msgs[0][0]->nFrags;
01072
01073
01074 if (1 == nFrags) {
01075 CkReductionMsg* msg = redInfo.msgs[0][0];
01076
01077
01078 redInfo.msgs[0].length() = 0;
01079
01080 return msg;
01081 }
01082
01083 for (i=0; i<nFrags; i++) {
01084 dataSize += redInfo.msgs[i][0]->dataSize;
01085 }
01086
01087 CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
01088
01089
01090 msg->redNo = redInfo.msgs[0][0]->redNo;
01091 msg->reducer = redInfo.msgs[0][0]->reducer;
01092 msg->sid = id;
01093 msg->nFrags = nFrags;
01094
01095
01096 msg->sourceFlag = 2;
01097 msg->rebuilt = redInfo.msgs[0][0]->rebuilt;
01098 msg->callback = redInfo.msgs[0][0]->callback;
01099 msg->userFlag = redInfo.msgs[0][0]->userFlag;
01100
01101 byte* data = (byte*)msg->getData ();
01102 for (i=0; i<nFrags; i++) {
01103
01104 memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
01105 data += redInfo.msgs[i][0]->dataSize;
01106
01107
01108 delete redInfo.msgs[i][0];
01109 redInfo.msgs[i].length() = 0;
01110 }
01111
01112 return msg;
01113 }
01114
01115
01116
01117 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
01118 mCastEntry* entry, reductionInfo& redInfo,
01119 int currentTreeUp) {
01120
01121 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01122 reductionMsgs& rmsgs = redInfo.msgs[index];
01123 int dataSize = rmsgs[0]->dataSize;
01124 int i;
01125 int oldRedNo = redInfo.redNo;
01126 int nFrags = rmsgs[0]->nFrags;
01127 int fragNo = rmsgs[0]->fragNo;
01128 int userFlag = rmsgs[0]->userFlag;
01129
01130
01131 CkReduction::reducerType reducer = rmsgs[0]->reducer;
01132 CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
01133 CkAssert(NULL != f);
01134
01135
01136 CkCallback msg_cb;
01137 int rebuilt = 0;
01138 for (i=0; i<rmsgs.length(); i++) {
01139 if (rmsgs[i]->rebuilt) rebuilt = 1;
01140 if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
01141 }
01142
01143
01144 CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
01145 newmsg->redNo = redInfo.redNo;
01146 newmsg->nFrags = nFrags;
01147 newmsg->fragNo = fragNo;
01148 newmsg->userFlag = userFlag;
01149 newmsg->reducer = reducer;
01150
01151
01152 redInfo.npProcessed ++;
01153
01154
01155 for (i=0; i<rmsgs.length(); i++)
01156 if (rmsgs[i]!=newmsg) delete rmsgs[i];
01157 rmsgs.length() = 0;
01158
01159
01160 if (entry->hasParent()) {
01161
01162 newmsg->sid = entry->parentGrp;
01163 newmsg->sourceFlag = 2;
01164 newmsg->redNo = oldRedNo;
01165 newmsg->gcount = redInfo.gcount [index];
01166 newmsg->rebuilt = rebuilt;
01167 newmsg->callback = msg_cb;
01168 DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
01169 mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
01170 } else {
01171 newmsg->sid = id;
01172
01173 rmsgs.push_back (newmsg);
01174
01175 if (redInfo.npProcessed == nFrags) {
01176
01177 newmsg = combineFrags (id, entry, redInfo);
01178
01179 CkSetRefNum(newmsg, userFlag);
01180
01181 if ( !msg_cb.isInvalid() )
01182 msg_cb.send(newmsg);
01183 else if (redInfo.storedCallback != NULL)
01184 redInfo.storedCallback->send(newmsg);
01185 else if (redInfo.storedClient != NULL) {
01186 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
01187 delete newmsg;
01188 }
01189 else
01190 CmiAbort("Did you forget to register a reduction client?");
01191
01192 DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
01193
01194 if (currentTreeUp) {
01195 if (entry->oldc) {
01196
01197 mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
01198 entry->oldc = NULL;
01199 }
01200 if (entry->hasOldtree()) {
01201
01202 int oldpe = entry->oldtree.pe;
01203 mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
01204 entry->oldtree.clear();
01205 }
01206 }
01207
01208 if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
01209 }
01210 }
01211 }
01212
01213
01214
01223 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
01224 {
01225 int i;
01227 CkSectionInfo id = msg->sid;
01229 mCastEntry *entry = (mCastEntry *)id.get_val();
01230 CmiAssert(entry!=NULL);
01231
01232 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01233
01234 int updateReduceNo = 0;
01235
01236
01238 if (entry->isObsolete()) {
01239
01240 DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.get_pe()));
01241
01243 if (!entry->hasParent()) {
01244 mCastEntry *newentry = entry->newc;
01245 while (newentry && newentry->newc) newentry=newentry->newc;
01246 if (newentry) entry = newentry;
01247 CmiAssert(entry!=NULL);
01248 }
01249
01251 if (!entry->hasParent() && !entry->isObsolete()) {
01253 msg->sourceFlag = 0;
01255 updateReduceNo = 1;
01256 }
01258 else {
01259
01260 CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
01261
01262 msg->sid = entry->rootSid;
01263 msg->sourceFlag = 0;
01264
01265 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01266 return;
01267 }
01268 }
01269
01271 reductionInfo &redInfo = entry->red;
01272
01273
01275 if (msg->redNo < redInfo.redNo) {
01276 CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
01277 CmiAbort("Could never happen! \n");
01278 }
01279
01280
01282 if (entry->notReady() || msg->redNo > redInfo.redNo) {
01283 DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
01284 redInfo.futureMsgs.push_back(msg);
01285 return;
01286 }
01287
01288
01289 const int index = msg->fragNo;
01290
01291 if (msg->sourceFlag == 1) {
01292 redInfo.lcount [index] ++;
01293 }
01294
01295 if (msg->sourceFlag == 2) {
01296 redInfo.ccount [index] ++;
01297 }
01298
01299 redInfo.gcount [index] += msg->gcount;
01300
01301
01302 if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
01303 CmiAbort("Reduction data are not of same length!");
01304
01305
01306
01307 redInfo.msgs [index].push_back(msg);
01308
01309
01311 int currentTreeUp = 0;
01312 if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
01313 currentTreeUp = 1;
01314
01316 int mixTreeUp = 0;
01317 if (!entry->hasParent()) {
01318 mixTreeUp = 1;
01319 for (int i=0; i<msg->nFrags; i++)
01320 if (entry->allElem.length() != redInfo.gcount [i])
01321 mixTreeUp = 0;
01322 }
01323
01324
01326 if (currentTreeUp || mixTreeUp)
01327 {
01328 const int nFrags = msg->nFrags;
01330 reduceFragment (index, id, entry, redInfo, currentTreeUp);
01331
01332
01333
01334 if (updateReduceNo)
01335 mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
01336
01338 if (redInfo.npProcessed == nFrags) {
01339
01341 entry->incReduceNo();
01342
01344 for (i=0; i<nFrags; i++) {
01345 redInfo.lcount [i] = 0;
01346 redInfo.ccount [i] = 0;
01347 redInfo.gcount [i] = 0;
01348 }
01349 redInfo.npProcessed = 0;
01351 releaseFutureReduceMsgs(entry);
01352 }
01353 }
01354 }
01355
01356
01357
01358 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
01359 {
01360 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01361
01362 for (int i=0; i<entry->red.futureMsgs.length(); i++) {
01363 DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
01364 mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
01365 }
01366 entry->red.futureMsgs.length() = 0;
01367 }
01368
01369
01370
01371
01372 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
01373 {
01374 int i;
01375 CProxy_CkMulticastMgr mCastGrp(thisgroup);
01376
01377 for (int j=0; j<MAXFRAGS; j++) {
01378 for (i=0; i<entry->red.msgs[j].length(); i++) {
01379 CkReductionMsg *msg = entry->red.msgs[j][i];
01380 DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
01381 msg->sid = entry->rootSid;
01382 msg->sourceFlag = 0;
01383 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01384 }
01385 entry->red.msgs[j].length() = 0;
01386 }
01387
01388
01389 for (i=0; i<entry->red.futureMsgs.length(); i++) {
01390 CkReductionMsg *msg = entry->red.futureMsgs[i];
01391 DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
01392 msg->sid = entry->rootSid;
01393 msg->sourceFlag = 0;
01394 mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
01395 }
01396 entry->red.futureMsgs.length() = 0;
01397 }
01398
01399
01400
01401 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
01402 {
01403 DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
01404 if (entry->red.redNo < red)
01405 entry->red.redNo = red;
01406
01407 CProxy_CkMulticastMgr mp(thisgroup);
01408 for (int i=0; i<entry->children.length(); i++) {
01409 mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
01410 }
01411
01412 releaseFutureReduceMsgs(entry);
01413 }
01414
01415 #include "CkMulticast.def.h"
01416