00001 #ifndef NDMESH_STREAMER_H
00002 #define NDMESH_STREAMER_H
00003
00004 #include <algorithm>
00005 #include <vector>
00006 #include <list>
00007 #include <map>
00008 #include "NDMeshStreamer.decl.h"
00009 #include "DataItemTypes.h"
00010 #include "completion.h"
00011 #include "ckarray.h"
00012 #include "VirtualRouter.h"
00013 #include "pup_stl.h"
00014 #include "debug-charm.h"
00015
00016
00017
00018
00019
00020 #define CMK_TRAM_OVERALLOCATION_FACTOR 4
00021
00022
00023
00024
00025 #define TRAM_BROADCAST (-100)
00026
00027 extern void QdCreate(int n);
00028 extern void QdProcess(int n);
00029
00030 template<class itype>
00031 struct TramBroadcastInstance;
00032
00033 template<>
00034 struct TramBroadcastInstance<CkArrayIndex1D>{
00035 static CkArrayIndex1D value;
00036 };
00037
00038 template<>
00039 struct TramBroadcastInstance<CkArrayIndex2D>{
00040 static CkArrayIndex2D value;
00041 };
00042
00043 template<>
00044 struct TramBroadcastInstance<CkArrayIndex3D>{
00045 static CkArrayIndex3D value;
00046 };
00047
00048 template<>
00049 struct TramBroadcastInstance<CkArrayIndex4D>{
00050 static CkArrayIndex4D value;
00051 };
00052
00053 template<>
00054 struct TramBroadcastInstance<CkArrayIndex5D>{
00055 static CkArrayIndex5D value;
00056 };
00057
00058 template<>
00059 struct TramBroadcastInstance<CkArrayIndex6D>{
00060 static CkArrayIndex6D value;
00061 };
00062
00063 template<>
00064 struct TramBroadcastInstance<CkArrayIndex>{
00065 static CkArrayIndex& value(int);
00066 };
00067
00068 template<class dtype>
00069 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
00070
00071 public:
00072
00073 int finalMsgCount;
00074 int msgType;
00075 int numDataItems;
00076 int *destinationPes;
00077 dtype *dataItems;
00078
00079 MeshStreamerMessage(int t): numDataItems(0), msgType(t) {
00080 finalMsgCount = -1;
00081 }
00082
00083 inline int addDataItem(const dtype& dataItem) {
00084 dataItems[numDataItems] = dataItem;
00085 return ++numDataItems;
00086 }
00087
00088 inline void markDestination(const int index, const int destinationPe) {
00089 destinationPes[index] = destinationPe;
00090 }
00091
00092 inline const dtype& getDataItem(const int index) {
00093 return dataItems[index];
00094 }
00095 };
00096
00097 template <class dtype, class RouterType>
00098 class MeshStreamer : public CBase_MeshStreamer<dtype, RouterType> {
00099
00100 private:
00101 int bufferSize_;
00102 int maxNumDataItemsBuffered_;
00103 int numDataItemsBuffered_;
00104
00105 CkCallback userCallback_;
00106 bool yieldFlag_;
00107
00108 double progressPeriodInMs_;
00109 bool isPeriodicFlushEnabled_;
00110 bool hasSentRecently_;
00111 std::vector<std::vector<MeshStreamerMessage<dtype> * > > dataBuffers_;
00112
00113 CProxy_CompletionDetector detector_;
00114 int prio_;
00115 int yieldCount_;
00116
00117
00118 std::vector<std::vector<int> > cntMsgSent_;
00119 std::vector<int> cntMsgReceived_;
00120 std::vector<int> cntMsgExpected_;
00121 std::vector<int> cntFinished_;
00122
00123 int numLocalDone_;
00124 int numLocalContributors_;
00125 CompletionStatus myCompletionStatus_;
00126
00127 virtual void localDeliver(const dtype& dataItem) { CkAbort("Called what should be a pure virtual base method"); }
00128 virtual void localBroadcast(const dtype& dataItem) { CkAbort("Called what should be a pure virtual base method"); }
00129
00130 virtual void initLocalClients() { CkAbort("Called what should be a pure virtual base method"); }
00131
00132 void sendLargestBuffer();
00133 void flushToIntermediateDestinations();
00134 void flushDimension(int dimension, bool sendMsgCounts = false);
00135
00136 protected:
00137
00138 RouterType myRouter_;
00139 int numMembers_;
00140 int myIndex_;
00141 int numDimensions_;
00142 bool useStagedCompletion_;
00143 bool stagedCompletionStarted_;
00144 bool useCompletionDetection_;
00145 CompletionDetector *detectorLocalObj_;
00146 virtual int copyDataItemIntoMessage(
00147 MeshStreamerMessage<dtype> *destinationBuffer,
00148 const void *dataItemHandle, bool copyIndirectly = false);
00149 void insertData(const void *dataItemHandle, int destinationPe);
00150 void broadcast(const void *dataItemHandle, int dimension,
00151 bool copyIndirectly);
00152 void storeMessage(int destinationPe,
00153 const Route& destinationCoordinates,
00154 const void *dataItem, bool copyIndirectly = false);
00155
00156 void ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
00157 int *dimensionSizes, int bufferSize,
00158 bool yieldFlag, double progressPeriodInMs);
00159
00160 public:
00161
00162 MeshStreamer() {}
00163 MeshStreamer(CkMigrateMessage *) {}
00164 MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
00165 int *dimensionSizes, int bufferSize,
00166 bool yieldFlag = 0, double progressPeriodInMs = -1.0);
00167
00168
00169
00170 void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
00171 void enablePeriodicFlushing(){
00172 if (progressPeriodInMs_ <= 0) {
00173 if (myIndex_ == 0) {
00174 CkPrintf("Using periodic flushing for NDMeshStreamer requires"
00175 " setting a valid periodic flush period. Defaulting"
00176 " to 10 ms.\n");
00177 }
00178 progressPeriodInMs_ = 10;
00179 }
00180
00181 isPeriodicFlushEnabled_ = true;
00182 registerPeriodicProgressFunction();
00183 }
00184 void finish();
00185 void init(int numLocalContributors, CkCallback startCb, CkCallback endCb,
00186 int prio, bool usePeriodicFlushing);
00187 void init(int numContributors, CkCallback startCb, CkCallback endCb,
00188 CProxy_CompletionDetector detector,
00189 int prio, bool usePeriodicFlushing);
00190 void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb,
00191 int prio, bool usePeriodicFlushing);
00192 void init(CkCallback startCb, int prio);
00193
00194 void syncInit();
00195
00196 virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) { CkAbort("Called what should be a pure virtual base method"); }
00197
00198
00199 void flushIfIdle();
00200 inline bool isPeriodicFlushEnabled() {
00201 return isPeriodicFlushEnabled_;
00202 }
00203 virtual void insertData(const dtype& dataItem, int destinationPe);
00204 virtual void broadcast(const dtype& dataItem);
00205
00206 void sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
00207 int dimension, int destinationIndex);
00208
00209 void registerPeriodicProgressFunction();
00210
00211 inline void done(int numContributorsFinished = 1) {
00212
00213 if (useStagedCompletion_) {
00214 numLocalDone_ += numContributorsFinished;
00215 CkAssert(numLocalDone_ <= numLocalContributors_);
00216 if (numLocalDone_ == numLocalContributors_) {
00217 startStagedCompletion();
00218 }
00219 }
00220 else if (useCompletionDetection_){
00221 detectorLocalObj_->done(numContributorsFinished);
00222 }
00223 }
00224
00225 inline void startStagedCompletion() {
00226 stagedCompletionStarted_ = true;
00227 myCompletionStatus_.stageIndex = initialCompletionStage;
00228 myRouter_.updateCompletionProgress(myCompletionStatus_);
00229 std::vector<int> &pendingFlushes = myCompletionStatus_.dimensionsToFlush;
00230 for (int i = 0; i < pendingFlushes.size(); i++) {
00231 flushDimension(pendingFlushes[i], true);
00232 }
00233 pendingFlushes.clear();
00234 checkForCompletedStages();
00235 }
00236
00237 inline void markMessageReceived(int msgType, int finalCount) {
00238 cntMsgReceived_[msgType]++;
00239 if (finalCount >= 0) {
00240 cntFinished_[msgType]++;
00241 cntMsgExpected_[msgType] += finalCount;
00242 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00243 CkPrintf("[%d] received msgType: %d finalCount: %d cntFinished: %d "
00244 "cntMsgExpected: %d cntMsgReceived: %d\n", myIndex_, msgType,
00245 finalCount, cntFinished_[msgType], cntMsgExpected_[msgType],
00246 cntMsgReceived_[msgType]);
00247 #endif
00248 }
00249 if (stagedCompletionStarted_) {
00250 checkForCompletedStages();
00251 }
00252 }
00253 inline bool checkAllStagesCompleted() {
00254
00255
00256 if (myCompletionStatus_.stageIndex == finalCompletionStage) {
00257 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00258 CkPrintf("[%d] All done. Reducing to final callback ...\n", myIndex_);
00259 #endif
00260 CkAssert(numDataItemsBuffered_ == 0);
00261 isPeriodicFlushEnabled_ = false;
00262 if (!userCallback_.isInvalid()) {
00263 this->contribute(userCallback_);
00264 userCallback_ = CkCallback();
00265 }
00266 return true;
00267 }
00268 else {
00269 return false;
00270 }
00271 }
00272
00273 inline void checkForCompletedStages() {
00274 int ¤tStage = myCompletionStatus_.stageIndex;
00275 if (checkAllStagesCompleted()) {
00276 return;
00277 }
00278 while (cntFinished_[currentStage] == myCompletionStatus_.numContributors &&
00279 cntMsgExpected_[currentStage] == cntMsgReceived_[currentStage]) {
00280 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00281 CkPrintf("[%d] stage completion finished stage %d, received contributions"
00282 " from %d PEs, cntMsgExpected: %d cntMsgReceived: %d\n",
00283 myIndex_, myCompletionStatus_.stageIndex,
00284 cntFinished_[currentStage], cntMsgExpected_[currentStage],
00285 cntMsgReceived_[currentStage]);
00286 #endif
00287 myRouter_.updateCompletionProgress(myCompletionStatus_);
00288 if (checkAllStagesCompleted()) {
00289 return;
00290 }
00291 else {
00292 std::vector<int> &pendingFlushes =
00293 myCompletionStatus_.dimensionsToFlush;
00294 for (int i = 0; i < pendingFlushes.size(); i++) {
00295 flushDimension(pendingFlushes[i], true);
00296 }
00297 pendingFlushes.clear();
00298 }
00299 }
00300 }
00301
00302 virtual void pup(PUP::er &p);
00303 };
00304
00305 template <class dtype, class RouterType>
00306 MeshStreamer<dtype, RouterType>::
00307 MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
00308 int *dimensionSizes, int bufferSize, bool yieldFlag,
00309 double progressPeriodInMs) {
00310 ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
00311 bufferSize, yieldFlag, progressPeriodInMs);
00312 }
00313
00314 template <class dtype, class RouterType>
00315 void MeshStreamer<dtype, RouterType>::
00316 ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
00317 int *dimensionSizes, int bufferSize,
00318 bool yieldFlag, double progressPeriodInMs) {
00319
00320 numDimensions_ = numDimensions;
00321 maxNumDataItemsBuffered_ = maxNumDataItemsBuffered;
00322 yieldFlag_ = yieldFlag;
00323 progressPeriodInMs_ = progressPeriodInMs;
00324 bufferSize_ = bufferSize;
00325 numDataItemsBuffered_ = 0;
00326 numMembers_ = CkNumPes();
00327 myIndex_ = CkMyPe();
00328
00329 myRouter_.initializeRouter(numDimensions_, myIndex_, dimensionSizes);
00330 int maxNumBuffers = myRouter_.maxNumAllocatedBuffers();
00331
00332 dataBuffers_.resize(numDimensions_);
00333 for (int i = 0; i < numDimensions; i++) {
00334 dataBuffers_[i].assign(myRouter_.numBuffersPerDimension(i),
00335 (MeshStreamerMessage<dtype> *) NULL);
00336 }
00337
00338
00339 if (bufferSize_ == 0) {
00340 CkAssert(maxNumDataItemsBuffered_ > 0);
00341 bufferSize_ = CMK_TRAM_OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_
00342 / maxNumBuffers;
00343 }
00344 else {
00345 maxNumDataItemsBuffered_ = bufferSize_ * maxNumBuffers
00346 / CMK_TRAM_OVERALLOCATION_FACTOR;
00347 }
00348
00349 if (bufferSize_ <= 0) {
00350 bufferSize_ = 1;
00351 if (myIndex_ == 0) {
00352 CkPrintf("Warning: Argument maxNumDataItemsBuffered to MeshStreamer "
00353 "constructor is set very low, translating to less than a single "
00354 "item per aggregation buffer. Defaulting to a single item "
00355 "per buffer.\n");
00356 }
00357 }
00358
00359 isPeriodicFlushEnabled_ = false;
00360 detectorLocalObj_ = NULL;
00361
00362 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00363 CkPrintf("[%d] Instance initialized. Buffer size: %d, Capacity: %d, "
00364 "Yield: %d, Flush period: %f, Maximum number of buffers: %d\n",
00365 myIndex_, bufferSize_, maxNumDataItemsBuffered_, yieldFlag_,
00366 progressPeriodInMs_, maxNumBuffers);
00367 #endif
00368
00369 useStagedCompletion_ = false;
00370 stagedCompletionStarted_ = false;
00371 useCompletionDetection_ = false;
00372
00373 yieldCount_ = 0;
00374 userCallback_ = CkCallback();
00375 prio_ = -1;
00376
00377 initLocalClients();
00378
00379 hasSentRecently_ = false;
00380
00381 }
00382
00383 template <class dtype, class RouterType>
00384 inline int MeshStreamer<dtype, RouterType>::
00385 copyDataItemIntoMessage(MeshStreamerMessage<dtype> *destinationBuffer,
00386 const void *dataItemHandle, bool copyIndirectly) {
00387 return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle));
00388 }
00389
00390 template <class dtype, class RouterType>
00391 inline void MeshStreamer<dtype, RouterType>::
00392 sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
00393 int dimension, int destinationIndex) {
00394
00395 bool personalizedMessage = myRouter_.isMessagePersonalized(dimension);
00396
00397 if (personalizedMessage) {
00398 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00399 CkPrintf("[%d] sending to %d\n", myIndex_, destinationIndex);
00400 #endif
00401 this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
00402 }
00403 else {
00404 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00405 CkPrintf("[%d] sending intermediate to %d\n",
00406 myIndex_, destinationIndex);
00407 #endif
00408 this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
00409 }
00410 }
00411
00412 template <class dtype, class RouterType>
00413 inline void MeshStreamer<dtype, RouterType>::
00414 storeMessage(int destinationPe, const Route& destinationRoute,
00415 const void *dataItem, bool copyIndirectly) {
00416
00417 int dimension = destinationRoute.dimension;
00418 int bufferIndex = destinationRoute.dimensionIndex;
00419 std::vector<MeshStreamerMessage<dtype> *> &messageBuffers
00420 = dataBuffers_[dimension];
00421
00422 bool personalizedMessage = myRouter_.isMessagePersonalized(dimension);
00423
00424
00425 if (messageBuffers[bufferIndex] == NULL) {
00426 int numDestIndices = bufferSize_;
00427
00428 if (personalizedMessage) {
00429 numDestIndices = 0;
00430 }
00431 messageBuffers[bufferIndex] =
00432 new (numDestIndices, bufferSize_, 8 * sizeof(int))
00433 MeshStreamerMessage<dtype>(myRouter_.determineMsgType(dimension));
00434
00435 *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
00436 CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
00437 CkAssert(messageBuffers[bufferIndex] != NULL);
00438 }
00439
00440 MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
00441 int numBuffered =
00442 copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
00443 if (!personalizedMessage) {
00444 destinationBuffer->markDestination(numBuffered-1, destinationPe);
00445 }
00446 numDataItemsBuffered_++;
00447
00448
00449 if (numBuffered == bufferSize_) {
00450
00451 sendMeshStreamerMessage(destinationBuffer, dimension,
00452 destinationRoute.destinationPe);
00453 if (useStagedCompletion_) {
00454 cntMsgSent_[dimension][bufferIndex]++;
00455 }
00456 messageBuffers[bufferIndex] = NULL;
00457 numDataItemsBuffered_ -= numBuffered;
00458 hasSentRecently_ = true;
00459
00460 }
00461
00462 else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
00463 sendLargestBuffer();
00464 hasSentRecently_ = true;
00465 }
00466 }
00467
00468 template <class dtype, class RouterType>
00469 inline void MeshStreamer<dtype, RouterType>::broadcast(const dtype& dataItem) {
00470
00471 const static bool copyIndirectly = true;
00472
00473
00474
00475 CkAssert(stagedCompletionStarted_ == false);
00476
00477
00478 if (useCompletionDetection_) {
00479 detectorLocalObj_->produce(numMembers_);
00480 }
00481 QdCreate(numMembers_);
00482
00483
00484 localBroadcast(dataItem);
00485
00486 broadcast(&dataItem, numDimensions_ - 1, copyIndirectly);
00487 }
00488
00489 template <class dtype, class RouterType>
00490 inline void MeshStreamer<dtype, RouterType>::
00491 broadcast(const void *dataItemHandle, int dimension, bool copyIndirectly) {
00492
00493 if (!myRouter_.isBroadcastSupported()) {
00494 CkAbort("Broadcast is not supported by this virtual routing scheme\n");
00495 }
00496
00497 Route destinationRoute;
00498 destinationRoute.dimension = dimension;
00499
00500 while (destinationRoute.dimension != -1) {
00501 for (int i = 0;
00502 i < myRouter_.numBuffersPerDimension(destinationRoute.dimension);
00503 i++) {
00504
00505 if (!myRouter_.isBufferInUse(destinationRoute.dimension, i)) {
00506 destinationRoute.dimensionIndex = i;
00507 storeMessage(TRAM_BROADCAST, destinationRoute,
00508 dataItemHandle, copyIndirectly);
00509 }
00510
00511
00512 if (yieldFlag_ && ++yieldCount_ == 1024) {
00513 yieldCount_ = 0;
00514 CthYield();
00515 }
00516 }
00517 destinationRoute.dimension--;
00518 }
00519 }
00520
00521 template <class dtype, class RouterType>
00522 inline void MeshStreamer<dtype, RouterType>::
00523 insertData(const void *dataItemHandle, int destinationPe) {
00524
00525 const static bool copyIndirectly = true;
00526
00527 Route destinationRoute;
00528 myRouter_.determineInitialRoute(destinationPe, destinationRoute);
00529 storeMessage(destinationPe, destinationRoute, dataItemHandle,
00530 copyIndirectly);
00531
00532
00533 if (yieldFlag_ && ++yieldCount_ == 1024) {
00534 yieldCount_ = 0;
00535 CthYield();
00536 }
00537 }
00538
00539 template <class dtype, class RouterType>
00540 inline void MeshStreamer<dtype, RouterType>::
00541 insertData(const dtype& dataItem, int destinationPe) {
00542
00543
00544
00545 CkAssert(stagedCompletionStarted_ == false);
00546
00547 if (useCompletionDetection_) {
00548 detectorLocalObj_->produce();
00549 }
00550 QdCreate(1);
00551
00552 insertData((const void *) &dataItem, destinationPe);
00553 }
00554
00555 template <class dtype, class RouterType>
00556 void MeshStreamer<dtype, RouterType>::init(CkCallback startCb, int prio) {
00557
00558 useStagedCompletion_ = false;
00559 stagedCompletionStarted_ = false;
00560 useCompletionDetection_ = false;
00561
00562 yieldCount_ = 0;
00563 userCallback_ = CkCallback();
00564 prio_ = prio;
00565
00566 initLocalClients();
00567
00568 hasSentRecently_ = false;
00569 enablePeriodicFlushing();
00570
00571 this->contribute(startCb);
00572 }
00573
00574 template <class dtype, class RouterType>
00575 void MeshStreamer<dtype, RouterType>::
00576 init(int numLocalContributors, CkCallback startCb, CkCallback endCb, int prio,
00577 bool usePeriodicFlushing) {
00578
00579 useStagedCompletion_ = true;
00580 stagedCompletionStarted_ = false;
00581 useCompletionDetection_ = false;
00582
00583 int dimensionsReceiving = myRouter_.numMsgTypes();
00584
00585 cntMsgSent_.resize(numDimensions_);
00586 for (int i = 0; i < numDimensions_; i++) {
00587 cntMsgSent_[i].assign(myRouter_.numBuffersPerDimension(i), 0);
00588 }
00589 cntMsgReceived_.assign(dimensionsReceiving, 0);
00590 cntMsgExpected_.assign(dimensionsReceiving, 0);
00591 cntFinished_.assign(dimensionsReceiving, 0);
00592
00593 yieldCount_ = 0;
00594 userCallback_ = endCb;
00595 prio_ = prio;
00596 numLocalDone_ = 0;
00597 numLocalContributors_ = numLocalContributors;
00598 initLocalClients();
00599
00600 hasSentRecently_ = false;
00601 if (usePeriodicFlushing) {
00602 enablePeriodicFlushing();
00603 }
00604
00605 CkCallback syncInitCb(CkIndex_MeshStreamer<dtype, RouterType>::syncInit(),
00606 this->thisProxy);
00607 this->contribute(syncInitCb);
00608 this->contribute(startCb);
00609 }
00610
00611 template <class dtype, class RouterType>
00612 void MeshStreamer<dtype, RouterType>::
00613 syncInit() {
00614
00615 if (numLocalContributors_ == 0) {
00616 startStagedCompletion();
00617 }
00618
00619 }
00620
00621 template <class dtype, class RouterType>
00622 void MeshStreamer<dtype, RouterType>::
00623 init(int numContributors, CkCallback startCb, CkCallback endCb,
00624 CProxy_CompletionDetector detector,
00625 int prio, bool usePeriodicFlushing) {
00626
00627 useStagedCompletion_ = false;
00628 stagedCompletionStarted_ = false;
00629 useCompletionDetection_ = true;
00630 yieldCount_ = 0;
00631 prio_ = prio;
00632 userCallback_ = endCb;
00633
00634
00635
00636 CkCallback flushCb(CkIndex_MeshStreamer<dtype, RouterType>::
00637 enablePeriodicFlushing(), this->thisProxy);
00638 CkCallback finish(CkIndex_MeshStreamer<dtype, RouterType>::finish(),
00639 this->thisProxy);
00640 detector_ = detector;
00641 detectorLocalObj_ = detector_.ckLocalBranch();
00642 initLocalClients();
00643
00644 detector_[CkMyPe()].start_detection(numContributors, startCb, flushCb,
00645 finish , 0);
00646
00647 hasSentRecently_ = false;
00648 if (usePeriodicFlushing) {
00649 enablePeriodicFlushing();
00650 }
00651 }
00652
00653 template <class dtype, class RouterType>
00654 void MeshStreamer<dtype, RouterType>::
00655 init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, int prio,
00656 bool usePeriodicFlushing) {
00657
00658 CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
00659 int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
00660 init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing);
00661 }
00662
00663 template <class dtype, class RouterType>
00664 void MeshStreamer<dtype, RouterType>::finish() {
00665
00666 isPeriodicFlushEnabled_ = false;
00667
00668 if (!userCallback_.isInvalid()) {
00669 this->contribute(userCallback_);
00670 userCallback_ = CkCallback();
00671 }
00672 }
00673
00674 template <class dtype, class RouterType>
00675 void MeshStreamer<dtype, RouterType>::
00676 receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
00677
00678 int destinationPe, lastDestinationPe;
00679 Route destinationRoute;
00680
00681 lastDestinationPe = -1;
00682 for (int i = 0; i < msg->numDataItems; i++) {
00683 destinationPe = msg->destinationPes[i];
00684 const dtype& dataItem = msg->getDataItem(i);
00685 if (destinationPe == myIndex_) {
00686 localDeliver(dataItem);
00687 }
00688 else if (destinationPe != TRAM_BROADCAST) {
00689 if (destinationPe != lastDestinationPe) {
00690
00691 myRouter_.determineRoute(destinationPe,
00692 myRouter_.dimensionReceived(msg->msgType),
00693 destinationRoute);
00694 }
00695 storeMessage(destinationPe, destinationRoute, &dataItem);
00696 }
00697 else {
00698 localBroadcast(dataItem);
00699 broadcast(&dataItem, msg->msgType - 1, false);
00700 }
00701 lastDestinationPe = destinationPe;
00702 }
00703
00704 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00705 envelope *env = UsrToEnv(msg);
00706 CkPrintf("[%d] received along route from %d %d items finalMsgCount: %d"
00707 " msgType: %d\n", myIndex_, env->getSrcPe(),
00708 msg->numDataItems, msg->finalMsgCount, msg->msgType);
00709 #endif
00710
00711 if (useStagedCompletion_) {
00712 if (msg->finalMsgCount != -2) {
00713 markMessageReceived(msg->msgType, msg->finalMsgCount);
00714 }
00715 #if !CMK_MULTICORE && !CMK_UTH_VERSION
00716 else if (stagedCompletionStarted_) {
00717 checkForCompletedStages();
00718 }
00719 #endif
00720 }
00721
00722 delete msg;
00723 }
00724
00725 template <class dtype, class RouterType>
00726 inline void MeshStreamer<dtype, RouterType>::sendLargestBuffer() {
00727
00728 int flushDimension, flushIndex, maxSize, destinationIndex;
00729 MeshStreamerMessage<dtype> *destinationBuffer;
00730
00731 for (int i = 0; i < numDimensions_; i++) {
00732 std::vector<MeshStreamerMessage<dtype> *> &messageBuffers = dataBuffers_[i];
00733
00734 flushDimension = i;
00735 maxSize = 0;
00736 for (int j = 0; j < messageBuffers.size(); j++) {
00737 if (messageBuffers[j] != NULL &&
00738 messageBuffers[j]->numDataItems > maxSize) {
00739 maxSize = messageBuffers[j]->numDataItems;
00740 flushIndex = j;
00741 }
00742 }
00743
00744 if (maxSize > 0) {
00745
00746 messageBuffers = dataBuffers_[flushDimension];
00747 destinationBuffer = messageBuffers[flushIndex];
00748
00749
00750 envelope *env = UsrToEnv(destinationBuffer);
00751 env->shrinkUsersize((bufferSize_ - destinationBuffer->numDataItems)
00752 * sizeof(dtype));
00753 numDataItemsBuffered_ -= destinationBuffer->numDataItems;
00754
00755 destinationIndex =
00756 myRouter_.nextPeAlongRoute(flushDimension, flushIndex);
00757
00758 if (destinationIndex == myIndex_) {
00759 destinationBuffer->finalMsgCount = -2;
00760 }
00761
00762 sendMeshStreamerMessage(destinationBuffer, flushDimension,
00763 destinationIndex);
00764
00765 if (useStagedCompletion_ && destinationIndex != myIndex_) {
00766 cntMsgSent_[i][flushIndex]++;
00767 }
00768
00769 messageBuffers[flushIndex] = NULL;
00770 }
00771 }
00772 }
00773
00774 template <class dtype, class RouterType>
00775 inline void MeshStreamer<dtype, RouterType>::flushToIntermediateDestinations() {
00776 for (int i = 0; i < numDimensions_; i++) {
00777 flushDimension(i);
00778 }
00779 }
00780
00781 template <class dtype, class RouterType>
00782 void MeshStreamer<dtype, RouterType>::
00783 flushDimension(int dimension, bool sendMsgCounts) {
00784
00785 std::vector<MeshStreamerMessage<dtype> *>
00786 &messageBuffers = dataBuffers_[dimension];
00787 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00788 CkPrintf("[%d] flushDimension: %d, num buffered: %d, sendMsgCounts: %d\n",
00789 myIndex_, dimension, numDataItemsBuffered_, sendMsgCounts);
00790 #endif
00791
00792 for (int j = 0; j < messageBuffers.size(); j++) {
00793
00794 if (messageBuffers[j] == NULL && !sendMsgCounts) {
00795 continue;
00796 }
00797 if(messageBuffers[j] == NULL && sendMsgCounts) {
00798 messageBuffers[j] = new (0, 0, 8 * sizeof(int))
00799 MeshStreamerMessage<dtype>(myRouter_.determineMsgType(dimension));
00800 *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
00801 CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
00802 }
00803 else {
00804
00805 envelope *env = UsrToEnv(messageBuffers[j]);
00806 const UInt s = (bufferSize_ - messageBuffers[j]->numDataItems) * sizeof(dtype);
00807 if (env->getUsersize() > s) {
00808 env->shrinkUsersize(s);
00809 }
00810 }
00811
00812 MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[j];
00813 int destinationIndex = myRouter_.nextPeAlongRoute(dimension, j);
00814 numDataItemsBuffered_ -= destinationBuffer->numDataItems;
00815 if (useStagedCompletion_) {
00816 if (destinationIndex == myIndex_) {
00817 destinationBuffer->finalMsgCount = -2;
00818 } else {
00819 cntMsgSent_[dimension][j]++;
00820 if (sendMsgCounts) {
00821 destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
00822 }
00823 }
00824 CkAssert(!sendMsgCounts || destinationBuffer->finalMsgCount != -1);
00825 }
00826 sendMeshStreamerMessage(destinationBuffer, dimension, destinationIndex);
00827 messageBuffers[j] = NULL;
00828 }
00829 }
00830
00831 template <class dtype, class RouterType>
00832 void MeshStreamer<dtype, RouterType>::flushIfIdle(){
00833
00834
00835
00836
00837 if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
00838
00839 if (numDataItemsBuffered_ != 0) {
00840 flushToIntermediateDestinations();
00841 }
00842 CkAssert(numDataItemsBuffered_ == 0);
00843
00844 }
00845
00846 hasSentRecently_ = false;
00847 }
00848
00849 template <class dtype, class RouterType>
00850 void periodicProgressFunction(void *MeshStreamerObj, double time) {
00851
00852 MeshStreamer<dtype, RouterType> *properObj =
00853 static_cast<MeshStreamer<dtype, RouterType>*>(MeshStreamerObj);
00854
00855 if (properObj->isPeriodicFlushEnabled()) {
00856 properObj->flushIfIdle();
00857 properObj->registerPeriodicProgressFunction();
00858 }
00859 }
00860
00861 template <class dtype, class RouterType>
00862 void MeshStreamer<dtype, RouterType>::registerPeriodicProgressFunction() {
00863 CcdCallFnAfter(periodicProgressFunction<dtype, RouterType>, (void *) this,
00864 progressPeriodInMs_);
00865 }
00866
00867 template <class dtype, class RouterType>
00868 void MeshStreamer<dtype, RouterType>::pup(PUP::er &p) {
00869
00870 p|bufferSize_;
00871 p|maxNumDataItemsBuffered_;
00872 p|numDataItemsBuffered_;
00873
00874 p|userCallback_;
00875 p|yieldFlag_;
00876
00877 p|progressPeriodInMs_;
00878 p|isPeriodicFlushEnabled_;
00879 p|hasSentRecently_;
00880
00881 p|detector_;
00882 p|prio_;
00883 p|yieldCount_;
00884
00885
00886 p|cntMsgSent_;
00887 p|cntMsgReceived_;
00888 p|cntMsgExpected_;
00889 p|cntFinished_;
00890
00891 p|numLocalDone_;
00892 p|numLocalContributors_;
00893 p|myCompletionStatus_;
00894
00895
00896 p|myRouter_;
00897 p|numMembers_;
00898 p|myIndex_;
00899 p|numDimensions_;
00900 p|useStagedCompletion_;
00901 p|stagedCompletionStarted_;
00902 p|useCompletionDetection_;
00903 if (p.isUnpacking()) detectorLocalObj_ = detector_.ckLocalBranch();
00904
00905 size_t outervec_size;
00906 std::vector<size_t> innervec_sizes;
00907
00908 if (p.isPacking()) {
00909 outervec_size = dataBuffers_.size();
00910 for (int i = 0; i < outervec_size; i++) {
00911 innervec_sizes.push_back(dataBuffers_[i].size());
00912 }
00913 }
00914
00915 p|outervec_size;
00916 p|innervec_sizes;
00917
00918 if (p.isUnpacking()) {
00919 dataBuffers_.resize(outervec_size);
00920 for (int i = 0; i < outervec_size; i++) {
00921 dataBuffers_[i].resize(innervec_sizes[i]);
00922 }
00923 }
00924
00925
00926 for (int i = 0; i < outervec_size; i++) {
00927 for (int j = 0; j < innervec_sizes[i]; j++) {
00928 CkPupMessage(p, (void**) &dataBuffers_[i][j]);
00929 }
00930 }
00931
00932 }
00933
00934 template <class dtype, class ClientType, class RouterType, int (*EntryMethod)(char *, void *) = defaultMeshStreamerDeliver<dtype, ClientType> >
00935 class GroupMeshStreamer :
00936 public CBase_GroupMeshStreamer<dtype, ClientType, RouterType, EntryMethod> {
00937 private:
00938
00939 CkGroupID clientGID_;
00940 ClientType *clientObj_;
00941
00942 void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
00943 for (int i = 0; i < msg->numDataItems; i++) {
00944 const dtype& data = msg->getDataItem(i);
00945 EntryMethod((char *) &data, clientObj_);
00946 }
00947
00948 if (this->useStagedCompletion_) {
00949 #ifdef CMK_TRAM_VERBOSE_OUTPUT
00950 envelope *env = UsrToEnv(msg);
00951 CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d"
00952 " msgType: %d\n", this->myIndex_, env->getSrcPe(),
00953 msg->numDataItems, msg->finalMsgCount, msg->msgType);
00954 #endif
00955 this->markMessageReceived(msg->msgType, msg->finalMsgCount);
00956 }
00957 else if (this->useCompletionDetection_){
00958 this->detectorLocalObj_->consume(msg->numDataItems);
00959 }
00960 QdProcess(msg->numDataItems);
00961 delete msg;
00962 }
00963
00964 inline void localDeliver(const dtype& dataItem) {
00965 EntryMethod((char *) &dataItem, clientObj_);
00966 if (this->useCompletionDetection_) {
00967 this->detectorLocalObj_->consume();
00968 }
00969 QdProcess(1);
00970 }
00971
00972 inline void localBroadcast(const dtype& dataItem) {
00973 localDeliver(dataItem);
00974 }
00975
00976 inline void initLocalClients() {
00977
00978 }
00979
00980 public:
00981
00982 GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
00983 int *dimensionSizes,
00984 CkGroupID clientGID,
00985 bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
00986 this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
00987 0, yieldFlag, progressPeriodInMs);
00988 clientGID_ = clientGID;
00989 clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
00990
00991 }
00992
00993 GroupMeshStreamer(int numDimensions, int *dimensionSizes,
00994 CkGroupID clientGID,
00995 int bufferSize, bool yieldFlag = 0,
00996 double progressPeriodInMs = -1.0) {
00997 this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize,
00998 yieldFlag, progressPeriodInMs);
00999 clientGID_ = clientGID;
01000 clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
01001
01002 }
01003
01004 GroupMeshStreamer(CkMigrateMessage *) {}
01005
01006 void pup(PUP::er &p) {
01007 p|clientGID_;
01008 if (p.isUnpacking()) {
01009 clientObj_ = (ClientType*) CkLocalBranch(clientGID_);
01010 }
01011 }
01012 };
01013
01014
01015 template <class dtype, class ClientType, int (*EntryMethod)(char *, void *) = defaultMeshStreamerDeliver<dtype,ClientType> >
01016 class LocalBroadcaster : public CkLocIterator {
01017
01018 public:
01019 CkArray *clientArrMgr_;
01020 const dtype *dataItem_;
01021
01022 LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem)
01023 : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
01024
01025 void addLocation(CkLocation& loc) {
01026 ClientType *clientObj =
01027 (ClientType *) clientArrMgr_->lookup(loc.getIndex());
01028 CkAssert(clientObj != NULL);
01029 EntryMethod((char *) dataItem_, clientObj);
01030 }
01031
01032 };
01033
01034 template <class dtype, class itype, class ClientType, class RouterType, int (*EntryMethod)(char *, void *) = defaultMeshStreamerDeliver<dtype,ClientType> >
01035 class ArrayMeshStreamer :
01036 public CBase_ArrayMeshStreamer<dtype, itype, ClientType, RouterType, EntryMethod> {
01037
01038 private:
01039
01040 CkArrayID clientAID_;
01041 CkArray *clientArrayMgr_;
01042 CkLocMgr *clientLocMgr_;
01043 int numArrayElements_;
01044 int numLocalArrayElements_;
01045 std::map<itype, std::vector<ArrayDataItem<dtype, itype> > > misdeliveredItems;
01046 #ifdef CMK_TRAM_CACHE_ARRAY_METADATA
01047 std::vector<ClientType *> clientObjs_;
01048 std::vector<int> destinationPes_;
01049 std::vector<bool> isCachedArrayMetadata_;
01050 #endif
01051
01052 inline
01053 void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
01054
01055 itype arrayId = packedDataItem.arrayIndex;
01056 if (arrayId == TramBroadcastInstance<CkArrayIndex>::value(arrayId.dimension)) {
01057 localBroadcast(packedDataItem);
01058 return;
01059 }
01060 ClientType *clientObj;
01061 #ifdef CMK_TRAM_CACHE_ARRAY_METADATA
01062 clientObj = clientObjs_[arrayId];
01063 #else
01064 clientObj = (ClientType *) clientArrayMgr_->lookup((CkArrayIndex)arrayId);
01065 #endif
01066
01067 if (clientObj != NULL) {
01068 EntryMethod((char *) &packedDataItem.dataItem, clientObj);
01069 if (this->useCompletionDetection_) {
01070 this->detectorLocalObj_->consume();
01071 }
01072 QdProcess(1);
01073 }
01074 else {
01075
01076
01077
01078 if (this->useStagedCompletion_) {
01079 CkAbort("Using staged completion when array locations"
01080 " are not guaranteed to be correct is currently"
01081 " not supported.");
01082 }
01083 misdeliveredItems[arrayId].push_back(packedDataItem);
01084 if (misdeliveredItems[arrayId].size() == 1) {
01085 int homePe = clientLocMgr_->homePe(arrayId);
01086 this->thisProxy[homePe].
01087 processLocationRequest(arrayId, this->myIndex_,
01088 packedDataItem.sourcePe);
01089 }
01090 }
01091 }
01092
01093 inline
01094 void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
01095
01096 LocalBroadcaster<dtype, ClientType, EntryMethod>
01097 clientIterator(clientArrayMgr_, &packedDataItem.dataItem);
01098 clientLocMgr_->iterate(clientIterator);
01099
01100 if (this->useCompletionDetection_) {
01101 this->detectorLocalObj_->consume();
01102 }
01103 QdProcess(1);
01104 }
01105
01106 inline void initLocalClients() {
01107
01108 if (this->useCompletionDetection_) {
01109 #ifdef CMK_TRAM_CACHE_ARRAY_METADATA
01110 numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
01111 clientObjs_.resize(numArrayElements_);
01112 destinationPes_.resize(numArrayElements_);
01113 isCachedArrayMetadata_.assign(numArrayElements_, false);
01114
01115 for (int i = 0; i < numArrayElements_; i++) {
01116 clientObjs_[i] =
01117 (ClientType*) ( clientArrayMgr_->lookup(CkArrayIndex1D(i)) );
01118 }
01119 #endif
01120 }
01121 }
01122
01123 public:
01124
01125 struct DataItemHandle {
01126 itype arrayIndex;
01127 const dtype *dataItem;
01128 };
01129
01130 ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
01131 int *dimensionSizes, CkArrayID clientAID,
01132 bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
01133
01134 this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 0,
01135 yieldFlag, progressPeriodInMs);
01136 clientAID_ = clientAID;
01137 clientArrayMgr_ = clientAID_.ckLocalBranch();
01138 clientLocMgr_ = clientArrayMgr_->getLocMgr();
01139 }
01140
01141 ArrayMeshStreamer(int numDimensions, int *dimensionSizes,
01142 CkArrayID clientAID, int bufferSize, bool yieldFlag = 0,
01143 double progressPeriodInMs = -1.0) {
01144
01145 this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize, yieldFlag,
01146 progressPeriodInMs);
01147 clientAID_ = clientAID;
01148 clientArrayMgr_ = clientAID_.ckLocalBranch();
01149 clientLocMgr_ = clientArrayMgr_->getLocMgr();
01150 }
01151
01152 ArrayMeshStreamer(CkMigrateMessage *) {}
01153
01154 void receiveAtDestination(
01155 MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
01156
01157 for (int i = 0; i < msg->numDataItems; i++) {
01158 const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
01159 localDeliver(packedData);
01160 }
01161 if (this->useStagedCompletion_) {
01162 this->markMessageReceived(msg->msgType, msg->finalMsgCount);
01163 }
01164
01165 delete msg;
01166 }
01167
01168 inline void broadcast(const dtype& dataItem) {
01169 const static bool copyIndirectly = true;
01170
01171
01172
01173 CkAssert(this->stagedCompletionStarted_ == false);
01174
01175 if (this->useCompletionDetection_) {
01176 this->detectorLocalObj_->produce(this->numMembers_);
01177 }
01178 QdCreate(this->numMembers_);
01179
01180
01181 ArrayDataItem<dtype, itype> packedDataItem(TRAM_BROADCAST, this->myIndex_,
01182 dataItem);
01183 localBroadcast(packedDataItem);
01184
01185 DataItemHandle tempHandle;
01186 tempHandle.dataItem = &dataItem;
01187 tempHandle.arrayIndex = TRAM_BROADCAST;
01188
01189 MeshStreamer<ArrayDataItem<dtype, itype>, RouterType>::
01190 broadcast(&tempHandle, this->numDimensions_ - 1, copyIndirectly);
01191 }
01192
01193 template <bool deliverInline = false>
01194 inline void insertData(const dtype& dataItem, itype arrayIndex) {
01195
01196
01197
01198 CkAssert(this->stagedCompletionStarted_ == false);
01199
01200 if (this->useCompletionDetection_) {
01201 this->detectorLocalObj_->produce();
01202 }
01203 QdCreate(1);
01204 int destinationPe;
01205 #ifdef CMK_TRAM_CACHE_ARRAY_METADATA
01206 if (isCachedArrayMetadata_[arrayIndex]) {
01207 destinationPe = destinationPes_[arrayIndex];
01208 }
01209 else {
01210 destinationPe = clientArrayMgr_->lastKnown((CkArrayIndex)arrayIndex);
01211 isCachedArrayMetadata_[arrayIndex] = true;
01212 destinationPes_[arrayIndex] = destinationPe;
01213 }
01214 #else
01215 destinationPe =
01216 clientArrayMgr_->lastKnown((CkArrayIndex)arrayIndex);
01217 #endif
01218
01219 if (deliverInline && destinationPe == this->myIndex_) {
01220 ArrayDataItem<dtype, itype>
01221 packedDataItem(arrayIndex, this->myIndex_, dataItem);
01222 localDeliver(packedDataItem);
01223 return;
01224 }
01225
01226
01227 DataItemHandle tempHandle;
01228 tempHandle.arrayIndex = arrayIndex;
01229 tempHandle.dataItem = &dataItem;
01230
01231 MeshStreamer<ArrayDataItem<dtype, itype>, RouterType>::
01232 insertData(&tempHandle, destinationPe);
01233
01234 }
01235
01236 inline int copyDataItemIntoMessage(
01237
01238 MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer,
01239 const void *dataItemHandle, bool copyIndirectly) {
01240
01241 if (copyIndirectly == true) {
01242
01243 int numDataItems = destinationBuffer->numDataItems;
01244 const DataItemHandle *tempHandle =
01245 (const DataItemHandle *) dataItemHandle;
01246 (destinationBuffer->dataItems)[numDataItems].arrayIndex =
01247 tempHandle->arrayIndex;
01248 (destinationBuffer->dataItems)[numDataItems].sourcePe = this->myIndex_;
01249 (destinationBuffer->dataItems)[numDataItems].dataItem =
01250 *(tempHandle->dataItem);
01251 return ++destinationBuffer->numDataItems;
01252 }
01253 else {
01254
01255
01256 return MeshStreamer<ArrayDataItem<dtype, itype>, RouterType>::
01257 copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
01258 }
01259 }
01260
01261
01262 void processLocationRequest(itype arrayId, int deliveredToPe, int sourcePe) {
01263 int ownerPe = clientArrayMgr_->lastKnown((CkArrayIndex)arrayId);
01264 this->thisProxy[deliveredToPe].resendMisdeliveredItems(arrayId, ownerPe);
01265 this->thisProxy[sourcePe].updateLocationAtSource(arrayId, ownerPe);
01266 }
01267
01268 void resendMisdeliveredItems(itype arrayId, int destinationPe) {
01269
01270 clientLocMgr_->updateLocation(arrayId, clientLocMgr_->lookupID(arrayId),destinationPe);
01271
01272 std::vector<ArrayDataItem<dtype, itype> > &bufferedItems
01273 = misdeliveredItems[arrayId];
01274
01275 Route destinationRoute;
01276 this->myRouter_.determineInitialRoute(destinationPe, destinationRoute);
01277 for (int i = 0; i < bufferedItems.size(); i++) {
01278 this->storeMessage(destinationPe, destinationRoute, &bufferedItems[i]);
01279 }
01280
01281 bufferedItems.clear();
01282 }
01283
01284 void updateLocationAtSource(itype arrayId, int destinationPe) {
01285
01286 int prevOwner = clientArrayMgr_->lastKnown((CkArrayIndex)arrayId);
01287
01288 if (prevOwner != destinationPe) {
01289 clientLocMgr_->updateLocation(arrayId,clientLocMgr_->lookupID(arrayId), destinationPe);
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305 }
01306 }
01307
01308 void pup(PUP::er &p) {
01309 p|clientAID_;
01310 if (p.isUnpacking()) {
01311 clientArrayMgr_ = clientAID_.ckLocalBranch();
01312 clientLocMgr_ = clientArrayMgr_->getLocMgr();
01313 }
01314
01315 p|numArrayElements_;
01316 p|numLocalArrayElements_;
01317 p|misdeliveredItems;
01318 #ifdef CMK_TRAM_CACHE_ARRAY_METADATA
01319 size_t clientObjsSize;
01320
01321 if (p.isPacking()) {
01322 clientObjsSize = clientObjs_.size();
01323 }
01324 p|clientObjsSize;
01325
01326 if (p.isUnpacking()) {
01327 clientObjs_.resize(clientObjsSize);
01328 }
01329 for (int i = 0; i < clientObjsSize; i++) {
01330 p|*clientObjs_[i];
01331 }
01332
01333 p|destinationPes_;
01334 p|isCachedArrayMetadata_;
01335 #endif
01336 }
01337
01338 };
01339
01340 struct ChunkReceiveBuffer {
01341 int bufferNumber;
01342 int receivedChunks;
01343 char *buffer;
01344
01345 void pup(PUP::er &p) {
01346 p|bufferNumber;
01347 p|receivedChunks;
01348 if(p.isUnpacking()) {
01349 buffer = new char[receivedChunks * CHUNK_SIZE];
01350 }
01351 PUParray(p, buffer, receivedChunks*CHUNK_SIZE);
01352 }
01353 };
01354
01355 struct ChunkOutOfOrderBuffer {
01356 int bufferNumber;
01357 int receivedChunks;
01358 int sourcePe;
01359 char *buffer;
01360
01361 ChunkOutOfOrderBuffer() {}
01362
01363 ChunkOutOfOrderBuffer(int b, int r, int s, char *buf)
01364 : bufferNumber(b), receivedChunks(r), sourcePe(s), buffer(buf) {}
01365
01366 bool operator==(const ChunkDataItem &chunk) {
01367 return ( (chunk.bufferNumber == bufferNumber) &&
01368 (chunk.sourcePe == sourcePe) );
01369 }
01370
01371 void pup(PUP::er &p) {
01372 p|bufferNumber;
01373 p|receivedChunks;
01374 p|sourcePe;
01375 if(p.isUnpacking()) {
01376 buffer = new char[receivedChunks * CHUNK_SIZE];
01377 }
01378 PUParray(p, buffer, receivedChunks*CHUNK_SIZE);
01379 }
01380 };
01381
01382 template <class dtype, class ClientType, class RouterType, int (*EntryMethod)(char *, void *) = defaultMeshStreamerDeliver<dtype,ClientType> >
01383 class GroupChunkMeshStreamer
01384 : public CBase_GroupChunkMeshStreamer<dtype, ClientType, RouterType, EntryMethod> {
01385
01386 private:
01387
01388
01389 std::list<ChunkOutOfOrderBuffer> outOfOrderBuffers_;
01390 std::vector<ChunkReceiveBuffer> lastReceived_;
01391 std::vector<int> currentBufferNumbers_;
01392
01393 CkGroupID clientGID_;
01394 ClientType *clientObj_;
01395
01396 bool userHandlesFreeing_;
01397 public:
01398
01399 GroupChunkMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
01400 int *dimensionSizes, CkGroupID clientGID,
01401 bool yieldFlag = 0, double progressPeriodInMs = -1.0,
01402 bool userHandlesFreeing = false) {
01403
01404 this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
01405 0, yieldFlag, progressPeriodInMs);
01406 clientGID_ = clientGID;
01407 clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
01408 userHandlesFreeing_ = userHandlesFreeing;
01409 commonInit();
01410 }
01411
01412 GroupChunkMeshStreamer(int numDimensions, int *dimensionSizes,
01413 CkGroupID clientGID, int bufferSize,
01414 bool yieldFlag = 0, double progressPeriodInMs = -1.0,
01415 bool userHandlesFreeing = false) {
01416
01417 this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize, yieldFlag,
01418 progressPeriodInMs);
01419 clientGID_ = clientGID;
01420 clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
01421 userHandlesFreeing_ = userHandlesFreeing;
01422 commonInit();
01423 }
01424
01425 GroupChunkMeshStreamer(CkMigrateMessage *) {}
01426
01427 inline void commonInit() {
01428 lastReceived_.resize(this->numMembers_);
01429 memset(&lastReceived_.front(), 0,
01430 this->numMembers_ * sizeof(ChunkReceiveBuffer));
01431 currentBufferNumbers_.assign(this->numMembers_, 0);
01432 }
01433
01434 inline void insertData(dtype *dataArray, int numElements, int destinationPe,
01435 void *extraData = NULL, int extraDataSize = 0) {
01436
01437 char *inputData = (char *) dataArray;
01438 int arraySizeInBytes = numElements * sizeof(dtype);
01439 int totalSizeInBytes = arraySizeInBytes + extraDataSize;
01440 ChunkDataItem chunk;
01441 int offset;
01442 int chunkNumber = 0;
01443 chunk.bufferNumber = currentBufferNumbers_[destinationPe]++;
01444 chunk.sourcePe = this->myIndex_;
01445 chunk.chunkNumber = 0;
01446 chunk.chunkSize = CHUNK_SIZE;
01447 chunk.numChunks = (int) ceil ( (float) totalSizeInBytes / CHUNK_SIZE);
01448 chunk.numItems = numElements;
01449
01450
01451 for (offset = 0; offset < arraySizeInBytes - CHUNK_SIZE;
01452 offset += CHUNK_SIZE) {
01453 memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
01454 MeshStreamer<ChunkDataItem, RouterType>::
01455 insertData(chunk, destinationPe);
01456 chunk.chunkNumber++;
01457 }
01458
01459
01460 chunk.chunkSize = arraySizeInBytes - offset;
01461 memset(chunk.rawData, 0, CHUNK_SIZE);
01462 memcpy(chunk.rawData, inputData + offset, chunk.chunkSize);
01463
01464
01465 int remainingToSend = extraDataSize;
01466 int tempOffset = chunk.chunkSize;
01467 int extraOffset = 0;
01468 do {
01469 chunk.chunkSize = std::min(tempOffset + remainingToSend, CHUNK_SIZE);
01470 memcpy(chunk.rawData + tempOffset, (char *) extraData + extraOffset,
01471 chunk.chunkSize - tempOffset);
01472
01473 MeshStreamer<ChunkDataItem, RouterType>::insertData(chunk, destinationPe);
01474 chunk.chunkNumber++;
01475 offset += CHUNK_SIZE;
01476 extraOffset += (chunk.chunkSize - tempOffset);
01477 remainingToSend -= (chunk.chunkSize - tempOffset);
01478 tempOffset = 0;
01479 } while (offset < totalSizeInBytes);
01480
01481 }
01482
01483 inline void processChunk(const ChunkDataItem& chunk) {
01484
01485 ChunkReceiveBuffer &last = lastReceived_[chunk.sourcePe];
01486
01487 if (last.buffer == NULL) {
01488 if (outOfOrderBuffers_.size() == 0) {
01489
01490 last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
01491 last.receivedChunks = 0;
01492 }
01493 else {
01494
01495 std::list<ChunkOutOfOrderBuffer>::iterator storedBuffer =
01496 find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
01497 if (storedBuffer != outOfOrderBuffers_.end()) {
01498 last.buffer = storedBuffer->buffer;
01499 last.receivedChunks = storedBuffer->receivedChunks;
01500 outOfOrderBuffers_.erase(storedBuffer);
01501 }
01502 else {
01503 last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
01504 last.receivedChunks = 0;
01505 }
01506 }
01507 last.bufferNumber = chunk.bufferNumber;
01508 }
01509 else if (last.bufferNumber != chunk.bufferNumber) {
01510
01511 ChunkOutOfOrderBuffer lastOutOfOrderBuffer(last.bufferNumber,
01512 last.receivedChunks,
01513 chunk.sourcePe, last.buffer);
01514 outOfOrderBuffers_.push_front(lastOutOfOrderBuffer);
01515
01516
01517 std::list<ChunkOutOfOrderBuffer >::iterator storedBuffer =
01518 find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
01519
01520 if (storedBuffer == outOfOrderBuffers_.end() ) {
01521
01522 last.bufferNumber = chunk.bufferNumber;
01523 last.receivedChunks = 0;
01524 last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
01525 }
01526 else {
01527
01528 last.bufferNumber = storedBuffer->bufferNumber;
01529 last.receivedChunks = storedBuffer->receivedChunks;
01530 last.buffer = storedBuffer->buffer;
01531 outOfOrderBuffers_.erase(storedBuffer);
01532 }
01533 }
01534
01535 char *receiveBuffer = last.buffer;
01536
01537 memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE,
01538 chunk.rawData, chunk.chunkSize);
01539 if (++last.receivedChunks == chunk.numChunks) {
01540 clientObj_->receiveArray(
01541 (dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
01542 last.receivedChunks = 0;
01543 if (!userHandlesFreeing_) {
01544 delete [] last.buffer;
01545 }
01546 last.buffer = NULL;
01547 }
01548
01549 }
01550
01551 inline void localDeliver(const ChunkDataItem& chunk) {
01552 processChunk(chunk);
01553 if (this->useCompletionDetection_) {
01554 this->detectorLocalObj_->consume();
01555 }
01556 QdProcess(1);
01557 }
01558
01559 void receiveAtDestination(
01560 MeshStreamerMessage<ChunkDataItem> *msg) {
01561
01562 for (int i = 0; i < msg->numDataItems; i++) {
01563 const ChunkDataItem& chunk = msg->getDataItem(i);
01564 processChunk(chunk);
01565 }
01566
01567 if (this->useStagedCompletion_) {
01568 #ifdef CMK_TRAM_VERBOSE_OUTPUT
01569 envelope *env = UsrToEnv(msg);
01570 CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n",
01571 this->myIndex_, env->getSrcPe(), msg->numDataItems,
01572 msg->finalMsgCount);
01573 #endif
01574 this->markMessageReceived(msg->msgType, msg->finalMsgCount);
01575 }
01576 else if (this->useCompletionDetection_){
01577 this->detectorLocalObj_->consume(msg->numDataItems);
01578 }
01579 QdProcess(msg->numDataItems);
01580 delete msg;
01581
01582 }
01583
01584 inline void localBroadcast(const ChunkDataItem& dataItem) {
01585 localDeliver(dataItem);
01586 }
01587
01588 inline void initLocalClients() {
01589
01590 }
01591
01592 void pup(PUP::er &p) {
01593 p|outOfOrderBuffers_;
01594 p|lastReceived_;
01595 p|currentBufferNumbers_;
01596
01597 p|clientGID_;
01598 if (p.isUnpacking())
01599 clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
01600
01601 p|userHandlesFreeing_;
01602 }
01603
01604 };
01605
01606 template <typename dtype, typename RouterType>
01607 struct recursive_pup_impl<MeshStreamer<dtype, RouterType>, 1> {
01608 typedef MeshStreamer<dtype, RouterType> T;
01609 void operator()(T *obj, PUP::er &p) {
01610 obj->parent_pup(p);
01611 obj->T::pup(p);
01612 }
01613 };
01614
01615 #define CK_TEMPLATES_ONLY
01616 #include "NDMeshStreamer.def.h"
01617 #undef CK_TEMPLATES_ONLY
01618
01619 #endif