00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 #include <limits>
00051
00052 #include "charm++.h"
00053 #include "ck.h"
00054
00055 #include "pathHistory.h"
00056
00057 #if CMK_DEBUG_REDUCTIONS
00058
00059
00060 #define DEBR(x) CkPrintf x
00061 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
00062 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
00063
00064 #define DEBN(x) CkPrintf x
00065 #define AAN "Red Node%d "
00066 #define ABN ,CkMyNode()
00067
00068
00069 #define RED_DEB(x) //CkPrintf x
00070 #define DEBREVAC(x) CkPrintf x
00071 #define DEB_TUPLE(x) CkPrintf x
00072 #else
00073
00074 #define DEBR(x) // CkPrintf x
00075 #define DEBRMLOG(x) CkPrintf x
00076 #define AA
00077 #define AB
00078 #define DEBN(x) //CkPrintf x
00079 #define RED_DEB(x) //CkPrintf x
00080 #define DEBREVAC(x) //CkPrintf x
00081 #define DEB_TUPLE(x) //CkPrintf x
00082 #endif
00083
00084 #ifndef INT_MAX
00085 #define INT_MAX 2147483647
00086 #endif
00087
00088 extern bool _inrestart;
00089
00090 CkReductionTypesExt charm_reducers;
00091 extern int (*PyReductionExt)(char**, int*, int, char**);
00092
00093 Group::Group():thisIndex(CkMyPe())
00094 {
00095 if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
00096
00097 creatingContributors();
00098 contributorStamped(&reductionInfo);
00099 contributorCreated(&reductionInfo);
00100 doneCreatingContributors();
00101 }
00102
00103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg),thisIndex(CkMyPe())
00104 {
00105 creatingContributors();
00106 contributorStamped(&reductionInfo);
00107 contributorCreated(&reductionInfo);
00108 doneCreatingContributors();
00109 }
00110
00111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
00112 ((CkReductionMgr *)this),
00113 reductionInfo,false)
00114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
00115
00116 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
00117 ((CkReductionMgr *)this),
00118 reductionInfo,false)
00119
00120
00121
00122 CkGroupInitCallback::CkGroupInitCallback(void) {}
00123
00124
00125
00126
00127 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
00128 {
00129 m->call();
00130 delete m;
00131 }
00132
00133
00134
00135
00136
00137 CkGroupReadyCallback::CkGroupReadyCallback(void)
00138 {
00139 _isReady = false;
00140 }
00141 void
00142 CkGroupReadyCallback::callBuffered(void)
00143 {
00144 int n = _msgs.length();
00145 for(int i=0;i<n;i++)
00146 {
00147 CkGroupCallbackMsg *msg = _msgs.deq();
00148 msg->call();
00149 delete msg;
00150 }
00151 }
00152 void
00153 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
00154 {
00155 if(_isReady) {
00156 msg->call();
00157 delete msg;
00158 } else {
00159 _msgs.enq(msg);
00160 }
00161 }
00162
00163 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
00164 :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
00165 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
00166 {
00167 CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
00168 CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
00169 b->fn(b->param,m->getSize(),m->getData());
00170 delete m;
00171 }
00172
00174
00175
00176
00177
00178
00179
00180
00181 CkReductionMgr::CkReductionMgr()
00182 :
00183 thisProxy(thisgroup),
00184 isDestroying(false)
00185 {
00186 #ifdef BINOMIAL_TREE
00187 init_BinomialTree();
00188 #elif CMK_BIGSIM_CHARM
00189 init_BinaryTree();
00190 #else
00191 init_TopoTree();
00192 #endif
00193 redNo=0;
00194 completedRedNo = -1;
00195 inProgress=false;
00196 creating=false;
00197 startRequested=false;
00198 gcount=lcount=0;
00199 nContrib=nRemote=0;
00200 is_inactive = false;
00201 maxStartRequest=0;
00202 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00203 numImmigrantRecObjs = 0;
00204 numEmigrantRecObjs = 0;
00205 #endif
00206 disableNotifyChildrenStart = false;
00207
00208 barrier_gCount=0;
00209 barrier_nSource=0;
00210 barrier_nContrib=barrier_nRemote=0;
00211
00212 DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
00213 }
00214
00215 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
00216 , isDestroying(false)
00217 {
00218 numKids = -1;
00219 redNo=0;
00220 completedRedNo = -1;
00221 inProgress=false;
00222 creating=false;
00223 startRequested=false;
00224 gcount=lcount=0;
00225 nContrib=nRemote=0;
00226 is_inactive = false;
00227 maxStartRequest=0;
00228 DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
00229
00230 barrier_gCount=0;
00231 barrier_nSource=0;
00232 barrier_nContrib=barrier_nRemote=0;
00233
00234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00235 numImmigrantRecObjs = 0;
00236 numEmigrantRecObjs = 0;
00237 #endif
00238
00239 }
00240
00241 CkReductionMgr::~CkReductionMgr()
00242 {
00243 }
00244
00245 void CkReductionMgr::flushStates()
00246 {
00247
00248 redNo=0;
00249 completedRedNo = -1;
00250 inProgress=false;
00251 creating=false;
00252 startRequested=false;
00253 nContrib=nRemote=0;
00254 maxStartRequest=0;
00255
00256 while (!msgs.isEmpty()) { delete msgs.deq(); }
00257 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
00258 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
00259 while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
00260
00261 adjVec.clear();
00262
00263 }
00264
00266
00267
00268 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
00269 {
00270 DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
00271
00272 if (CkMyPe()!=0)
00273 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
00274 storedCallback=*cb;
00275 }
00276
00278
00279
00280
00281 void CkReductionMgr::creatingContributors(void)
00282 {
00283 DEBR((AA "Creating contributors...\n" AB));
00284 creating=true;
00285 }
00286 void CkReductionMgr::doneCreatingContributors(void)
00287 {
00288 DEBR((AA "Done creating contributors...\n" AB));
00289 creating=false;
00290 checkIsActive();
00291 if (startRequested) startReduction(redNo,CkMyPe());
00292 finishReduction();
00293 }
00294
00295
00296 void CkReductionMgr::contributorStamped(contributorInfo *ci)
00297 {
00298 DEBR((AA "Contributor %p stamped\n" AB,ci));
00299
00300 gcount++;
00301 if (inProgress)
00302 {
00303 ci->redNo=redNo+1;
00304 adj(redNo).gcount--;
00305 } else
00306 ci->redNo=redNo;
00307 }
00308
00309
00310 void CkReductionMgr::contributorCreated(contributorInfo *ci)
00311 {
00312 DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
00313
00314 lcount++;
00315
00316 for (int r=redNo;r<ci->redNo;r++)
00317 adj(r).lcount--;
00318 checkIsActive();
00319 }
00320
00321
00322
00323
00324
00325
00326
00327 void CkReductionMgr::contributorDied(contributorInfo *ci)
00328 {
00329 #if CMK_MEM_CHECKPOINT
00330
00331 if (CkInRestarting()) return;
00332 #endif
00333
00334 if (isDestroying) return;
00335
00336 DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
00337
00338 gcount--;
00339
00340 if (ci->redNo<redNo)
00341 {
00342
00343 DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
00344 for (int r=ci->redNo;r<redNo;r++)
00345 thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
00346 }
00347
00348
00349 int r;
00350 for (r=redNo;r<ci->redNo;r++)
00351 {
00352 DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
00353 adj(r).gcount++;
00354 }
00355
00356 lcount--;
00357
00358 for (r=redNo;r<ci->redNo;r++)
00359 adj(r).lcount++;
00360
00361
00362
00363 if (ci->redNo <= redNo) {
00364 checkIsActive();
00365 }
00366 finishReduction();
00367 }
00368
00369
00370 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
00371 {
00372 DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
00373 lcount--;
00374
00375 for (int r=redNo;r<ci->redNo;r++)
00376 adj(r).lcount++;
00377
00378
00379 if (ci->redNo <= redNo) {
00380 checkIsActive();
00381 }
00382 finishReduction();
00383 }
00384
00385
00386 void CkReductionMgr::contributorArriving(contributorInfo *ci)
00387 {
00388 DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
00389 lcount++;
00390 #if CMK_MEM_CHECKPOINT
00391
00392
00393 if (CkInRestarting()) return;
00394 #endif
00395
00396 for (int r=redNo;r<ci->redNo;r++)
00397 adj(r).lcount--;
00398
00399
00400 if (ci->redNo == redNo) {
00401 checkIsActive();
00402 }
00403 }
00404
00405
00406
00407
00408 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
00409 {
00410 #if CMK_BIGSIM_CHARM
00411 _TRACE_BG_TLINE_END(&(m->log));
00412 #endif
00413 DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
00414 m->redNo=ci->redNo++;
00415 m->sourceFlag=-1;
00416 m->gcount=0;
00417
00418 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00419
00420
00421 if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
00422
00423
00424 envelope *env = UsrToEnv(m);
00425 env->flags = env->flags | CK_BYPASS_DET_MLOG;
00426 thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
00427 return;
00428 }
00429
00430 Chare *oldObj = CpvAccess(_currentObj);
00431 CpvAccess(_currentObj) = this;
00432
00433
00434 addContribution(m);
00435
00436 CpvAccess(_currentObj) = oldObj;
00437 #else
00438 addContribution(m);
00439 #endif
00440 }
00441
00442 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00443 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
00444
00445
00446
00447 envelope *env = UsrToEnv(m);
00448 env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
00449
00450
00451 addContribution(m);
00452 }
00453 #else
00454 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
00455 #endif
00456
00457 void CkReductionMgr::checkIsActive() {
00458 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
00459 return;
00460 #endif
00461
00462
00463 std::map<int, int>::iterator it;
00464 int c_inactive = 0;
00465 for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
00466 if (it->second <= redNo) {
00467 DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
00468 c_inactive++;
00469 }
00470 }
00471 DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
00472 numKids, c_inactive, lcount));
00473
00474 if(numKids == c_inactive && lcount == 0) {
00475 if(!is_inactive) {
00476 informParentInactive();
00477 }
00478 is_inactive = true;
00479 } else if(is_inactive) {
00480 is_inactive = false;
00481 }
00482 }
00483
00484
00485
00486
00487 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
00488
00489
00490
00491 if (inProgress && redNo == red_no) {
00492 thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
00493 }
00494
00495 std::map<int, int>::iterator it;
00496 it = inactiveList.find(id);
00497 if (it == inactiveList.end()) {
00498 inactiveList.insert(std::pair<int, int>(id, red_no));
00499 } else {
00500 it->second = red_no;
00501 }
00502
00503 if (redNo == red_no) {
00504 checkIsActive();
00505 }
00506 }
00507
00508
00509
00510
00511
00512 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
00513 std::map<int, int>::iterator it;
00514 it = inactiveList.find(id);
00515 if (it == inactiveList.end()) {
00516 return;
00517 }
00518 if (it->second <= red_no) {
00519 inactiveList.erase(it);
00520 DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
00521 id, red_no));
00522 }
00523 }
00524
00525
00526 void CkReductionMgr::informParentInactive() {
00527 if (hasParent()) {
00528 DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
00529 thisProxy[treeParent()].AddToInactiveList(
00530 new CkReductionInactiveMsg(CkMyPe(), redNo));
00531 }
00532 }
00533
00534
00535
00536
00537
00538 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
00539 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
00540 for (int k=0;k<treeKids();k++)
00541 {
00542 DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
00543 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
00544 }
00545 #else
00546 std::map<int, int>::iterator it;
00547 for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
00548 if (it->second <= red_no) {
00549 DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
00550 it->first));
00551 thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
00552 }
00553 }
00554 #endif
00555 }
00556
00557
00559
00560 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
00561 {
00562 if(CkMyPe()==0){
00563
00564
00565
00566 }
00567 DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
00568 int srcPE = (UsrToEnv(m))->getSrcPe();
00569 if (isPresent(m->num) && !inProgress)
00570 {
00571 DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
00572 startReduction(m->num,srcPE);
00573 finishReduction();
00574 } else if (isFuture(m->num)){
00575
00576 DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
00577 if(maxStartRequest < m->num){
00578 maxStartRequest = m->num;
00579 }
00580
00581
00582 }
00583 else
00584 DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
00585 delete m;
00586 }
00587
00588
00589
00590 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
00591 {
00592 #if CMK_BIGSIM_CHARM
00593 _TRACE_BG_TLINE_END(&(m->log));
00594 #endif
00595 addContribution(m);
00596 }
00597
00598
00599 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
00600 {
00601 if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
00602 DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
00603
00604 adj(m->num).gcount--;
00605 finishReduction();
00606 delete m;
00607 }
00608
00610 void CkReductionMgr::startReduction(int number,int srcPE)
00611 {
00612 if (isFuture(number)){ return;}
00613 if (isPast(number)) {return;}
00614 if (inProgress){
00615 DEBR((AA "This reduction is already in progress\n" AB));
00616 return;
00617 }
00618 if (creating)
00619 {
00620 DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
00621 startRequested=true;
00622 return;
00623 }
00624
00625
00626 DEBR((AA "Starting reduction #%d %d %d \n" AB,redNo,completedRedNo,number));
00627 inProgress=true;
00628
00629
00630 #if CMK_FAULT_EVAC
00631 if(!CmiNodeAlive(CkMyPe())){
00632 return;
00633 }
00634 #endif
00635
00636
00637
00638
00639 if(disableNotifyChildrenStart) return;
00640
00641
00642 sendReductionStartingToKids(redNo);
00643 }
00644
00645
00646 void CkReductionMgr::addContribution(CkReductionMsg *m)
00647 {
00648 if (isPast(m->redNo))
00649 {
00650 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00651 CmiAbort("this version should not have late migrations");
00652 #else
00653
00654 DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
00655
00656
00657 thisProxy[0].LateMigrantMsg(m);
00658 #endif
00659 }
00660 else if (isFuture(m->redNo)) {
00661 DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
00662 futureMsgs.enq(m);
00663 } else {
00664 DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
00665
00666 startReduction(m->redNo,CkMyPe());
00667 msgs.enq(m);
00668 nContrib++;
00669 finishReduction();
00670 }
00671 }
00672
00676 void CkReductionMgr::finishReduction(void)
00677 {
00678
00679 DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
00680 if ((!inProgress) || creating){
00681 DEBR((AA "Either not in Progress or creating\n" AB));
00682 return;
00683 }
00684
00685 bool partialReduction = false;
00686
00687
00688 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00689 if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
00690 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00691 partialReduction = true;
00692 }
00693 else {
00694 DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00695 return;
00696 }
00697 }
00698 #else
00699 if (nContrib<(lcount+adj(redNo).lcount)){
00700 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00701 partialReduction = true;
00702 }
00703 else {
00704 DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00705 return;
00706 }
00707 }
00708 #endif
00709
00710 if (nRemote<treeKids()) {
00711 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00712 partialReduction = true;
00713 }
00714 else {
00715 DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
00716 return;
00717 }
00718 }
00719
00720
00721 DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00722 #if CMK_BIGSIM_CHARM
00723 _TRACE_BG_END_EXECUTE(1);
00724 void* _bgParentLog = NULL;
00725 _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
00726 #endif
00727 CkReductionMsg *result=reduceMessages(msgs);
00728 result->fromPE = CkMyPe();
00729 result->redNo=redNo;
00730 DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,result->gcount,result->sourceFlag));
00731
00732 if (partialReduction) {
00733 msgs.enq(result);
00734 return;
00735 }
00736
00737 if (hasParent())
00738 {
00739 DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
00740 DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
00741 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00742 result->gcount+=gcount+adj(redNo).gcount;
00743 #else
00744 result->gcount+=gcount+adj(redNo).gcount;
00745 #endif
00746 thisProxy[treeParent()].RecvMsg(result);
00747 }
00748 else
00749 {
00750 DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
00751 int totalElements=result->gcount+gcount+adj(redNo).gcount;
00752 if (totalElements>result->nSources())
00753 {
00754 DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
00755 msgs.enq(result);
00756 return;
00757 } else if (totalElements<result->nSources()) {
00758 DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
00759 #if !defined(_FAULT_CAUSAL_)
00760 CkAbort("ERROR! Too many contributions at root!\n");
00761 #endif
00762 }
00763 DEBR((AA "Passing result to client function\n" AB));
00764 CkSetRefNum(result, result->getUserFlag());
00765 if (!result->callback.isInvalid())
00766 result->callback.send(result);
00767 else if (!storedCallback.isInvalid())
00768 storedCallback.send(result);
00769 else
00770 CkAbort("No reduction client!\n"
00771 "You must register a client with either SetReductionClient or during contribute.\n");
00772 }
00773
00774
00775
00776 redNo++;
00777
00778
00779 checkIsActive();
00780
00781 int i;
00782 completedRedNo++;
00783 adjVec.erase(adjVec.begin());
00784
00785 inProgress=false;
00786 startRequested=false;
00787 nRemote=nContrib=0;
00788
00789
00790 int n=futureMsgs.length();
00791 for (i=0;i<n;i++)
00792 {
00793 CkReductionMsg *m=futureMsgs.deq();
00794 if (m!=NULL)
00795 addContribution(m);
00796 }
00797 n=futureRemoteMsgs.length();
00798 for (i=0;i<n;i++)
00799 {
00800 CkReductionMsg *m=futureRemoteMsgs.deq();
00801 if (m!=NULL) {
00802 RecvMsg(m);
00803 }
00804 }
00805
00806 if(maxStartRequest >= redNo){
00807 startReduction(redNo,CkMyPe());
00808 finishReduction();
00809 }
00810
00811
00812 }
00813
00814
00815 void CkReductionMgr::RecvMsg(CkReductionMsg *m)
00816 {
00817 #if CMK_BIGSIM_CHARM
00818 _TRACE_BG_TLINE_END(&m->log);
00819 #endif
00820 if (isPresent(m->redNo)) {
00821 DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
00822
00823
00824 if (m->nSources() > 0) {
00825 checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
00826 }
00827 startReduction(m->redNo, CkMyPe());
00828 msgs.enq(m);
00829 nRemote++;
00830 finishReduction();
00831 }
00832 else if (isFuture(m->redNo)) {
00833 DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
00834 futureRemoteMsgs.enq(m);
00835 }
00836 else CkAbort("Recv'd late remote contribution!\n");
00837 }
00838
00839 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
00840 int id = m->id;
00841 int last_redno = m->redno;
00842 delete m;
00843
00844 DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
00845 id, last_redno));
00846 checkAndAddToInactiveList(id, last_redno);
00847
00848 finishReduction();
00849 if (last_redno <= redNo) {
00850 checkIsActive();
00851 }
00852 }
00853
00855
00856
00857 countAdjustment &CkReductionMgr::adj(int number)
00858 {
00859 number-=completedRedNo;
00860 number--;
00861 if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
00862
00863 if (adjVec.size() <= number) { adjVec.resize(number + 1); }
00864 return adjVec[number];
00865 }
00866
00867
00868 CkReductionMsg *CkReductionMgr::reduceMessages(CkMsgQ<CkReductionMsg> &msgs)
00869 {
00870 CkReductionMsg *ret=NULL;
00871
00872
00873 CkReduction::reducerType r=CkReduction::invalid;
00874 int msgs_gcount=0;
00875 int msgs_nSources=0;
00876 CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
00877 CkCallback msgs_callback;
00878 int i;
00879 int nMsgs=0;
00880 CkReductionMsg *m;
00881 std::vector<CkReductionMsg *> msgArr(msgs.length());
00882 bool isMigratableContributor;
00883
00884
00885 while (NULL!=(m=msgs.deq()))
00886 {
00887 DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));
00888 msgs_gcount+=m->gcount;
00889 if (m->sourceFlag!=0)
00890 {
00891 msgs_nSources+=m->nSources();
00892 #if CMK_BIGSIM_CHARM
00893 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
00894 #endif
00895
00896
00897 if (nMsgs == 0 || m->reducer != CkReduction::nop) {
00898 msgArr[nMsgs++]=m;
00899 if (!m->callback.isInvalid()) {
00900 #if CMK_ERROR_CHECKING
00901 if(nMsgs > 1 && !(msgs_callback == m->callback)) {
00902 CkPrintf("Mismatched callback details: reducers (%s, %s); callback types (%s, %s)\n",
00903 CkReduction::reducerTable()[r].name, CkReduction::reducerTable()[m->reducer].name,
00904 CkCallback::typeName(msgs_callback.type), CkCallback::typeName(m->callback.type));
00905 CkAbort("mis-matched client callbacks in reduction messages\n");
00906 }
00907 #endif
00908 msgs_callback=m->callback;
00909 }
00910 r=m->reducer;
00911 if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
00912 msgs_userFlag=m->userFlag;
00913 isMigratableContributor=m->isMigratableContributor();
00914 } else {
00915 #if CMK_ERROR_CHECKING
00916 if(!(msgs_callback == m->callback)) {
00917 CkPrintf("Mismatched callback details: reducers (%s, %s); callback types (%s, %s)\n",
00918 CkReduction::reducerTable()[r].name, CkReduction::reducerTable()[m->reducer].name,
00919 CkCallback::typeName(msgs_callback.type), CkCallback::typeName(m->callback.type));
00920 CkAbort("mis-matched client callbacks in reduction messages\n");
00921 }
00922 #endif
00923 delete m;
00924 }
00925 }
00926 else
00927 {
00928 delete m;
00929 }
00930 }
00931
00932 if (nMsgs==0||r==CkReduction::invalid)
00933
00934 ret=CkReductionMsg::buildNew(0,NULL);
00935 else
00936 {
00937
00938 if(nMsgs == 1 &&
00939 msgArr[0]->reducer != CkReduction::set &&
00940 msgArr[0]->reducer != CkReduction::tuple) {
00941 ret = msgArr[0];
00942 }else{
00943 if (msgArr[0]->reducer == CkReduction::nop) {
00944
00945
00946
00947 delete msgArr[0];
00948 msgArr[0] = msgArr[nMsgs - 1];
00949 nMsgs--;
00950 }
00951 CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
00952 ret=(*f)(nMsgs,msgArr.data());
00953 }
00954 ret->reducer=r;
00955 }
00956
00957 #if USE_CRITICAL_PATH_HEADER_ARRAY
00958 #if CRITICAL_PATH_DEBUG > 3
00959 CkPrintf("[%d] combining critical path information from messages in reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
00960 #endif
00961 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
00962 path.updateMax(UsrToEnv(ret));
00963
00964 for (i=0;i<nMsgs;i++){
00965 if (msgArr[i]!=ret){
00966
00967 path.updateMax(UsrToEnv(msgArr[i]));
00968 } else {
00969
00970 }
00971 }
00972 #if CRITICAL_PATH_DEBUG > 3
00973 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
00974 #endif
00975
00976 PathHistoryTableEntry tableEntry(path);
00977 tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
00978
00979 #endif
00980
00981
00982 for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
00983
00984
00985 ret->gcount=msgs_gcount;
00986 ret->userFlag=msgs_userFlag;
00987 ret->callback=msgs_callback;
00988 ret->sourceFlag=msgs_nSources;
00989 ret->setMigratableContributor(isMigratableContributor);
00990
00991 return ret;
00992 }
00993
00994
00995
00996
00997
00998
00999
01000 void CkReductionMgr::pup(PUP::er &p)
01001 {
01002
01003
01004 CkGroupInitCallback::pup(p);
01005 p(redNo);
01006 p(completedRedNo);
01007 p(inProgress); p(creating); p(startRequested);
01008 p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
01009 p|msgs;
01010 p|futureMsgs;
01011 p|futureRemoteMsgs;
01012 p|finalMsgs;
01013 p|adjVec;
01014 p|storedCallback;
01015
01016 if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn)
01017 {
01018 CkReductionClientBundle *bd;
01019 if (p.isUnpacking())
01020 bd = new CkReductionClientBundle;
01021 else
01022 bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
01023 p|*bd;
01024 if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
01025 }
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038 if(p.isUnpacking()){
01039 thisProxy = thisgroup;
01040 maxStartRequest=0;
01041 #ifdef BINOMIAL_TREE
01042 init_BinomialTree();
01043 #elif CMK_BIGSIM_CHARM
01044 init_BinaryTree();
01045 #else
01046 init_TopoTree();
01047 #endif
01048 is_inactive = false;
01049 checkIsActive();
01050 }
01051
01052 DEBR(("[%d,%d] pupping _____________ gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
01053 }
01054
01055 void CkReductionMgr::init_BinaryTree(){
01056 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01057 parent = CkNodeFirst(CkMyNode());
01058 numKids = 0;
01059 } else {
01060 int parentNode = (CkMyNode()-1)/TREE_WID;
01061 parent = CkMyNode() > 0 ? CkNodeFirst(parentNode) : -1;
01062
01063 int firstKid = CkMyNode()*TREE_WID+1;
01064 numKids=CkNumNodes()-firstKid;
01065 if (numKids > TREE_WID) numKids = TREE_WID;
01066 if (numKids < 0) numKids = 0;
01067 for (int i = 0; i < numKids; i++) {
01068 kids.push_back(CkNodeFirst(firstKid+i));
01069 #if CMK_FAULT_EVAC
01070 newKids.push_back(CkNodeFirst(firstKid+i));
01071 #endif
01072 }
01073
01074
01075 numKids += CkNodeSize(CkMyNode())-1;
01076 for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
01077 kids.push_back(CkMyPe()+i);
01078 #if CMK_FAULT_EVAC
01079 newKids.push_back(CkMyPe()+i);
01080 #endif
01081 }
01082 }
01083 }
01084
01085 void CkReductionMgr::init_TopoTree() {
01086 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01087 parent = CkNodeFirst(CkMyNode());
01088 numKids = 0;
01089 } else {
01090 if (_topoTree == NULL) CkAbort("CkReductionMgr:: topo tree has not been calculated\n");
01091
01092 CmiSpanningTreeInfo &t = *_topoTree;
01093 if (t.parent != -1) parent = CkNodeFirst(t.parent);
01094 else parent = -1;
01095 numKids = t.child_count;
01096 for (int i=0; i < numKids; i++) {
01097 int child = CkNodeFirst(t.children[i]);
01098 kids.push_back(child);
01099 #if CMK_FAULT_EVAC
01100 newKids.push_back(child);
01101 #endif
01102 }
01103
01104
01105 numKids += CkNodeSize(CkMyNode())-1;
01106 for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
01107 kids.push_back(CkMyPe()+i);
01108 #if CMK_FAULT_EVAC
01109 newKids.push_back(CkMyPe()+i);
01110 #endif
01111 }
01112 }
01113 }
01114
01115 void CkReductionMgr::init_BinomialTree(){
01116 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01117 parent = CkNodeFirst(CkMyNode());
01118 numKids = 0;
01119 } else {
01120 int depth = (int)ceil((log((double)CkNumNodes())/log((double)2)));
01121 upperSize = (unsigned) 1 << depth;
01122 label = upperSize-CkNodeFirst(CkMyNode())-1;
01123 int p=label;
01124 int count=0;
01125 while( p > 0){
01126 if(p % 2 == 0)
01127 break;
01128 else{
01129 p = p/2;
01130 count++;
01131 }
01132 }
01133 parent = label + (1<<count);
01134 parent = upperSize - 1 - parent;
01135 int temp;
01136 if(count != 0){
01137 numKids = 0;
01138 for(int i=0;i<count;i++){
01139 temp = label - (1<<i);
01140 temp = upperSize - 1 - temp;
01141 if(temp <= CkNumPes()-1){
01142 kids.push_back(temp);
01143 numKids++;
01144 }
01145 }
01146 }else{
01147 numKids = 0;
01148 }
01149 }
01150 }
01151
01152
01153 int CkReductionMgr::treeRoot(void)
01154 {
01155 return 0;
01156 }
01157 bool CkReductionMgr::hasParent(void)
01158 {
01159 return (bool)(CkMyPe()!=treeRoot());
01160 }
01161 int CkReductionMgr::treeParent(void)
01162 {
01163 return parent;
01164 }
01165 int CkReductionMgr::treeKids(void)
01166 {
01167 return numKids;
01168 }
01169
01170
01171
01172
01173
01174 void CkReductionMgr::barrier(CkReductionMsg *m)
01175 {
01176 barrier_nContrib++;
01177 barrier_nSource++;
01178 if(!m->callback.isInvalid())
01179 barrier_storedCallback=m->callback;
01180 finishBarrier();
01181 delete m;
01182 }
01183
01184 void CkReductionMgr::finishBarrier(void)
01185 {
01186 if(barrier_nContrib<lcount){
01187 DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
01188 return;
01189 }
01190 if(barrier_nRemote<treeKids()){
01191 DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
01192 return;
01193 }
01194 CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
01195 result->callback=barrier_storedCallback;
01196 result->sourceFlag=barrier_nSource;
01197 result->gcount=barrier_gCount;
01198 if(hasParent())
01199 {
01200 DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
01201 result->gcount+=gcount;
01202 thisProxy[treeParent()].Barrier_RecvMsg(result);
01203 }
01204 else{
01205 int totalElements=result->gcount+gcount;
01206 DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
01207 if(totalElements<result->nSources()){
01208 CkAbort("ERROR! Too many contributions at barrier root\n");
01209 }
01210 CkSetRefNum(result,result->getUserFlag());
01211 if(!result->callback.isInvalid())
01212 result->callback.send(result);
01213 else if(!barrier_storedCallback.isInvalid())
01214 barrier_storedCallback.send(result);
01215 else
01216 CkAbort("No reduction client!\n");
01217 }
01218 barrier_nRemote=barrier_nContrib=0;
01219 barrier_gCount=0;
01220 barrier_nSource=0;
01221 }
01222
01223 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
01224 {
01225 barrier_nRemote++;
01226 barrier_gCount+=m->gcount;
01227 barrier_nSource+=m->nSources();
01228 if(!m->callback.isInvalid())
01229 barrier_storedCallback=m->callback;
01230 finishBarrier();
01231 }
01232
01233
01234
01236
01238
01239
01240
01241
01242 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
01243
01244
01245
01246 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
01247 CkReduction::reducerType reducer, CkReductionMsg *buf)
01248 {
01249 int len[1] = { NdataSize };
01250 CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
01251
01252 ret->dataSize=NdataSize;
01253 if (srcData!=NULL && !buf)
01254 memcpy(ret->data,srcData,NdataSize);
01255 ret->userFlag=(CMK_REFNUM_TYPE)-1;
01256 ret->reducer=reducer;
01257 ret->sourceFlag=std::numeric_limits<int>::min();
01258 ret->gcount=0;
01259 ret->migratableContributor = true;
01260 #if CMK_BIGSIM_CHARM
01261 ret->log = NULL;
01262 #endif
01263 return ret;
01264 }
01265
01266
01267 void *
01268 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits,GroupDepNum groupDepNum)
01269 {
01270 int totalsize=ARM_DATASTART+(*sz);
01271 DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
01272 CkReductionMsg *ret = (CkReductionMsg *)
01273 CkAllocMsg(msgnum,totalsize,priobits,groupDepNum);
01274 ret->data=(void *)(&ret->dataStorage);
01275 return (void *) ret;
01276 }
01277
01278 void *
01279 CkReductionMsg::pack(CkReductionMsg* in)
01280 {
01281 DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
01282
01283 in->data = NULL;
01284 return (void*) in;
01285 }
01286
01287 CkReductionMsg* CkReductionMsg::unpack(void *in)
01288 {
01289 CkReductionMsg *ret = (CkReductionMsg *)in;
01290 DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
01291
01292 ret->data=(void *)(&ret->dataStorage);
01293 return ret;
01294 }
01295
01296
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01313
01314
01315
01316
01317
01318 static CkReductionMsg *invalid_reducer_fn(int nMsg,CkReductionMsg **msg)
01319 {
01320 CkAbort("Called the invalid reducer type 0. This probably\n"
01321 "means you forgot to initialize your custom reducer index.\n");
01322 return NULL;
01323 }
01324
01325 static CkReductionMsg *nop_fn(int nMsg,CkReductionMsg **msg)
01326 {
01327 return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
01328 }
01329
01330 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
01331 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
01332 {\
01333 RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
01334 int m,i;\
01335 int nElem=msg[0]->getLength()/sizeof(dataType);\
01336 dataType *ret=(dataType *)(msg[0]->getData());\
01337 for (m=1;m<nMsg;m++)\
01338 {\
01339 dataType *value=(dataType *)(msg[m]->getData());\
01340 for (i=0;i<nElem;i++)\
01341 {\
01342 RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
01343 loop\
01344 }\
01345 }\
01346 RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
01347 return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
01348 }
01349
01350
01351 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
01352 SIMPLE_REDUCTION(nameBase##_char_fn,char,"%c",loop) \
01353 SIMPLE_REDUCTION(nameBase##_short_fn,short,"%h",loop) \
01354 SIMPLE_REDUCTION(nameBase##_int_fn,int,"%d",loop) \
01355 SIMPLE_REDUCTION(nameBase##_long_fn,long,"%ld",loop) \
01356 SIMPLE_REDUCTION(nameBase##_long_long_fn,long long,"%lld",loop) \
01357 SIMPLE_REDUCTION(nameBase##_uchar_fn,unsigned char,"%c",loop) \
01358 SIMPLE_REDUCTION(nameBase##_ushort_fn,unsigned short,"%hu",loop) \
01359 SIMPLE_REDUCTION(nameBase##_uint_fn,unsigned int,"%u",loop) \
01360 SIMPLE_REDUCTION(nameBase##_ulong_fn,unsigned long,"%lu",loop) \
01361 SIMPLE_REDUCTION(nameBase##_ulong_long_fn,unsigned long long,"%llu",loop) \
01362 SIMPLE_REDUCTION(nameBase##_float_fn,float,"%f",loop) \
01363 SIMPLE_REDUCTION(nameBase##_double_fn,double,"%f",loop)
01364
01365
01366 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
01367
01368
01369 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
01370
01371
01372 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
01373
01374
01375 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
01376
01377
01378
01379
01380 SIMPLE_REDUCTION(logical_and_fn,int,"%d",
01381 if (value[i]==0)
01382 ret[i]=0;
01383 ret[i]=!!ret[i];
01384 )
01385
01386
01387
01388 SIMPLE_REDUCTION(logical_and_int_fn,int,"%d",
01389 if (value[i]==0)
01390 ret[i]=0;
01391 ret[i]=!!ret[i];
01392 )
01393
01394
01395
01396 SIMPLE_REDUCTION(logical_and_bool_fn,bool,"%d",
01397 if (!value[i]) ret[i]=false;
01398 )
01399
01400
01401
01402 SIMPLE_REDUCTION(logical_or_fn,int,"%d",
01403 if (value[i]!=0)
01404 ret[i]=1;
01405 ret[i]=!!ret[i];
01406 )
01407
01408
01409
01410 SIMPLE_REDUCTION(logical_or_int_fn,int,"%d",
01411 if (value[i]!=0)
01412 ret[i]=1;
01413 ret[i]=!!ret[i];
01414 )
01415
01416
01417
01418 SIMPLE_REDUCTION(logical_or_bool_fn,bool,"%d",
01419 if (value[i]) ret[i]=true;
01420 )
01421
01422
01423
01424 SIMPLE_REDUCTION(logical_xor_int_fn,int,"%d",
01425 ret[i] = (!ret[i] != !value[i]);
01426 )
01427
01428
01429
01430 SIMPLE_REDUCTION(logical_xor_bool_fn,bool,"%d",
01431 ret[i] = (ret[i] != value[i]);
01432 )
01433
01434 SIMPLE_REDUCTION(bitvec_and_fn,int,"%d",ret[i]&=value[i];)
01435 SIMPLE_REDUCTION(bitvec_and_int_fn,int,"%d",ret[i]&=value[i];)
01436 SIMPLE_REDUCTION(bitvec_and_bool_fn,bool,"%d",ret[i]&=value[i];)
01437
01438 SIMPLE_REDUCTION(bitvec_or_fn,int,"%d",ret[i]|=value[i];)
01439 SIMPLE_REDUCTION(bitvec_or_int_fn,int,"%d",ret[i]|=value[i];)
01440 SIMPLE_REDUCTION(bitvec_or_bool_fn,bool,"%d",ret[i]|=value[i];)
01441
01442 SIMPLE_REDUCTION(bitvec_xor_fn,int,"%d",ret[i]^=value[i];)
01443 SIMPLE_REDUCTION(bitvec_xor_int_fn,int,"%d",ret[i]^=value[i];)
01444 SIMPLE_REDUCTION(bitvec_xor_bool_fn,bool,"%d",ret[i]^=value[i];)
01445
01446
01447 static CkReductionMsg *random_fn(int nMsg,CkReductionMsg **msg) {
01448 int idx = (int)(CrnDrand()*(nMsg-1) + 0.5);
01449 return CkReductionMsg::buildNew(msg[idx]->getLength(),
01450 (void *)msg[idx]->getData(),
01451 CkReduction::random, msg[idx]);
01452 }
01453
01455
01456
01457
01458
01459 static CkReductionMsg *concat_fn(int nMsg,CkReductionMsg **msg)
01460 {
01461 RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
01462
01463 int i,retSize=0;
01464 for (i=0;i<nMsg;i++)
01465 retSize+=msg[i]->getSize();
01466
01467 RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
01468
01469
01470 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01471
01472
01473 char *cur=(char *)(ret->getData());
01474 for (i=0;i<nMsg;i++) {
01475 int messageBytes=msg[i]->getSize();
01476 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01477 cur+=messageBytes;
01478 }
01479 RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
01480 return ret;
01481 }
01482
01484
01485
01486
01487
01488
01489
01490
01491
01492 static const int alignSize=sizeof(double);
01493 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
01494
01495
01496 static int SET_SIZE(int dataSize)
01497 {return SET_ALIGN(sizeof(int)+dataSize);}
01498
01499
01500 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
01501 {
01502 char *next=((char *)cur)+SET_SIZE(cur->dataSize);
01503 return (CkReduction::setElement *)next;
01504 }
01505
01506
01507
01508 static CkReductionMsg *set_fn(int nMsg,CkReductionMsg **msg)
01509 {
01510 RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
01511
01512 int i,retSize=0;
01513 for (i=0;i<nMsg;i++) {
01514 if (!msg[i]->isFromUser())
01515
01516 retSize+=(msg[i]->getSize()-sizeof(int));
01517 else
01518 retSize+=SET_SIZE(msg[i]->getSize());
01519 }
01520 retSize+=sizeof(int);
01521
01522 RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
01523
01524
01525 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01526
01527
01528 CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
01529 for (i=0;i<nMsg;i++)
01530 if (!msg[i]->isFromUser())
01531 {
01532 int messageBytes=msg[i]->getSize()-sizeof(int);
01533 RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
01534 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01535 cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
01536 }
01537 else
01538 {
01539 RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
01540 cur->dataSize=msg[i]->getSize();
01541 memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
01542 cur=SET_NEXT(cur);
01543 }
01544 cur->dataSize=-1;
01545 RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
01546 return ret;
01547 }
01548
01549
01550
01551
01552
01553 CkReduction::setElement *CkReduction::setElement::next(void)
01554 {
01555 CkReduction::setElement *n=SET_NEXT(this);
01556 if (n->dataSize==-1)
01557 return NULL;
01558 else
01559 return n;
01560 }
01561
01562
01564
01565 CkReduction::statisticsElement::statisticsElement(double initialValue)
01566 : count(1)
01567 , mean(initialValue)
01568 , m2(0.0)
01569 {}
01570
01571
01572
01573
01574
01575
01576 static CkReductionMsg* statistics_fn(int nMsgs, CkReductionMsg** msg)
01577 {
01578 int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
01579 CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
01580 for (int m = 1; m < nMsgs; m++)
01581 {
01582 CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
01583 for (int i = 0; i < nElem; i++)
01584 {
01585 double a_count = ret[i].count;
01586 ret[i].count += value[i].count;
01587 double delta = value[i].mean - ret[i].mean;
01588 ret[i].mean += delta * value[i].count / ret[i].count;
01589 ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
01590 }
01591 }
01592 return CkReductionMsg::buildNew(
01593 nElem*sizeof(CkReduction::statisticsElement),
01594 (void *)ret,
01595 CkReduction::invalid,
01596 msg[0]);
01597 }
01598
01600
01601 CkReduction::tupleElement::tupleElement()
01602 : dataSize(0)
01603 , data(NULL)
01604 , reducer(CkReduction::invalid)
01605 , owns_data(false)
01606 {}
01607 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
01608 : dataSize(dataSize_)
01609 , data((char*)data_)
01610 , reducer(reducer_)
01611 , owns_data(false)
01612 {
01613 }
01614 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
01615 : dataSize(rhs_move.dataSize)
01616 , data(rhs_move.data)
01617 , reducer(rhs_move.reducer)
01618 , owns_data(rhs_move.owns_data)
01619 {
01620 rhs_move.dataSize = 0;
01621 rhs_move.data = 0;
01622 rhs_move.reducer = CkReduction::invalid;
01623 rhs_move.owns_data = false;
01624 }
01625 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
01626 {
01627 if (owns_data)
01628 delete[] data;
01629 dataSize = rhs_move.dataSize;
01630 data = rhs_move.data;
01631 reducer = rhs_move.reducer;
01632 owns_data = rhs_move.owns_data;
01633 rhs_move.dataSize = 0;
01634 rhs_move.data = 0;
01635 rhs_move.reducer = CkReduction::invalid;
01636 rhs_move.owns_data = false;
01637 return *this;
01638 }
01639 CkReduction::tupleElement::~tupleElement()
01640 {
01641 if (owns_data)
01642 delete[] data;
01643 }
01644
01645 void CkReduction::tupleElement::pup(PUP::er &p) {
01646 p|dataSize;
01647
01648
01649
01650 if (p.isUnpacking()) {
01651 data = new char[dataSize];
01652 owns_data = true;
01653 }
01654 PUParray(p, data, dataSize);
01655 if (p.isUnpacking()){
01656 int temp;
01657 p|temp;
01658 reducer=(CkReduction::reducerType)temp;
01659 } else {
01660 int temp=(int)reducer;
01661 p|temp;
01662 }
01663 }
01664
01665 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
01666 {
01667 PUP::sizer ps;
01668 ps|num_reductions;
01669 PUParray(ps, reductions, num_reductions);
01670
01671 CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
01672 PUP::toMem p(msg->data);
01673 p|num_reductions;
01674 PUParray(p, reductions, num_reductions);
01675 if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
01676 return msg;
01677 }
01678
01679 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
01680 {
01681 PUP::fromMem p(this->getData());
01682 p|(*num_reductions);
01683 *out_reductions = new CkReduction::tupleElement[*num_reductions];
01684 PUParray(p, *out_reductions, *num_reductions);
01685 }
01686
01687
01688 CkReductionMsg* CkReduction::tupleReduction_fn(int num_messages, CkReductionMsg** messages)
01689 {
01690 std::vector<CkReduction::tupleElement*> tuple_data(num_messages);
01691 int num_reductions = 0;
01692 for (int message_idx = 0; message_idx < num_messages; ++message_idx)
01693 {
01694 int itr_num_reductions = 0;
01695 messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
01696
01697
01698 if (num_reductions == 0)
01699 num_reductions = itr_num_reductions;
01700 else if (num_reductions != itr_num_reductions)
01701 CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
01702 }
01703
01704 DEB_TUPLE(("tupleReduction {\n num_messages=%d,\n num_reductions=%d,\n length=%d\n",
01705 num_messages, num_reductions, messages[0]->getLength()));
01706
01707 std::vector<CkReduction::tupleElement> return_data(num_reductions);
01708
01709
01710 std::vector<char> simulated_messages_buffer(sizeof(CkReductionMsg) * num_reductions * num_messages);
01711 std::vector<CkReductionMsg*> simulated_messages(num_messages);
01712
01713
01714
01715
01716 std::vector<CkReductionMsg *> msgs_to_delete;
01717 msgs_to_delete.reserve(num_reductions);
01718 for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
01719 {
01720 DEB_TUPLE((" reduction_idx=%d {\n", reduction_idx));
01721 CkReduction::reducerType reducerType = CkReduction::invalid;
01722 for (int message_idx = 0; message_idx < num_messages; ++message_idx)
01723 {
01724 CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
01725 CkReduction::tupleElement& element = reductions[reduction_idx];
01726 DEB_TUPLE((" msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
01727 message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
01728
01729 reducerType = element.reducer;
01730
01731 size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
01732 CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
01733 simulated_message.dataSize = element.dataSize;
01734 simulated_message.data = element.data;
01735 simulated_message.reducer = element.reducer;
01736 simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
01737 simulated_message.userFlag = messages[message_idx]->userFlag;
01738 simulated_message.gcount = messages[message_idx]->gcount;
01739 simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
01740 #if CMK_BIGSIM_CHARM
01741 simulated_message.log = NULL;
01742 #endif
01743 simulated_messages[message_idx] = &simulated_message;
01744 }
01745
01746
01747 const auto& reducerFp = CkReduction::reducerTable()[reducerType].fn;
01748 CkReductionMsg* result = reducerFp(num_messages, simulated_messages.data());
01749 DEB_TUPLE((" result_len=%d\n },\n", result->getLength()));
01750 return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
01751
01752
01753 if (result != simulated_messages[0]) {
01754 msgs_to_delete.push_back(result);
01755 }
01756 }
01757
01758 CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data.data(), num_reductions);
01759 DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
01760
01761 for (auto data : tuple_data) delete[] data;
01762 for (auto msg : msgs_to_delete) delete msg;
01763
01764 return retval;
01765 }
01766
01767
01769 static CkReductionMsg *external_py(int nMsgs, CkReductionMsg **msg)
01770 {
01771
01772 std::vector<char*> msg_data(nMsgs);
01773 std::vector<int> msg_sizes(nMsgs);
01774
01775 for (int i = 0; i < nMsgs; i++)
01776 {
01777 msg_data[i] = (char*)(msg[i]->getData());
01778 msg_sizes[i] = msg[i]->getSize();
01779
01780 }
01781
01782
01783
01784 char* reduction_result;
01785 int reduction_result_size = PyReductionExt(msg_data.data(), msg_sizes.data(), nMsgs, &reduction_result);
01786
01787
01788 return CkReductionMsg::buildNew(reduction_result_size, reduction_result);
01789 }
01790
01791
01792
01794 CkReduction::CkReduction() {}
01795
01796
01797
01798 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable, const char* name)
01799 {
01800 CkAssert(CmiMyRank() == 0);
01801 reducerType index = (reducerType)reducerTable().size();
01802 reducerTable().emplace_back(fn, streamable, name);
01803 return index;
01804 }
01805
01806
01807
01808
01809
01810
01811
01812 std::vector<CkReduction::reducerStruct> CkReduction::initReducerTable()
01813 {
01814 std::vector<CkReduction::reducerStruct> vec;
01815
01816 vec.emplace_back(invalid_reducer_fn, true, "CkReduction::invalid");
01817 vec.emplace_back(nop_fn, true, "CkReduction::nop");
01818
01819 vec.emplace_back(sum_char_fn, true, "CkReduction::sum_char");
01820 vec.emplace_back(sum_short_fn, true, "CkReduction::sum_short");
01821 vec.emplace_back(sum_int_fn, true, "CkReduction::sum_int");
01822 vec.emplace_back(sum_long_fn, true, "CkReduction::sum_long");
01823 vec.emplace_back(sum_long_long_fn, true, "CkReduction::sum_long_long");
01824 vec.emplace_back(sum_uchar_fn, true, "CkReduction::sum_uchar");
01825 vec.emplace_back(sum_ushort_fn, true, "CkReduction::sum_ushort");
01826 vec.emplace_back(sum_uint_fn, true, "CkReduction::sum_uint");
01827 vec.emplace_back(sum_ulong_fn, true, "CkReduction::sum_ulong");
01828 vec.emplace_back(sum_ulong_long_fn, true, "CkReduction::sum_ulong_long");
01829 vec.emplace_back(sum_float_fn, true, "CkReduction::sum_float");
01830 vec.emplace_back(sum_double_fn, true, "CkReduction::sum_double");
01831
01832
01833 vec.emplace_back(product_char_fn, true, "CkReduction::product_char");
01834 vec.emplace_back(product_short_fn, true, "CkReduction::product_short");
01835 vec.emplace_back(product_int_fn, true, "CkReduction::product_int");
01836 vec.emplace_back(product_long_fn, true, "CkReduction::product_long");
01837 vec.emplace_back(product_long_long_fn, true, "CkReduction::product_long_long");
01838 vec.emplace_back(product_uchar_fn, true, "CkReduction::product_uchar");
01839 vec.emplace_back(product_ushort_fn, true, "CkReduction::product_ushort");
01840 vec.emplace_back(product_uint_fn, true, "CkReduction::product_uint");
01841 vec.emplace_back(product_ulong_fn, true, "CkReduction::product_ulong");
01842 vec.emplace_back(product_ulong_long_fn, true, "CkReduction::product_ulong_long");
01843 vec.emplace_back(product_float_fn, true, "CkReduction::product_float");
01844 vec.emplace_back(product_double_fn, true, "CkReduction::product_double");
01845
01846
01847 vec.emplace_back(max_char_fn, true, "CkReduction::max_char");
01848 vec.emplace_back(max_short_fn, true, "CkReduction::max_short");
01849 vec.emplace_back(max_int_fn, true, "CkReduction::max_int");
01850 vec.emplace_back(max_long_fn, true, "CkReduction::max_long");
01851 vec.emplace_back(max_long_long_fn, true, "CkReduction::max_long_long");
01852 vec.emplace_back(max_uchar_fn, true, "CkReduction::max_uchar");
01853 vec.emplace_back(max_ushort_fn, true, "CkReduction::max_ushort");
01854 vec.emplace_back(max_uint_fn, true, "CkReduction::max_uint");
01855 vec.emplace_back(max_ulong_fn, true, "CkReduction::max_ulong");
01856 vec.emplace_back(max_ulong_long_fn, true, "CkReduction::max_ulong_long");
01857 vec.emplace_back(max_float_fn, true, "CkReduction::max_float");
01858 vec.emplace_back(max_double_fn, true, "CkReduction::max_double");
01859
01860
01861 vec.emplace_back(min_char_fn, true, "CkReduction::min_char");
01862 vec.emplace_back(min_short_fn, true, "CkReduction::min_short");
01863 vec.emplace_back(min_int_fn, true, "CkReduction::min_int");
01864 vec.emplace_back(min_long_fn, true, "CkReduction::min_long");
01865 vec.emplace_back(min_long_long_fn, true, "CkReduction::min_long_long");
01866 vec.emplace_back(min_uchar_fn, true, "CkReduction::min_uchar");
01867 vec.emplace_back(min_ushort_fn, true, "CkReduction::min_ushort");
01868 vec.emplace_back(min_uint_fn, true, "CkReduction::min_uint");
01869 vec.emplace_back(min_ulong_fn, true, "CkReduction::min_ulong");
01870 vec.emplace_back(min_ulong_long_fn, true, "CkReduction::min_ulong_long");
01871 vec.emplace_back(min_float_fn, true, "CkReduction::min_float");
01872 vec.emplace_back(min_double_fn, true, "CkReduction::min_double");
01873
01874
01875
01876
01877 vec.emplace_back(logical_and_fn, true, "CkReduction::logical_and");
01878 vec.emplace_back(logical_and_int_fn, true, "CkReduction::logical_and_int");
01879 vec.emplace_back(logical_and_bool_fn, true, "CkReduction::logical_and_bool");
01880
01881
01882
01883
01884 vec.emplace_back(logical_or_fn, true, "CkReduction::logical_or");
01885 vec.emplace_back(logical_or_int_fn, true, "CkReduction::logical_or_int");
01886 vec.emplace_back(logical_or_bool_fn, true, "CkReduction::logical_or_bool");
01887
01888
01889
01890
01891 vec.emplace_back(logical_xor_int_fn, true, "CkReduction::logical_xor_int");
01892 vec.emplace_back(logical_xor_bool_fn, true, "CkReduction::logical_xor_bool");
01893
01894
01895
01896 vec.emplace_back(bitvec_and_fn, true, "CkReduction::bitvec_and");
01897 vec.emplace_back(bitvec_and_int_fn, true, "CkReduction::bitvec_and_int");
01898 vec.emplace_back(bitvec_and_bool_fn, true, "CkReduction::bitvec_and_bool");
01899
01900
01901
01902 vec.emplace_back(bitvec_or_fn, true, "CkReduction::bitvec_or");
01903 vec.emplace_back(bitvec_or_int_fn, true, "CkReduction::bitvec_or_int");
01904 vec.emplace_back(bitvec_or_bool_fn, true, "CkReduction::bitvec_or_bool");
01905
01906
01907 vec.emplace_back(bitvec_xor_fn, true, "CkReduction::bitvec_xor");
01908 vec.emplace_back(bitvec_xor_int_fn, true, "CkReduction::bitvec_xor_int");
01909 vec.emplace_back(bitvec_xor_bool_fn, true, "CkReduction::bitvec_xor_bool");
01910
01911
01912 vec.emplace_back(random_fn, true, "CkReduction::random");
01913
01914
01915
01916
01917 vec.emplace_back(concat_fn, false, "CkReduction::concat");
01918
01919
01920
01921
01922
01923 vec.emplace_back(set_fn, false, "CkReduction::set");
01924
01925
01926 vec.emplace_back(statistics_fn, true, "CkReduction::statistics");
01927
01928
01929 vec.emplace_back(CkReduction::tupleReduction_fn, false, "CkReduction::tuple");
01930
01931
01932 vec.emplace_back(CkReduction::reducerStruct(::external_py, false, "CkReduction::custom_python"));
01933
01934 return vec;
01935 }
01936
01937
01938
01939 std::vector<CkReduction::reducerStruct>& CkReduction::reducerTable()
01940 {
01941 static std::vector<CkReduction::reducerStruct> table = initReducerTable();
01942 return table;
01943 }
01944
01945
01946 typedef enum : uint8_t {
01947 array=0,
01948 group,
01949 nodegroup
01950 } extContributorType;
01951
01952
01953 struct CkExtContributeInfo
01954 {
01955 int cbEpIdx;
01956 int fid;
01957 void* data;
01958 int numelems;
01959 int dataSize;
01960 CkReduction::reducerType redtype;
01961 int id;
01962 int *idx;
01963 int ndims;
01964 extContributorType contributorType;
01965 };
01966
01967 template <typename T>
01968 T* getExtContributor(CkExtContributeInfo* contribute_params);
01969
01970 template <>
01971 ArrayElement* getExtContributor<ArrayElement>(CkExtContributeInfo* contribute_params)
01972 {
01973 CkGroupID gId;
01974 gId.idx = contribute_params->id;
01975 CkArrayIndex arrIndex(contribute_params->ndims, contribute_params->idx);
01976 CProxyElement_ArrayBase meProxy = CProxyElement_ArrayBase(gId, arrIndex);
01977 return meProxy.ckLocal();
01978 }
01979
01980 template <>
01981 Group* getExtContributor<Group>(CkExtContributeInfo* contribute_params)
01982 {
01983 CkGroupID gId;
01984 gId.idx = contribute_params->id;
01985 return (Group*)CkLocalBranch(gId);
01986 }
01987
01988
01989
01990 extern "C" {
01991 void CkExtContributeTo(CkExtContributeInfo* contribute_params, CkCallback& cb);
01992 void CkExtContributeToChare(CkExtContributeInfo* contribute_params, int onPE, void* objPtr);
01993 void CkExtContributeToArray(CkExtContributeInfo* contribute_params, int aid, int* idx, int ndims);
01994 void CkExtContributeToGroup(CkExtContributeInfo* contribute_params, int gid, int pe);
01995 }
01996
01997
01998 template <class T>
01999 void CkExtContribute(CkExtContributeInfo* contribute_params, CkCallback& cb)
02000 {
02001 T* me = getExtContributor<T>(contribute_params);
02002
02003 if (contribute_params->redtype == CkReduction::nop) {
02004 contribute_params->dataSize = 0;
02005 contribute_params->data = NULL;
02006 }
02007
02008 me->contribute(contribute_params->dataSize, contribute_params->data, contribute_params->redtype, cb);
02009 }
02010
02011 void CkExtContributeTo(CkExtContributeInfo* contribute_params, CkCallback& cb)
02012 {
02013 #if CMK_CHARMPY
02014 cb.isCkExtReductionCb = true;
02015
02016 switch (contribute_params->contributorType) {
02017 case extContributorType::array :
02018 CkExtContribute<ArrayElement>(contribute_params, cb);
02019 break;
02020 case extContributorType::group :
02021 CkExtContribute<Group>(contribute_params, cb);
02022 break;
02023 default : CkAbort("Invalid external contributor type!\n");
02024 }
02025 #else
02026 CkAbort("charm4py support must be enabled to use CkExtContributeTo");
02027 #endif
02028 }
02029
02030
02031 void CkExtContributeToChare(CkExtContributeInfo* contribute_params, int onPE, void* objPtr)
02032 {
02033 CkChareID targetChareID;
02034 targetChareID.onPE = onPE;
02035 targetChareID.objPtr = objPtr;
02036
02037 CkCallback cb(contribute_params->cbEpIdx, targetChareID);
02038 if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02039 CkExtContributeTo(contribute_params, cb);
02040 }
02041
02042
02043 void CkExtContributeToArray(CkExtContributeInfo* contribute_params, int aid, int* idx, int ndims)
02044 {
02045 CkCallback cb;
02046 CkGroupID gId;
02047 gId.idx = aid;
02048
02049 CkArrayID arrayId(gId);
02050
02051 if (ndims > 0) {
02052
02053 CkArrayIndex arrIndex(ndims, idx);
02054 cb = CkCallback(contribute_params->cbEpIdx, arrIndex, arrayId);
02055 }
02056 else {
02057
02058 cb = CkCallback(contribute_params->cbEpIdx, arrayId);
02059 }
02060 if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02061
02062 CkExtContributeTo(contribute_params, cb);
02063 }
02064
02065
02066 void CkExtContributeToGroup(CkExtContributeInfo* contribute_params, int gid, int pe)
02067 {
02068 CkCallback cb;
02069 CkGroupID groupId;
02070 groupId.idx = gid;
02071
02072 if (pe == -1) {
02073
02074 cb = CkCallback(contribute_params->cbEpIdx, groupId);
02075 }
02076 else {
02077
02078 cb = CkCallback(contribute_params->cbEpIdx, pe, groupId);
02079 }
02080 if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02081
02082 CkExtContributeTo(contribute_params, cb);
02083 }
02084
02085
02086
02104 NodeGroup::NodeGroup(void):thisIndex(CkMyNode()) {
02105 __nodelock=CmiCreateLock();
02106 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02107 mlogData->objID.type = TypeNodeGroup;
02108 mlogData->objID.data.group.onPE = CkMyNode();
02109 #endif
02110
02111 }
02112 NodeGroup::~NodeGroup() {
02113 CmiDestroyLock(__nodelock);
02114 CkpvAccess(_destroyingNodeGroup) = true;
02115 }
02116 void NodeGroup::pup(PUP::er &p)
02117 {
02118 CkNodeReductionMgr::pup(p);
02119 p|reductionInfo;
02120 }
02121
02122
02123
02124 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
02125 DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
02126 ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
02127
02128 }
02129
02130 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
02131 ((CkNodeReductionMgr *)this),
02132 reductionInfo,false)
02133
02134
02135
02136 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
02137 {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
02138
02139
02140
02141
02142
02143 CkNodeReductionMgr::CkNodeReductionMgr()
02144 : thisProxy(thisgroup)
02145 {
02146 #ifdef BINOMIAL_TREE
02147 init_BinomialTree();
02148 #elif CMK_BIGSIM_CHARM
02149 init_BinaryTree();
02150 #else
02151 init_TopoTree();
02152 #endif
02153 storedCallback=NULL;
02154 redNo=0;
02155 inProgress=false;
02156
02157 startRequested=false;
02158 gcount=CkNumNodes();
02159 lcount=1;
02160 nContrib=nRemote=0;
02161 lockEverything = CmiCreateLock();
02162
02163
02164 creating=false;
02165 interrupt = false;
02166 DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
02167 #if CMK_FAULT_EVAC
02168 blocked = false;
02169 maxModificationRedNo = INT_MAX;
02170 killed=false;
02171 additionalGCount = newAdditionalGCount = 0;
02172 #endif
02173 }
02174
02175 CkNodeReductionMgr::~CkNodeReductionMgr()
02176 {
02177 CmiDestroyLock(lockEverything);
02178 }
02179
02180 void CkNodeReductionMgr::flushStates()
02181 {
02182 if(CkMyRank() == 0){
02183
02184 redNo=0;
02185 inProgress=false;
02186
02187 startRequested=false;
02188 gcount=CkNumNodes();
02189 lcount=1;
02190 nContrib=nRemote=0;
02191
02192 creating=false;
02193 interrupt = false;
02194 while (!msgs.isEmpty()) { delete msgs.deq(); }
02195 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
02196 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
02197 while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
02198 }
02199 }
02200
02202
02203
02204 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
02205 {
02206 DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
02207 if(cb->isInvalid()){
02208 DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
02209 }else{
02210 DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
02211 }
02212
02213 if (CkMyNode()!=0)
02214 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
02215 delete storedCallback;
02216 storedCallback=cb;
02217 }
02218
02219
02220
02221 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
02222 {
02223 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02224 Chare *oldObj =CpvAccess(_currentObj);
02225 CpvAccess(_currentObj) = this;
02226 #endif
02227
02228 m->redNo=ci->redNo++;
02229 m->sourceFlag=-1;
02230 m->gcount=0;
02231 DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
02232 addContribution(m);
02233
02234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02235 CpvAccess(_currentObj) = oldObj;
02236 #endif
02237 }
02238
02239
02240 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
02241 {
02242 #if CMK_BIGSIM_CHARM
02243 _TRACE_BG_TLINE_END(&m->log);
02244 #endif
02245 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02246 Chare *oldObj =CpvAccess(_currentObj);
02247 CpvAccess(_currentObj) = this;
02248 #endif
02249 m->redNo=ci->redNo++;
02250 m->gcount=count;
02251 DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
02252 addContribution(m);
02253 DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
02254
02255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02256 CpvAccess(_currentObj) = oldObj;
02257 #endif
02258 }
02259
02260
02262
02263 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
02264 DEBR(("[%d,%d] doRecvMsg called for %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02265 #if CMK_FAULT_EVAC
02266 if(blocked){
02267 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
02268 bufferedRemoteMsgs.enq(m);
02269 return;
02270 }
02271 #endif
02272
02273 if (isPresent(m->redNo)) {
02274
02275 startReduction(m->redNo,CkMyNode());
02276 msgs.enq(m);
02277 nRemote++;
02278 finishReduction();
02279 }
02280 else {
02281 if (isFuture(m->redNo)) {
02282
02283 futureRemoteMsgs.enq(m);
02284 }else{
02285 CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);
02286 CkAbort("Recv'd late remote contribution!\n");
02287 }
02288 }
02289 DEBR(("[%d,%d]]]]] doRecvMsg called for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02290 }
02291
02292
02293 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
02294 {
02295 #if CMK_BIGSIM_CHARM
02296 _TRACE_BG_TLINE_END(&m->log);
02297 #endif
02298 #ifndef CMK_CPV_IS_SMP
02299 #if CMK_IMMEDIATE_MSG
02300 if(interrupt == true){
02301
02302 CpvAccess(_qd)->process(-1);
02303 CmiDelayImmediate();
02304 return;
02305 }
02306 #endif
02307 #endif
02308 interrupt = true;
02309 CmiLock(lockEverything);
02310 DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02311 doRecvMsg(m);
02312 CmiUnlock(lockEverything);
02313 interrupt = false;
02314 DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02315 }
02316
02317 void CkNodeReductionMgr::startReduction(int number,int srcNode)
02318 {
02319 if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
02320 if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
02321 if (inProgress){
02322 DEBR((AA "This Node reduction is already in progress\n" AB));
02323 return;
02324 }
02325 if (creating)
02326 {
02327 DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
02328 startRequested=true;
02329 return;
02330 }
02331
02332
02333 DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
02334 inProgress=true;
02335 }
02336
02337 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
02338 #if CMK_FAULT_EVAC
02339 if(blocked){
02340 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
02341 bufferedMsgs.enq(m);
02342 return;
02343 }
02344 #endif
02345
02346 if (isFuture(m->redNo)) {
02347 DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
02348 futureMsgs.enq(m);
02349 } else {
02350 DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
02351
02352 startReduction(m->redNo,CkMyNode());
02353 msgs.enq(m);
02354 nContrib++;
02355 finishReduction();
02356 }
02357 }
02358
02359
02360 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
02361 {
02362 interrupt = true;
02363 CmiLock(lockEverything);
02364 doAddContribution(m);
02365 CmiUnlock(lockEverything);
02366 interrupt = false;
02367 }
02368
02369 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
02370 CmiLock(lockEverything);
02371 #if CMK_FAULT_EVAC
02372 if(blocked){
02373 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
02374 bufferedMsgs.enq(m);
02375 CmiUnlock(lockEverything);
02376 return;
02377 }
02378 #endif
02379
02380 if (isFuture(m->redNo)) {
02381 DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
02382
02383 futureLateMigrantMsgs.enq(m);
02384 } else {
02385 DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
02386
02387 msgs.enq(m);
02388 finishReduction();
02389 }
02390 CmiUnlock(lockEverything);
02391 }
02392
02393
02394
02395
02396
02400 void CkNodeReductionMgr::finishReduction(void)
02401 {
02402 DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
02403
02404 if ((!inProgress) || creating){
02405 DEBR((AA "Either not in Progress or creating\n" AB));
02406 return;
02407 }
02408
02409 bool partialReduction = false;
02410
02411 if (nContrib<(lcount)){
02412 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
02413 partialReduction = true;
02414 }
02415 else {
02416 DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
02417 return;
02418 }
02419 }
02420 if (nRemote<treeKids()){
02421 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
02422 partialReduction = true;
02423 }
02424 else {
02425 DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
02426 return;
02427 }
02428 }
02429 if (nRemote>treeKids()){
02430
02431 interrupt = false;
02432 CkAbort("Nodegrp Excess remote reduction message received!\n");
02433 }
02434
02435 DEBR((AA "Reducing node data...\n" AB));
02436
02438 #if CMK_BIGSIM_CHARM
02439 _TRACE_BG_END_EXECUTE(1);
02440 void* _bgParentLog = NULL;
02441 _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
02442 #endif
02443 CkReductionMsg *result=CkReductionMgr::reduceMessages(msgs);
02444 result->redNo=redNo;
02445 DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,result->gcount,result->sourceFlag));
02446 #if CMK_BIGSIM_CHARM
02447 _TRACE_BG_TLINE_END(&result->log);
02448 #endif
02449
02450 if (partialReduction) {
02451 msgs.enq(result);
02452 return;
02453 }
02454
02455 if (hasParent())
02456 {
02457 #if CMK_FAULT_EVAC
02458 if(CmiNodeAlive(CkMyNode()) || killed == false)
02459 #endif
02460 {
02461 DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
02462 DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
02463
02464 #if CMK_FAULT_EVAC
02465 result->gcount += additionalGCount;
02466 #endif
02467 thisProxy[treeParent()].RecvMsg(result);
02468 }
02469
02470 }
02471 else
02472 {
02473 if(result->isMigratableContributor()
02474 #if CMK_FAULT_EVAC
02475 && result->gcount+additionalGCount != result->sourceFlag
02476 #endif
02477 ){
02478 DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
02479 msgs.enq(result);
02480 return;
02481 }
02482 #if CMK_FAULT_EVAC
02483 result->gcount += additionalGCount;
02484 #endif
02485
02489 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
02490 CkSetRefNum(result, result->getUserFlag());
02491 if (!result->callback.isInvalid()){
02492 DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
02493 result->callback.send(result);
02494 }
02495 else if (storedCallback!=NULL){
02496 DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
02497 storedCallback->send(result);
02498 }
02499 else{
02500 DEBR((AA "Invalid Callback \n" AB));
02501 CkAbort("No reduction client!\n"
02502 "You must register a client with either SetReductionClient or during contribute.\n");
02503 }
02504 }
02505
02506
02507
02508 redNo++;
02509 #if CMK_FAULT_EVAC
02510 updateTree();
02511 #endif
02512 int i;
02513 inProgress=false;
02514 startRequested=false;
02515 nRemote=nContrib=0;
02516
02517
02518 int n=futureMsgs.length();
02519
02520 for (i=0;i<n;i++)
02521 {
02522 interrupt = true;
02523
02524 CkReductionMsg *m=futureMsgs.deq();
02525
02526 interrupt = false;
02527 if (m!=NULL){
02528 DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
02529 doAddContribution(m);
02530 }
02531 }
02532
02533 interrupt = true;
02534
02535 n=futureRemoteMsgs.length();
02536
02537 interrupt = false;
02538 for (i=0;i<n;i++)
02539 {
02540 interrupt = true;
02541
02542 CkReductionMsg *m=futureRemoteMsgs.deq();
02543
02544 interrupt = false;
02545 if (m!=NULL)
02546 doRecvMsg(m);
02547 }
02548
02549 n = futureLateMigrantMsgs.length();
02550 for(i=0;i<n;i++){
02551 CkReductionMsg *m = futureLateMigrantMsgs.deq();
02552 if(m != NULL){
02553 if(m->redNo == redNo){
02554 msgs.enq(m);
02555 }else{
02556 futureLateMigrantMsgs.enq(m);
02557 }
02558 }
02559 }
02560 }
02561
02563
02564 void CkNodeReductionMgr::init_BinaryTree(){
02565 parent = (CkMyNode()-1)/TREE_WID;
02566 int firstkid = CkMyNode()*TREE_WID+1;
02567 numKids=CkNumNodes()-firstkid;
02568 if (numKids>TREE_WID) numKids=TREE_WID;
02569 if (numKids<0) numKids=0;
02570
02571 for(int i=0;i<numKids;i++){
02572 kids.push_back(firstkid+i);
02573 #if CMK_FAULT_EVAC
02574 newKids.push_back(firstkid+i);
02575 #endif
02576 }
02577 }
02578
02579 void CkNodeReductionMgr::init_TopoTree() {
02580 if (_topoTree == NULL) CkAbort("CkNodeReductionMgr:: topo tree has not been calculated\n");
02581 CmiSpanningTreeInfo &t = *_topoTree;
02582 parent = t.parent;
02583 numKids = t.child_count;
02584 for (int i=0; i < numKids; i++) {
02585 kids.push_back(t.children[i]);
02586 #if CMK_FAULT_EVAC
02587 newKids.push_back(t.children[i]);
02588 #endif
02589 }
02590 }
02591
02592 void CkNodeReductionMgr::init_BinomialTree(){
02593 int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
02594
02595 upperSize = (unsigned) 1 << depth;
02596 label = upperSize-CkMyNode()-1;
02597 int p=label;
02598 int count=0;
02599 while( p > 0){
02600 if(p % 2 == 0)
02601 break;
02602 else{
02603 p = p/2;
02604 count++;
02605 }
02606 }
02607
02608 parent = label + (1<<count);
02609 parent = upperSize -1 -parent;
02610 int temp;
02611 if(count != 0){
02612 numKids = 0;
02613 for(int i=0;i<count;i++){
02614
02615 temp = label - (1<<i);
02616 temp = upperSize-1-temp;
02617 if(temp <= CkNumNodes()-1){
02618
02619 kids.push_back(temp);
02620 numKids++;
02621 }
02622 }
02623 }else{
02624 numKids = 0;
02625
02626 }
02627 }
02628
02629
02630 int CkNodeReductionMgr::treeRoot(void)
02631 {
02632 return 0;
02633 }
02634 bool CkNodeReductionMgr::hasParent(void)
02635 {
02636 return (bool)(CkMyNode()!=treeRoot());
02637 }
02638 int CkNodeReductionMgr::treeParent(void)
02639 {
02640 return parent;
02641 }
02642
02643 int CkNodeReductionMgr::firstKid(void)
02644 {
02645 return CkMyNode()*TREE_WID+1;
02646 }
02647 int CkNodeReductionMgr::treeKids(void)
02648 {
02649 #ifdef BINOMIAL_TREE
02650 return numKids;
02651 #else
02652
02653
02654
02655
02656 return numKids;
02657 #endif
02658 }
02659
02660 void CkNodeReductionMgr::pup(PUP::er &p)
02661 {
02662
02663
02664 IrrGroup::pup(p);
02665 p(redNo);
02666 p(inProgress); p(creating); p(startRequested);
02667 p(lcount);
02668 p(nContrib); p(nRemote);
02669 p(interrupt);
02670 p|msgs;
02671 p|futureMsgs;
02672 p|futureRemoteMsgs;
02673 p|futureLateMigrantMsgs;
02674 p|parent;
02675
02676 #if CMK_FAULT_EVAC
02677 p|additionalGCount;
02678 p|newAdditionalGCount;
02679 #endif
02680
02681 if(p.isUnpacking()) {
02682 gcount=CkNumNodes();
02683 thisProxy = thisgroup;
02684 lockEverything = CmiCreateLock();
02685 #ifdef BINOMIAL_TREE
02686 init_BinomialTree();
02687 #elif CMK_BIGSIM_CHARM
02688 init_BinaryTree();
02689 #else
02690 init_TopoTree();
02691 #endif
02692 }
02693
02694 #if CMK_FAULT_EVAC
02695 p | blocked;
02696 p | maxModificationRedNo;
02697 #endif
02698
02699 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
02700 bool isnull = (storedCallback == NULL);
02701 p | isnull;
02702 if (!isnull) {
02703 if (p.isUnpacking()) {
02704 storedCallback = new CkCallback;
02705 }
02706 p|*storedCallback;
02707 }
02708 #endif
02709
02710 }
02711
02712 #if CMK_FAULT_EVAC
02713
02714
02715
02716
02717
02718 void CkNodeReductionMgr::evacuate(){
02719 DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
02720 if(treeKids() == 0){
02721
02722
02723
02724 oldleaf=true;
02725 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
02726
02727
02728
02729
02730
02731
02732
02733 int data[2];
02734 data[0]=CkMyNode();
02735 data[1]=getTotalGCount()+additionalGCount;
02736 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
02737 newParent = treeParent();
02738 }else{
02739 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
02740 oldleaf= false;
02741
02742
02743
02744
02745
02746
02747 newParent = kids[0];
02748 for(int i=numKids-1;i>=0;i--){
02749 newKids.erase(newKids.begin() + i);
02750 }
02751
02752
02753
02754
02755
02756
02757
02758 int oldParentData[2];
02759 oldParentData[0] = CkMyNode();
02760 oldParentData[1] = newParent;
02761 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
02762
02763
02764
02765
02766 int childrenData=newParent;
02767 for(int i=1;i<numKids;i++){
02768 thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
02769 }
02770
02771
02772
02773
02774
02775 std::vector<int> newParentData(numKids+2);
02776 newParentData[0] = CkMyNode();
02777 for(int i=1;i<numKids;i++){
02778 newParentData[i] = kids[i];
02779 }
02780 newParentData[numKids] = parent;
02781 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
02782 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData.data());
02783 }
02784 readyDeletion = false;
02785 blocked = true;
02786 numModificationReplies = 0;
02787 tempModificationRedNo = findMaxRedNo();
02788 }
02789
02790
02791
02792
02793
02794
02795
02796
02797
02798 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
02799 DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
02800 int sender;
02801 newKids = kids;
02802 readyDeletion = false;
02803 newAdditionalGCount = additionalGCount;
02804 switch(code){
02805 case OLDPARENT:
02806 for(int i=0;i<numKids;i++){
02807 if(newKids[i] == data[0]){
02808 newKids[i] = data[1];
02809 break;
02810 }
02811 }
02812 sender = data[0];
02813 newParent = parent;
02814 break;
02815 case OLDCHILDREN:
02816 newParent = data[0];
02817 sender = parent;
02818 break;
02819 case NEWPARENT:
02820 for(int i=0;i<size-2;i++){
02821 newKids.push_back(data[i]);
02822 }
02823 newParent = data[size-2];
02824 newAdditionalGCount += data[size-1];
02825 sender = parent;
02826 break;
02827 case LEAFPARENT:
02828 for(int i=0;i<numKids;i++){
02829 if(newKids[i] == data[0]){
02830 newKids.erase(newKids.begin() + i);
02831 break;
02832 }
02833 }
02834 sender = data[0];
02835 newParent = parent;
02836 newAdditionalGCount += data[1];
02837 break;
02838 };
02839 blocked = true;
02840 int maxRedNo = findMaxRedNo();
02841
02842 thisProxy[sender].collectMaxRedNo(maxRedNo);
02843 }
02844
02845 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
02846
02847
02848
02849
02850 numModificationReplies++;
02851 if(maxRedNo > tempModificationRedNo){
02852 tempModificationRedNo = maxRedNo;
02853 }
02854 if(numModificationReplies == numKids+1){
02855 maxModificationRedNo = tempModificationRedNo;
02856
02857
02858
02859
02860 if(maxModificationRedNo == -1){
02861 printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
02862 }else{
02863 DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
02864 }
02865 thisProxy[parent].unblockNode(maxModificationRedNo);
02866 for(int i=0;i<numKids;i++){
02867 thisProxy[kids[i]].unblockNode(maxModificationRedNo);
02868 }
02869 blocked = false;
02870 updateTree();
02871 clearBlockedMsgs();
02872 }
02873 }
02874
02875 void CkNodeReductionMgr::unblockNode(int maxRedNo){
02876 maxModificationRedNo = maxRedNo;
02877 updateTree();
02878 blocked = false;
02879 clearBlockedMsgs();
02880 }
02881
02882
02883 void CkNodeReductionMgr::clearBlockedMsgs(){
02884 int len = bufferedMsgs.length();
02885 for(int i=0;i<len;i++){
02886 CkReductionMsg *m = bufferedMsgs.deq();
02887 doAddContribution(m);
02888 }
02889 len = bufferedRemoteMsgs.length();
02890 for(int i=0;i<len;i++){
02891 CkReductionMsg *m = bufferedRemoteMsgs.deq();
02892 doRecvMsg(m);
02893 }
02894
02895 }
02896
02897
02898
02899
02900
02901 void CkNodeReductionMgr::updateTree(){
02902 if(redNo > maxModificationRedNo){
02903 parent = newParent;
02904 kids = newKids;
02905 maxModificationRedNo = INT_MAX;
02906 numKids = kids.size();
02907 readyDeletion = true;
02908 additionalGCount = newAdditionalGCount;
02909 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
02910 for(int i=0;i<(int)(newKids.size());i++){
02911 DEBREVAC(("%d ",newKids[i]));
02912 }
02913 DEBREVAC(("\n"));
02914
02915 }else{
02916 if(maxModificationRedNo != INT_MAX){
02917 DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
02918 startReduction(redNo,CkMyNode());
02919 finishReduction();
02920 }
02921 }
02922 }
02923
02924
02925 void CkNodeReductionMgr::doneEvacuate(){
02926 DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
02927
02928
02929
02930
02931
02932
02933
02934
02935
02936
02937
02938
02939
02940
02941
02942
02943
02944
02945
02946
02947
02948
02949
02950
02951
02952 if(readyDeletion){
02953 thisProxy[treeParent()].DeleteChild(CkMyNode());
02954 }else{
02955 thisProxy[newParent].DeleteNewChild(CkMyNode());
02956 }
02957
02958 }
02959
02960 void CkNodeReductionMgr::DeleteChild(int deletedChild){
02961 DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
02962 for(int i=0;i<numKids;i++){
02963 if(kids[i] == deletedChild){
02964 kids.erase(kids.begin() + i);
02965 break;
02966 }
02967 }
02968 numKids = kids.size();
02969 finishReduction();
02970 }
02971
02972 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
02973 for(int i=0;i<(int)(newKids.size());i++){
02974 if(newKids[i] == deletedChild){
02975 newKids.erase(newKids.begin() + i);
02976 break;
02977 }
02978 }
02979 DEBREVAC(("[%d]%d> Deleting new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
02980 for(int i=0;i<(int)(newKids.size());i++){
02981 DEBREVAC(("%d ",newKids[i]));
02982 }
02983 DEBREVAC(("\n"));
02984 finishReduction();
02985 }
02986
02987 int CkNodeReductionMgr::findMaxRedNo(){
02988 int max = redNo;
02989 for(int i=0;i<futureRemoteMsgs.length();i++){
02990 if(futureRemoteMsgs[i]->redNo > max){
02991 max = futureRemoteMsgs[i]->redNo;
02992 }
02993 }
02994
02995
02996
02997
02998 if(redNo == max && msgs.length() == 0){
02999 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
03000 max--;
03001 }
03002 return max;
03003 }
03004 #endif //CMK_FAULT_EVAC
03005
03006 #include "CkReduction.def.h"