00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _CKREDUCTION_H
00019 #define _CKREDUCTION_H
00020
00021 #include "CkReduction.decl.h"
00022
00023 #ifdef _PIPELINED_ALLREDUCE_
00024 #define FRAG_SIZE 131072
00025 #define FRAG_THRESHOLD 131072
00026 #endif
00027
00028 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00029 #define MAX_INT 5000000
00030 #define _MLOG_REDUCE_P2P_ 0
00031 #endif
00032
00033
00034
00035 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
00036 public:
00037 typedef void (*callbackType)(void *param);
00038 CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
00039 {callback=Ncallback;param=Nparam;}
00040 void call(void) {(*callback)(param);}
00041 private:
00042 callbackType callback;
00043 void *param;
00044 };
00045
00046 class CkGroupInitCallback : public IrrGroup {
00047 public:
00048 CkGroupInitCallback(void);
00049 CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
00050 void callMeBack(CkGroupCallbackMsg *m);
00051 void pup(PUP::er& p){ IrrGroup::pup(p); }
00052 };
00053
00054
00055 class CkGroupReadyCallback : public IrrGroup {
00056 private:
00057 bool _isReady;
00058 CkQ<CkGroupCallbackMsg *> _msgs;
00059 void callBuffered(void);
00060 public:
00061 CkGroupReadyCallback(void);
00062 CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
00063 void callMeBack(CkGroupCallbackMsg *m);
00064 bool isReady(void) { return _isReady; }
00065 protected:
00066 void setReady(void) {_isReady = true; callBuffered(); }
00067 void setNotReady(void) {_isReady = false; }
00068 };
00069
00070 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
00071 public:
00072 int num;
00073 CkReductionNumberMsg(int n) {num=n;}
00074 };
00075
00076
00077 class CkReductionInactiveMsg:public CMessage_CkReductionInactiveMsg {
00078 public:
00079 int id, redno;
00080 CkReductionInactiveMsg(int i, int r) {id=i; redno = r;}
00081 };
00082
00083
00085 class contributorInfo {
00086 public:
00087 int redNo;
00088 contributorInfo() {redNo=0;}
00089 inline void pup(PUP::er& p) {
00090 p((char *)this, sizeof(contributorInfo));
00091 }
00092 };
00093 PUPbytes(contributorInfo)
00094
00095 class countAdjustment {
00096 public:
00097 int gcount;
00098 int lcount;
00099 countAdjustment(int ignored=0) {(void)ignored; gcount=0; lcount=0;}
00100 inline void pup(PUP::er& p) {
00101 p((char *)this, sizeof(countAdjustment));
00102 }
00103 };
00104 PUPbytes(countAdjustment)
00105
00106
00109 namespace ck { namespace impl { class XArraySectionReducer; } }
00110
00111
00112
00113 class CkReduction {
00114 public:
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 typedef enum {
00128
00129 invalid=0,
00130 nop,
00131
00132 sum_char,sum_short,sum_int,sum_long,sum_long_long,
00133 sum_uchar,sum_ushort,sum_uint,sum_ulong,
00134 sum_ulong_long,sum_float,sum_double,
00135
00136
00137 product_char,product_short,product_int,product_long,product_long_long,
00138 product_uchar,product_ushort,product_uint,product_ulong,
00139 product_ulong_long,product_float,product_double,
00140
00141
00142 max_char,max_short,max_int,max_long,max_long_long,
00143 max_uchar,max_ushort,max_uint,max_ulong,
00144 max_ulong_long,max_float,max_double,
00145
00146
00147 min_char,min_short,min_int,min_long,min_long_long,
00148 min_uchar,min_ushort,min_uint,min_ulong,
00149 min_ulong_long,min_float,min_double,
00150
00151
00152
00153 logical_and,
00154 logical_and_int,logical_and_bool,
00155
00156
00157
00158 logical_or,
00159 logical_or_int,logical_or_bool,
00160
00161
00162
00163
00164 logical_xor_int,logical_xor_bool,
00165
00166
00167 bitvec_and,
00168 bitvec_and_int,bitvec_and_bool,
00169
00170
00171 bitvec_or,
00172 bitvec_or_int,bitvec_or_bool,
00173
00174
00175 bitvec_xor,
00176 bitvec_xor_int,bitvec_xor_bool,
00177
00178
00179 random,
00180
00181
00182 concat,
00183
00184
00185
00186 set,
00187
00188
00189 statistics,
00190
00191
00192 tuple,
00193
00194
00195 external_py
00196 } reducerType;
00197
00198
00199
00200 class setElement {
00201 public:
00202 int dataSize;
00203 char data[1];
00204
00205
00206 setElement *next(void);
00207 };
00208
00209
00210 struct statisticsElement {
00211 int count;
00212 double mean;
00213 double m2;
00214 statisticsElement(double initialValue);
00215 double variance() const { return count > 1 ? m2 / (double(count) - 1.0) : 0.0; }
00216 double stddev() const { return sqrt(variance()); }
00217 };
00218
00219 struct tupleElement {
00220 size_t dataSize;
00221 char* data;
00222 CkReduction::reducerType reducer;
00223 bool owns_data;
00224 tupleElement();
00225 tupleElement(size_t dataSize, void* data, CkReduction::reducerType reducer);
00226 tupleElement(CkReduction::tupleElement&& rhs_move);
00227 tupleElement& operator=(CkReduction::tupleElement&& rhs_move);
00228 ~tupleElement();
00229
00230 inline void* getData(void) { return data; }
00231 void pup(PUP::er &p);
00232 };
00233
00234
00235
00236
00237
00238
00239 typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
00240
00241 struct reducerStruct {
00242 reducerFn fn;
00243 bool streamable;
00244 #if CMK_ERROR_CHECKING
00245 const char *name;
00246 #endif
00247 reducerStruct(reducerFn f=NULL, bool s=false, const char *n=NULL) : fn(f), streamable(s)
00248 #if CMK_ERROR_CHECKING
00249 ,name(n)
00250 #endif
00251 {}
00252 };
00253
00254
00255
00256 static reducerType addReducer(reducerFn fn, bool streamable=false, const char* name=NULL);
00257
00258 private:
00259 friend class CkReductionMgr;
00260 friend class CkNodeReductionMgr;
00261 friend class CkMulticastMgr;
00262 friend class ck::impl::XArraySectionReducer;
00263
00264
00265
00266 static std::vector<reducerStruct>& reducerTable();
00267 static std::vector<reducerStruct> initReducerTable();
00268
00269
00270
00271 static CkReductionMsg* tupleReduction_fn(int nMsgs, CkReductionMsg** msgs);
00272
00273
00274 CkReduction();
00275 };
00276 PUPbytes(CkReduction::reducerType)
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 struct CkReductionTypesExt {
00289
00290 int nop = CkReduction::nop;
00291
00292 int sum_char = CkReduction::sum_char;
00293 int sum_short = CkReduction::sum_short;
00294 int sum_int = CkReduction::sum_int;
00295 int sum_long = CkReduction::sum_long;
00296 int sum_long_long = CkReduction::sum_long_long;
00297 int sum_uchar = CkReduction::sum_uchar;
00298 int sum_ushort = CkReduction::sum_ushort;
00299 int sum_uint = CkReduction::sum_uint;
00300 int sum_ulong = CkReduction::sum_ulong;
00301 int sum_ulong_long = CkReduction::sum_ulong_long;
00302 int sum_float = CkReduction::sum_float;
00303 int sum_double = CkReduction::sum_double;
00304
00305 int product_char = CkReduction::product_char;
00306 int product_short = CkReduction::product_short;
00307 int product_int = CkReduction::product_int;
00308 int product_long = CkReduction::product_long;
00309 int product_long_long = CkReduction::product_long_long;
00310 int product_uchar = CkReduction::product_uchar;
00311 int product_ushort = CkReduction::product_ushort;
00312 int product_uint = CkReduction::product_uint;
00313 int product_ulong = CkReduction::product_ulong;
00314 int product_ulong_long = CkReduction::product_ulong_long;
00315 int product_float = CkReduction::product_float;
00316 int product_double = CkReduction::product_double;
00317
00318 int max_char = CkReduction::max_char;
00319 int max_short = CkReduction::max_short;
00320 int max_int = CkReduction::max_int;
00321 int max_long = CkReduction::max_long;
00322 int max_long_long = CkReduction::max_long_long;
00323 int max_uchar = CkReduction::max_uchar;
00324 int max_ushort = CkReduction::max_ushort;
00325 int max_uint = CkReduction::max_uint;
00326 int max_ulong = CkReduction::max_ulong;
00327 int max_ulong_long = CkReduction::max_ulong_long;
00328 int max_float = CkReduction::max_float;
00329 int max_double = CkReduction::max_double;
00330
00331 int min_char = CkReduction::min_char;
00332 int min_short = CkReduction::min_short;
00333 int min_int = CkReduction::min_int;
00334 int min_long = CkReduction::min_long;
00335 int min_long_long = CkReduction::min_long_long;
00336 int min_uchar = CkReduction::min_uchar;
00337 int min_ushort = CkReduction::min_ushort;
00338 int min_uint = CkReduction::min_uint;
00339 int min_ulong = CkReduction::min_ulong;
00340 int min_ulong_long = CkReduction::min_ulong_long;
00341 int min_float = CkReduction::min_float;
00342 int min_double = CkReduction::min_double;
00343
00344 int external_py = CkReduction::external_py;
00345 };
00346
00347 extern "C" CkReductionTypesExt charm_reducers;
00348
00349
00350
00351 class CkReductionMsg : public CMessage_CkReductionMsg
00352 {
00353 friend class CkReduction;
00354 friend class CkReductionMgr;
00355 friend class CkNodeReductionMgr;
00356 friend class CkMulticastMgr;
00357 #ifdef _PIPELINED_ALLREDUCE_
00358 friend class ArrayElement;
00359 friend class AllreduceMgr;
00360 #endif
00361 friend class ck::impl::XArraySectionReducer;
00362 public:
00363
00364
00365
00366
00367 static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
00368 CkReduction::reducerType reducer=CkReduction::invalid,
00369 CkReductionMsg *buf = NULL);
00370
00371 inline int getLength() const {return dataSize;}
00372 inline int getSize() const {return dataSize;}
00373 inline void *getData() {return data;}
00374 inline const void *getData() const {return data;}
00375
00376 inline int getGcount() const {return gcount;}
00377 inline CkReduction::reducerType getReducer() const {return reducer;}
00378 inline int getRedNo() const {return redNo;}
00379
00380 inline CMK_REFNUM_TYPE getUserFlag() const {return userFlag;}
00381 inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
00382
00383 inline void setCallback(const CkCallback &cb) { callback=cb; }
00384
00385
00386
00387 inline bool isFromUser() const {return sourceFlag==-1;}
00388
00389 inline bool isMigratableContributor() const {return migratableContributor;}
00390 inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
00391
00392
00393 static CkReductionMsg* buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions);
00394 void toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions);
00395
00396 ~CkReductionMsg() {}
00397
00398
00399
00400 static void *alloc(int msgnum, size_t size, int *reqSize, int priobits, GroupDepNum groupDepNum=GroupDepNum{});
00401 static void *pack(CkReductionMsg *);
00402 static CkReductionMsg *unpack(void *in);
00403
00404 private:
00405 inline int nSources() const {return std::abs(sourceFlag);}
00406
00407
00408 CkReductionMsg() {}
00409
00410 private:
00411 int dataSize;
00412 int sourceFlag;
00413
00414
00415
00416
00417 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
00418 int sourceProcessorCount;
00419 #endif
00420 int fromPE;
00421 int redNo;
00422 int gcount;
00423 CkReduction::reducerType reducer;
00424 CMK_REFNUM_TYPE userFlag;
00425 bool migratableContributor;
00426
00427 int8_t rebuilt;
00428 int8_t nFrags;
00429 int8_t fragNo;
00430
00431 CkSectionInfo sid;
00432 CkCallback callback;
00433 #if CMK_BIGSIM_CHARM
00434 public:
00435
00436 void *event;
00437 int eventPe;
00438 private:
00439 void *log;
00440 #endif
00441 void *data;
00442 double dataStorage;
00443 };
00444
00445
00446 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
00447 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00448 CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00449 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00450 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00451 template <typename T> \
00452 void contribute(const std::vector<T> &data,CkReduction::reducerType type, \
00453 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1) \
00454 { contribute(sizeof(T)*data.size(), data.data(), type, cb, userFlag); } \
00455 void contribute(CkReductionMsg *msg); \
00456 void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
00457 void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
00458
00459 #define CK_BARRIER_CONTRIBUTE_METHODS_DECL \
00460 void barrier(const CkCallback &cb);\
00461
00462
00468 class CkNodeReductionMgr : public IrrGroup {
00469 public:
00470 CProxy_CkNodeReductionMgr thisProxy;
00471 public:
00472 CkNodeReductionMgr(void);
00473 CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
00474 storedCallback = NULL;
00475 }
00476 ~CkNodeReductionMgr();
00477
00478 typedef CkReductionClientFn clientFn;
00479
00484 void ckSetReductionClient(CkCallback *cb);
00485
00486
00487
00488
00489 void contribute(contributorInfo *ci,CkReductionMsg *msg);
00490 void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
00491
00492
00493 void RecvMsg(CkReductionMsg *m);
00494 void doRecvMsg(CkReductionMsg *m);
00495 void LateMigrantMsg(CkReductionMsg *m);
00496
00497 virtual void flushStates();
00498
00499 virtual int getTotalGCount(){return 0;};
00500
00501 private:
00502
00503
00504 CkCallback *storedCallback;
00505
00506 int redNo;
00507 bool inProgress;
00508 bool creating;
00509 bool startRequested;
00510 int gcount;
00511 int lcount;
00512
00513
00514 int nContrib,nRemote;
00515
00516 CkMsgQ<CkReductionMsg> msgs;
00517
00518 CkMsgQ<CkReductionMsg> futureMsgs;
00519
00520 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00521
00522 CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
00523
00524
00525 CmiNodeLock lockEverything;
00526
00527 bool interrupt;
00528
00529
00530 std::vector<int> kids;
00531
00532
00533 void startReduction(int number,int srcPE);
00534 void doAddContribution(CkReductionMsg *m);
00535 void finishReduction(void);
00536 protected:
00537 void addContribution(CkReductionMsg *m);
00538
00539 private:
00540
00541
00542
00543 unsigned upperSize;
00544 unsigned label;
00545 int parent;
00546 int numKids;
00547
00548 void init_BinomialTree();
00549
00550 void init_TopoTree();
00551 void init_BinaryTree();
00552 enum {TREE_WID=2};
00553 int treeRoot(void);
00554 bool hasParent(void);
00555 int treeParent(void);
00556 int firstKid(void);
00557 int treeKids(void);
00558
00559
00560 bool isPast(int num) const {return (bool)(num<redNo);}
00561 bool isPresent(int num) const {return (bool)(num==redNo);}
00562 bool isFuture(int num) const {return (bool)(num>redNo);}
00563
00564 #if CMK_FAULT_EVAC
00565 bool oldleaf;
00566 bool blocked;
00567 int newParent;
00568 int additionalGCount,newAdditionalGCount;
00569 std::vector<int> newKids;
00570 CkMsgQ<CkReductionMsg> bufferedMsgs;
00571 CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
00572 enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
00573 int numModificationReplies;
00574 int maxModificationRedNo;
00575 int tempModificationRedNo;
00576 bool readyDeletion;
00577 bool killed;
00578 #endif
00579
00580
00581 public:
00582 virtual void pup(PUP::er &p);
00583 #if CMK_FAULT_EVAC
00584 virtual void evacuate();
00585 virtual void doneEvacuate();
00586 void DeleteChild(int deletedChild);
00587 void DeleteNewChild(int deletedChild);
00588 void collectMaxRedNo(int maxRedNo);
00589 void unblockNode(int maxRedNo);
00590 void modifyTree(int code,int size,int *data);
00591 private:
00592 int findMaxRedNo();
00593 void updateTree();
00594 void clearBlockedMsgs();
00595 #endif
00596 };
00597
00598
00599
00600
00601 class NodeGroup : public CkNodeReductionMgr {
00602 protected:
00603 contributorInfo reductionInfo;
00604 public:
00605 CmiNodeLock __nodelock;
00606 const int thisIndex;
00607 NodeGroup();
00608 NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m),thisIndex(CkMyNode()) { __nodelock=CmiCreateLock(); }
00609
00610 ~NodeGroup();
00611 inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
00612 inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
00613 virtual bool isNodeGroup() { return true; }
00614
00615 virtual void pup(PUP::er &p);
00616 virtual void flushStates() {
00617 CkNodeReductionMgr::flushStates();
00618 reductionInfo.redNo = 0;
00619 }
00620
00621 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00622 void contributeWithCounter(CkReductionMsg *msg,int count);
00623 };
00624
00625
00626 class CkReductionMgr : public CkGroupInitCallback {
00627 public:
00628 CProxy_CkReductionMgr thisProxy;
00629
00630 public:
00631 CkReductionMgr();
00632 CkReductionMgr(CkMigrateMessage *m);
00633 ~CkReductionMgr();
00634
00635 typedef CkReductionClientFn clientFn;
00636
00641 void ckSetReductionClient(CkCallback *cb);
00642
00643
00644
00645
00646
00647
00648
00649
00650 void creatingContributors(void);
00651 void doneCreatingContributors(void);
00652
00653 void contributorStamped(contributorInfo *ci);
00654 void contributorCreated(contributorInfo *ci);
00655 void contributorDied(contributorInfo *ci);
00656
00657 void contributorLeaving(contributorInfo *ci);
00658
00659 void contributorArriving(contributorInfo *ci);
00660
00661
00662
00663
00664 void contribute(contributorInfo *ci,CkReductionMsg *msg);
00665
00666
00667
00668 void ReductionStarting(CkReductionNumberMsg *m);
00669
00670 void LateMigrantMsg(CkReductionMsg *m);
00671
00672 void MigrantDied(CkReductionNumberMsg *m);
00673
00674 void RecvMsg(CkReductionMsg *m);
00675 void AddToInactiveList(CkReductionInactiveMsg *m);
00676
00677
00678 void barrier(CkReductionMsg * msg);
00679 void Barrier_RecvMsg(CkReductionMsg *m);
00680 void addBarrier(CkReductionMsg *m);
00681 void finishBarrier(void);
00682
00683 virtual bool isReductionMgr(void){ return true; }
00684 virtual void flushStates();
00685
00686
00687
00688
00689
00690
00691
00692
00693 int getGCount(){return gcount;};
00694 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00695 void decGCount(){gcount--;}
00696 void incNumImmigrantRecObjs(){
00697 numImmigrantRecObjs++;
00698 }
00699 void decNumImmigrantRecObjs(){
00700 numImmigrantRecObjs--;
00701 }
00702 void incNumEmigrantRecObjs(){
00703 numEmigrantRecObjs++;
00704 }
00705 void decNumEmigrantRecObjs(){
00706 numEmigrantRecObjs--;
00707 }
00708
00709 #endif
00710
00711
00712 static CkReductionMsg *reduceMessages(CkMsgQ<CkReductionMsg> &msgs);
00713
00714 private:
00715
00716 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00717 int numImmigrantRecObjs;
00718 int numEmigrantRecObjs;
00719 #endif
00720
00721
00722
00723 CkCallback storedCallback;
00724
00725 int redNo;
00726 int completedRedNo;
00727 bool inProgress;
00728 bool creating;
00729 bool startRequested;
00730 int gcount;
00731 int lcount;
00732 int maxStartRequest;
00733
00734
00735 int nContrib,nRemote;
00736
00737 bool is_inactive;
00738
00739
00740 CkCallback barrier_storedCallback;
00741 int barrier_gCount;
00742 int barrier_nSource;
00743 int barrier_nContrib,barrier_nRemote;
00744
00745
00746 CkMsgQ<CkReductionMsg> msgs;
00747
00748
00749 CkMsgQ<CkReductionMsg> futureMsgs;
00750
00751 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00752
00753 CkMsgQ<CkReductionMsg> finalMsgs;
00754 std::map<int, int> inactiveList;
00755
00756
00757 void startReduction(int number,int srcPE);
00758 void addContribution(CkReductionMsg *m);
00759 void finishReduction(void);
00760 void checkIsActive();
00761 void informParentInactive();
00762 void checkAndAddToInactiveList(int id, int red_no);
00763 void checkAndRemoveFromInactiveList(int id, int red_no);
00764 void sendReductionStartingToKids(int red_no);
00765
00766
00767 unsigned upperSize;
00768 unsigned label;
00769 int parent;
00770 int numKids;
00771
00772 std::vector<int> newKids;
00773 std::vector<int> kids;
00774 void init_BinomialTree();
00775
00776 void init_TopoTree();
00777 void init_BinaryTree();
00778 enum {TREE_WID=2};
00779 int treeRoot(void);
00780
00781
00782 bool isPast(int num) const {return (bool)(num<redNo);}
00783 bool isPresent(int num) const {return (bool)(num==redNo);}
00784 bool isFuture(int num) const {return (bool)(num>redNo);}
00785
00786
00787
00788
00789 std::vector<countAdjustment> adjVec;
00790
00791 countAdjustment &adj(int number);
00792
00793 protected:
00794 bool hasParent(void);
00795 int treeParent(void);
00796 int firstKid(void);
00797 int treeKids(void);
00798
00799
00800 bool disableNotifyChildrenStart;
00801 void resetCountersWhenFlushingStates() { gcount = lcount = 0; }
00802 bool isDestroying;
00803
00804
00805 public:
00806 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00807 int *perProcessorCounts;
00808 int processorCount;
00809 int totalCount;
00810 int numberReductionMessages(){
00811 if(totalCount != 0){
00812 return totalCount;
00813 }else{
00814 return MAX_INT;
00815 }
00816 }
00817 #endif
00818 virtual void pup(PUP::er &p);
00819 static bool isIrreducible(){ return false;}
00820 void contributeViaMessage(CkReductionMsg *m);
00821 };
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832 #define CkReductionTarget(me, method) \
00833 CkIndex_##me::redn_wrapper_##method(NULL)
00834
00835 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
00836 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00837 CMK_REFNUM_TYPE userFlag)\
00838 {\
00839 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00840 msg->setUserFlag(userFlag);\
00841 msg->setMigratableContributor(migratable);\
00842 myRednMgr->contribute(&myRednInfo,msg);\
00843 }\
00844 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00845 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00846 {\
00847 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00848 msg->setUserFlag(userFlag);\
00849 msg->setCallback(cb);\
00850 msg->setMigratableContributor(migratable);\
00851 myRednMgr->contribute(&myRednInfo,msg);\
00852 }\
00853 void me::contribute(CkReductionMsg *msg) \
00854 {\
00855 msg->setMigratableContributor(migratable);\
00856 myRednMgr->contribute(&myRednInfo,msg);\
00857 }\
00858 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00859 {\
00860 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00861 msg->setUserFlag(userFlag);\
00862 msg->setCallback(cb);\
00863 msg->setMigratableContributor(migratable);\
00864 myRednMgr->contribute(&myRednInfo,msg);\
00865 }\
00866 void me::contribute(CMK_REFNUM_TYPE userFlag)\
00867 {\
00868 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00869 msg->setUserFlag(userFlag);\
00870 msg->setMigratableContributor(migratable);\
00871 myRednMgr->contribute(&myRednInfo,msg);\
00872 }\
00873
00874 #define CK_BARRIER_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
00875 void me::barrier(const CkCallback &cb)\
00876 {\
00877 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00878 msg->setCallback(cb);\
00879 msg->setMigratableContributor(migratable);\
00880 myRednMgr->barrier(msg);\
00881 }\
00882
00883
00884
00885 class Group : public CkReductionMgr
00886 {
00887 contributorInfo reductionInfo;
00888 public:
00889 const int thisIndex;
00890 Group();
00891 Group(CkMigrateMessage *msg);
00892 virtual bool isNodeGroup() { return false; }
00893 virtual void pup(PUP::er &p);
00894 virtual void flushStates() {
00895 CkReductionMgr::flushStates();
00896 reductionInfo.redNo = 0;
00897 }
00898 virtual void CkAddThreadListeners(CthThread tid, void *msg);
00899
00900 int getRedNo() const { return reductionInfo.redNo; }
00901
00902 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00903 CK_BARRIER_CONTRIBUTE_METHODS_DECL
00904 };
00905
00906 #ifdef _PIPELINED_ALLREDUCE_
00907 class AllreduceMgr
00908 {
00909 public:
00910 AllreduceMgr() { fragsRecieved=0; size=0; }
00911 friend class ArrayElement;
00912
00913 void allreduce_recieve(CkReductionMsg* msg)
00914 {
00915
00916 fragsRecieved++;
00917 if(fragsRecieved==1)
00918 {
00919 data = new char[FRAG_SIZE*msg->nFrags];
00920 }
00921 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
00922 size += msg->dataSize;
00923
00924 if(fragsRecieved==msg->nFrags) {
00925 CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
00926 cb.send(ret);
00927 fragsRecieved=0; size=0;
00928 delete [] data;
00929 }
00930
00931 }
00932
00933 CkCallback cb;
00934 int size;
00935 char* data;
00936 int fragsRecieved;
00937
00938 };
00939 #endif // _PIPELINED_ALLREDUCE_
00940
00941 #endif //_CKREDUCTION_H