00001 #ifndef _MESH_STREAMER_H_
00002 #define _MESH_STREAMER_H_
00003
00004 #include <algorithm>
00005 #include "MeshStreamer.decl.h"
00006
00007
00008 #define BUCKET_SIZE_FACTOR 4
00009
00010
00011
00012
00013
00014 enum MeshStreamerMessageType {PlaneMessage, ColumnMessage, PersonalizedMessage};
00015
00016 class MeshLocation {
00017 public:
00018 int rowIndex;
00019 int columnIndex;
00020 int planeIndex;
00021 MeshStreamerMessageType msgType;
00022 };
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 template<class dtype>
00050 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
00051 public:
00052 int numDataItems;
00053 int *destinationPes;
00054 dtype *data;
00055
00056 MeshStreamerMessage(): numDataItems(0) {}
00057
00058 int addDataItem(const dtype &dataItem) {
00059 data[numDataItems] = dataItem;
00060 return ++numDataItems;
00061 }
00062
00063 void markDestination(const int index, const int destinationPe) {
00064 destinationPes[index] = destinationPe;
00065 }
00066
00067 dtype &getDataItem(const int index) {
00068 return data[index];
00069 }
00070 };
00071
00072 template <class dtype>
00073 class MeshStreamerClient : public CBase_MeshStreamerClient<dtype> {
00074 public:
00075 virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
00076 virtual void process(dtype &data)=0;
00077 };
00078
00079 template <class dtype>
00080 class MeshStreamer : public CBase_MeshStreamer<dtype> {
00081
00082 private:
00083 int bucketSize_;
00084 int totalBufferCapacity_;
00085 int numDataItemsBuffered_;
00086
00087 int numNodes_;
00088 int numRows_;
00089 int numColumns_;
00090 int numPlanes_;
00091 int planeSize_;
00092
00093 CProxy_MeshStreamerClient<dtype> clientProxy_;
00094 MeshStreamerClient<dtype> *clientObj_;
00095
00096 int myNodeIndex_;
00097 int myPlaneIndex_;
00098 int myColumnIndex_;
00099 int myRowIndex_;
00100
00101 CkCallback userCallback_;
00102 int yieldFlag_;
00103
00104 double progressPeriodInMs_;
00105 bool isPeriodicFlushEnabled_;
00106 double timeOfLastSend_;
00107
00108 MeshStreamerMessage<dtype> **personalizedBuffers_;
00109 MeshStreamerMessage<dtype> **columnBuffers_;
00110 MeshStreamerMessage<dtype> **planeBuffers_;
00111
00112 #ifdef CACHE_LOCATIONS
00113 MeshLocation *cachedLocations;
00114 bool *isCached;
00115 #endif
00116
00117 #ifdef SUPPORT_INCOMPLETE_MESH
00118 int numNodesInLastPlane_;
00119 int numFullRowsInLastPlane_;
00120 int numColumnsInLastRow_;
00121 #endif
00122
00123 void determineLocation(const int destinationPe,
00124 MeshLocation &destinationCoordinates);
00125
00126 void storeMessage(MeshStreamerMessage<dtype> ** const messageBuffers,
00127 const int bucketIndex, const int destinationPe,
00128 const MeshLocation &destinationCoordinates, const dtype &dataItem);
00129
00130 void flushLargestBucket(MeshStreamerMessage<dtype> ** const messageBuffers,
00131 const int numBuffers, const int myIndex,
00132 const int dimensionFactor);
00133
00134 public:
00135
00136 MeshStreamer(int totalBufferCapacity, int numRows,
00137 int numColumns, int numPlanes,
00138 const CProxy_MeshStreamerClient<dtype> &clientProxy,
00139 int yieldFlag = 0, double progressPeriodInMs = -1.0);
00140 ~MeshStreamer();
00141
00142
00143 void insertData(dtype &dataItem, const int destinationPe);
00144 void doneInserting();
00145 void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
00146
00147
00148 void flushBuckets(MeshStreamerMessage<dtype> **messageBuffers, const int numBuffers);
00149 void flushDirect();
00150
00151 bool isPeriodicFlushEnabled() {
00152 return isPeriodicFlushEnabled_;
00153 }
00154
00155 void associateCallback(CkCallback &cb, bool automaticFinish = true) {
00156 userCallback_ = cb;
00157 if (automaticFinish) {
00158 CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), this->thisProxy));
00159 }
00160 }
00161
00162 void registerPeriodicProgressFunction();
00163 void finish(CkReductionMsg *msg);
00164
00165
00166
00167
00168 void enablePeriodicFlushing(){
00169 isPeriodicFlushEnabled_ = true;
00170 registerPeriodicProgressFunction();
00171 }
00172 };
00173
00174 template <class dtype>
00175 void MeshStreamerClient<dtype>::receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
00176 for (int i = 0; i < msg->numDataItems; i++) {
00177 dtype data = ((dtype*)(msg->data))[i];
00178 process(data);
00179 }
00180 delete msg;
00181 }
00182
00183 template <class dtype>
00184 MeshStreamer<dtype>::MeshStreamer(int totalBufferCapacity, int numRows,
00185 int numColumns, int numPlanes,
00186 const CProxy_MeshStreamerClient<dtype> &clientProxy,
00187 int yieldFlag, double progressPeriodInMs): yieldFlag_(yieldFlag) {
00188
00189
00190
00191
00192 bucketSize_ = BUCKET_SIZE_FACTOR * totalBufferCapacity / (numRows + numColumns + numPlanes - 2);
00193 if (bucketSize_ <= 0) {
00194 bucketSize_ = 1;
00195 CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
00196 "is invalid. Defaulting to a single buffer per destination.\n");
00197 }
00198 totalBufferCapacity_ = totalBufferCapacity;
00199 numDataItemsBuffered_ = 0;
00200 numRows_ = numRows;
00201 numColumns_ = numColumns;
00202 numPlanes_ = numPlanes;
00203 numNodes_ = CkNumPes();
00204 clientProxy_ = clientProxy;
00205 clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
00206 progressPeriodInMs_ = progressPeriodInMs;
00207
00208 personalizedBuffers_ = new MeshStreamerMessage<dtype> *[numRows];
00209 for (int i = 0; i < numRows; i++) {
00210 personalizedBuffers_[i] = NULL;
00211 }
00212
00213 columnBuffers_ = new MeshStreamerMessage<dtype> *[numColumns];
00214 for (int i = 0; i < numColumns; i++) {
00215 columnBuffers_[i] = NULL;
00216 }
00217
00218 planeBuffers_ = new MeshStreamerMessage<dtype> *[numPlanes];
00219 for (int i = 0; i < numPlanes; i++) {
00220 planeBuffers_[i] = NULL;
00221 }
00222
00223
00224 myNodeIndex_ = CkMyPe();
00225 planeSize_ = numRows_ * numColumns_;
00226 myPlaneIndex_ = myNodeIndex_ / planeSize_;
00227 int indexWithinPlane = myNodeIndex_ - myPlaneIndex_ * planeSize_;
00228 myRowIndex_ = indexWithinPlane / numColumns_;
00229 myColumnIndex_ = indexWithinPlane - myRowIndex_ * numColumns_;
00230
00231 isPeriodicFlushEnabled_ = false;
00232
00233 #ifdef CACHE_LOCATIONS
00234 cachedLocations = new MeshLocation[numNodes_];
00235 isCached = new bool[numNodes_];
00236 std::fill(isCached, isCached + numNodes_, false);
00237 #endif
00238
00239 #ifdef SUPPORT_INCOMPLETE_MESH
00240 numNodesInLastPlane_ = numNodes_ % planeSize_;
00241 numFullRowsInLastPlane_ = numNodesInLastPlane_ / numColumns_;
00242 numColumnsInLastRow_ = numNodesInLastPlane_ - numFullRowsInLastPlane_ * numColumns_;
00243 #endif
00244 }
00245
00246 template <class dtype>
00247 MeshStreamer<dtype>::~MeshStreamer() {
00248
00249 for (int i = 0; i < numRows_; i++)
00250 delete personalizedBuffers_[i];
00251
00252 for (int i = 0; i < numColumns_; i++)
00253 delete columnBuffers_[i];
00254
00255 for (int i = 0; i < numPlanes_; i++)
00256 delete planeBuffers_[i];
00257
00258 delete[] personalizedBuffers_;
00259 delete[] columnBuffers_;
00260 delete[] planeBuffers_;
00261
00262 }
00263
00264 template <class dtype>
00265 void MeshStreamer<dtype>::determineLocation(const int destinationPe,
00266 MeshLocation &destinationCoordinates) {
00267
00268 int nodeIndex, indexWithinPlane;
00269
00270 #ifdef CACHE_LOCATIONS
00271 if (isCached[destinationPe] == true) {
00272 destinationCoordinates = cachedLocations[destinationPe];
00273 return;
00274 }
00275 #endif
00276
00277 nodeIndex = destinationPe;
00278 destinationCoordinates.planeIndex = nodeIndex / planeSize_;
00279 if (destinationCoordinates.planeIndex != myPlaneIndex_) {
00280 destinationCoordinates.msgType = PlaneMessage;
00281 }
00282 else {
00283 indexWithinPlane =
00284 nodeIndex - destinationCoordinates.planeIndex * planeSize_;
00285 destinationCoordinates.rowIndex = indexWithinPlane / numColumns_;
00286 destinationCoordinates.columnIndex =
00287 indexWithinPlane - destinationCoordinates.rowIndex * numColumns_;
00288 if (destinationCoordinates.columnIndex != myColumnIndex_) {
00289 destinationCoordinates.msgType = ColumnMessage;
00290 }
00291 else {
00292 destinationCoordinates.msgType = PersonalizedMessage;
00293 }
00294 }
00295
00296 #ifdef CACHE_LOCATIONS
00297 cachedLocations[destinationPe] = destinationCoordinates;
00298 #endif
00299
00300 }
00301
00302 template <class dtype>
00303 void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messageBuffers,
00304 const int bucketIndex, const int destinationPe,
00305 const MeshLocation& destinationCoordinates,
00306 const dtype &dataItem) {
00307
00308
00309 if (messageBuffers[bucketIndex] == NULL) {
00310 if (destinationCoordinates.msgType == PersonalizedMessage) {
00311 messageBuffers[bucketIndex] =
00312 new (0, bucketSize_) MeshStreamerMessage<dtype>();
00313 }
00314 else {
00315 messageBuffers[bucketIndex] =
00316 new (bucketSize_, bucketSize_) MeshStreamerMessage<dtype>();
00317 }
00318 #ifdef DEBUG_STREAMER
00319 CkAssert(messageBuffers[bucketIndex] != NULL);
00320 #endif
00321 }
00322
00323 MeshStreamerMessage<dtype> *destinationBucket = messageBuffers[bucketIndex];
00324
00325 int numBuffered = destinationBucket->addDataItem(dataItem);
00326 if (destinationCoordinates.msgType != PersonalizedMessage) {
00327 destinationBucket->markDestination(numBuffered-1, destinationPe);
00328 }
00329 numDataItemsBuffered_++;
00330
00331 if (numBuffered == bucketSize_) {
00332 int destinationIndex;
00333 switch (destinationCoordinates.msgType) {
00334
00335 case PlaneMessage:
00336 destinationIndex = myNodeIndex_ +
00337 (destinationCoordinates.planeIndex - myPlaneIndex_) * planeSize_;
00338 #ifdef SUPPORT_INCOMPLETE_MESH
00339 if (destinationIndex >= numNodes_) {
00340 int numValidRows = numFullRowsInLastPlane_;
00341 if (numColumnsInLastRow_ > myColumnIndex_) {
00342 numValidRows++;
00343 }
00344 destinationIndex = destinationCoordinates.planeIndex * planeSize_ +
00345 myColumnIndex_ + (myRowIndex_ % numValidRows) * numColumns_;
00346 }
00347 #endif
00348 this->thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
00349 break;
00350 case ColumnMessage:
00351 destinationIndex = myNodeIndex_ +
00352 (destinationCoordinates.columnIndex - myColumnIndex_);
00353 #ifdef SUPPORT_INCOMPLETE_MESH
00354 if (destinationIndex >= numNodes_) {
00355 destinationIndex = destinationCoordinates.planeIndex * planeSize_ +
00356 destinationCoordinates.columnIndex +
00357 (myColumnIndex_ % numFullRowsInLastPlane_) * numColumns_;
00358 }
00359 #endif
00360 this->thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
00361 break;
00362 case PersonalizedMessage:
00363 destinationIndex = myNodeIndex_ +
00364 (destinationCoordinates.rowIndex - myRowIndex_) * numColumns_;
00365 clientProxy_[destinationIndex].receiveCombinedData(destinationBucket);
00366
00367 break;
00368 default:
00369 CkError("Incorrect MeshStreamer message type\n");
00370 break;
00371 }
00372 messageBuffers[bucketIndex] = NULL;
00373 numDataItemsBuffered_ -= numBuffered;
00374
00375 if (isPeriodicFlushEnabled_) {
00376 timeOfLastSend_ = CkWallTimer();
00377 }
00378
00379 }
00380
00381 if (numDataItemsBuffered_ == totalBufferCapacity_) {
00382
00383 flushLargestBucket(personalizedBuffers_, numRows_, myRowIndex_, numColumns_);
00384 flushLargestBucket(columnBuffers_, numColumns_, myColumnIndex_, 1);
00385 flushLargestBucket(planeBuffers_, numPlanes_, myPlaneIndex_, planeSize_);
00386
00387 if (isPeriodicFlushEnabled_) {
00388 timeOfLastSend_ = CkWallTimer();
00389 }
00390
00391 }
00392
00393 }
00394
00395 template <class dtype>
00396 void MeshStreamer<dtype>::insertData(dtype &dataItem, const int destinationPe) {
00397 static int count = 0;
00398
00399 if (destinationPe == CkMyPe()) {
00400 clientObj_->process(dataItem);
00401 return;
00402 }
00403
00404 MeshLocation destinationCoordinates;
00405
00406 determineLocation(destinationPe, destinationCoordinates);
00407
00408
00409 MeshStreamerMessage<dtype> **messageBuffers;
00410 int bucketIndex;
00411
00412 switch (destinationCoordinates.msgType) {
00413 case PlaneMessage:
00414 messageBuffers = planeBuffers_;
00415 bucketIndex = destinationCoordinates.planeIndex;
00416 break;
00417 case ColumnMessage:
00418 messageBuffers = columnBuffers_;
00419 bucketIndex = destinationCoordinates.columnIndex;
00420 break;
00421 case PersonalizedMessage:
00422 messageBuffers = personalizedBuffers_;
00423 bucketIndex = destinationCoordinates.rowIndex;
00424 break;
00425 default:
00426 CkError("Unrecognized MeshStreamer message type\n");
00427 break;
00428 }
00429
00430 storeMessage(messageBuffers, bucketIndex, destinationPe, destinationCoordinates,
00431 dataItem);
00432
00433
00434
00435 if (yieldFlag_ && ++count == 1024) {
00436 count = 0;
00437 CthYield();
00438 }
00439 }
00440
00441 template <class dtype>
00442 void MeshStreamer<dtype>::doneInserting() {
00443 this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), this->thisProxy));
00444 }
00445
00446 template <class dtype>
00447 void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
00448
00449 isPeriodicFlushEnabled_ = false;
00450 flushDirect();
00451
00452 if (!userCallback_.isInvalid()) {
00453 CkStartQD(userCallback_);
00454 userCallback_ = CkCallback();
00455 }
00456
00457
00458 }
00459
00460
00461 template <class dtype>
00462 void MeshStreamer<dtype>::receiveAggregateData(MeshStreamerMessage<dtype> *msg) {
00463
00464 int destinationPe;
00465 MeshStreamerMessageType msgType;
00466 MeshLocation destinationCoordinates;
00467
00468 for (int i = 0; i < msg->numDataItems; i++) {
00469 destinationPe = msg->destinationPes[i];
00470 dtype &dataItem = msg->getDataItem(i);
00471 determineLocation(destinationPe, destinationCoordinates);
00472 #ifdef DEBUG_STREAMER
00473 CkAssert(destinationCoordinates.planeIndex == myPlaneIndex_);
00474
00475 if (destinationCoordinates.msgType == PersonalizedMessage) {
00476 CkAssert(destinationCoordinates.columnIndex == myColumnIndex_);
00477 }
00478 #endif
00479
00480 MeshStreamerMessage<dtype> **messageBuffers;
00481 int bucketIndex;
00482
00483 switch (destinationCoordinates.msgType) {
00484 case ColumnMessage:
00485 messageBuffers = columnBuffers_;
00486 bucketIndex = destinationCoordinates.columnIndex;
00487 break;
00488 case PersonalizedMessage:
00489 messageBuffers = personalizedBuffers_;
00490 bucketIndex = destinationCoordinates.rowIndex;
00491 break;
00492 default:
00493 CkError("Incorrect MeshStreamer message type\n");
00494 break;
00495 }
00496
00497 storeMessage(messageBuffers, bucketIndex, destinationPe,
00498 destinationCoordinates, dataItem);
00499
00500 }
00501
00502 delete msg;
00503
00504 }
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541 template <class dtype>
00542 void MeshStreamer<dtype>::flushLargestBucket(MeshStreamerMessage<dtype> ** const messageBuffers,
00543 const int numBuffers, const int myIndex,
00544 const int dimensionFactor) {
00545
00546 int flushIndex, maxSize, destinationIndex;
00547 MeshStreamerMessage<dtype> *destinationBucket;
00548 maxSize = 0;
00549 for (int i = 0; i < numBuffers; i++) {
00550 if (messageBuffers[i] != NULL && messageBuffers[i]->numDataItems > maxSize) {
00551 maxSize = messageBuffers[i]->numDataItems;
00552 flushIndex = i;
00553 }
00554 }
00555 if (maxSize > 0) {
00556 destinationBucket = messageBuffers[flushIndex];
00557 destinationIndex = myNodeIndex_ + (flushIndex - myIndex) * dimensionFactor;
00558
00559 if (destinationBucket->numDataItems < bucketSize_) {
00560
00561 envelope *env = UsrToEnv(destinationBucket);
00562 env->setTotalsize(env->getTotalsize() - (bucketSize_ - destinationBucket->numDataItems) * sizeof(dtype));
00563 }
00564 numDataItemsBuffered_ -= destinationBucket->numDataItems;
00565
00566 if (messageBuffers == personalizedBuffers_) {
00567 clientProxy_[destinationIndex].receiveCombinedData(destinationBucket);
00568 }
00569 else {
00570 this->thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
00571 }
00572 messageBuffers[flushIndex] = NULL;
00573 }
00574 }
00575
00576 template <class dtype>
00577 void MeshStreamer<dtype>::flushBuckets(MeshStreamerMessage<dtype> **messageBuffers, const int numBuffers)
00578 {
00579
00580 for (int i = 0; i < numBuffers; i++) {
00581 if(messageBuffers[i] == NULL)
00582 continue;
00583
00584 numDataItemsBuffered_ -= messageBuffers[i]->numDataItems;
00585 if (messageBuffers == personalizedBuffers_) {
00586 int destinationPe = myNodeIndex_ + (i - myRowIndex_) * numColumns_;
00587 clientProxy_[destinationPe].receiveCombinedData(messageBuffers[i]);
00588 }
00589 else {
00590 for (int j = 0; j < messageBuffers[i]->numDataItems; j++) {
00591 MeshStreamerMessage<dtype> *directMsg =
00592 new (0, 1) MeshStreamerMessage<dtype>();
00593 #ifdef DEBUG_STREAMER
00594 CkAssert(directMsg != NULL);
00595 #endif
00596 int destinationPe = messageBuffers[i]->destinationPes[j];
00597 dtype dataItem = messageBuffers[i]->getDataItem(j);
00598 directMsg->addDataItem(dataItem);
00599 clientProxy_[destinationPe].receiveCombinedData(directMsg);
00600 }
00601 delete messageBuffers[i];
00602 }
00603 messageBuffers[i] = NULL;
00604 }
00605
00606 }
00607
00608 template <class dtype>
00609 void MeshStreamer<dtype>::flushDirect(){
00610
00611 if (!isPeriodicFlushEnabled_ ||
00612 1000 * (CkWallTimer() - timeOfLastSend_) >= progressPeriodInMs_) {
00613 flushBuckets(planeBuffers_, numPlanes_);
00614 flushBuckets(columnBuffers_, numColumns_);
00615 flushBuckets(personalizedBuffers_, numRows_);
00616 }
00617
00618 if (isPeriodicFlushEnabled_) {
00619 timeOfLastSend_ = CkWallTimer();
00620 }
00621
00622 #ifdef DEBUG_STREAMER
00623
00624 CkAssert(numDataItemsBuffered_ == 0);
00625 #endif
00626
00627 }
00628
00629 template <class dtype>
00630 void periodicProgressFunction(void *MeshStreamerObj, double time) {
00631
00632 MeshStreamer<dtype> *properObj =
00633 static_cast<MeshStreamer<dtype>*>(MeshStreamerObj);
00634
00635 if (properObj->isPeriodicFlushEnabled()) {
00636 properObj->flushDirect();
00637 properObj->registerPeriodicProgressFunction();
00638 }
00639 }
00640
00641 template <class dtype>
00642 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
00643 CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, progressPeriodInMs_);
00644 }
00645
00646
00647 #define CK_TEMPLATES_ONLY
00648 #include "MeshStreamer.def.h"
00649 #undef CK_TEMPLATES_ONLY
00650
00651 #endif