00001 #include "charm++.h"
00002 #include "ck.h"
00003 #include "CkArrayReductionMgr.decl.h"
00004 #define ARRREDDEBUG 0
00005
00006 void noopitar(const char*, ...)
00007 {}
00008
00009 #if ARRREDDEBUG
00010 #define ARPRINT CkPrintf
00011 #else
00012 #define ARPRINT noopitar
00013 #endif
00014
00015 void CkArrayReductionMgr::init()
00016 {
00017
00018 redNo=0;
00019 size = CkMyNodeSize();
00020 count = 0;
00021 lockCount = CmiCreateLock();
00022 ctorDoneFlag = 1;
00023 alreadyStarted = -1;
00024 }
00025
00026 CkArrayReductionMgr::CkArrayReductionMgr(){
00027 init();
00028 attachedGroup.setZero();
00029 }
00030
00031 CkArrayReductionMgr::CkArrayReductionMgr(int dummy, CkGroupID gid){
00032 init();
00033 attachedGroup = gid;
00034 }
00035
00036 void CkArrayReductionMgr::flushStates(){
00037 if(CkMyRank()== 0){
00038
00039 redNo=0;
00040 count = 0;
00041 while (!my_msgs.isEmpty()) delete my_msgs.deq();
00042 while (!my_futureMsgs.isEmpty()) delete my_futureMsgs.deq();
00043 reductionInfo.redNo = 0;
00044 CkNodeReductionMgr::flushStates();
00045 }
00046 }
00047
00048 void CkArrayReductionMgr::collectAllMessages(){
00049 if(count == size){
00050 ARPRINT("[%d] CollectAll messages for %d with %d on %p\n",CkMyNode(),redNo,count,this);
00051 CkReductionMsg *result = reduceMessages();
00052 result->redNo = redNo;
00054 contributeWithCounter(result,result->gcount);
00055 count=0;
00056 redNo++;
00057 int n=my_futureMsgs.length();
00058 for(int i=0;i<n;i++){
00059 CkReductionMsg *elementMesg = my_futureMsgs.deq();
00060 if(elementMesg->getRedNo() == redNo){
00061 my_msgs.enq(elementMesg);
00062 count++;
00063 collectAllMessages();
00064 }else{
00065 my_futureMsgs.enq(elementMesg);
00066 }
00067 }
00068 }
00069 }
00070
00071 void CkArrayReductionMgr::contributeArrayReduction(CkReductionMsg *m){
00072 ARPRINT("[%d]Contribute Array Reduction called for RedNo %d group %d \n",CkMyNode(),m->getRedNo(),thisgroup.idx);
00075 #if CMK_BIGSIM_CHARM
00076 _TRACE_BG_TLINE_END(&(m->log));
00077 #endif
00078 CmiLock(lockCount);
00079 if(m->getRedNo() == redNo){
00080 my_msgs.enq(m);
00081 count++;
00082 collectAllMessages();
00083 }else{
00084
00085 my_futureMsgs.enq(m);
00086 }
00087 CmiUnlock(lockCount);
00088 }
00089
00090 CkReductionMsg *CkArrayReductionMgr::reduceMessages(void){
00091 #if CMK_BIGSIM_CHARM
00092 _TRACE_BG_END_EXECUTE(1);
00093 void* _bgParentLog = NULL;
00094 _TRACE_BG_BEGIN_EXECUTE_NOMSG("ArrayReduce", &_bgParentLog, 0);
00095 #endif
00096 CkReductionMsg *ret=NULL;
00097
00098
00099 CkReduction::reducerType r=CkReduction::invalid;
00100 int msgs_gcount=0;
00101 int msgs_nSources=0;
00102 int msgs_userFlag=-1;
00103 CkCallback msgs_callback;
00104 CkCallback msgs_secondaryCallback;
00105 int i;
00106 int nMsgs=0;
00107 CkReductionMsg *m;
00108 CkReductionMsg **msgArr=new CkReductionMsg*[my_msgs.length()];
00109 bool isMigratableContributor;
00110
00111 while(NULL!=(m=my_msgs.deq()))
00112 {
00113
00114 msgs_gcount+=m->gcount;
00115 if (m->sourceFlag!=0)
00116 {
00117 msgArr[nMsgs++]=m;
00118 msgs_nSources+=m->nSources();
00119 r=m->reducer;
00120 if (!m->callback.isInvalid())
00121 msgs_callback=m->callback;
00122 if(!m->secondaryCallback.isInvalid())
00123 msgs_secondaryCallback = m->secondaryCallback;
00124 if (m->userFlag!=-1)
00125 msgs_userFlag=m->userFlag;
00126
00127 isMigratableContributor=m->isMigratableContributor();
00128 #if CMK_BIGSIM_CHARM
00129 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
00130 #endif
00131
00132 }
00133 else
00134 {
00135 delete m;
00136 }
00137 }
00138
00139 if (nMsgs==0||r==CkReduction::invalid)
00140
00141 ret=CkReductionMsg::buildNew(0,NULL);
00142 else
00143 {
00144 if(nMsgs == 1){
00145 ret = msgArr[0];
00146 }else{
00147 CkReduction::reducerFn f=CkReduction::reducerTable[r];
00148 ret=(*f)(nMsgs,msgArr);
00149 }
00150 ret->reducer=r;
00151 }
00152
00153
00154 for (i=0;i<nMsgs;i++) {
00155 if (msgArr[i] != ret) delete msgArr[i];
00156 }
00157 delete [] msgArr;
00158
00159
00160 ret->redNo=redNo;
00161 ret->gcount=msgs_gcount;
00162 ret->userFlag=msgs_userFlag;
00163 ret->callback=msgs_callback;
00164 ret->secondaryCallback = msgs_secondaryCallback;
00165 ret->sourceFlag=msgs_nSources;
00166 ret->setMigratableContributor(isMigratableContributor);
00167 return ret;
00168 }
00169
00170 void CkArrayReductionMgr::pup(PUP::er &p){
00171 NodeGroup::pup(p);
00172 p(redNo);p(count);
00173 p|my_msgs;
00174 p|my_futureMsgs;
00175 p|attachedGroup;
00176 if(p.isUnpacking()) {
00177 size = CkMyNodeSize();
00178 lockCount = CmiCreateLock();
00179 }
00180 }
00181
00182 void CkArrayReductionMgr::setAttachedGroup(CkGroupID groupID){
00183 attachedGroup = groupID;
00184 ARPRINT("[%d] setAttachedGroup called with attachedGroup %d \n",CkMyNode(),attachedGroup);
00185 if (alreadyStarted != -1) {
00186 ((CkNodeReductionMgr *)this)->restartLocalGroupReductions(alreadyStarted);
00187 alreadyStarted = -1;
00188 }
00189 }
00190
00191
00192 void CkArrayReductionMgr::startNodeGroupReduction(int number,CkGroupID groupID){
00193 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00194 Chare *oldObj =CpvAccess(_currentObj);
00195 CpvAccess(_currentObj) = this;
00196 #endif
00197 ARPRINT("[%d] startNodeGroupReductions for red No %d my group %d attached group %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,this);
00198 if(attachedGroup.isZero()){
00199 setAttachedGroup(groupID);
00200 }
00201 startLocalGroupReductions(number);
00202 CkReductionNumberMsg *msg = new CkReductionNumberMsg(number);
00203 envelope::setSrcPe((char *)UsrToEnv(msg),CkMyNode());
00204 ((CkNodeReductionMgr *)this)->ReductionStarting(msg);
00205 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00206 CpvAccess(_currentObj) = oldObj;
00207 #endif
00208 }
00209
00210 int CkArrayReductionMgr::startLocalGroupReductions(int number){
00211 ARPRINT("[%d] startLocalGroupReductions for red No %d my group %d attached group %d number of rednMgrs %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,size,this);
00212 if(attachedGroup.isZero()){
00213 alreadyStarted = number;
00214 return 0;
00215 }
00216 int firstPE = CkNodeFirst(CkMyNode());
00217 for(int i=0;i<size;i++){
00218 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
00219 reductionMgrProxy[firstPE+i].ReductionStarting(new CkReductionNumberMsg(number));
00220 }
00221 return 1;
00222 }
00223
00224 int CkArrayReductionMgr::getTotalGCount(){
00225 int firstPE = CkNodeFirst(CkMyNode());
00226 int totalGCount=0;
00227 for(int i=0;i<size;i++){
00228 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
00229 CkReductionMgr *mgrPtr = reductionMgrProxy[firstPE+i].ckLocalBranch();
00230 CkAssert(mgrPtr != NULL);
00231 totalGCount += mgrPtr->getGCount();
00232 }
00233 return totalGCount;
00234 }
00235
00236
00237 #include "CkArrayReductionMgr.def.h"