2 #ifdef COLLATOR_ENABLE_RDMA
3 #define ENABLE_RDMA_HANDSHAKES
6 #include "RDMAMessages.h"
11 #ifndef MESSAGE_DATA_COLLATOR_H
12 #define MESSAGE_DATA_COLLATOR_H
22 #ifdef DEBUG_MESSAGEDATACOLLATOR_ALL
23 #define DEBUG_MESSAGEDATACOLLATOR
24 #define DEBUG_MESSAGEDATACOLLATOR_CREATION
25 #define DEBUG_MESSAGEDATACOLLATOR_RDMA
26 #define DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
52 template <
class msgType,
typename dataType>
63 MessageDataCollator(std::string debugID, CkCallback callbk,
const unsigned int nExpected,
bool manageBuffer=
false,
const unsigned int offsetStart=0):
73 numBatchesProcessed(0),
75 isThisBatchViaRDMA(false),
79 #ifdef DEBUG_MESSAGEDATACOLLATOR_CREATION
80 CkPrintf(
"MessageDataCollator%s instantiated: numExpected = %d and buffer management = %d on(1)/off(0)\n",
myName.c_str(),
numExpected,
isBufferMine);
82 #ifdef COLLATOR_ENABLE_RDMA
89 CkAbort(
"MessageDataCollator: You cannot enable RDMA and want to manage the data buffer too. The buffer will have to be mine.");
112 template<
class tokenType>
147 unsigned int numRows, numCols, numUnitsInMsg, bufferSizeInUnits;
162 template <
class msgType,
typename dataType>
174 if (numReceived != 0)
175 CkAssert(isThisBatchViaRDMA ==
false);
180 CkAssert(isNextBatchViaRDMA ==
false);
182 isThisBatchViaRDMA =
false;
184 numRows = msg->numRows();
185 numCols = msg->numCols();
186 numUnitsInMsg = numRows * numCols;
188 int reqdBufferSizeInUnits = numExpected * numUnitsInMsg;
190 if (dataBuffer && isBufferMine && reqdBufferSizeInUnits > bufferSizeInUnits)
192 #ifdef DEBUG_MESSAGEDATACOLLATOR
193 CkPrintf(
"MessageDataCollator%s: Deleting buffer as I need space for %d units. Available = %d units\n",myName.c_str(),reqdBufferSizeInUnits,bufferSizeInUnits);
203 bufferSizeInUnits = reqdBufferSizeInUnits;
204 #ifdef DEBUG_MESSAGEDATACOLLATOR
205 CkPrintf(
"MessageDataCollator%s: Allocating buffer of size %d units for %d data packets each having %d units in %d rows x %d cols.\n",myName.c_str(),bufferSizeInUnits,numExpected,numUnitsInMsg,numRows,numCols);
207 outputMsg =
new (numExpected*numRows*numCols) msgType(-1,numCols,numRows*numExpected);
208 dataBuffer = outputMsg->data();
210 bzero(dataBuffer,bufferSizeInUnits *
sizeof(dataType));
215 CkAssert( (outputMsg != NULL) && (dataBuffer != NULL) );
217 CkAssert( (numRows == msg->numRows()) && (numCols == msg->numCols()) );
220 int offset = msg->sender() - minSender;
222 CmiMemcpy(&(dataBuffer[offset*numUnitsInMsg]), msg->data(), numUnitsInMsg *
sizeof(dataType));
228 if (numReceived >= numExpected)
230 #ifdef DEBUG_MESSAGEDATACOLLATOR
231 CkPrintf(
"MessageDataCollator%s: Collated data from %d of %d messages at %p. Gonna trigger my client\n",myName.c_str(),numReceived,numExpected,dataBuffer);
234 numBatchesProcessed++;
238 sampleMsg =
reinterpret_cast<msgType*
> ( CkCopyMsg(reinterpret_cast<void**>(&msg)) );
240 uponCompletion.send(outputMsg);
252 #ifdef DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
253 CkPrintf(
"MessageDataCollator%s: Received message %d of %d.\n",myName.c_str(),numReceived,numExpected);
266 template <
class msgType,
typename dataType>
270 return reinterpret_cast<msgType*
> ( CkCopyMsg(reinterpret_cast<void**>(&sampleMsg)) );
277 template <
class msgType,
typename dataType>
template <
class tokenType>
280 #ifndef COLLATOR_ENABLE_RDMA
281 CkAbort(
"MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
283 #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
284 std::stringstream dbgStr;
285 dbgStr<<
"MessageDataCollator"<<myName<<
": Received RDMA setup request from sender "<<msg->
sender()<<
" on proc "<<msg->
senderPE()<<
"."
286 <<
"\n\tRDMA token at Receiver: "<<*replyToken<<
" dataSize = "<<msg->
numDataUnits();
287 CkPrintf(
"%s\n",dbgStr.str().c_str());
290 CkAssert(numReceived == 0);
292 isNextBatchViaRDMA =
true;
297 numUnitsInMsg = numRows * numCols;
299 int reqdBufferSizeInUnits = numExpected * numUnitsInMsg;
301 if (dataBuffer && isBufferMine && reqdBufferSizeInUnits > bufferSizeInUnits)
303 #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
304 CkPrintf(
"MessageDataCollator%s: Deleting buffer as I need space for %d units. Available = %d units\n",myName.c_str(),reqdBufferSizeInUnits,bufferSizeInUnits);
314 bufferSizeInUnits = reqdBufferSizeInUnits;
315 #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
316 CkPrintf(
"MessageDataCollator%s: Allocating buffer of size %d units for %d data packets each having %d units in %d rows x %d cols.\n",myName.c_str(),bufferSizeInUnits,numExpected,numUnitsInMsg,numRows,numCols);
318 outputMsg =
new (numExpected*numRows*numCols) msgType(-1,numCols,numRows*numExpected);
319 dataBuffer = outputMsg->data();
321 bzero(dataBuffer,bufferSizeInUnits *
sizeof(dataType));
325 int offset = msg->
sender() - minSender;
327 dataType *recvLoc = &(dataBuffer[offset*numUnitsInMsg]);
331 double const actualQNaN = 999999999999999999999999999999999999999999999999.9999999999;
334 fnPtr,reinterpret_cast<void*>(
this),actualQNaN);
336 RDMAhandles.push_back(rHandle);
338 tokenType rToken = msg->
token();
340 rToken = *replyToken;
347 #endif // ENABLE_COLLATOR_RDMA
352 template <
class msgType,
typename dataType>
355 #ifndef COLLATOR_ENABLE_RDMA
356 CkAbort(
"MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
359 if (numReceived != 0)
360 CkAssert(isThisBatchViaRDMA ==
true);
364 CkAssert(isNextBatchViaRDMA ==
true);
366 isThisBatchViaRDMA =
true;
368 isNextBatchViaRDMA =
false;
375 if (numReceived >= numExpected)
377 #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
378 CkPrintf(
"MessageDataCollator%s: Collated %d of %d RDMA data packets. Gonna trigger my client\n",myName.c_str(),numReceived,numExpected);
381 numBatchesProcessed++;
387 uponCompletion.send(outputMsg);
393 #ifdef DEBUG_MESSAGEDATACOLLATOR_EACHARRIVAL
394 CkPrintf(
"MessageDataCollator%s: Someone deposited data via RDMA. Collated %d of %d.\n",myName.c_str(),numReceived,numExpected);
402 template <
class msgType,
typename dataType>
405 #ifndef COLLATOR_ENABLE_RDMA
406 CkAbort(
"MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
415 template <
class msgType,
typename dataType>
418 #ifndef COLLATOR_ENABLE_RDMA
419 CkAbort(
"MessageDataCollator aborting because someone called an RDMA method when it has been turned off for me...\n");
422 CkAssert( numReceived==0 || numReceived>=numExpected );
424 isNextBatchViaRDMA =
true;
425 #ifdef DEBUG_MESSAGEDATACOLLATOR_RDMA
426 CkPrintf(
"MessageDataCollator%s: Expecting the next batch of data via RDMA. Notifying the runtime to watch %d receive buffers.\n",myName.c_str(),RDMAhandles.size());
429 for (
int i=0;i<RDMAhandles.size();i++)
430 CmiDirect_ready(&RDMAhandles[i]);
438 #endif // MESSAGE_DATA_COLLATOR_H
bool isBufferMine
If buffer is mine, I manage it, viz. reuse the same buffer for every batch of msgs where possible...
const CkCallback & senderCallback() const
Get hold of the callback the sender has configured.
CkVec< rdmaHandleType > RDMAhandles
A vector of RDMA handles, one for each sender.
unsigned int minSender
Senders should have an ID accessed by msgType::sender().
unsigned int getNumBatchesDone() const
Get the number of batches of incoming messages already processed.
unsigned int getNumExpected() const
Get the number of messages this collator has been configured to buffer.
int numDataUnits() const
Get the number of records of data that will be sent by RDMA.
unsigned int getNumReceived() const
Check the number of messages that have arrived.
unsigned int numExpected
Counters to keep track of the messages.
msgType * sampleMsg
Handle to a sample message copied from every batch of incoming messages.
CkCallback uponCompletion
Callback that is passed in to trigger the post-collation work.
int senderPE() const
Get the PE number of the sender.
~MessageDataCollator()
Destructor.
Class that buffers incoming data (via messages/RDMA) till it counts a pre-specified number of arrival...
A request from a data sender to setup an RDMA link. Initiates the sender-receiver handshake required ...
MessageDataCollator(std::string debugID, CkCallback callbk, const unsigned int nExpected, bool manageBuffer=false, const unsigned int offsetStart=0)
Collators should be constructed thus.
Based on whether RDMA is enabled, the handle type is either the actual handle or just an empty struct...
void setupRDMA(RDMASetupRequestMsg< tokenType > *msg, tokenType *replyToken=0)
Called by a data sender to setup an RDMA link with me (data receiver)
std::string myName
A string that encodes some kind of ID supplied by the owner of an instance.
bool isNextBatchViaRDMA
Booleans indicating if the next/current batch of messages are via RDMA.
tokenType token() const
Get a copy of the handshake token the sender created.
dataType * dataBuffer
A pointer to the memory region (contained in outputMsg) where incoming data is collated.
int sender() const
Get an integer representation of the sender's ID.
msgType * getSampleMsg()
Get a copy of a sample message from the last completed batch of messages. Copy managed by the user...
static void collatorCallback(void *thisObj)
Fed to CmiDirect as a callback to be triggered when some data is deposited by RDMA.
void operator()(void)
RDMA equivalent of msg handling. When data arrives via RDMA, this is called for handling it...
Reply from data receiver to the data sender indicating completion of setup on the receiver side...
msgType * outputMsg
The message (which has the data buffer) in which the incoming data is collated.
unsigned int numRows
Variables related to the message/buffer sizes.
void expectNext()
Ask me to notify converse that we're ready for the next batch of messages.