00001 #ifndef NDMESH_STREAMER_H
00002 #define NDMESH_STREAMER_H
00003
00004 #include <algorithm>
00005 #include "NDMeshStreamer.decl.h"
00006 #include "DataItemTypes.h"
00007 #include "completion.h"
00008 #include "ckarray.h"
00009
00010
00011
00012
00013
00014 #define OVERALLOCATION_FACTOR 4
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 struct MeshLocation {
00025 int dimension;
00026 int bufferIndex;
00027 };
00028
00029 template<class dtype>
00030 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
00031
00032 public:
00033
00034 #ifdef STAGED_COMPLETION
00035 int finalMsgCount;
00036 #endif
00037 int numDataItems;
00038 int *destinationPes;
00039 dtype *dataItems;
00040
00041 MeshStreamerMessage(): numDataItems(0) {
00042 #ifdef STAGED_COMPLETION
00043 finalMsgCount = -1;
00044 #endif
00045 }
00046
00047 int addDataItem(const dtype &dataItem) {
00048 dataItems[numDataItems] = dataItem;
00049 return ++numDataItems;
00050 }
00051
00052 void markDestination(const int index, const int destinationPe) {
00053 destinationPes[index] = destinationPe;
00054 }
00055
00056 dtype &getDataItem(const int index) {
00057 return dataItems[index];
00058 }
00059
00060 };
00061
00062 template <class dtype>
00063 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
00064 private:
00065 CompletionDetector *detectorLocalObj_;
00066 public:
00067 MeshStreamerArrayClient(){}
00068 MeshStreamerArrayClient(CkMigrateMessage *msg) {}
00069
00070
00071 virtual void process(dtype &data) {
00072 CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
00073 "This virtual function should have been defined by the user.\n");
00074 };
00075 void setDetector(CompletionDetector *detectorLocalObj) {
00076 detectorLocalObj_ = detectorLocalObj;
00077 }
00078 void receiveRedeliveredItem(dtype data) {
00079 #ifdef STREAMER_VERBOSE_OUTPUT
00080 CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
00081 #endif
00082 detectorLocalObj_->consume();
00083 process(data);
00084 }
00085
00086 void pup(PUP::er &p) {
00087 CBase_MeshStreamerArrayClient<dtype>::pup(p);
00088 }
00089
00090 };
00091
00092 template <class dtype>
00093 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
00094
00095 public:
00096 virtual void process(dtype &data) = 0;
00097
00098 };
00099
00100 template <class dtype>
00101 class MeshStreamer : public CBase_MeshStreamer<dtype> {
00102
00103 private:
00104 int bufferSize_;
00105 int maxNumDataItemsBuffered_;
00106 int numDataItemsBuffered_;
00107
00108 int numMembers_;
00109 int numDimensions_;
00110 int *individualDimensionSizes_;
00111 int *combinedDimensionSizes_;
00112
00113 int myIndex_;
00114 int *myLocationIndex_;
00115
00116 CkCallback userCallback_;
00117 bool yieldFlag_;
00118
00119 double progressPeriodInMs_;
00120 bool isPeriodicFlushEnabled_;
00121 bool hasSentRecently_;
00122 #ifdef STREAMER_EXPERIMENTAL
00123 bool hasSentPreviously_;
00124 bool immediateMode_;
00125 #endif
00126 MeshStreamerMessage<dtype> ***dataBuffers_;
00127
00128 CProxy_CompletionDetector detector_;
00129 int prio_;
00130 int yieldCount_;
00131
00132 #ifdef CACHE_LOCATIONS
00133 MeshLocation *cachedLocations_;
00134 bool *isCached_;
00135 #endif
00136
00137
00138
00139 int **cntMsgSent_;
00140 int *cntMsgReceived_;
00141 int *cntMsgExpected_;
00142 int *cntFinished_;
00143 int dimensionToFlush_;
00144 int numLocalDone_;
00145
00146
00147 void storeMessage(int destinationPe,
00148 const MeshLocation &destinationCoordinates,
00149 void *dataItem, bool copyIndirectly = false);
00150 virtual void localDeliver(dtype &dataItem) = 0;
00151
00152 virtual int numElementsInClient() = 0;
00153 virtual int numLocalElementsInClient() = 0;
00154
00155 virtual void initLocalClients() = 0;
00156
00157 void sendLargestBuffer();
00158 void flushToIntermediateDestinations();
00159 void flushDimension(int dimension, bool sendMsgCounts = false);
00160
00161 protected:
00162
00163 CompletionDetector *detectorLocalObj_;
00164 virtual int copyDataItemIntoMessage(
00165 MeshStreamerMessage<dtype> *destinationBuffer,
00166 void *dataItemHandle, bool copyIndirectly = false);
00167 MeshLocation determineLocation(int destinationPe);
00168 public:
00169
00170 MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
00171 int *dimensionSizes,
00172 bool yieldFlag = 0, double progressPeriodInMs = -1.0);
00173 ~MeshStreamer();
00174
00175
00176 void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
00177 virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
00178 void flushDirect();
00179 void finish();
00180
00181
00182 bool isPeriodicFlushEnabled() {
00183 return isPeriodicFlushEnabled_;
00184 }
00185 virtual void insertData(dtype &dataItem, int destinationPe);
00186 void insertData(void *dataItemHandle, int destinationPe);
00187 void associateCallback(int numContributors,
00188 CkCallback startCb, CkCallback endCb,
00189 CProxy_CompletionDetector detector,
00190 int prio);
00191 void flushAllBuffers();
00192 void registerPeriodicProgressFunction();
00193
00194
00195
00196 void enablePeriodicFlushing(){
00197 isPeriodicFlushEnabled_ = true;
00198 registerPeriodicProgressFunction();
00199 }
00200
00201 void done(int numContributorsFinished = 1) {
00202 #ifdef STAGED_COMPLETION
00203 numLocalDone_ += numContributorsFinished;
00204 if (numLocalDone_ == numLocalElementsInClient()) {
00205 startStagedCompletion();
00206 }
00207 #else
00208 detectorLocalObj_->done(numContributorsFinished);
00209 #endif
00210 }
00211
00212 void init(CkCallback startCb, CkCallback endCb, int prio);
00213
00214 void startStagedCompletion() {
00215 if (individualDimensionSizes_[dimensionToFlush_] != 1) {
00216 flushDimension(dimensionToFlush_, true);
00217 }
00218 dimensionToFlush_--;
00219
00220 checkForCompletedStages();
00221 }
00222
00223 void markMessageReceived(int dimension, int finalCount) {
00224 cntMsgReceived_[dimension]++;
00225 if (finalCount != -1) {
00226 cntFinished_[dimension]++;
00227 cntMsgExpected_[dimension] += finalCount;
00228 #ifdef STREAMER_VERBOSE_OUTPUT
00229 CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
00230 "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension,
00231 finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
00232 cntMsgReceived_[dimension]);
00233 #endif
00234 }
00235 if (dimensionToFlush_ != numDimensions_ - 1) {
00236 checkForCompletedStages();
00237 }
00238 }
00239
00240 void checkForCompletedStages() {
00241
00242 while (cntFinished_[dimensionToFlush_ + 1] ==
00243 individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
00244 cntMsgExpected_[dimensionToFlush_ + 1] ==
00245 cntMsgReceived_[dimensionToFlush_ + 1]) {
00246 if (dimensionToFlush_ == -1) {
00247 #ifdef STREAMER_VERBOSE_OUTPUT
00248 CkPrintf("[%d] contribute\n", CkMyPe());
00249 #endif
00250 #ifdef DEBUG_STREAMER
00251 CkAssert(numDataItemsBuffered_ == 0);
00252 #endif
00253 this->contribute(userCallback_);
00254 return;
00255 }
00256 else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
00257 flushDimension(dimensionToFlush_, true);
00258 }
00259 dimensionToFlush_--;
00260 }
00261 }
00262
00263 };
00264
00265 template <class dtype>
00266 MeshStreamer<dtype>::MeshStreamer(
00267 int maxNumDataItemsBuffered, int numDimensions,
00268 int *dimensionSizes,
00269 bool yieldFlag,
00270 double progressPeriodInMs)
00271 :numDimensions_(numDimensions),
00272 maxNumDataItemsBuffered_(maxNumDataItemsBuffered),
00273 yieldFlag_(yieldFlag),
00274 progressPeriodInMs_(progressPeriodInMs)
00275 {
00276
00277 int sumAlongAllDimensions = 0;
00278 individualDimensionSizes_ = new int[numDimensions_];
00279 combinedDimensionSizes_ = new int[numDimensions_ + 1];
00280 myLocationIndex_ = new int[numDimensions_];
00281 memcpy(individualDimensionSizes_, dimensionSizes,
00282 numDimensions * sizeof(int));
00283 combinedDimensionSizes_[0] = 1;
00284 for (int i = 0; i < numDimensions; i++) {
00285 sumAlongAllDimensions += individualDimensionSizes_[i];
00286 combinedDimensionSizes_[i + 1] =
00287 combinedDimensionSizes_[i] * individualDimensionSizes_[i];
00288 }
00289
00290
00291
00292 bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_
00293 / (sumAlongAllDimensions - numDimensions_ + 1);
00294 if (bufferSize_ <= 0) {
00295 bufferSize_ = 1;
00296 CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
00297 "is invalid. Defaulting to a single buffer per destination.\n");
00298 }
00299 numDataItemsBuffered_ = 0;
00300 numMembers_ = CkNumPes();
00301
00302 dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_];
00303 for (int i = 0; i < numDimensions; i++) {
00304 int numMembersAlongDimension = individualDimensionSizes_[i];
00305 dataBuffers_[i] =
00306 new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
00307 for (int j = 0; j < numMembersAlongDimension; j++) {
00308 dataBuffers_[i][j] = NULL;
00309 }
00310 }
00311
00312 myIndex_ = CkMyPe();
00313 int remainder = myIndex_;
00314 for (int i = numDimensions_ - 1; i >= 0; i--) {
00315 myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
00316 remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
00317 }
00318
00319 isPeriodicFlushEnabled_ = false;
00320 detectorLocalObj_ = NULL;
00321 #ifdef STREAMER_EXPERIMENTAL
00322 immediateMode_ = false;
00323 #endif
00324
00325 #ifdef CACHE_LOCATIONS
00326 cachedLocations_ = new MeshLocation[numMembers_];
00327 isCached_ = new bool[numMembers_];
00328 std::fill(isCached_, isCached_ + numMembers_, false);
00329 #endif
00330
00331 #ifdef STAGED_COMPLETION
00332
00333 cntMsgSent_ = new int*[numDimensions_];
00334 cntMsgReceived_ = new int[numDimensions_];
00335 cntMsgExpected_ = new int[numDimensions_];
00336 cntFinished_ = new int[numDimensions_];
00337
00338 for (int i = 0; i < numDimensions_; i++) {
00339 cntMsgSent_[i] = new int[individualDimensionSizes_[i]];
00340 }
00341
00342 #endif
00343
00344 }
00345
00346 template <class dtype>
00347 MeshStreamer<dtype>::~MeshStreamer() {
00348
00349 for (int i = 0; i < numDimensions_; i++) {
00350 for (int j=0; j < individualDimensionSizes_[i]; j++) {
00351 delete[] dataBuffers_[i][j];
00352 }
00353 delete[] dataBuffers_[i];
00354 }
00355
00356 delete[] individualDimensionSizes_;
00357 delete[] combinedDimensionSizes_;
00358 delete[] myLocationIndex_;
00359
00360 #ifdef CACHE_LOCATIONS
00361 delete[] cachedLocations_;
00362 delete[] isCached_;
00363 #endif
00364
00365 #ifdef STAGED_COMPLETION
00366 for (int i = 0; i < numDimensions_; i++) {
00367 delete[] cntMsgSent_[i];
00368 }
00369 delete[] cntMsgSent_;
00370 delete[] cntMsgReceived_;
00371 delete[] cntMsgExpected_;
00372 delete[] cntFinished_;
00373 #endif
00374
00375 }
00376
00377
00378 template <class dtype>
00379 inline
00380 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) {
00381
00382 #ifdef CACHE_LOCATIONS
00383 if (isCached_[destinationPe]) {
00384 return cachedLocations_[destinationPe];
00385 }
00386 #endif
00387
00388 MeshLocation destinationLocation;
00389 int remainder = destinationPe;
00390 int dimensionIndex;
00391 for (int i = numDimensions_ - 1; i >= 0; i--) {
00392 dimensionIndex = remainder / combinedDimensionSizes_[i];
00393
00394 if (dimensionIndex != myLocationIndex_[i]) {
00395 destinationLocation.dimension = i;
00396 destinationLocation.bufferIndex = dimensionIndex;
00397 #ifdef CACHE_LOCATIONS
00398 cachedLocations_[destinationPe] = destinationLocation;
00399 isCached_[destinationPe] = true;
00400 #endif
00401 return destinationLocation;
00402 }
00403
00404 remainder -= combinedDimensionSizes_[i] * dimensionIndex;
00405 }
00406
00407
00408 destinationLocation.dimension = 0;
00409 destinationLocation.bufferIndex = myLocationIndex_[0];
00410 return destinationLocation;
00411 }
00412
00413 template <class dtype>
00414 inline
00415 int MeshStreamer<dtype>::copyDataItemIntoMessage(
00416 MeshStreamerMessage<dtype> *destinationBuffer,
00417 void *dataItemHandle, bool copyIndirectly) {
00418 return destinationBuffer->addDataItem(*((dtype *)dataItemHandle));
00419 }
00420
00421 template <class dtype>
00422 inline
00423 void MeshStreamer<dtype>::storeMessage(
00424 int destinationPe,
00425 const MeshLocation& destinationLocation,
00426 void *dataItem, bool copyIndirectly) {
00427
00428 int dimension = destinationLocation.dimension;
00429 int bufferIndex = destinationLocation.bufferIndex;
00430 MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];
00431
00432
00433 if (messageBuffers[bufferIndex] == NULL) {
00434 if (dimension == 0) {
00435
00436 messageBuffers[bufferIndex] =
00437 new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
00438 }
00439 else {
00440 messageBuffers[bufferIndex] =
00441 new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
00442 }
00443 *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
00444 CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
00445 #ifdef DEBUG_STREAMER
00446 CkAssert(messageBuffers[bufferIndex] != NULL);
00447 #endif
00448 }
00449
00450 MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
00451 int numBuffered =
00452 copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
00453 if (dimension != 0) {
00454 destinationBuffer->markDestination(numBuffered-1, destinationPe);
00455 }
00456 numDataItemsBuffered_++;
00457
00458
00459 if (numBuffered == bufferSize_) {
00460
00461 int destinationIndex;
00462
00463 destinationIndex = myIndex_ +
00464 (bufferIndex - myLocationIndex_[dimension]) *
00465 combinedDimensionSizes_[dimension];
00466
00467 if (dimension == 0) {
00468 #ifdef STREAMER_VERBOSE_OUTPUT
00469 CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex);
00470 #endif
00471 this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
00472 }
00473 else {
00474 #ifdef STREAMER_VERBOSE_OUTPUT
00475 CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex);
00476 #endif
00477 this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
00478 }
00479
00480 #ifdef STAGED_COMPLETION
00481 cntMsgSent_[dimension][bufferIndex]++;
00482 #endif
00483
00484 messageBuffers[bufferIndex] = NULL;
00485 numDataItemsBuffered_ -= numBuffered;
00486 hasSentRecently_ = true;
00487
00488 }
00489
00490 else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
00491 sendLargestBuffer();
00492 hasSentRecently_ = true;
00493 }
00494
00495 }
00496
00497 template <class dtype>
00498 inline
00499 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
00500 const static bool copyIndirectly = true;
00501
00502 MeshLocation destinationLocation = determineLocation(destinationPe);
00503 storeMessage(destinationPe, destinationLocation, dataItemHandle,
00504 copyIndirectly);
00505
00506
00507 if (yieldFlag_ && ++yieldCount_ == 1024) {
00508 yieldCount_ = 0;
00509 CthYield();
00510 }
00511
00512 }
00513
00514 template <class dtype>
00515 inline
00516 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
00517 #ifndef STAGED_COMPLETION
00518 detectorLocalObj_->produce();
00519 #endif
00520 if (destinationPe == CkMyPe()) {
00521
00522
00523 dtype dataItemCopy = dataItem;
00524 localDeliver(dataItemCopy);
00525 return;
00526 }
00527
00528 insertData((void *) &dataItem, destinationPe);
00529 }
00530
00531 template <class dtype>
00532 void MeshStreamer<dtype>::init(CkCallback startCb, CkCallback endCb,
00533 int prio) {
00534
00535 for (int i = 0; i < numDimensions_; i++) {
00536 std::fill(cntMsgSent_[i],
00537 cntMsgSent_[i] + individualDimensionSizes_[i], 0);
00538 cntMsgReceived_[i] = 0;
00539 cntMsgExpected_[i] = 0;
00540 cntFinished_[i] = 0;
00541 }
00542 dimensionToFlush_ = numDimensions_ - 1;
00543
00544 yieldCount_ = 0;
00545 userCallback_ = endCb;
00546 prio_ = prio;
00547
00548 numLocalDone_ = 0;
00549 initLocalClients();
00550 this->contribute(startCb);
00551 }
00552
00553 template <class dtype>
00554 void MeshStreamer<dtype>::associateCallback(
00555 int numContributors,
00556 CkCallback startCb, CkCallback endCb,
00557 CProxy_CompletionDetector detector,
00558 int prio) {
00559
00560 #ifdef STREAMER_EXPERIMENTAL
00561 immediateMode_ = false;
00562 hasSentPreviously_ = false;
00563 #endif
00564 yieldCount_ = 0;
00565 prio_ = prio;
00566 userCallback_ = endCb;
00567 CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(),
00568 this->thisProxy);
00569 CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(),
00570 this->thisProxy);
00571 detector_ = detector;
00572 detectorLocalObj_ = detector_.ckLocalBranch();
00573 initLocalClients();
00574
00575 detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
00576
00577 if (progressPeriodInMs_ <= 0) {
00578 CkPrintf("Using completion detection in NDMeshStreamer requires"
00579 " setting a valid periodic flush period. Defaulting"
00580 " to 10 ms\n");
00581 progressPeriodInMs_ = 10;
00582 }
00583
00584 hasSentRecently_ = false;
00585 enablePeriodicFlushing();
00586
00587 }
00588
00589 template <class dtype>
00590 void MeshStreamer<dtype>::finish() {
00591 isPeriodicFlushEnabled_ = false;
00592
00593 if (!userCallback_.isInvalid()) {
00594 this->contribute(userCallback_);
00595 userCallback_ = CkCallback();
00596 }
00597
00598 }
00599
00600 template <class dtype>
00601 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
00602
00603 int destinationPe, lastDestinationPe;
00604 MeshLocation destinationLocation;
00605
00606 lastDestinationPe = -1;
00607 for (int i = 0; i < msg->numDataItems; i++) {
00608 destinationPe = msg->destinationPes[i];
00609 dtype &dataItem = msg->getDataItem(i);
00610 if (destinationPe == CkMyPe()) {
00611 localDeliver(dataItem);
00612 }
00613 else {
00614 if (destinationPe != lastDestinationPe) {
00615
00616 destinationLocation = determineLocation(destinationPe);
00617 }
00618 storeMessage(destinationPe, destinationLocation, &dataItem);
00619 }
00620 lastDestinationPe = destinationPe;
00621 }
00622
00623 #ifdef STREAMER_EXPERIMENTAL
00624 if (immediateMode_) {
00625 flushToIntermediateDestinations();
00626 }
00627 #endif
00628
00629 #ifdef STAGED_COMPLETION
00630 envelope *env = UsrToEnv(msg);
00631 MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
00632 markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
00633 #endif
00634
00635 delete msg;
00636
00637 }
00638
00639 template <class dtype>
00640 void MeshStreamer<dtype>::sendLargestBuffer() {
00641
00642 int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
00643 MeshStreamerMessage<dtype> ** messageBuffers;
00644 MeshStreamerMessage<dtype> *destinationBuffer;
00645
00646 for (int i = 0; i < numDimensions_; i++) {
00647
00648 messageBuffers = dataBuffers_[i];
00649 numBuffers = individualDimensionSizes_[i];
00650
00651 flushDimension = i;
00652 maxSize = 0;
00653 for (int j = 0; j < numBuffers; j++) {
00654 if (messageBuffers[j] != NULL &&
00655 messageBuffers[j]->numDataItems > maxSize) {
00656 maxSize = messageBuffers[j]->numDataItems;
00657 flushIndex = j;
00658 }
00659 }
00660
00661 if (maxSize > 0) {
00662
00663 messageBuffers = dataBuffers_[flushDimension];
00664 destinationBuffer = messageBuffers[flushIndex];
00665 destinationIndex = myIndex_ +
00666 (flushIndex - myLocationIndex_[flushDimension]) *
00667 combinedDimensionSizes_[flushDimension] ;
00668
00669 if (destinationBuffer->numDataItems < bufferSize_) {
00670
00671 envelope *env = UsrToEnv(destinationBuffer);
00672 env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
00673 (bufferSize_ - destinationBuffer->numDataItems));
00674 *((int *) env->getPrioPtr()) = prio_;
00675 }
00676 numDataItemsBuffered_ -= destinationBuffer->numDataItems;
00677
00678 if (flushDimension == 0) {
00679 #ifdef STREAMER_VERBOSE_OUTPUT
00680 CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex);
00681 #endif
00682 this->thisProxy[destinationIndex].
00683 receiveAtDestination(destinationBuffer);
00684 }
00685 else {
00686 #ifdef STREAMER_VERBOSE_OUTPUT
00687 CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex);
00688 #endif
00689 this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
00690 }
00691
00692 #ifdef STAGED_COMPLETION
00693 cntMsgSent_[i][flushIndex]++;
00694 #endif
00695
00696 messageBuffers[flushIndex] = NULL;
00697
00698 }
00699
00700 }
00701 }
00702
00703 template <class dtype>
00704 void MeshStreamer<dtype>::flushAllBuffers() {
00705
00706 MeshStreamerMessage<dtype> **messageBuffers;
00707 int numBuffers;
00708
00709 for (int i = 0; i < numDimensions_; i++) {
00710
00711 messageBuffers = dataBuffers_[i];
00712 numBuffers = individualDimensionSizes_[i];
00713
00714 for (int j = 0; j < numBuffers; j++) {
00715
00716 if(messageBuffers[j] == NULL) {
00717 continue;
00718 }
00719
00720 numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
00721
00722 if (i == 0) {
00723 int destinationPe = myIndex_ + j - myLocationIndex_[i];
00724 this->thisProxy[destinationPe].receiveAtDestination(messageBuffers[j]);
00725 }
00726 else {
00727
00728 for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
00729
00730 MeshStreamerMessage<dtype> *directMsg =
00731 new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
00732 *(int *) CkPriorityPtr(directMsg) = prio_;
00733 CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
00734
00735 #ifdef DEBUG_STREAMER
00736 CkAssert(directMsg != NULL);
00737 #endif
00738 int destinationPe = messageBuffers[j]->destinationPes[k];
00739 dtype &dataItem = messageBuffers[j]->getDataItem(k);
00740 directMsg->addDataItem(dataItem);
00741 this->thisProxy[destinationPe].receiveAtDestination(directMsg);
00742 }
00743 delete messageBuffers[j];
00744 }
00745 messageBuffers[j] = NULL;
00746 }
00747 }
00748 }
00749
00750 template <class dtype>
00751 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
00752
00753 for (int i = 0; i < numDimensions_; i++) {
00754 flushDimension(i);
00755 }
00756 }
00757
00758 template <class dtype>
00759 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
00760 #ifdef STREAMER_VERBOSE_OUTPUT
00761 CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts);
00762 #endif
00763 MeshStreamerMessage<dtype> **messageBuffers;
00764 MeshStreamerMessage<dtype> *destinationBuffer;
00765 int destinationIndex, numBuffers;
00766
00767 messageBuffers = dataBuffers_[dimension];
00768 numBuffers = individualDimensionSizes_[dimension];
00769
00770 for (int j = 0; j < numBuffers; j++) {
00771
00772 if(messageBuffers[j] == NULL) {
00773 if (sendMsgCounts && j != myLocationIndex_[dimension]) {
00774 messageBuffers[j] =
00775 new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>();
00776 *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
00777 CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
00778 }
00779 else {
00780 continue;
00781 }
00782 }
00783
00784 destinationBuffer = messageBuffers[j];
00785 destinationIndex = myIndex_ +
00786 (j - myLocationIndex_[dimension]) *
00787 combinedDimensionSizes_[dimension] ;
00788
00789 if (destinationBuffer->numDataItems < bufferSize_) {
00790 #ifdef STAGED_COMPLETION
00791 if (destinationBuffer->numDataItems != 0) {
00792 #endif
00793
00794 envelope *env = UsrToEnv(destinationBuffer);
00795 env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
00796 (bufferSize_ - destinationBuffer->numDataItems));
00797 *((int *) env->getPrioPtr()) = prio_;
00798 #ifdef STAGED_COMPLETION
00799 }
00800 #endif
00801 }
00802 numDataItemsBuffered_ -= destinationBuffer->numDataItems;
00803
00804 #ifdef STAGED_COMPLETION
00805 destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
00806 #endif
00807
00808 if (dimension == 0) {
00809 #ifdef STREAMER_VERBOSE_OUTPUT
00810 CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex);
00811 #endif
00812 this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
00813 }
00814 else {
00815 #ifdef STREAMER_VERBOSE_OUTPUT
00816 CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex);
00817 #endif
00818 this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
00819 }
00820 messageBuffers[j] = NULL;
00821 }
00822
00823 }
00824
00825
00826 template <class dtype>
00827 void MeshStreamer<dtype>::flushDirect(){
00828
00829
00830
00831 if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
00832
00833 if (numDataItemsBuffered_ != 0) {
00834 flushAllBuffers();
00835 }
00836 #ifdef DEBUG_STREAMER
00837 CkAssert(numDataItemsBuffered_ == 0);
00838 #endif
00839
00840 }
00841
00842 #ifdef STREAMER_EXPERIMENTAL
00843
00844
00845
00846 if (hasSentPreviously_ &&
00847 (numDataItemsBuffered_ < .1 * maxNumDataItemsBuffered_)) {
00848 immediateMode_ = true;
00849 }
00850
00851 if (!hasSentPreviously_) {
00852 hasSentPreviously_ = hasSentRecently_;
00853 }
00854 #endif
00855
00856 hasSentRecently_ = false;
00857
00858 }
00859
00860 template <class dtype>
00861 void periodicProgressFunction(void *MeshStreamerObj, double time) {
00862
00863 MeshStreamer<dtype> *properObj =
00864 static_cast<MeshStreamer<dtype>*>(MeshStreamerObj);
00865
00866 if (properObj->isPeriodicFlushEnabled()) {
00867 properObj->flushDirect();
00868 properObj->registerPeriodicProgressFunction();
00869 }
00870 }
00871
00872 template <class dtype>
00873 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
00874 CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this,
00875 progressPeriodInMs_);
00876 }
00877
00878
00879 template <class dtype>
00880 class GroupMeshStreamer : public MeshStreamer<dtype> {
00881 private:
00882
00883 CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
00884 MeshStreamerGroupClient<dtype> *clientObj_;
00885
00886 void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
00887 for (int i = 0; i < msg->numDataItems; i++) {
00888 dtype &data = msg->getDataItem(i);
00889 clientObj_->process(data);
00890 }
00891 #ifdef STAGED_COMPLETION
00892 envelope *env = UsrToEnv(msg);
00893 MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
00894 #ifdef DEBUG_STREAMER
00895 CkAssert(env->getSrcPe() >= 0 && env->getSrcPe() < CkNumPes());
00896 #endif
00897 #ifdef STREAMER_VERBOSE_OUTPUT
00898 CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);
00899 #endif
00900 markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
00901 #else
00902 this->detectorLocalObj_->consume(msg->numDataItems);
00903 #endif
00904 delete msg;
00905 }
00906
00907 void localDeliver(dtype &dataItem) {
00908 clientObj_->process(dataItem);
00909 #ifndef STAGED_COMPLETION
00910 MeshStreamer<dtype>::detectorLocalObj_->consume();
00911 #endif
00912 }
00913
00914 int numElementsInClient() {
00915
00916 return CkNumPes();
00917 }
00918
00919 int numLocalElementsInClient() {
00920 return 1;
00921 }
00922
00923 void initLocalClients() {
00924
00925 }
00926
00927 public:
00928
00929 GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
00930 int *dimensionSizes,
00931 const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
00932 bool yieldFlag = 0, double progressPeriodInMs = -1.0)
00933 :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
00934 yieldFlag, progressPeriodInMs)
00935 {
00936 clientProxy_ = clientProxy;
00937 clientObj_ =
00938 ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
00939 }
00940
00941 };
00942
00943 template <class dtype>
00944 class MeshStreamerClientIterator : public CkLocIterator {
00945
00946 public:
00947
00948 CompletionDetector *detectorLocalObj_;
00949 CkArray *clientArrMgr_;
00950 MeshStreamerClientIterator(CompletionDetector *detectorObj,
00951 CkArray *clientArrMgr)
00952 : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
00953
00954
00955 void addLocation(CkLocation &loc) {
00956
00957 MeshStreamerArrayClient<dtype> *clientObj =
00958 (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
00959
00960 #ifdef DEBUG_STREAMER
00961 CkAssert(clientObj != NULL);
00962 #endif
00963 clientObj->setDetector(detectorLocalObj_);
00964 }
00965
00966 };
00967
00968 template <class dtype, class itype>
00969 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
00970
00971 private:
00972
00973 CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
00974 CkArray *clientArrayMgr_;
00975 int numArrayElements_;
00976 int numLocalArrayElements_;
00977 #ifdef CACHE_ARRAY_METADATA
00978 MeshStreamerArrayClient<dtype> **clientObjs_;
00979 int *destinationPes_;
00980 bool *isCachedArrayMetadata_;
00981 #endif
00982
00983 void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
00984 itype arrayId = packedDataItem.arrayIndex;
00985
00986 MeshStreamerArrayClient<dtype> *clientObj;
00987 #ifdef CACHE_ARRAY_METADATA
00988 clientObj = clientObjs_[arrayId];
00989 #else
00990 clientObj = clientProxy_[arrayId].ckLocal();
00991 #endif
00992
00993 if (clientObj != NULL) {
00994 clientObj->process(packedDataItem.dataItem);
00995 #ifndef STAGED_COMPLETION
00996 MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
00997 #endif
00998 }
00999 else {
01000
01001 clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
01002 }
01003 }
01004
01005 int numElementsInClient() {
01006 return numArrayElements_;
01007 }
01008
01009 int numLocalElementsInClient() {
01010 return numLocalArrayElements_;
01011 }
01012
01013 void initLocalClients() {
01014 #ifndef STAGED_COMPLETION
01015
01016 #ifdef CACHE_ARRAY_METADATA
01017 std::fill(isCachedArrayMetadata_,
01018 isCachedArrayMetadata_ + numArrayElements_, false);
01019
01020 for (int i = 0; i < numArrayElements_; i++) {
01021 clientObjs_[i] = clientProxy_[i].ckLocal();
01022 if (clientObjs_[i] != NULL) {
01023 clientObjs_[i]->setDetector(
01024 MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
01025 }
01026 }
01027 #else
01028
01029 CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr();
01030 MeshStreamerClientIterator<dtype> clientIterator(
01031 MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_,
01032 clientProxy_.ckLocalBranch());
01033 clientLocMgr->iterate(clientIterator);
01034 #endif
01035
01036 #else
01037 numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
01038 #endif
01039 }
01040
01041 public:
01042
01043 struct DataItemHandle {
01044 itype arrayIndex;
01045 dtype *dataItem;
01046 };
01047
01048 ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
01049 int *dimensionSizes,
01050 const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
01051 bool yieldFlag = 0, double progressPeriodInMs = -1.0)
01052 :MeshStreamer<ArrayDataItem<dtype, itype> >(
01053 maxNumDataItemsBuffered, numDimensions, dimensionSizes, yieldFlag,
01054 progressPeriodInMs)
01055 {
01056 clientProxy_ = clientProxy;
01057 clientArrayMgr_ = clientProxy_.ckLocalBranch();
01058
01059 #ifdef CACHE_ARRAY_METADATA
01060 numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
01061 clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
01062 destinationPes_ = new int[numArrayElements_];
01063 isCachedArrayMetadata_ = new bool[numArrayElements_];
01064 std::fill(isCachedArrayMetadata_,
01065 isCachedArrayMetadata_ + numArrayElements_, false);
01066 #endif
01067 }
01068
01069 ~ArrayMeshStreamer() {
01070 #ifdef CACHE_ARRAY_METADATA
01071 delete [] clientObjs_;
01072 delete [] destinationPes_;
01073 delete [] isCachedArrayMetadata_;
01074 #endif
01075 }
01076
01077 void receiveAtDestination(
01078 MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
01079
01080 for (int i = 0; i < msg->numDataItems; i++) {
01081 ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
01082 localDeliver(packedData);
01083 }
01084 #ifdef STAGED_COMPLETION
01085 envelope *env = UsrToEnv(msg);
01086 MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
01087 markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
01088 #endif
01089
01090 delete msg;
01091 }
01092
01093 void insertData(dtype &dataItem, itype arrayIndex) {
01094 #ifndef STAGED_COMPLETION
01095 MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
01096 #endif
01097 int destinationPe;
01098 #ifdef CACHE_ARRAY_METADATA
01099 if (isCachedArrayMetadata_[arrayIndex]) {
01100 destinationPe = destinationPes_[arrayIndex];
01101 }
01102 else {
01103 destinationPe =
01104 clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
01105 isCachedArrayMetadata_[arrayIndex] = true;
01106 destinationPes_[arrayIndex] = destinationPe;
01107 }
01108 #else
01109 destinationPe =
01110 clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
01111 #endif
01112
01113 ArrayDataItem<dtype, itype> packedDataItem;
01114 if (destinationPe == CkMyPe()) {
01115
01116
01117 packedDataItem.arrayIndex = arrayIndex;
01118 packedDataItem.dataItem = dataItem;
01119 localDeliver(packedDataItem);
01120 return;
01121 }
01122
01123
01124
01125 DataItemHandle tempHandle;
01126 tempHandle.arrayIndex = arrayIndex;
01127 tempHandle.dataItem = &dataItem;
01128
01129 MeshStreamer<ArrayDataItem<dtype, itype> >::
01130 insertData(&tempHandle, destinationPe);
01131
01132 }
01133
01134 int copyDataItemIntoMessage(
01135 MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer,
01136 void *dataItemHandle, bool copyIndirectly) {
01137
01138 if (copyIndirectly == true) {
01139
01140 int numDataItems = destinationBuffer->numDataItems;
01141 DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
01142 (destinationBuffer->dataItems)[numDataItems].dataItem =
01143 *(tempHandle->dataItem);
01144 (destinationBuffer->dataItems)[numDataItems].arrayIndex =
01145 tempHandle->arrayIndex;
01146 return ++destinationBuffer->numDataItems;
01147 }
01148 else {
01149
01150
01151 return MeshStreamer<ArrayDataItem<dtype, itype> >::
01152 copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
01153 }
01154 }
01155
01156 };
01157
01158 #define CK_TEMPLATES_ONLY
01159 #include "NDMeshStreamer.def.h"
01160 #undef CK_TEMPLATES_ONLY
01161
01162 #endif