00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 #include "charm++.h"
00051 #include "ck.h"
00052
00053 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00054 #include "pathHistory.h"
00055 #endif
00056
00057 #if CMK_DEBUG_REDUCTIONS
00058
00059
00060 #define DEBR(x) CkPrintf x
00061 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
00062 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
00063
00064 #define DEBN(x) CkPrintf x
00065 #define AAN "Red Node%d "
00066 #define ABN ,CkMyNode()
00067
00068
00069 #define RED_DEB(x) //CkPrintf x
00070 #define DEBREVAC(x) CkPrintf x
00071 #else
00072
00073 #define DEBR(x) // CkPrintf x
00074 #define DEBRMLOG(x) CkPrintf x
00075 #define AA
00076 #define AB
00077 #define DEBN(x) //CkPrintf x
00078 #define RED_DEB(x) //CkPrintf x
00079 #define DEBREVAC(x) //CkPrintf x
00080 #endif
00081
00082 #ifndef INT_MAX
00083 #define INT_MAX 2147483647
00084 #endif
00085
00086 extern int _inrestart;
00087
00088 Group::Group()
00089 {
00090 if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
00091
00092 creatingContributors();
00093 contributorStamped(&reductionInfo);
00094 contributorCreated(&reductionInfo);
00095 doneCreatingContributors();
00096 DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
00097 #if !GROUP_LEVEL_REDUCTION
00098 CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
00099 nodeProxy = nodetemp;
00100 #endif
00101 }
00102
00103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
00104 {
00105 creatingContributors();
00106 contributorStamped(&reductionInfo);
00107 contributorCreated(&reductionInfo);
00108 doneCreatingContributors();
00109 }
00110
00111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
00112 ((CkReductionMgr *)this),
00113 reductionInfo,false)
00114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
00115
00116
00117
00118 CkGroupInitCallback::CkGroupInitCallback(void) {}
00119
00120
00121
00122
00123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
00124 {
00125 m->call();
00126 delete m;
00127 }
00128
00129
00130
00131
00132
00133 CkGroupReadyCallback::CkGroupReadyCallback(void)
00134 {
00135 _isReady = 0;
00136 }
00137 void
00138 CkGroupReadyCallback::callBuffered(void)
00139 {
00140 int n = _msgs.length();
00141 for(int i=0;i<n;i++)
00142 {
00143 CkGroupCallbackMsg *msg = _msgs.deq();
00144 msg->call();
00145 delete msg;
00146 }
00147 }
00148 void
00149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
00150 {
00151 if(_isReady) {
00152 msg->call();
00153 delete msg;
00154 } else {
00155 _msgs.enq(msg);
00156 }
00157 }
00158
00159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
00160 :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
00161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
00162 {
00163 CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
00164 CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
00165 b->fn(b->param,m->getSize(),m->getData());
00166 delete m;
00167 }
00168
00170
00171
00172
00173
00174
00175
00176
00177 CkReductionMgr::CkReductionMgr()
00178 : thisProxy(thisgroup)
00179 {
00180 #if GROUP_LEVEL_REDUCTION
00181 #ifdef BINOMIAL_TREE
00182 init_BinomialTree();
00183 #else
00184 init_BinaryTree();
00185 #endif
00186 #endif
00187 redNo=0;
00188 completedRedNo = -1;
00189 inProgress=CmiFalse;
00190 creating=CmiFalse;
00191 startRequested=CmiFalse;
00192 gcount=lcount=0;
00193 nContrib=nRemote=0;
00194 maxStartRequest=0;
00195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00196 if(CkMyPe() != 0){
00197 perProcessorCounts = NULL;
00198 }else{
00199 perProcessorCounts = new int[CmiNumPes()];
00200 for(int i=0;i<CmiNumPes();i++){
00201 perProcessorCounts[i] = -1;
00202 }
00203 }
00204 totalCount = 0;
00205 processorCount = 0;
00206 #endif
00207 disableNotifyChildrenStart = CmiFalse;
00208 DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
00209 }
00210
00211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
00212 {
00213 redNo=0;
00214 completedRedNo = -1;
00215 inProgress=CmiFalse;
00216 creating=CmiFalse;
00217 startRequested=CmiFalse;
00218 gcount=lcount=0;
00219 nContrib=nRemote=0;
00220 maxStartRequest=0;
00221 DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
00222 }
00223
00224 void CkReductionMgr::flushStates(int isgroup)
00225 {
00226
00227 redNo=0;
00228 completedRedNo = -1;
00229 inProgress=CmiFalse;
00230 creating=CmiFalse;
00231 if (!isgroup) gcount=lcount=0;
00232 startRequested=CmiFalse;
00233 nContrib=nRemote=0;
00234 maxStartRequest=0;
00235
00236 while (!msgs.isEmpty()) { delete msgs.deq(); }
00237 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
00238 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
00239 while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
00240
00241 adjVec.length()=0;
00242
00243 #if ! GROUP_LEVEL_REDUCTION
00244 nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
00245 #endif
00246 }
00247
00249
00250
00251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
00252 {
00253 DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
00254
00255 if (CkMyPe()!=0)
00256 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
00257 storedCallback=*cb;
00258 #if ! GROUP_LEVEL_REDUCTION
00259 CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
00260 nodeProxy.ckSetReductionClient(callback);
00261 #endif
00262 }
00263
00265
00266
00267
00268
00269 void contributorInfo::pup(PUP::er &p)
00270 {
00271 p(redNo);
00272 }
00273
00275
00276
00277
00278 void CkReductionMgr::creatingContributors(void)
00279 {
00280 DEBR((AA"Creating contributors...\n"AB));
00281 creating=CmiTrue;
00282 }
00283 void CkReductionMgr::doneCreatingContributors(void)
00284 {
00285 DEBR((AA"Done creating contributors...\n"AB));
00286 creating=CmiFalse;
00287 if (startRequested) startReduction(redNo,CkMyPe());
00288 finishReduction();
00289 }
00290
00291
00292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
00293 {
00294 DEBR((AA"Contributor %p stamped\n"AB,ci));
00295
00296 gcount++;
00297 if (inProgress)
00298 {
00299 ci->redNo=redNo+1;
00300 adj(redNo).gcount--;
00301 } else
00302 ci->redNo=redNo;
00303 }
00304
00305
00306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
00307 {
00308 DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
00309
00310 lcount++;
00311
00312 for (int r=redNo;r<ci->redNo;r++)
00313 adj(r).lcount--;
00314 }
00315
00316
00317
00318
00319
00320
00321
00322 void CkReductionMgr::contributorDied(contributorInfo *ci)
00323 {
00324 #if CMK_MEM_CHECKPOINT
00325
00326 if (CkInRestarting()) return;
00327 #endif
00328 DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
00329
00330 gcount--;
00331
00332 if (ci->redNo<redNo)
00333 {
00334
00335 DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
00336 for (int r=ci->redNo;r<redNo;r++)
00337 thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
00338 }
00339
00340
00341 int r;
00342 for (r=redNo;r<ci->redNo;r++)
00343 {
00344 DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
00345 adj(r).gcount++;
00346 }
00347
00348 lcount--;
00349
00350 for (r=redNo;r<ci->redNo;r++)
00351 adj(r).lcount++;
00352
00353 finishReduction();
00354 }
00355
00356
00357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
00358 {
00359 DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
00360 lcount--;
00361
00362 for (int r=redNo;r<ci->redNo;r++)
00363 adj(r).lcount++;
00364
00365 finishReduction();
00366 }
00367
00368
00369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
00370 {
00371 DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
00372 lcount++;
00373 #if CMK_MEM_CHECKPOINT
00374
00375
00376 if (CkInRestarting()) return;
00377 #endif
00378
00379 for (int r=redNo;r<ci->redNo;r++)
00380 adj(r).lcount--;
00381
00382 }
00383
00384
00385
00386
00387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
00388 {
00389 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00390 Chare *oldObj =CpvAccess(_currentObj);
00391 CpvAccess(_currentObj) = this;
00392 #endif
00393
00394 #if CMK_BIGSIM_CHARM
00395 _TRACE_BG_TLINE_END(&(m->log));
00396 #endif
00397 DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
00398
00399 m->redNo=ci->redNo++;
00400 m->sourceFlag=-1;
00401 m->gcount=0;
00402
00403 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00404 if(lcount == 0){
00405 m->sourceProcessorCount = 1;
00406 }else{
00407 m->sourceProcessorCount = lcount;
00408 }
00409 m->fromPE = CmiMyPe();
00410 Chare *oldObj =CpvAccess(_currentObj);
00411 char currentObjName[100];
00412 DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
00413 thisProxy[0].contributeViaMessage(m);
00414 #else
00415 addContribution(m);
00416 #endif
00417
00418 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00419 CpvAccess(_currentObj) = oldObj;
00420 #endif
00421 }
00422
00423 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00424 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
00425 CmiAssert(CmiMyPe() == 0);
00426 DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
00427 if(redNo == 0){
00428 if(perProcessorCounts[m->fromPE] == -1){
00429 processorCount++;
00430 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
00431 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
00432 }else{
00433 if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
00434 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
00435 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
00436 }
00437 }
00438 if(processorCount == CmiNumPes()){
00439 totalCount = 0;
00440 for(int i=0;i<CmiNumPes();i++){
00441 CmiAssert(perProcessorCounts[i] != -1);
00442 totalCount += perProcessorCounts[i];
00443 }
00444 DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
00445 }
00446 if(m->sourceProcessorCount == 0){
00447 if(processorCount == CmiNumPes()){
00448 finishReduction();
00449 }
00450 return;
00451 }
00452 }
00453 addContribution(m);
00454 }
00455 #else
00456 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
00457 #endif
00458
00460
00461 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
00462 {
00463 if(CkMyPe()==0){
00464
00465
00466
00467 }
00468 DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
00469 int srcPE = (UsrToEnv(m))->getSrcPe();
00470 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
00471 if (isPresent(m->num) && !inProgress)
00472 {
00473 DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
00474 startReduction(m->num,srcPE);
00475 finishReduction();
00476 } else if (isFuture(m->num)){
00477
00478 DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
00479 if(maxStartRequest < m->num){
00480 maxStartRequest = m->num;
00481 }
00482
00483
00484 }
00485 else
00486 DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
00487 delete m;
00488 #else
00489 if(redNo == 0){
00490 if(lcount == 0){
00491 DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
00492 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
00493 dummy->fromPE = CmiMyPe();
00494 dummy->sourceProcessorCount = 0;
00495 dummy->redNo = 0;
00496 thisProxy[0].contributeViaMessage(dummy);
00497 }
00498 }
00499 #endif
00500 }
00501
00502
00503
00504 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
00505 {
00506 #if GROUP_LEVEL_REDUCTION
00507 #if CMK_BIGSIM_CHARM
00508 _TRACE_BG_TLINE_END(&(m->log));
00509 #endif
00510 addContribution(m);
00511 #else
00512 m->secondaryCallback = m->callback;
00513 m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
00514 CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
00515 nodeMgr->LateMigrantMsg(m);
00516
00517
00518
00519
00520 #endif
00521 }
00522
00523
00524 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
00525 {
00526 if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
00527 DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
00528
00529 adj(m->num).gcount--;
00530 finishReduction();
00531 delete m;
00532 }
00533
00535 void CkReductionMgr::startReduction(int number,int srcPE)
00536 {
00537 if (isFuture(number)){ return;}
00538 if (isPast(number)) {return;}
00539 if (inProgress){
00540 DEBR((AA"This reduction is already in progress\n"AB));
00541 return;
00542 }
00543 if (creating)
00544 {
00545 DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
00546 startRequested=CmiTrue;
00547 return;
00548 }
00549
00550
00551 DEBR((AA"Starting reduction #%d %d %d \n"AB,redNo,completedRedNo,number));
00552 inProgress=CmiTrue;
00553
00554
00555
00556
00557
00558 if(!CmiNodeAlive(CkMyPe())){
00559 return;
00560 }
00561
00562 if(disableNotifyChildrenStart) return;
00563
00564 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00565 if(CmiMyPe() == 0 && redNo == 0){
00566 for(int j=0;j<CkNumPes();j++){
00567 if(j != CkMyPe() && j != srcPE){
00568 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
00569 }
00570 }
00571 if(lcount == 0){
00572 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
00573 dummy->fromPE = CmiMyPe();
00574 dummy->sourceProcessorCount = 0;
00575 dummy->redNo = 0;
00576 thisProxy[0].contributeViaMessage(dummy);
00577 }
00578 } else{
00579 thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
00580 }
00581
00582
00583 #else
00584
00585 #if GROUP_LEVEL_REDUCTION
00586 for (int k=0;k<treeKids();k++)
00587 {
00588 DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
00589 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
00590 }
00591 #else
00592 nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
00593 #endif
00594 #endif
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642 }
00643
00644
00645 void CkReductionMgr::addContribution(CkReductionMsg *m)
00646 {
00647 if (isPast(m->redNo))
00648 {
00649 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00650 CmiAbort("this version should not have late migrations");
00651 #else
00652
00653 DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
00654
00655
00656 thisProxy[0].LateMigrantMsg(m);
00657 #endif
00658 }
00659 else if (isFuture(m->redNo)) {
00660 DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
00661 futureMsgs.enq(m);
00662 } else {
00663 DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
00664
00665 startReduction(m->redNo,CkMyPe());
00666 msgs.enq(m);
00667 nContrib++;
00668 finishReduction();
00669 }
00670 }
00671
00675 void CkReductionMgr::finishReduction(void)
00676 {
00677
00678 DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
00679 if ((!inProgress) || creating){
00680 DEBR((AA"Either not in Progress or creating\n"AB));
00681 return;
00682 }
00683
00684 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
00685
00686 if (nContrib<(lcount+adj(redNo).lcount)){
00687 DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
00688 return;
00689 }
00690 #if GROUP_LEVEL_REDUCTION
00691 if (nRemote<treeKids()) return;
00692 #endif
00693
00694 #else
00695
00696 if(CkMyPe() != 0){
00697 if(redNo != 0){
00698 return;
00699 }else{
00700 CmiAssert(lcount == 0);
00701 CkReductionMsg *dummy = reduceMessages();
00702 dummy->fromPE = CmiMyPe();
00703 dummy->sourceProcessorCount = 0;
00704 thisProxy[0].contributeViaMessage(dummy);
00705 return;
00706 }
00707 }
00708 if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
00709 DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
00710 return;
00711 }else{
00712 DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
00713 }
00714
00715
00716 #endif
00717
00718 DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
00719 CkReductionMsg *result=reduceMessages();
00720 result->redNo=redNo;
00721 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
00722
00723 #if GROUP_LEVEL_REDUCTION
00724 if (hasParent())
00725 {
00726 DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
00727 DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
00728 result->gcount+=gcount+adj(redNo).gcount;
00729 thisProxy[treeParent()].RecvMsg(result);
00730 }
00731 else
00732 {
00733 DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
00734 int totalElements=result->gcount+gcount+adj(redNo).gcount;
00735 if (totalElements>result->nSources())
00736 {
00737 DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
00738 msgs.enq(result);
00739 return;
00740 } else if (totalElements<result->nSources()) {
00741 DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
00742 CkAbort("ERROR! Too many contributions at root!\n");
00743 }
00744 DEBR((AA"Passing result to client function\n"AB));
00745 CkSetRefNum(result, result->getUserFlag());
00746 if (!result->callback.isInvalid())
00747 result->callback.send(result);
00748 else if (!storedCallback.isInvalid())
00749 storedCallback.send(result);
00750 else
00751 CkAbort("No reduction client!\n"
00752 "You must register a client with either SetReductionClient or during contribute.\n");
00753 }
00754
00755 #else
00756 result->gcount+=gcount+adj(redNo).gcount;
00757
00758 result->secondaryCallback = result->callback;
00759 result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
00760 DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
00761
00762
00763
00764
00765
00766
00767 CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
00768 nodeMgr->contributeArrayReduction(result);
00769 #endif
00770 #else // _FAULT_MLOG_
00771 DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
00772
00773 CkSetRefNum(result, result->getUserFlag());
00774 if (!result->callback.isInvalid())
00775 result->callback.send(result);
00776 else if (!storedCallback.isInvalid())
00777 storedCallback.send(result);
00778 else{
00779 DEBR(("No reduction client for group %d \n",thisgroup.idx));
00780 CkAbort("No reduction client!\n"
00781 "You must register a client with either SetReductionClient or during contribute.\n");
00782 }
00783
00784 DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
00785
00786 #endif // _FAULT_MLOG_
00787
00788
00789 redNo++;
00790
00791 int i;
00792 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
00793
00794 if(CkMyPe()!=0)
00795 #endif
00796 {
00797 completedRedNo++;
00798 for (i=1;i<(int)(adjVec.length());i++){
00799 adjVec[i-1]=adjVec[i];
00800 }
00801 adjVec.length()--;
00802 }
00803
00804 inProgress=CmiFalse;
00805 startRequested=CmiFalse;
00806 nRemote=nContrib=0;
00807
00808
00809 int n=futureMsgs.length();
00810 for (i=0;i<n;i++)
00811 {
00812 CkReductionMsg *m=futureMsgs.deq();
00813 if (m!=NULL)
00814 addContribution(m);
00815 }
00816 #if GROUP_LEVEL_REDUCTION
00817 n=futureRemoteMsgs.length();
00818 for (i=0;i<n;i++)
00819 {
00820 CkReductionMsg *m=futureRemoteMsgs.deq();
00821 if (m!=NULL) {
00822 RecvMsg(m);
00823 }
00824 }
00825 #endif
00826
00827 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
00828 if(maxStartRequest >= redNo){
00829 startReduction(redNo,CkMyPe());
00830 finishReduction();
00831 }
00832
00833 #endif
00834 }
00835
00836
00837 void CkReductionMgr::RecvMsg(CkReductionMsg *m)
00838 {
00839 #if GROUP_LEVEL_REDUCTION
00840 #if CMK_BIGSIM_CHARM
00841 _TRACE_BG_TLINE_END(&m->log);
00842 #endif
00843 if (isPresent(m->redNo)) {
00844 DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
00845 startReduction(m->redNo, CkMyPe());
00846 msgs.enq(m);
00847 nRemote++;
00848 finishReduction();
00849 }
00850 else if (isFuture(m->redNo)) {
00851 DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
00852 futureRemoteMsgs.enq(m);
00853 }
00854 else CkAbort("Recv'd late remote contribution!\n");
00855 #endif
00856 }
00857
00859
00860
00861 countAdjustment &CkReductionMgr::adj(int number)
00862 {
00863 number-=completedRedNo;
00864 number--;
00865 if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
00866
00867 while ((int)(adjVec.length())<=number)
00868 adjVec.push_back(countAdjustment());
00869 return adjVec[number];
00870 }
00871
00872
00873 CkReductionMsg *CkReductionMgr::reduceMessages(void)
00874 {
00875 #if CMK_BIGSIM_CHARM
00876 _TRACE_BG_END_EXECUTE(1);
00877 void* _bgParentLog = NULL;
00878 _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
00879 #endif
00880 CkReductionMsg *ret=NULL;
00881
00882
00883 CkReduction::reducerType r=CkReduction::invalid;
00884 int msgs_gcount=0;
00885 int msgs_nSources=0;
00886 int msgs_userFlag=-1;
00887 CkCallback msgs_callback;
00888 int i;
00889 int nMsgs=0;
00890 CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
00891 CkReductionMsg *m;
00892 bool isMigratableContributor;
00893
00894
00895 while (NULL!=(m=msgs.deq()))
00896 {
00897 msgs_gcount+=m->gcount;
00898 if (m->sourceFlag!=0)
00899 {
00900 msgArr[nMsgs++]=m;
00901 msgs_nSources+=m->nSources();
00902 r=m->reducer;
00903 if (!m->callback.isInvalid())
00904 msgs_callback=m->callback;
00905 if (m->userFlag!=-1)
00906 msgs_userFlag=m->userFlag;
00907
00908 isMigratableContributor=m->isMigratableContributor();
00909 #if CMK_BIGSIM_CHARM
00910 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
00911 #endif
00912 }
00913 else
00914 {
00915 delete m;
00916 }
00917 }
00918
00919 if (nMsgs==0||r==CkReduction::invalid)
00920
00921 ret=CkReductionMsg::buildNew(0,NULL);
00922 else
00923 {
00924
00925 if(nMsgs == 1){
00926 ret = msgArr[0];
00927 }else{
00928 CkReduction::reducerFn f=CkReduction::reducerTable[r];
00929 ret=(*f)(nMsgs,msgArr);
00930 }
00931 ret->reducer=r;
00932 }
00933
00934
00935
00936 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
00937
00938 #if CRITICAL_PATH_DEBUG > 3
00939 CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
00940 #endif
00941
00942 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
00943 path.updateMax(UsrToEnv(ret));
00944
00945 for (i=0;i<nMsgs;i++){
00946 if (msgArr[i]!=ret){
00947
00948 path.updateMax(UsrToEnv(msgArr[i]));
00949 }
00950 }
00951
00952
00953 #if CRITICAL_PATH_DEBUG > 3
00954 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
00955 #endif
00956
00957 PathHistoryTableEntry tableEntry(path);
00958 tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
00959
00960 #endif
00961
00962
00963 for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
00964 delete [] msgArr;
00965
00966
00967 ret->redNo=redNo;
00968 ret->gcount=msgs_gcount;
00969 ret->userFlag=msgs_userFlag;
00970 ret->callback=msgs_callback;
00971 ret->sourceFlag=msgs_nSources;
00972 ret->setMigratableContributor(isMigratableContributor);
00973 DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
00974
00975 return ret;
00976 }
00977
00978
00979
00980
00981
00982
00983
00984 void CkReductionMgr::pup(PUP::er &p)
00985 {
00986
00987
00988 CkGroupInitCallback::pup(p);
00989 p(redNo);
00990 p(completedRedNo);
00991 p(inProgress); p(creating); p(startRequested);
00992 p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
00993 p|msgs;
00994 p|futureMsgs;
00995 p|futureRemoteMsgs;
00996 p|finalMsgs;
00997 p|adjVec;
00998 p|nodeProxy;
00999 p|storedCallback;
01000
01001 if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn)
01002 {
01003 CkReductionClientBundle *bd;
01004 if (p.isUnpacking())
01005 bd = new CkReductionClientBundle;
01006 else
01007 bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
01008 p|*bd;
01009 if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
01010 }
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
01021
01022
01023
01024 #endif
01025 if(p.isUnpacking()){
01026 thisProxy = thisgroup;
01027 maxStartRequest=0;
01028 #if GROUP_LEVEL_REDUCTION
01029 #ifdef BINOMIAL_TREE
01030 init_BinomialTree();
01031 #else
01032 init_BinaryTree();
01033 #endif
01034 #endif
01035 }
01036
01037 DEBR(("[%d,%d] pupping _____________ gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
01038 }
01039
01040
01041
01042
01043 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
01044
01045 int total = m->gcount+adj(m->redNo).gcount;
01046 finalMsgs.enq(m);
01047
01048 adj(m->redNo).mainRecvd = 1;
01049 DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
01050 endArrayReduction();
01051 }
01052
01053 void CkReductionMgr :: endArrayReduction(){
01054 CkReductionMsg *ret=NULL;
01055 int nMsgs=finalMsgs.length();
01056
01057
01058
01059 CkReduction::reducerType r=CkReduction::invalid;
01060 int msgs_gcount=0;
01061 int msgs_nSources=0;
01062 int msgs_userFlag=-1;
01063 CkCallback msgs_callback;
01064 CkCallback msgs_secondaryCallback;
01065 CkVec<CkReductionMsg *> tempMsgs;
01066 int i;
01067 int numMsgs = 0;
01068 for (i=0;i<nMsgs;i++)
01069 {
01070 CkReductionMsg *m=finalMsgs.deq();
01071 if(m->redNo == completedRedNo +1){
01072 msgs_gcount+=m->gcount;
01073 if (m->sourceFlag!=0)
01074 {
01075 msgs_nSources+=m->nSources();
01076 r=m->reducer;
01077 if (!m->callback.isInvalid())
01078 msgs_callback=m->callback;
01079 if(!m->secondaryCallback.isInvalid())
01080 msgs_secondaryCallback = m->secondaryCallback;
01081 if (m->userFlag!=-1)
01082 msgs_userFlag=m->userFlag;
01083 tempMsgs.push_back(m);
01084 }
01085 }else{
01086 finalMsgs.enq(m);
01087 }
01088
01089 }
01090 numMsgs = tempMsgs.length();
01091
01092 DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount, adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
01093
01094 if(numMsgs == 0){
01095 return;
01096 }
01097 if(adj(completedRedNo+1).mainRecvd == 0){
01098 for(i=0;i<numMsgs;i++){
01099 finalMsgs.enq(tempMsgs[i]);
01100 }
01101 return;
01102 }
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112
01113 if (nMsgs==0||r==CkReduction::invalid)
01114
01115 ret=CkReductionMsg::buildNew(0,NULL);
01116 else{
01117 CkReduction::reducerFn f=CkReduction::reducerTable[r];
01118
01119 CkReductionMsg **msgArr=&tempMsgs[0];
01120 ret=(*f)(numMsgs,msgArr);
01121 ret->reducer=r;
01122
01123 }
01124
01125
01126 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
01127
01128 #if CRITICAL_PATH_DEBUG > 3
01129 CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
01130 #endif
01131
01132 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
01133 path.updateMax(UsrToEnv(ret));
01134
01135 for (i=0;i<numMsgs;i++){
01136 if (tempMsgs[i]!=ret){
01137
01138 path.updateMax(UsrToEnv(tempMsgs[i]));
01139 } else {
01140
01141 }
01142 }
01143
01144
01145 #if CRITICAL_PATH_DEBUG > 3
01146 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
01147 #endif
01148
01149 #endif
01150
01151
01152
01153
01154
01155 for(i = 0;i<numMsgs;i++){
01156 if (tempMsgs[i] != ret) delete tempMsgs[i];
01157 }
01158
01159
01160
01161
01162 ret->redNo=completedRedNo+1;
01163 ret->gcount=msgs_gcount;
01164 ret->userFlag=msgs_userFlag;
01165 ret->callback=msgs_callback;
01166 ret->secondaryCallback = msgs_secondaryCallback;
01167 ret->sourceFlag=msgs_nSources;
01168
01169 DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
01170
01171 CkSetRefNum(ret, ret->getUserFlag());
01172 if (!ret->secondaryCallback.isInvalid())
01173 ret->secondaryCallback.send(ret);
01174 else if (!storedCallback.isInvalid())
01175 storedCallback.send(ret);
01176 else{
01177 DEBR(("No reduction client for group %d \n",thisgroup.idx));
01178 CkAbort("No reduction client!\n"
01179 "You must register a client with either SetReductionClient or during contribute.\n");
01180 }
01181 completedRedNo++;
01182
01183 DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
01184
01185 for (i=1;i<(int)(adjVec.length());i++)
01186 adjVec[i-1]=adjVec[i];
01187 adjVec.length()--;
01188 endArrayReduction();
01189 }
01190
01191 #if GROUP_LEVEL_REDUCTION
01192
01193 void CkReductionMgr::init_BinaryTree(){
01194 parent = (CkMyPe()-1)/TREE_WID;
01195 int firstkid = CkMyPe()*TREE_WID+1;
01196 numKids=CkNumPes()-firstkid;
01197 if (numKids>TREE_WID) numKids=TREE_WID;
01198 if (numKids<0) numKids=0;
01199
01200 for(int i=0;i<numKids;i++){
01201 kids.push_back(firstkid+i);
01202 newKids.push_back(firstkid+i);
01203 }
01204 }
01205
01206 void CkReductionMgr::init_BinomialTree(){
01207 int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
01208
01209 upperSize = (unsigned) 1 << depth;
01210 label = upperSize-CkMyPe()-1;
01211 int p=label;
01212 int count=0;
01213 while( p > 0){
01214 if(p % 2 == 0)
01215 break;
01216 else{
01217 p = p/2;
01218 count++;
01219 }
01220 }
01221
01222 parent = label + (1<<count);
01223 parent = upperSize -1 -parent;
01224 int temp;
01225 if(count != 0){
01226 numKids = 0;
01227 for(int i=0;i<count;i++){
01228
01229 temp = label - (1<<i);
01230 temp = upperSize-1-temp;
01231 if(temp <= CkNumPes()-1){
01232 kids.push_back(temp);
01233 numKids++;
01234 }
01235 }
01236 }else{
01237 numKids = 0;
01238
01239 }
01240 }
01241
01242
01243 int CkReductionMgr::treeRoot(void)
01244 {
01245 return 0;
01246 }
01247 CmiBool CkReductionMgr::hasParent(void)
01248 {
01249 return (CmiBool)(CkMyPe()!=treeRoot());
01250 }
01251 int CkReductionMgr::treeParent(void)
01252 {
01253 return parent;
01254 }
01255
01256 int CkReductionMgr::firstKid(void)
01257 {
01258 return CkMyPe()*TREE_WID+1;
01259 }
01260 int CkReductionMgr::treeKids(void)
01261 {
01262 return numKids;
01263 }
01264
01265 #endif
01266
01268
01270
01271
01272
01273 CkReductionMsg::CkReductionMsg(){}
01274 CkReductionMsg::~CkReductionMsg(){}
01275
01276
01277
01278 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
01279
01280
01281
01282 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
01283 CkReduction::reducerType reducer, CkReductionMsg *buf)
01284 {
01285 int len[1];
01286 len[0]=NdataSize;
01287 CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
01288
01289 ret->dataSize=NdataSize;
01290 if (srcData!=NULL && !buf)
01291 memcpy(ret->data,srcData,NdataSize);
01292 ret->userFlag=-1;
01293 ret->reducer=reducer;
01294
01295 ret->sourceFlag=-1000;
01296 ret->gcount=0;
01297 ret->migratableContributor = true;
01298 #if CMK_BIGSIM_CHARM
01299 ret->log = NULL;
01300 #endif
01301 return ret;
01302 }
01303
01304
01305 void *
01306 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
01307 {
01308 int totalsize=ARM_DATASTART+(*sz);
01309 DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
01310 CkReductionMsg *ret = (CkReductionMsg *)
01311 CkAllocMsg(msgnum,totalsize,priobits);
01312 ret->data=(void *)(&ret->dataStorage);
01313 return (void *) ret;
01314 }
01315
01316 void *
01317 CkReductionMsg::pack(CkReductionMsg* in)
01318 {
01319 DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
01320
01321 in->data = NULL;
01322 return (void*) in;
01323 }
01324
01325 CkReductionMsg* CkReductionMsg::unpack(void *in)
01326 {
01327 CkReductionMsg *ret = (CkReductionMsg *)in;
01328 DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
01329
01330 ret->data=(void *)(&ret->dataStorage);
01331 return ret;
01332 }
01333
01334
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01351
01352
01353
01354
01355
01356 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
01357 {
01358 CkAbort("Called the invalid reducer type 0. This probably\n"
01359 "means you forgot to initialize your custom reducer index.\n");
01360 return NULL;
01361 }
01362
01363 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
01364 {
01365 return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
01366 }
01367
01368 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
01369 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
01370 {\
01371 RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
01372 int m,i;\
01373 int nElem=msg[0]->getLength()/sizeof(dataType);\
01374 dataType *ret=(dataType *)(msg[0]->getData());\
01375 for (m=1;m<nMsg;m++)\
01376 {\
01377 dataType *value=(dataType *)(msg[m]->getData());\
01378 for (i=0;i<nElem;i++)\
01379 {\
01380 RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
01381 loop\
01382 }\
01383 }\
01384 RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
01385 return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
01386 }
01387
01388
01389 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
01390 SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
01391 SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
01392 SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
01393 SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
01394
01395
01396
01397 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
01398
01399
01400 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
01401
01402
01403 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
01404
01405
01406 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
01407
01408
01409
01410
01411 SIMPLE_REDUCTION(logical_and,int,"%d",
01412 if (value[i]==0)
01413 ret[i]=0;
01414 ret[i]=!!ret[i];
01415 )
01416
01417
01418
01419 SIMPLE_REDUCTION(logical_or,int,"%d",
01420 if (value[i]!=0)
01421 ret[i]=1;
01422 ret[i]=!!ret[i];
01423 )
01424
01425 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
01426 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
01427
01428
01429 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
01430 return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
01431 }
01432
01434
01435
01436
01437
01438 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
01439 {
01440 RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
01441
01442 int i,retSize=0;
01443 for (i=0;i<nMsg;i++)
01444 retSize+=msg[i]->getSize();
01445
01446 RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
01447
01448
01449 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01450
01451
01452 char *cur=(char *)(ret->getData());
01453 for (i=0;i<nMsg;i++) {
01454 int messageBytes=msg[i]->getSize();
01455 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01456 cur+=messageBytes;
01457 }
01458 RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
01459 return ret;
01460 }
01461
01463
01464
01465
01466
01467
01468
01469
01470
01471 static const int alignSize=sizeof(double);
01472 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
01473
01474
01475 static int SET_SIZE(int dataSize)
01476 {return SET_ALIGN(sizeof(int)+dataSize);}
01477
01478
01479 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
01480 {
01481 char *next=((char *)cur)+SET_SIZE(cur->dataSize);
01482 return (CkReduction::setElement *)next;
01483 }
01484
01485
01486
01487 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
01488 {
01489 RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
01490
01491 int i,retSize=0;
01492 for (i=0;i<nMsg;i++) {
01493 if (!msg[i]->isFromUser())
01494
01495 retSize+=(msg[i]->getSize()-sizeof(int));
01496 else
01497 retSize+=SET_SIZE(msg[i]->getSize());
01498 }
01499 retSize+=sizeof(int);
01500
01501 RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
01502
01503
01504 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01505
01506
01507 CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
01508 for (i=0;i<nMsg;i++)
01509 if (!msg[i]->isFromUser())
01510 {
01511 int messageBytes=msg[i]->getSize()-sizeof(int);
01512 RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
01513 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01514 cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
01515 }
01516 else
01517 {
01518 RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
01519 cur->dataSize=msg[i]->getSize();
01520 memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
01521 cur=SET_NEXT(cur);
01522 }
01523 cur->dataSize=-1;
01524 RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
01525 return ret;
01526 }
01527
01528
01529
01530
01531
01532 CkReduction::setElement *CkReduction::setElement::next(void)
01533 {
01534 CkReduction::setElement *n=SET_NEXT(this);
01535 if (n->dataSize==-1)
01536 return NULL;
01537 else
01538 return n;
01539 }
01540
01542 CkReduction::CkReduction() {}
01543
01544
01545
01546 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
01547 {
01548 reducerTable[nReducers]=fn;
01549 return (reducerType)nReducers++;
01550 }
01551
01552
01553
01554
01555
01556
01557 int CkReduction::nReducers=CkReduction::lastSystemReducer;
01558
01559 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
01560 ::invalid_reducer,
01561 ::nop,
01562
01563 ::sum_int,::sum_long,::sum_float,::sum_double,
01564
01565
01566 ::product_int,::product_long,::product_float,::product_double,
01567
01568
01569 ::max_int,::max_long,::max_float,::max_double,
01570
01571
01572 ::min_int,::min_long,::min_float,::min_double,
01573
01574
01575
01576 ::logical_and,
01577
01578
01579
01580 ::logical_or,
01581
01582
01583 ::bitvec_and,
01584
01585
01586 ::bitvec_or,
01587
01588
01589 ::random,
01590
01591
01592 ::concat,
01593
01594
01595
01596 ::set
01597 };
01598
01599
01600
01601
01602
01603
01604
01605
01606
01624 NodeGroup::NodeGroup(void) {
01625 __nodelock=CmiCreateLock();
01626 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01627 mlogData->objID.type = TypeNodeGroup;
01628 mlogData->objID.data.group.onPE = CkMyNode();
01629 #endif
01630
01631 }
01632 NodeGroup::~NodeGroup() {
01633 CmiDestroyLock(__nodelock);
01634 }
01635 void NodeGroup::pup(PUP::er &p)
01636 {
01637 CkNodeReductionMgr::pup(p);
01638 p|reductionInfo;
01639 }
01640
01641
01642
01643 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
01644 DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
01645 ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
01646
01647 }
01648
01649 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
01650 ((CkNodeReductionMgr *)this),
01651 reductionInfo,false)
01652
01653
01654
01655 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
01656 {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
01657
01658
01659
01660
01661
01662 CkNodeReductionMgr::CkNodeReductionMgr()
01663 : thisProxy(thisgroup)
01664 {
01665 #ifdef BINOMIAL_TREE
01666 init_BinomialTree();
01667 #else
01668 init_BinaryTree();
01669 #endif
01670 storedCallback=NULL;
01671 redNo=0;
01672 inProgress=CmiFalse;
01673
01674 startRequested=CmiFalse;
01675 gcount=CkNumNodes();
01676 lcount=1;
01677 nContrib=nRemote=0;
01678 lockEverything = CmiCreateLock();
01679
01680
01681 creating=CmiFalse;
01682 interrupt = 0;
01683 DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
01684
01685
01686
01687 blocked = false;
01688 maxModificationRedNo = INT_MAX;
01689 killed=0;
01690 additionalGCount = newAdditionalGCount = 0;
01691 }
01692
01693 void CkNodeReductionMgr::flushStates()
01694 {
01695 if(CkMyRank() == 0){
01696
01697 redNo=0;
01698 inProgress=CmiFalse;
01699
01700 startRequested=CmiFalse;
01701 gcount=CkNumNodes();
01702 lcount=1;
01703 nContrib=nRemote=0;
01704
01705 creating=CmiFalse;
01706 interrupt = 0;
01707 while (!msgs.isEmpty()) { delete msgs.deq(); }
01708 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
01709 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
01710 while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
01711 }
01712 }
01713
01715
01716
01717 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
01718 {
01719 DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
01720 if(cb->isInvalid()){
01721 DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
01722 }else{
01723 DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
01724 }
01725
01726 if (CkMyNode()!=0)
01727 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
01728 delete storedCallback;
01729 storedCallback=cb;
01730 }
01731
01732
01733
01734 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
01735 {
01736 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01737 Chare *oldObj =CpvAccess(_currentObj);
01738 CpvAccess(_currentObj) = this;
01739 #endif
01740
01741
01742 m->redNo=ci->redNo++;
01743 m->sourceFlag=-1;
01744 m->gcount=0;
01745 DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
01746 addContribution(m);
01747
01748 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01749 CpvAccess(_currentObj) = oldObj;
01750 #endif
01751 }
01752
01753
01754 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
01755 {
01756 #if CMK_BIGSIM_CHARM
01757 _TRACE_BG_TLINE_END(&m->log);
01758 #endif
01759 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01760 Chare *oldObj =CpvAccess(_currentObj);
01761 CpvAccess(_currentObj) = this;
01762 #endif
01763
01764 m->redNo=ci->redNo++;
01765 m->gcount=count;
01766 DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
01767 addContribution(m);
01768 DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
01769
01770 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01771 CpvAccess(_currentObj) = oldObj;
01772 #endif
01773 }
01774
01775
01777
01778
01779 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
01780 {
01781 CmiLock(lockEverything);
01782 if(blocked){
01783 delete m;
01784 CmiUnlock(lockEverything);
01785 return ;
01786 }
01787 int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
01788 if (isPresent(m->num) && !inProgress)
01789 {
01790 DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
01791 startReduction(m->num,srcNode);
01792 finishReduction();
01793 } else if (isFuture(m->num)){
01794 DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
01795 }
01796 else
01797 DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
01798 CmiUnlock(lockEverything);
01799 delete m;
01800
01801 }
01802
01803
01804 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
01805 DEBR(("[%d,%d] doRecvMsg called for %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
01806
01807
01808
01809 if(blocked){
01810 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
01811 bufferedRemoteMsgs.enq(m);
01812 return;
01813 }
01814
01815 if (isPresent(m->redNo)) {
01816
01817 startReduction(m->redNo,CkMyNode());
01818 msgs.enq(m);
01819 nRemote++;
01820 finishReduction();
01821 }
01822 else {
01823 if (isFuture(m->redNo)) {
01824
01825 futureRemoteMsgs.enq(m);
01826 }else{
01827 CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);
01828 CkAbort("Recv'd late remote contribution!\n");
01829 }
01830 }
01831 DEBR(("[%d,%d]]]]] doRecvMsg called for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
01832 }
01833
01834
01835 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
01836 {
01837 #if CMK_BIGSIM_CHARM
01838 _TRACE_BG_TLINE_END(&m->log);
01839 #endif
01840 #ifndef CMK_CPV_IS_SMP
01841 #if CMK_IMMEDIATE_MSG
01842 if(interrupt == 1){
01843
01844 CpvAccess(_qd)->process(-1);
01845 CmiDelayImmediate();
01846 return;
01847 }
01848 #endif
01849 #endif
01850 interrupt = 1;
01851 CmiLock(lockEverything);
01852 DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
01853 doRecvMsg(m);
01854 CmiUnlock(lockEverything);
01855 interrupt = 0;
01856 DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
01857 }
01858
01859 void CkNodeReductionMgr::startReduction(int number,int srcNode)
01860 {
01861 if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
01862 if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
01863 if (inProgress){
01864 DEBR((AA"This Node reduction is already in progress\n"AB));
01865 return;
01866 }
01867 if (creating)
01868 {
01869 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
01870 startRequested=CmiTrue;
01871 return;
01872 }
01873
01874
01875 DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
01876 inProgress=CmiTrue;
01877
01878 if(!_isNotifyChildInRed) return;
01879
01880
01881
01882 for (int k=0;k<treeKids();k++)
01883 {
01884 #ifdef BINOMIAL_TREE
01885 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
01886 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
01887 #else
01888 if(kids[k] != srcNode){
01889 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
01890 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
01891 }
01892 #endif
01893 }
01894
01895 DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
01896
01897
01898 startLocalGroupReductions(number);
01899
01900
01901
01902
01903 }
01904
01905
01906 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
01907 CmiLock(lockEverything);
01908 if (startLocalGroupReductions(number) == 0)
01909 thisProxy[CkMyNode()].restartLocalGroupReductions(number);
01910 CmiUnlock(lockEverything);
01911 }
01912
01913 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
01914
01915
01916
01917 if(blocked){
01918 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
01919 bufferedMsgs.enq(m);
01920 return;
01921 }
01922
01923 if (isFuture(m->redNo)) {
01924 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
01925 futureMsgs.enq(m);
01926 } else {
01927 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
01928
01929 startReduction(m->redNo,CkMyNode());
01930 msgs.enq(m);
01931 nContrib++;
01932 finishReduction();
01933 }
01934 }
01935
01936
01937 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
01938 {
01939 interrupt = 1;
01940 CmiLock(lockEverything);
01941 doAddContribution(m);
01942 CmiUnlock(lockEverything);
01943 interrupt = 0;
01944 }
01945
01946 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
01947 if(blocked){
01948 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
01949 bufferedMsgs.enq(m);
01950 return;
01951 }
01952
01953 if (isFuture(m->redNo)) {
01954 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
01955
01956 futureLateMigrantMsgs.enq(m);
01957 } else {
01958 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
01959
01960 msgs.enq(m);
01961 finishReduction();
01962 }
01963 }
01964
01965
01966
01967
01968
01972 void CkNodeReductionMgr::finishReduction(void)
01973 {
01974 DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
01975
01976 if ((!inProgress) || creating){
01977 DEBR((AA"Either not in Progress or creating\n"AB));
01978 return;
01979 }
01980
01981 if (nContrib<(lcount)){
01982 DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
01983
01984 return;
01985 }
01986 if (nRemote<treeKids()){
01987 DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
01988
01989 return;
01990 }
01991 if (nRemote>treeKids()){
01992
01993 interrupt = 0;
01994 CkAbort("Nodegrp Excess remote reduction message received!\n");
01995 }
01996
01997 DEBR((AA"Reducing node data...\n"AB));
01998
02000 CkReductionMsg *result=reduceMessages();
02001
02002 if (hasParent())
02003 {
02004 if(CmiNodeAlive(CkMyNode()) || killed == 0){
02005 DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
02006 DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
02007
02008
02009
02010 result->gcount += additionalGCount;
02011 thisProxy[treeParent()].RecvMsg(result);
02012 }
02013
02014 }
02015 else
02016 {
02017 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
02018 DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
02019 msgs.enq(result);
02020 return;
02021 }
02022 result->gcount += additionalGCount;
02027 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
02028 CkSetRefNum(result, result->getUserFlag());
02029 if (!result->callback.isInvalid()){
02030 DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
02031 result->callback.send(result);
02032 }
02033 else if (storedCallback!=NULL){
02034 DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
02035 storedCallback->send(result);
02036 }
02037 else{
02038 DEBR((AA"Invalid Callback \n"AB));
02039 CkAbort("No reduction client!\n"
02040 "You must register a client with either SetReductionClient or during contribute.\n");
02041 }
02042 }
02043
02044
02045
02046 redNo++;
02047 updateTree();
02048 int i;
02049 inProgress=CmiFalse;
02050 startRequested=CmiFalse;
02051 nRemote=nContrib=0;
02052
02053
02054 int n=futureMsgs.length();
02055
02056 for (i=0;i<n;i++)
02057 {
02058 interrupt = 1;
02059
02060 CkReductionMsg *m=futureMsgs.deq();
02061
02062 interrupt = 0;
02063 if (m!=NULL){
02064 DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
02065 doAddContribution(m);
02066 }
02067 }
02068
02069 interrupt = 1;
02070
02071 n=futureRemoteMsgs.length();
02072
02073 interrupt = 0;
02074 for (i=0;i<n;i++)
02075 {
02076 interrupt = 1;
02077
02078 CkReductionMsg *m=futureRemoteMsgs.deq();
02079
02080 interrupt = 0;
02081 if (m!=NULL)
02082 doRecvMsg(m);
02083 }
02084
02085 n = futureLateMigrantMsgs.length();
02086 for(i=0;i<n;i++){
02087 CkReductionMsg *m = futureLateMigrantMsgs.deq();
02088 if(m != NULL){
02089 if(m->redNo == redNo){
02090 msgs.enq(m);
02091 }else{
02092 futureLateMigrantMsgs.enq(m);
02093 }
02094 }
02095 }
02096 }
02097
02099
02100 void CkNodeReductionMgr::init_BinaryTree(){
02101 parent = (CkMyNode()-1)/TREE_WID;
02102 int firstkid = CkMyNode()*TREE_WID+1;
02103 numKids=CkNumNodes()-firstkid;
02104 if (numKids>TREE_WID) numKids=TREE_WID;
02105 if (numKids<0) numKids=0;
02106
02107 for(int i=0;i<numKids;i++){
02108 kids.push_back(firstkid+i);
02109 newKids.push_back(firstkid+i);
02110 }
02111 }
02112
02113 void CkNodeReductionMgr::init_BinomialTree(){
02114 int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
02115
02116 upperSize = (unsigned) 1 << depth;
02117 label = upperSize-CkMyNode()-1;
02118 int p=label;
02119 int count=0;
02120 while( p > 0){
02121 if(p % 2 == 0)
02122 break;
02123 else{
02124 p = p/2;
02125 count++;
02126 }
02127 }
02128
02129 parent = label + (1<<count);
02130 parent = upperSize -1 -parent;
02131 int temp;
02132 if(count != 0){
02133 numKids = 0;
02134 for(int i=0;i<count;i++){
02135
02136 temp = label - (1<<i);
02137 temp = upperSize-1-temp;
02138 if(temp <= CkNumNodes()-1){
02139
02140 kids.push_back(temp);
02141 numKids++;
02142 }
02143 }
02144 }else{
02145 numKids = 0;
02146
02147 }
02148 }
02149
02150
02151 int CkNodeReductionMgr::treeRoot(void)
02152 {
02153 return 0;
02154 }
02155 CmiBool CkNodeReductionMgr::hasParent(void)
02156 {
02157 return (CmiBool)(CkMyNode()!=treeRoot());
02158 }
02159 int CkNodeReductionMgr::treeParent(void)
02160 {
02161 return parent;
02162 }
02163
02164 int CkNodeReductionMgr::firstKid(void)
02165 {
02166 return CkMyNode()*TREE_WID+1;
02167 }
02168 int CkNodeReductionMgr::treeKids(void)
02169 {
02170 #ifdef BINOMIAL_TREE
02171 return numKids;
02172 #else
02173
02174
02175
02176
02177 return numKids;
02178 #endif
02179 }
02180
02181
02182 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
02183 {
02184 #if CMK_BIGSIM_CHARM
02185 _TRACE_BG_END_EXECUTE(1);
02186 void* _bgParentLog = NULL;
02187 _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
02188 #endif
02189 CkReductionMsg *ret=NULL;
02190
02191
02192 CkReduction::reducerType r=CkReduction::invalid;
02193 int msgs_gcount=0;
02194 int msgs_nSources=0;
02195 int msgs_userFlag=-1;
02196 CkCallback msgs_callback;
02197 CkCallback msgs_secondaryCallback;
02198 int i;
02199 int nMsgs=0;
02200 CkReductionMsg *m;
02201 CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
02202 bool isMigratableContributor;
02203
02204
02205 while(NULL!=(m=msgs.deq()))
02206 {
02207 DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));
02208 msgs_gcount+=m->gcount;
02209 if (m->sourceFlag!=0)
02210 {
02211 msgArr[nMsgs++]=m;
02212 msgs_nSources+=m->nSources();
02213 r=m->reducer;
02214 if (!m->callback.isInvalid())
02215 msgs_callback=m->callback;
02216 if(!m->secondaryCallback.isInvalid()){
02217 msgs_secondaryCallback = m->secondaryCallback;
02218 }
02219 if (m->userFlag!=-1)
02220 msgs_userFlag=m->userFlag;
02221
02222 isMigratableContributor= m->isMigratableContributor();
02223 #if CMK_BIGSIM_CHARM
02224 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
02225 #endif
02226
02227 }
02228 else
02229 {
02230 delete m;
02231 }
02232 }
02233
02234 if (nMsgs==0||r==CkReduction::invalid)
02235
02236 ret=CkReductionMsg::buildNew(0,NULL);
02237 else
02238 {
02239 if(nMsgs == 1){
02240 ret = msgArr[0];
02241 }else{
02242 CkReduction::reducerFn f=CkReduction::reducerTable[r];
02243 ret=(*f)(nMsgs,msgArr);
02244 }
02245 ret->reducer=r;
02246 }
02247
02248
02249 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
02250 #if CRITICAL_PATH_DEBUG > 3
02251 CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
02252 #endif
02253 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
02254 path.updateMax(UsrToEnv(ret));
02255
02256 for (i=0;i<nMsgs;i++){
02257 if (msgArr[i]!=ret){
02258
02259 path.updateMax(UsrToEnv(msgArr[i]));
02260 } else {
02261
02262 }
02263 }
02264 #if CRITICAL_PATH_DEBUG > 3
02265 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
02266 #endif
02267
02268 #endif
02269
02270
02271
02272 for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
02273 delete [] msgArr;
02274
02275 ret->redNo=redNo;
02276 ret->gcount=msgs_gcount;
02277 ret->userFlag=msgs_userFlag;
02278 ret->callback=msgs_callback;
02279 ret->secondaryCallback = msgs_secondaryCallback;
02280 ret->sourceFlag=msgs_nSources;
02281 ret->setMigratableContributor(isMigratableContributor);
02282 DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
02283 #if CMK_BIGSIM_CHARM
02284 _TRACE_BG_TLINE_END(&ret->log);
02285 #endif
02286
02287 return ret;
02288 }
02289
02290 void CkNodeReductionMgr::pup(PUP::er &p)
02291 {
02292
02293
02294 IrrGroup::pup(p);
02295 p(redNo);
02296 p(inProgress); p(creating); p(startRequested);
02297 p(lcount);
02298 p(nContrib); p(nRemote);
02299 p(interrupt);
02300 p|msgs;
02301 p|futureMsgs;
02302 p|futureRemoteMsgs;
02303 p|futureLateMigrantMsgs;
02304 p|parent;
02305 p|additionalGCount;
02306 p|newAdditionalGCount;
02307 if(p.isUnpacking()) {
02308 gcount=CkNumNodes();
02309 thisProxy = thisgroup;
02310 lockEverything = CmiCreateLock();
02311 #ifdef BINOMIAL_TREE
02312 init_BinomialTree();
02313 #else
02314 init_BinaryTree();
02315 #endif
02316 }
02317 p | blocked;
02318 p | maxModificationRedNo;
02319
02320 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
02321 int isnull = (storedCallback == NULL);
02322 p | isnull;
02323 if (!isnull) {
02324 if (p.isUnpacking()) {
02325 storedCallback = new CkCallback;
02326 }
02327 p|*storedCallback;
02328 }
02329 #endif
02330
02331 }
02332
02333
02334
02335
02336
02337
02338
02339 void CkNodeReductionMgr::evacuate(){
02340 DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
02341 if(treeKids() == 0){
02342
02343
02344
02345 oldleaf=true;
02346 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
02347
02348
02349
02350
02351
02352
02353
02354 int data[2];
02355 data[0]=CkMyNode();
02356 data[1]=getTotalGCount()+additionalGCount;
02357 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
02358 newParent = treeParent();
02359 }else{
02360 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
02361 oldleaf= false;
02362
02363
02364
02365
02366
02367
02368 newParent = kids[0];
02369 for(int i=numKids-1;i>=0;i--){
02370 newKids.remove(i);
02371 }
02372
02373
02374
02375
02376
02377
02378
02379 int oldParentData[2];
02380 oldParentData[0] = CkMyNode();
02381 oldParentData[1] = newParent;
02382 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
02383
02384
02385
02386
02387 int childrenData=newParent;
02388 for(int i=1;i<numKids;i++){
02389 thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
02390 }
02391
02392
02393
02394
02395
02396 int *newParentData = new int[numKids+2];
02397 for(int i=1;i<numKids;i++){
02398 newParentData[i] = kids[i];
02399 }
02400 newParentData[0] = CkMyNode();
02401 newParentData[numKids] = parent;
02402 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
02403 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
02404 }
02405 readyDeletion = false;
02406 blocked = true;
02407 numModificationReplies = 0;
02408 tempModificationRedNo = findMaxRedNo();
02409 }
02410
02411
02412
02413
02414
02415
02416
02417
02418
02419 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
02420 DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
02421 int sender;
02422 newKids = kids;
02423 readyDeletion = false;
02424 newAdditionalGCount = additionalGCount;
02425 switch(code){
02426 case OLDPARENT:
02427 for(int i=0;i<numKids;i++){
02428 if(newKids[i] == data[0]){
02429 newKids[i] = data[1];
02430 break;
02431 }
02432 }
02433 sender = data[0];
02434 newParent = parent;
02435 break;
02436 case OLDCHILDREN:
02437 newParent = data[0];
02438 sender = parent;
02439 break;
02440 case NEWPARENT:
02441 for(int i=0;i<size-2;i++){
02442 newKids.push_back(data[i]);
02443 }
02444 newParent = data[size-2];
02445 newAdditionalGCount += data[size-1];
02446 sender = parent;
02447 break;
02448 case LEAFPARENT:
02449 for(int i=0;i<numKids;i++){
02450 if(newKids[i] == data[0]){
02451 newKids.remove(i);
02452 break;
02453 }
02454 }
02455 sender = data[0];
02456 newParent = parent;
02457 newAdditionalGCount += data[1];
02458 break;
02459 };
02460 blocked = true;
02461 int maxRedNo = findMaxRedNo();
02462
02463 thisProxy[sender].collectMaxRedNo(maxRedNo);
02464 }
02465
02466 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
02467
02468
02469
02470
02471 numModificationReplies++;
02472 if(maxRedNo > tempModificationRedNo){
02473 tempModificationRedNo = maxRedNo;
02474 }
02475 if(numModificationReplies == numKids+1){
02476 maxModificationRedNo = tempModificationRedNo;
02477
02478
02479
02480
02481 if(maxModificationRedNo == -1){
02482 printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
02483 }else{
02484 DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
02485 }
02486 thisProxy[parent].unblockNode(maxModificationRedNo);
02487 for(int i=0;i<numKids;i++){
02488 thisProxy[kids[i]].unblockNode(maxModificationRedNo);
02489 }
02490 blocked = false;
02491 updateTree();
02492 clearBlockedMsgs();
02493 }
02494 }
02495
02496 void CkNodeReductionMgr::unblockNode(int maxRedNo){
02497 maxModificationRedNo = maxRedNo;
02498 updateTree();
02499 blocked = false;
02500 clearBlockedMsgs();
02501 }
02502
02503
02504 void CkNodeReductionMgr::clearBlockedMsgs(){
02505 int len = bufferedMsgs.length();
02506 for(int i=0;i<len;i++){
02507 CkReductionMsg *m = bufferedMsgs.deq();
02508 doAddContribution(m);
02509 }
02510 len = bufferedRemoteMsgs.length();
02511 for(int i=0;i<len;i++){
02512 CkReductionMsg *m = bufferedRemoteMsgs.deq();
02513 doRecvMsg(m);
02514 }
02515
02516 }
02517
02518
02519
02520
02521
02522 void CkNodeReductionMgr::updateTree(){
02523 if(redNo > maxModificationRedNo){
02524 parent = newParent;
02525 kids = newKids;
02526 maxModificationRedNo = INT_MAX;
02527 numKids = kids.size();
02528 readyDeletion = true;
02529 additionalGCount = newAdditionalGCount;
02530 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
02531 for(int i=0;i<(int)(newKids.size());i++){
02532 DEBREVAC(("%d ",newKids[i]));
02533 }
02534 DEBREVAC(("\n"));
02535
02536 }else{
02537 if(maxModificationRedNo != INT_MAX){
02538 DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
02539 startReduction(redNo,CkMyNode());
02540 finishReduction();
02541 }
02542 }
02543 }
02544
02545
02546 void CkNodeReductionMgr::doneEvacuate(){
02547 DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
02548
02549
02550
02551
02552
02553
02554
02555
02556
02557
02558
02559
02560
02561
02562
02563
02564
02565
02566
02567
02568
02569
02570
02571
02572
02573 if(readyDeletion){
02574 thisProxy[treeParent()].DeleteChild(CkMyNode());
02575 }else{
02576 thisProxy[newParent].DeleteNewChild(CkMyNode());
02577 }
02578
02579 }
02580
02581 void CkNodeReductionMgr::DeleteChild(int deletedChild){
02582 DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
02583 for(int i=0;i<numKids;i++){
02584 if(kids[i] == deletedChild){
02585 kids.remove(i);
02586 break;
02587 }
02588 }
02589 numKids = kids.length();
02590 finishReduction();
02591 }
02592
02593 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
02594 for(int i=0;i<(int)(newKids.length());i++){
02595 if(newKids[i] == deletedChild){
02596 newKids.remove(i);
02597 break;
02598 }
02599 }
02600 DEBREVAC(("[%d]%d> Deleting new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
02601 for(int i=0;i<(int)(newKids.size());i++){
02602 DEBREVAC(("%d ",newKids[i]));
02603 }
02604 DEBREVAC(("\n"));
02605 finishReduction();
02606 }
02607
02608 int CkNodeReductionMgr::findMaxRedNo(){
02609 int max = redNo;
02610 for(int i=0;i<futureRemoteMsgs.length();i++){
02611 if(futureRemoteMsgs[i]->redNo > max){
02612 max = futureRemoteMsgs[i]->redNo;
02613 }
02614 }
02615
02616
02617
02618
02619 if(redNo == max && msgs.length() == 0){
02620 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
02621 max--;
02622 }
02623 return max;
02624 }
02625
02626
02627 void CkReductionMgr::sanitycheck()
02628 {
02629 #if CMK_ERROR_CHECKING
02630 int count = 0;
02631 while (CkReduction::reducerTable[count] != NULL) count++;
02632 CmiAssert(CkReduction::nReducers == count);
02633 #endif
02634 }
02635
02636 #include "CkReduction.def.h"