00001 #include "charm++.h" 00002 00003 #ifndef X_ARRAY_SECTION_REDUCER_H 00004 #define X_ARRAY_SECTION_REDUCER_H 00005 00006 namespace ck { 00007 namespace impl { 00008 00018 class XArraySectionReducer 00019 { 00020 public: 00022 XArraySectionReducer(int _numSubSections, CkCallback *_finalCB) 00023 : numSubSections(_numSubSections), finalCB(_finalCB) 00024 { 00025 CkAssert(numSubSections > 0); 00026 } 00027 00029 ~XArraySectionReducer() 00030 { 00031 delete finalCB; 00032 } 00033 00035 void acceptSectionContribution(CkReductionMsg *msg) 00036 { 00037 int redNo = msg->redNo; 00038 msgMap[redNo].push_back(msg); 00039 // Check if partialReduction is possible (streamable) across subsections 00040 if (msgMap[redNo].size() > 1 && CkReduction::reducerTable()[msg->reducer].streamable) { 00041 CkReduction::reducerFn f = CkReduction::reducerTable()[msg->reducer].fn; 00042 CkReductionMsg *intermediateReducedMsg = (*f)(msgMap[redNo].size(), msgMap[redNo].data()); 00043 msgMap[redNo].pop_back(); 00044 // Only the partially reduced message should be remaining in the msgs vector after partialReduction 00045 CkAssert(intermediateReducedMsg == msgMap[redNo][0]); 00046 // Copy the reducer in the newly created message which will be used in the finalReducer() 00047 intermediateReducedMsg->reducer = msg->reducer; 00048 delete msg; 00049 } 00050 numReceivedMap[redNo]++; 00051 if (numReceivedMap[redNo] >= numSubSections) 00052 finalReducer(redNo); 00053 } 00054 00055 private: 00057 void finalReducer(int redNo) 00058 { 00059 // Get a handle on the reduction function for this message 00060 CkReduction::reducerFn f = CkReduction::reducerTable()[ msgMap[redNo][0]->reducer ].fn; 00061 // Perform an extra reduction step on all the subsection reduction msgs 00062 CkReductionMsg *finalMsg = (*f)(msgMap[redNo].size(), msgMap[redNo].data()); 00063 // Send the final reduced msg to the client 00064 if (finalCB == nullptr) 00065 msgMap[redNo][0]->callback.send(finalMsg); 00066 else 00067 finalCB->send(finalMsg); 00068 // Delete the subsection redn msgs, accounting for any msg reuse 00069 auto& msgs = msgMap[redNo]; 00070 for (auto *msg:msgs) { 00071 if (msg != finalMsg) delete msg; 00072 } 00073 // Reset the msg list and counters for the corresponding redNo 00074 msgMap.erase(redNo); 00075 numReceivedMap[redNo] = 0; 00076 } 00077 00078 // The number of subsection redn msgs to expect 00079 const int numSubSections; 00080 // The final client callback after all redn are done 00081 const CkCallback *finalCB; 00082 // A map of counters indexed using redNo to track when all the subsection redns are complete 00083 // Since a single instance of this class is used to stitch the reductions across arrays, 00084 // multiple callbacks to a particular cross section array are tagged using their redNo 00085 std::map<int, int> numReceivedMap; 00086 // A map storing a list of subsection redn msgs indexed using redNo 00087 std::map<int, std::vector<CkReductionMsg *> > msgMap; 00088 }; 00089 00090 00091 typedef XArraySectionReducer XGroupSectionReducer; 00092 00093 00095 void processSectionContribution (void *that, void *msg) 00096 { 00097 CkAssert(that); 00098 reinterpret_cast<XArraySectionReducer*>(that)->acceptSectionContribution(reinterpret_cast<CkReductionMsg*>(msg)); 00099 } 00100 00101 } // end namespace impl 00102 } // end namespace ck 00103 00104 #endif // X_ARRAY_SECTION_REDUCER_H 00105