OpenAtom  Version1.5a
MessageDataCollator.h
1 /// If the collator should expect RDMA, then it has to enable the right switches
2 #ifdef COLLATOR_ENABLE_RDMA
3  #define ENABLE_RDMA_HANDSHAKES
4  #include "cmidirect.h"
5 #endif
6 #include "RDMAMessages.h"
7 #include "charm++.h"
8 #include <string>
9 #include <sstream>
10 
11 #ifndef MESSAGE_DATA_COLLATOR_H
12 #define MESSAGE_DATA_COLLATOR_H
13 
14 /*
15  * @addtogroup PairCalculator
16  * @{
17  */
18 
19 namespace cp {
20 namespace paircalc {
21 
22 #ifdef DEBUG_MESSAGEDATACOLLATOR_ALL
23  #define DEBUG_MESSAGEDATACOLLATOR
24  #define DEBUG_MESSAGEDATACOLLATOR_CREATION
25  #define DEBUG_MESSAGEDATACOLLATOR_RDMA
26  #define DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
27 #endif
28 
29 /** Class that buffers incoming data (via messages/RDMA) till it counts a pre-specified number of arrivals, and then spits the collated data out
30  * (packaged in the same incoming message type) via a CkCallback.
31  *
32  * Assumptions:
33  * - All messages carry the same amount of data
34  * - Each batch of data arrives completely only via RDMA or msgs (no interleaving of different methods within a single batch of messages)
35  * - Incoming messages are not deleted after processing. This class is not a chare and will let the user code deal with such formalities as needed
36  * - If this class manages the data buffer, then the post-collation callback goes to a chare on the same PE so that the output msg is not deleted/moved/resized in any way
37  * - The first batch of messages immediately after an RDMA setup (which will involve a bunch of setupRDMA() calls) WILL be via RDMA (and not messages)
38  * - expectNext() will be called before every new batch of RDMA arrivals (and only after the current batch is done)
39  * - Every time the amount of data in an imminent batch will be more than in any previous batch, the buffer is reallocated. A fresh RDMA setup is required for subsequent RDMA arrivals
40  * - Incoming data is treated as 2d (nRows x nCols). This is not binding. Higher / Lower dimensional data can be sent in as a contiguous array as long as msgType has the same API
41  * - Some effort has been made to tolerate different data sizes across different batches, but no testing / guarantees as yet
42  *
43  *
44  * @note: Class msgType should provide the following methods:
45  * - int numRows() const : returns the number of rows in the dataType array in the message
46  * - int numCols() const : returns the number of columns in the dataType array in the message
47  * - int sender() const : returns the sender ID or a representation of the senderID as an int
48  * - dataType* data() : returns a pointer to the data held by the message. Does not strictly
49  * have to be of dataType as it is only used by CmiMemcpy.
50  * - msgType(const int senderID, const int nCols, const int nRows) : Used to create the output msg that holds the collated data
51  */
52 template <class msgType, typename dataType>
54 {
55  public:
56  // ----------------------------------------- Cons/Des-truction -----------------------------------------
57  /// @warning: Do not use this. This exists only to appease charm migration code which screams for default constructors.
59  /** Collators should be constructed thus. Copying has not been restricted so that collator classes can be passed in as
60  * arguments. Hence, be careful when copying collators to make sure multiple collators dont use the same memory region as
61  * a data buffer and kill you.
62  */
63  MessageDataCollator(std::string debugID, CkCallback callbk, const unsigned int nExpected, bool manageBuffer=false, const unsigned int offsetStart=0):
64  myName(debugID),
65  uponCompletion(callbk),
66  isBufferMine(manageBuffer),
67  outputMsg(0),
68  dataBuffer(0),
69  bufferSizeInUnits(0),
70  numUnitsInMsg(0),
71  numExpected(nExpected),
72  numReceived(0),
73  numBatchesProcessed(0),
74  isNextBatchViaRDMA(false),
75  isThisBatchViaRDMA(false),
76  minSender(offsetStart),
77  sampleMsg(0)
78  {
79  #ifdef DEBUG_MESSAGEDATACOLLATOR_CREATION
80  CkPrintf("MessageDataCollator%s instantiated: numExpected = %d and buffer management = %d on(1)/off(0)\n",myName.c_str(),numExpected,isBufferMine);
81  #endif
82  #ifdef COLLATOR_ENABLE_RDMA
83  /** If RDMA is enabled, the client code CANNOT own the buffer and do what it pleases with it. The buffer will
84  * have to be managed by me. Hence, this check.
85  * As an aside, for RDMA, I will choose to never delete/ reallocate the buffer as that will break RDMA. A repurcussion is that
86  * setupRDMA() has to be called everytime the message size changes as that will prompt me to delete and reallocate the buffer
87  */
88  if (!isBufferMine)
89  CkAbort("MessageDataCollator: You cannot enable RDMA and want to manage the data buffer too. The buffer will have to be mine.");
90  /// Reserve enough space for the RDMA handles that will be created for each data sender
91  RDMAhandles.reserve(numExpected);
92  #endif
93  }
94 
95  /// Destructor
97  {
98  if (isBufferMine)
99  delete outputMsg;
100  delete sampleMsg;
101  }
102 
103  /// PUP method @todo: Define a PUP method
104  // void pup(PUP::er &p);
105 
106  // ---------------------------------------- Message Handling ----------------------------------------
107  /// Makes this a functor so that it can be fed messages by any entity that just knows the MessageHandler API.
108  void operator() (msgType *msg);
109 
110  // ---------------------------------------- RDMA support ----------------------------------------
111  /// Called by a data sender to setup an RDMA link with me (data receiver)
112  template<class tokenType>
113  void setupRDMA(RDMASetupRequestMsg<tokenType> *msg, tokenType *replyToken=0);
114  /// RDMA equivalent of msg handling. When data arrives via RDMA, this is called for handling it. Gets called from the CmiDirect code via a callback.
115  void operator() (void);
116  /// Fed to CmiDirect as a callback to be triggered when some data is deposited by RDMA
117  static void collatorCallback(void *thisObj);
118  /// Ask me to notify converse that we're ready for the next batch of messages.
119  void expectNext();
120 
121  // ---------------------------------------- Utility get methods ----------------------------------------
122  /// Get the number of messages this collator has been configured to buffer
123  inline unsigned int getNumExpected() const { return numExpected; }
124  /// Check the number of messages that have arrived
125  inline unsigned int getNumReceived() const { return numReceived; }
126  /// Get the number of batches of incoming messages already processed
127  inline unsigned int getNumBatchesDone() const { return numBatchesProcessed; }
128  /// Get a copy of a sample message from the last completed batch of messages. Copy managed by the user
129  inline msgType* getSampleMsg();
130 
131  private:
132  /// A string that encodes some kind of ID supplied by the owner of an instance
133  std::string myName;
134  /// Callback that is passed in to trigger the post-collation work.
135  CkCallback uponCompletion;
136  /// If buffer is mine, I manage it, viz. reuse the same buffer for every batch of msgs where possible. If not, I allocate a fresh buffer and let the client manage it.
138  /// The message (which has the data buffer) in which the incoming data is collated
139  msgType *outputMsg;
140  /// A pointer to the memory region (contained in outputMsg) where incoming data is collated
141  dataType *dataBuffer;
142  /// Handle to a sample message copied from every batch of incoming messages
143  msgType *sampleMsg;
144  /// Counters to keep track of the messages
145  unsigned int numExpected, numReceived, numBatchesProcessed;
146  /// Variables related to the message/buffer sizes
147  unsigned int numRows, numCols, numUnitsInMsg, bufferSizeInUnits;
148  /** Senders should have an ID accessed by msgType::sender(). This stores the minimum value of sender(),
149  * so that offsets into the memory buffer can be computed
150  *
151  * @todo: A better way to accomplish that leaves the collator class less rigid might be better
152  */
153  unsigned int minSender;
154  /// A vector of RDMA handles, one for each sender
155  CkVec<rdmaHandleType> RDMAhandles;
156  /// Booleans indicating if the next/current batch of messages are via RDMA
157  bool isNextBatchViaRDMA, isThisBatchViaRDMA;
158 };
159 
160 
161 
162 template <class msgType,typename dataType>
164 {
165  /** @todo: Debate if some kind of check is needed to ensure that the message received is the correct iteration etc.
166  * For e.g, if the message defined a uniqID() method that would return a unique tag indicating the group the message
167  * belonged to, then all messages that belonged to this group could be buffered together. uniqID() could return
168  * iteration number if messages from a single iteration are to be grouped, or the sender ID if messages from a single
169  * sender are to be grouped together. Other possible scenarios where it is possible to receive messages from separate
170  * collation groups might occur.
171  */
172 
173  /// If we're in the middle of a batch of messages
174  if (numReceived != 0)
175  CkAssert(isThisBatchViaRDMA == false);
176  /// else, if this is a fresh batch of messages
177  else
178  {
179  /// First, ensure that I was not expecting the next batch of data via RDMA
180  CkAssert(isNextBatchViaRDMA == false);
181  /// Since, this is the first message in a new batch, flag this whole batch as NOT via RDMA
182  isThisBatchViaRDMA = false;
183  /// Get the message size
184  numRows = msg->numRows();
185  numCols = msg->numCols();
186  numUnitsInMsg = numRows * numCols;
187  /// Get the required buffer size
188  int reqdBufferSizeInUnits = numExpected * numUnitsInMsg;
189  /// If buffer exists AND I manage it AND its not big enough for this batch of messages, delete it
190  if (dataBuffer && isBufferMine && reqdBufferSizeInUnits > bufferSizeInUnits)
191  {
192  #ifdef DEBUG_MESSAGEDATACOLLATOR
193  CkPrintf("MessageDataCollator%s: Deleting buffer as I need space for %d units. Available = %d units\n",myName.c_str(),reqdBufferSizeInUnits,bufferSizeInUnits);
194  #endif
195  delete outputMsg;
196  outputMsg = 0;
197  dataBuffer = 0;
198  }
199  /// If the buffer is unallocated ...
200  if (!dataBuffer)
201  {
202  /// Get some memory to hold the message data
203  bufferSizeInUnits = reqdBufferSizeInUnits;
204  #ifdef DEBUG_MESSAGEDATACOLLATOR
205  CkPrintf("MessageDataCollator%s: Allocating buffer of size %d units for %d data packets each having %d units in %d rows x %d cols.\n",myName.c_str(),bufferSizeInUnits,numExpected,numUnitsInMsg,numRows,numCols);
206  #endif
207  outputMsg = new (numExpected*numRows*numCols) msgType(-1,numCols,numRows*numExpected);
208  dataBuffer = outputMsg->data();
209  /// Zero out the memory region
210  bzero(dataBuffer,bufferSizeInUnits * sizeof(dataType));
211  } /// endif
212  }
213 
214  /// Ensure that the output message and its data buffer exist
215  CkAssert( (outputMsg != NULL) && (dataBuffer != NULL) );
216  /// Ensure that this message has the same size as the others in this batch
217  CkAssert( (numRows == msg->numRows()) && (numCols == msg->numCols()) );
218 
219  /// Compute the offset for this message. This will depend on the sender index and thisIndex.
220  int offset = msg->sender() - minSender;
221  /// Copy the data from the message into the contiguous data buffer
222  CmiMemcpy(&(dataBuffer[offset*numUnitsInMsg]), msg->data(), numUnitsInMsg *sizeof(dataType));
223 
224  /// Increment the tally of received messages
225  numReceived++;
226 
227  /// If the expected number have arrived ...
228  if (numReceived >= numExpected)
229  {
230  #ifdef DEBUG_MESSAGEDATACOLLATOR
231  CkPrintf("MessageDataCollator%s: Collated data from %d of %d messages at %p. Gonna trigger my client\n",myName.c_str(),numReceived,numExpected,dataBuffer);
232  #endif
233  /// Increment the number of message batches that have been processed
234  numBatchesProcessed++;
235  /// Create a copy of this message for a sample from this batch
236  if (sampleMsg)
237  delete sampleMsg;
238  sampleMsg = reinterpret_cast<msgType*> ( CkCopyMsg(reinterpret_cast<void**>(&msg)) );
239  /// Call the trigger functor
240  uponCompletion.send(outputMsg);
241  /// Reset the number received so that we can start collating all over again
242  numReceived = 0;
243  /// If you are not managing the buffer, then you need to forget about it
244  if (!isBufferMine)
245  {
246  outputMsg = 0;
247  dataBuffer = 0;
248  }
249  } /// endif
250  else
251  {
252  #ifdef DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
253  CkPrintf("MessageDataCollator%s: Received message %d of %d.\n",myName.c_str(),numReceived,numExpected);
254  #endif
255  }
256  /// Do NOT delete the msg; MessageDataCollator is not a chare and doesnt worry about the message formalities. Let the user code handle this
257 }
258 
259 
260 
261 /** Useful in case the msg has info other than the data() field. These can then
262  * be extracted in a case-specific manner, by the client code directly.
263  *
264  * @note: This class makes no guarantees on which message will be held as a sample msg.
265  */
266 template <class msgType,typename dataType>
268 {
269  if (sampleMsg)
270  return reinterpret_cast<msgType*> ( CkCopyMsg(reinterpret_cast<void**>(&sampleMsg)) );
271  else
272  return 0;
273 }
274 
275 
276 
277 template <class msgType,typename dataType> template <class tokenType>
279 {
280  #ifndef COLLATOR_ENABLE_RDMA
281  CkAbort("MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
282  #else
283  #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
284  std::stringstream dbgStr;
285  dbgStr<<"MessageDataCollator"<<myName<<": Received RDMA setup request from sender "<<msg->sender()<<" on proc "<<msg->senderPE()<<"."
286  <<"\n\tRDMA token at Receiver: "<<*replyToken<<" dataSize = "<<msg->numDataUnits();
287  CkPrintf("%s\n",dbgStr.str().c_str());
288  #endif
289  /// You usually will not want to setup an RDMA link in the middle of receiving a batch of messages
290  CkAssert(numReceived == 0);
291  /// Flag the immediate next batch of messages as via RDMA ( which I expect, the user will respect :) )
292  isNextBatchViaRDMA = true;
293 
294  /// Get the message size
295  numCols = msg->numDataUnits();
296  numRows = 1;
297  numUnitsInMsg = numRows * numCols;
298  /// Get the required buffer size
299  int reqdBufferSizeInUnits = numExpected * numUnitsInMsg;
300  /// If buffer exists AND I manage it AND its not big enough for this batch of data, delete it
301  if (dataBuffer && isBufferMine && reqdBufferSizeInUnits > bufferSizeInUnits)
302  {
303  #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
304  CkPrintf("MessageDataCollator%s: Deleting buffer as I need space for %d units. Available = %d units\n",myName.c_str(),reqdBufferSizeInUnits,bufferSizeInUnits);
305  #endif
306  delete outputMsg;
307  outputMsg = 0;
308  dataBuffer = 0;
309  }
310  /// If the buffer is unallocated ...
311  if (!dataBuffer)
312  {
313  /// Get some memory to hold the data
314  bufferSizeInUnits = reqdBufferSizeInUnits;
315  #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
316  CkPrintf("MessageDataCollator%s: Allocating buffer of size %d units for %d data packets each having %d units in %d rows x %d cols.\n",myName.c_str(),bufferSizeInUnits,numExpected,numUnitsInMsg,numRows,numCols);
317  #endif
318  outputMsg = new (numExpected*numRows*numCols) msgType(-1,numCols,numRows*numExpected);
319  dataBuffer = outputMsg->data();
320  /// Zero out the memory region
321  bzero(dataBuffer,bufferSizeInUnits * sizeof(dataType));
322  } /// endif
323 
324  /// Compute the offset for this sender.This may depend on (for eg) the sender index and thisIndex
325  int offset = msg->sender() - minSender;
326  /// Get the region of the data buffer that will store data from this sender
327  dataType *recvLoc = &(dataBuffer[offset*numUnitsInMsg]);
328  /// Create a function pointer to give converse for a callback
330  //double const actualQNaN = std::numeric_limits<double>::quiet_NaN();
331  double const actualQNaN = 999999999999999999999999999999999999999999999999.9999999999;
332  /// Create the RDMA handle for this sender-receiver pair
333  rdmaHandleType rHandle = CmiDirect_createHandle(msg->senderPE(),recvLoc,numUnitsInMsg*sizeof(dataType),
334  fnPtr,reinterpret_cast<void*>(this),actualQNaN);
335  /// Store this handle for reference
336  RDMAhandles.push_back(rHandle);
337  /// Determine if the handshake token to be sent in the response has been supplied explicitly or should simply be copied from the request msg
338  tokenType rToken = msg->token();
339  if (replyToken !=0)
340  rToken = *replyToken;
341 
342  /// Prepare an RDMA request acceptance notification (and include the RDMA handle in the message for the data sender's use)
344  /// Notify the data sender of acceptance, to complete the RDMA setup handshake
345  msg->senderCallback().send(acceptanceMsg);
346  /// Do NOT delete the msg; MessageDataCollator is not a chare and doesnt worry about the message formalities. Let the user code handle this
347  #endif // ENABLE_COLLATOR_RDMA
348 }
349 
350 
351 
352 template <class msgType,typename dataType>
354 {
355  #ifndef COLLATOR_ENABLE_RDMA
356  CkAbort("MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
357  #else
358  /// If this is a fresh batch of messages, flag it as via RDMA, else check if it is
359  if (numReceived != 0)
360  CkAssert(isThisBatchViaRDMA == true);
361  else
362  {
363  /// Ensure that I was expecting this batch to be via RDMA (@todo: does this hold for the very first batch too?)
364  CkAssert(isNextBatchViaRDMA == true);
365  /// Flag this batch as via RDMA
366  isThisBatchViaRDMA = true;
367  /// Next batch is via messages unless I am explicitly told otherwise
368  isNextBatchViaRDMA = false;
369  }
370 
371  /// Increment the tally of completed RDMAs
372  numReceived++;
373 
374  /// If the expected number have arrived ...
375  if (numReceived >= numExpected)
376  {
377  #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
378  CkPrintf("MessageDataCollator%s: Collated %d of %d RDMA data packets. Gonna trigger my client\n",myName.c_str(),numReceived,numExpected);
379  #endif
380  /// Increment the number of message batches that have been processed
381  numBatchesProcessed++;
382  /// Batches of RDMA cannot have sample messages. Delete if older one exists.
383  if (sampleMsg)
384  delete sampleMsg;
385  sampleMsg = 0;
386  /// Trigger the post-collation work via the callback
387  uponCompletion.send(outputMsg);
388  /// Reset the number received so that we can start collating all over again
389  numReceived = 0;
390  } /// endif
391  else
392  {
393  #ifdef DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
394  CkPrintf("MessageDataCollator%s: Someone deposited data via RDMA. Collated %d of %d.\n",myName.c_str(),numReceived,numExpected);
395  #endif
396  }
397  #endif
398 }
399 
400 
401 
402 template <class msgType,typename dataType>
404 {
405  #ifndef COLLATOR_ENABLE_RDMA
406  CkAbort("MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
407  #else
408  MessageDataCollator<msgType,dataType> *thisCollator = reinterpret_cast<MessageDataCollator<msgType,dataType>*>(thisObj);
409  (*thisCollator)();
410  #endif
411 }
412 
413 
414 
415 template <class msgType,typename dataType>
417 {
418  #ifndef COLLATOR_ENABLE_RDMA
419  CkAbort("MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
420  #else
421  /// Ensure that we're not in the middle of receiving a batch of messages @todo: Should this be a non-assert if condition thats always checked?
422  CkAssert( numReceived==0 || numReceived>=numExpected );
423  /// Set the flag the will tell me to not allow data via messages for the next batch
424  isNextBatchViaRDMA = true;
425  #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
426  CkPrintf("MessageDataCollator%s: Expecting the next batch of data via RDMA. Notifying the runtime to watch %d receive buffers.\n",myName.c_str(),RDMAhandles.size());
427  #endif
428  /// Notify the runtime to expect data arrival via RDMA
429  for (int i=0;i<RDMAhandles.size();i++)
430  CmiDirect_ready(&RDMAhandles[i]);
431  #endif
432 }
433 
434 
435 } // end namespace paircalc
436 } // end namespace cp
437 /*@}*/
438 #endif // MESSAGE_DATA_COLLATOR_H
439 
bool isBufferMine
If buffer is mine, I manage it, viz. reuse the same buffer for every batch of msgs where possible...
const CkCallback & senderCallback() const
Get hold of the callback the sender has configured.
Definition: RDMAMessages.h:34
CkVec< rdmaHandleType > RDMAhandles
A vector of RDMA handles, one for each sender.
unsigned int minSender
Senders should have an ID accessed by msgType::sender().
unsigned int getNumBatchesDone() const
Get the number of batches of incoming messages already processed.
unsigned int getNumExpected() const
Get the number of messages this collator has been configured to buffer.
int numDataUnits() const
Get the number of records of data that will be sent by RDMA.
Definition: RDMAMessages.h:32
unsigned int getNumReceived() const
Check the number of messages that have arrived.
unsigned int numExpected
Counters to keep track of the messages.
msgType * sampleMsg
Handle to a sample message copied from every batch of incoming messages.
CkCallback uponCompletion
Callback that is passed in to trigger the post-collation work.
int senderPE() const
Get the PE number of the sender.
Definition: RDMAMessages.h:30
Class that buffers incoming data (via messages/RDMA) till it counts a pre-specified number of arrival...
A request from a data sender to setup an RDMA link. Initiates the sender-receiver handshake required ...
Definition: RDMAMessages.h:22
MessageDataCollator(std::string debugID, CkCallback callbk, const unsigned int nExpected, bool manageBuffer=false, const unsigned int offsetStart=0)
Collators should be constructed thus.
Based on whether RDMA is enabled, the handle type is either the actual handle or just an empty struct...
Definition: RDMAMessages.h:16
void setupRDMA(RDMASetupRequestMsg< tokenType > *msg, tokenType *replyToken=0)
Called by a data sender to setup an RDMA link with me (data receiver)
std::string myName
A string that encodes some kind of ID supplied by the owner of an instance.
bool isNextBatchViaRDMA
Booleans indicating if the next/current batch of messages are via RDMA.
tokenType token() const
Get a copy of the handshake token the sender created.
Definition: RDMAMessages.h:26
dataType * dataBuffer
A pointer to the memory region (contained in outputMsg) where incoming data is collated.
int sender() const
Get an integer representation of the sender's ID.
Definition: RDMAMessages.h:28
msgType * getSampleMsg()
Get a copy of a sample message from the last completed batch of messages. Copy managed by the user...
static void collatorCallback(void *thisObj)
Fed to CmiDirect as a callback to be triggered when some data is deposited by RDMA.
void operator()(void)
RDMA equivalent of msg handling. When data arrives via RDMA, this is called for handling it...
Reply from data receiver to the data sender indicating completion of setup on the receiver side...
msgType * outputMsg
The message (which has the data buffer) in which the incoming data is collated.
unsigned int numRows
Variables related to the message/buffer sizes.
void expectNext()
Ask me to notify converse that we're ready for the next batch of messages.