00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _CKREDUCTION_H
00019 #define _CKREDUCTION_H
00020
00021 #include "CkReduction.decl.h"
00022 #include "CkArrayReductionMgr.decl.h"
00023
00024 #if CMK_BIGSIM_CHARM || CMK_MULTICORE || !CMK_SMP
00025 #define GROUP_LEVEL_REDUCTION 1
00026 #endif
00027
00028 #ifdef _PIPELINED_ALLREDUCE_
00029 #define FRAG_SIZE 131072
00030 #define FRAG_THRESHOLD 131072
00031 #endif
00032
00033 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00034 #define MAX_INT 5000000
00035 #define _MLOG_REDUCE_P2P_ 0
00036 #endif
00037
00038
00039
00040 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
00041 public:
00042 typedef void (*callbackType)(void *param);
00043 CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
00044 {callback=Ncallback;param=Nparam;}
00045 void call(void) {(*callback)(param);}
00046 private:
00047 callbackType callback;
00048 void *param;
00049 };
00050
00051 class CkGroupInitCallback : public IrrGroup {
00052 public:
00053 CkGroupInitCallback(void);
00054 CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
00055 void callMeBack(CkGroupCallbackMsg *m);
00056 void pup(PUP::er& p){ IrrGroup::pup(p); }
00057 };
00058
00059
00060 class CkGroupReadyCallback : public IrrGroup {
00061 private:
00062 int _isReady;
00063 CkQ<CkGroupCallbackMsg *> _msgs;
00064 void callBuffered(void);
00065 public:
00066 CkGroupReadyCallback(void);
00067 CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
00068 void callMeBack(CkGroupCallbackMsg *m);
00069 int isReady(void) { return _isReady; }
00070 protected:
00071 void setReady(void) {_isReady = 1; callBuffered(); }
00072 void setNotReady(void) {_isReady = 0; }
00073 };
00074
00075 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
00076 public:
00077 int num;
00078 CkReductionNumberMsg(int n) {num=n;}
00079 };
00080
00081
00083 class contributorInfo {
00084 public:
00085 int redNo;
00086 contributorInfo() {redNo=0;}
00087
00088 void pup(PUP::er &p);
00089 };
00090
00091 class countAdjustment {
00092 public:
00093 int gcount;
00094 int lcount;
00095 int mainRecvd;
00096 countAdjustment(int ignored=0) {gcount=lcount=0;mainRecvd=0;}
00097 void pup(PUP::er& p){ p|gcount; p|lcount; p|mainRecvd; }
00098 };
00099
00103 namespace ck { namespace impl { class XArraySectionReducer; } }
00104
00105
00106
00107 class CkReduction {
00108 public:
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120 typedef enum {
00121
00122 invalid=0,
00123 nop,
00124
00125 sum_int,sum_long,sum_float,sum_double,
00126
00127
00128 product_int,product_long,product_float,product_double,
00129
00130
00131 max_int,max_long,max_float,max_double,
00132
00133
00134 min_int,min_long,min_float,min_double,
00135
00136
00137
00138 logical_and,
00139
00140
00141
00142 logical_or,
00143
00144
00145 bitvec_and,
00146
00147
00148 bitvec_or,
00149
00150
00151 random,
00152
00153
00154 concat,
00155
00156
00157
00158 set,
00159
00160
00161 lastSystemReducer
00162 } reducerType;
00163
00164
00165
00166 class setElement {
00167 public:
00168 int dataSize;
00169 char data[1];
00170
00171
00172 setElement *next(void);
00173 };
00174
00175
00176
00177
00178
00179
00180 typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
00181
00182
00183
00184 static reducerType addReducer(reducerFn fn);
00185
00186 private:
00187 friend class CkReductionMgr;
00188 friend class CkNodeReductionMgr;
00189 friend class CkArrayReductionMgr;
00190 friend class CkMulticastMgr;
00191 friend class ck::impl::XArraySectionReducer;
00192
00193
00194
00195 enum {MAXREDUCERS=256};
00196
00197
00198 static reducerFn reducerTable[MAXREDUCERS];
00199 static int nReducers;
00200
00201
00202 CkReduction();
00203 };
00204 PUPbytes(CkReduction::reducerType)
00205
00206
00207
00208 class CkReductionMsg : public CMessage_CkReductionMsg
00209 {
00210 friend class CkReduction;
00211 friend class CkReductionMgr;
00212 friend class CkNodeReductionMgr;
00213 friend class CkArrayReductionMgr;
00214 friend class CkMulticastMgr;
00215 #ifdef _PIPELINED_ALLREDUCE_
00216 friend class ArrayElement;
00217 friend class AllreduceMgr;
00218 #endif
00219 friend class ck::impl::XArraySectionReducer;
00220 public:
00221
00222
00223
00224
00225 static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
00226 CkReduction::reducerType reducer=CkReduction::invalid,
00227 CkReductionMsg *buf = NULL);
00228
00229 inline int getLength(void) const {return dataSize;}
00230 inline int getSize(void) const {return dataSize;}
00231 inline void *getData(void) {return data;}
00232 inline const void *getData(void) const {return data;}
00233
00234 inline int getGcount(void){return gcount;}
00235 inline CkReduction::reducerType getReducer(void){return reducer;}
00236 inline int getRedNo(void){return redNo;}
00237
00238 inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
00239 inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
00240
00241 inline void setCallback(const CkCallback &cb) { callback=cb; }
00242
00243
00244
00245 inline int isFromUser(void) const {return sourceFlag==-1;}
00246
00247 inline bool isMigratableContributor(void) const {return migratableContributor;}
00248 inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
00249
00250 ~CkReductionMsg();
00251
00252
00253
00254 static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
00255 static void *pack(CkReductionMsg *);
00256 static CkReductionMsg *unpack(void *in);
00257
00258 private:
00259 int dataSize;
00260 void *data;
00261 CMK_REFNUM_TYPE userFlag;
00262 CkCallback callback;
00263 CkCallback secondaryCallback;
00264 bool migratableContributor;
00265
00266 int sourceFlag;
00267
00268
00269
00270
00271 int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
00272 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
00273 int sourceProcessorCount;
00274 int fromPE;
00275 #endif
00276 private:
00277 #if CMK_BIGSIM_CHARM
00278 void *log;
00279 #endif
00280 CkReduction::reducerType reducer;
00281
00282 int redNo;
00283 int gcount;
00284
00285 CkSectionInfo sid;
00286 char rebuilt;
00287 int nFrags;
00288 int fragNo;
00289
00290 double dataStorage;
00291
00292 int no;
00293
00294
00295 CkReductionMsg();
00296 };
00297
00298
00299 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
00300 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00301 CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00302 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00303 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00304 void contribute(CkReductionMsg *msg); \
00305 void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
00306 void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
00307
00308
00309 class CkNodeReductionMgr : public IrrGroup {
00310 public:
00311 CProxy_CkNodeReductionMgr thisProxy;
00312 public:
00313 CkNodeReductionMgr(void);
00314 CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
00315 storedCallback = NULL;
00316 }
00317
00318 typedef CkReductionClientFn clientFn;
00319
00324 void ckSetReductionClient(CkCallback *cb);
00325
00326
00327
00328
00329 void contribute(contributorInfo *ci,CkReductionMsg *msg);
00330 void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
00331
00332 void restartLocalGroupReductions(int number);
00333
00334 void ReductionStarting(CkReductionNumberMsg *m);
00335
00336 void RecvMsg(CkReductionMsg *m);
00337 void doRecvMsg(CkReductionMsg *m);
00338 void LateMigrantMsg(CkReductionMsg *m);
00339
00340 virtual void flushStates();
00341 virtual int startLocalGroupReductions(int number){ return 1;}
00342
00343
00344
00345 virtual int getTotalGCount(){return 0;};
00346
00347 private:
00348
00349
00350 CkCallback *storedCallback;
00351
00352 int redNo;
00353 CmiBool inProgress;
00354 CmiBool creating;
00355 CmiBool startRequested;
00356 int gcount;
00357 int lcount;
00358
00359
00360 int nContrib,nRemote;
00361
00362 CkMsgQ<CkReductionMsg> msgs;
00363
00364 CkMsgQ<CkReductionMsg> futureMsgs;
00365
00366 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00367
00368 CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
00369
00370
00371 CmiNodeLock lockEverything;
00372
00373 int interrupt;
00374
00375
00376 CkVec<int> kids;
00377
00378
00379 void startReduction(int number,int srcPE);
00380 void doAddContribution(CkReductionMsg *m);
00381 void finishReduction(void);
00382 protected:
00383 void addContribution(CkReductionMsg *m);
00384
00385 private:
00386
00387
00388
00389 unsigned upperSize;
00390 unsigned label;
00391 int parent;
00392 int numKids;
00393
00394 void init_BinomialTree();
00395
00396
00397 void init_BinaryTree();
00398 enum {TREE_WID=2};
00399 int treeRoot(void);
00400 CmiBool hasParent(void);
00401 int treeParent(void);
00402 int firstKid(void);
00403 int treeKids(void);
00404
00405
00406 CkReductionMsg *reduceMessages(void);
00407
00408
00409 CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
00410 CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
00411 CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
00412
00413
00414 bool oldleaf;
00415 bool blocked;
00416 int newParent;
00417 int additionalGCount,newAdditionalGCount;
00418 CkVec<int> newKids;
00419 CkMsgQ<CkReductionMsg> bufferedMsgs;
00420 CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
00421 enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
00422 int numModificationReplies;
00423 int maxModificationRedNo;
00424 int tempModificationRedNo;
00425 bool readyDeletion;
00426 int killed;
00427
00428
00429 public:
00430 virtual void pup(PUP::er &p);
00431
00432 virtual void evacuate();
00433 virtual void doneEvacuate();
00434 void DeleteChild(int deletedChild);
00435 void DeleteNewChild(int deletedChild);
00436 void collectMaxRedNo(int maxRedNo);
00437 void unblockNode(int maxRedNo);
00438 void modifyTree(int code,int size,int *data);
00439 private:
00440 int findMaxRedNo();
00441 void updateTree();
00442 void clearBlockedMsgs();
00443 };
00444
00445
00446
00447
00448 class NodeGroup : public CkNodeReductionMgr {
00449 protected:
00450 contributorInfo reductionInfo;
00451 public:
00452 CmiNodeLock __nodelock;
00453 NodeGroup();
00454 NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m) { __nodelock=CmiCreateLock(); }
00455
00456 ~NodeGroup();
00457 inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
00458 inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
00459 virtual int isNodeGroup() { return 1; }
00460
00461 virtual void pup(PUP::er &p);
00462 virtual void flushStates() {
00463 CkNodeReductionMgr::flushStates();
00464 reductionInfo.redNo = 0;
00465 }
00466
00467 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00468 void contributeWithCounter(CkReductionMsg *msg,int count);
00469 };
00470
00471
00478 class CkNodeReductionMgr;
00479
00480 class CProxy_CkArrayReductionMgr;
00481 class CkReductionMgr : public CkGroupInitCallback {
00482 public:
00483 CProxy_CkReductionMgr thisProxy;
00484
00485 public:
00486 CProxy_CkArrayReductionMgr nodeProxy;
00487 CkReductionMgr(void);
00488 CkReductionMgr(CkMigrateMessage *m);
00489
00490 typedef CkReductionClientFn clientFn;
00491
00496 void ckSetReductionClient(CkCallback *cb);
00497
00498
00499
00500
00501
00502
00503
00504
00505 void creatingContributors(void);
00506 void doneCreatingContributors(void);
00507
00508 void contributorStamped(contributorInfo *ci);
00509 void contributorCreated(contributorInfo *ci);
00510 void contributorDied(contributorInfo *ci);
00511
00512 void contributorLeaving(contributorInfo *ci);
00513
00514 void contributorArriving(contributorInfo *ci);
00515
00516
00517
00518
00519 void contribute(contributorInfo *ci,CkReductionMsg *msg);
00520
00521
00522
00523 void ReductionStarting(CkReductionNumberMsg *m);
00524
00525 void LateMigrantMsg(CkReductionMsg *m);
00526
00527 void MigrantDied(CkReductionNumberMsg *m);
00528
00529 void RecvMsg(CkReductionMsg *m);
00530
00531
00532 void ArrayReductionHandler(CkReductionMsg *m);
00533 void endArrayReduction();
00534
00535 virtual CmiBool isReductionMgr(void){ return CmiTrue; }
00536 virtual void flushStates(int isgroup);
00537
00538
00539
00540
00541
00542
00543
00544
00545 int getGCount(){return gcount;};
00546 static void sanitycheck();
00547 private:
00548
00549
00550
00551
00552 CkCallback storedCallback;
00553
00554 CkCallback *secondaryStoredCallback;
00555
00556 int redNo;
00557 int completedRedNo;
00558 CmiBool inProgress;
00559 CmiBool creating;
00560 CmiBool startRequested;
00561 int gcount;
00562 int lcount;
00563 int maxStartRequest;
00564
00565
00566 int nContrib,nRemote;
00567
00568 CkMsgQ<CkReductionMsg> msgs;
00569
00570
00571 CkMsgQ<CkReductionMsg> futureMsgs;
00572
00573 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00574
00575 CkMsgQ<CkReductionMsg> finalMsgs;
00576
00577
00578 void startReduction(int number,int srcPE);
00579 void addContribution(CkReductionMsg *m);
00580 void finishReduction(void);
00581
00582 #if GROUP_LEVEL_REDUCTION
00583
00584 unsigned upperSize;
00585 unsigned label;
00586 int parent;
00587 int numKids;
00588
00589 CkVec<int> newKids;
00590 CkVec<int> kids;
00591 void init_BinomialTree();
00592
00593 void init_BinaryTree();
00594 enum {TREE_WID=2};
00595 int treeRoot(void);
00596 CmiBool hasParent(void);
00597 int treeParent(void);
00598 int firstKid(void);
00599 int treeKids(void);
00600 #endif
00601
00602
00603 CkReductionMsg *reduceMessages(void);
00604
00605
00606 CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
00607 CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
00608 CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
00609
00610
00611
00612
00613 CkVec<countAdjustment> adjVec;
00614
00615 countAdjustment &adj(int number);
00616
00617 void shiftAdjVec(void);
00618
00619 protected:
00620
00621 CmiBool disableNotifyChildrenStart;
00622
00623
00624 public:
00625 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00626 int *perProcessorCounts;
00627 int processorCount;
00628 int totalCount;
00629 int numberReductionMessages(){
00630 if(totalCount != 0){
00631 return totalCount;
00632 }else{
00633 return MAX_INT;
00634 }
00635 }
00636 #endif
00637 virtual void pup(PUP::er &p);
00638 static int isIrreducible(){ return 0;}
00639 void contributeViaMessage(CkReductionMsg *m);
00640 };
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651 #define CkReductionTarget(me, method) \
00652 CkIndex_##me::redn_wrapper_##method(NULL)
00653
00654 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
00655 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00656 CMK_REFNUM_TYPE userFlag)\
00657 {\
00658 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00659 msg->setUserFlag(userFlag);\
00660 msg->setMigratableContributor(migratable);\
00661 myRednMgr->contribute(&myRednInfo,msg);\
00662 }\
00663 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00664 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00665 {\
00666 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00667 msg->setUserFlag(userFlag);\
00668 msg->setCallback(cb);\
00669 msg->setMigratableContributor(migratable);\
00670 myRednMgr->contribute(&myRednInfo,msg);\
00671 }\
00672 void me::contribute(CkReductionMsg *msg) \
00673 {\
00674 msg->setMigratableContributor(migratable);\
00675 myRednMgr->contribute(&myRednInfo,msg);\
00676 }\
00677 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00678 {\
00679 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
00680 msg->setUserFlag(userFlag);\
00681 msg->setCallback(cb);\
00682 msg->setMigratableContributor(migratable);\
00683 myRednMgr->contribute(&myRednInfo,msg);\
00684 }\
00685 void me::contribute(CMK_REFNUM_TYPE userFlag)\
00686 {\
00687 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
00688 msg->setUserFlag(userFlag);\
00689 msg->setMigratableContributor(migratable);\
00690 myRednMgr->contribute(&myRednInfo,msg);\
00691 }\
00692
00693
00694
00695 class Group : public CkReductionMgr
00696 {
00697 contributorInfo reductionInfo;
00698 public:
00699 Group();
00700 Group(CkMigrateMessage *msg);
00701 virtual int isNodeGroup() { return 0; }
00702 virtual void pup(PUP::er &p);
00703 virtual void flushStates() {
00704 CkReductionMgr::flushStates(1);
00705 reductionInfo.redNo = 0;
00706 }
00707 virtual void CkAddThreadListeners(CthThread tid, void *msg);
00708
00709 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00710 };
00711
00712 #ifdef _PIPELINED_ALLREDUCE_
00713 class AllreduceMgr
00714 {
00715 public:
00716 AllreduceMgr() { fragsRecieved=0; size=0; }
00717 friend class ArrayElement;
00718
00719 void allreduce_recieve(CkReductionMsg* msg)
00720 {
00721
00722 fragsRecieved++;
00723 if(fragsRecieved==1)
00724 {
00725 data = new char[FRAG_SIZE*msg->nFrags];
00726 }
00727 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
00728 size += msg->dataSize;
00729
00730 if(fragsRecieved==msg->nFrags) {
00731 CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
00732 cb.send(ret);
00733 fragsRecieved=0; size=0;
00734 delete [] data;
00735 }
00736
00737 }
00738
00739 CkCallback cb;
00740 int size;
00741 char* data;
00742 int fragsRecieved;
00743
00744 };
00745 #endif // _PIPELINED_ALLREDUCE_
00746
00747 #endif //_CKREDUCTION_H