00001
00002
00003
00004
00005 #include "charm++.h"
00006 #include "ck.h"
00007 #include "converse.h"
00008 #include "cmirdmautils.h"
00009
00010
00011 #if CMK_SMP
00012 extern CProxy_ckcallback_group _ckcallbackgroup;
00013 #endif
00014
00015
00016
00017 void CkNcpyBuffer::memcpyGet(CkNcpyBuffer &source) {
00018
00019 memcpy((void *)ptr, source.ptr, cnt);
00020 }
00021
00022 #if CMK_USE_CMA
00023 void CkNcpyBuffer::cmaGet(CkNcpyBuffer &source) {
00024 CmiIssueRgetUsingCMA(source.ptr,
00025 source.layerInfo,
00026 source.pe,
00027 ptr,
00028 layerInfo,
00029 pe,
00030 cnt);
00031 }
00032 #endif
00033
00034 void CkNcpyBuffer::rdmaGet(CkNcpyBuffer &source) {
00035
00036 int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00037 int ackSize = sizeof(CkCallback);
00038
00039 if(regMode == CK_BUFFER_UNREG) {
00040
00041 CmiSetRdmaBufferInfo(layerInfo + CmiGetRdmaCommonInfoSize(), ptr, cnt, regMode);
00042
00043 isRegistered = true;
00044 }
00045
00046
00047 int ncpyObjSize = getNcpyOpInfoTotalSize(
00048 layerInfoSize,
00049 ackSize,
00050 layerInfoSize,
00051 ackSize);
00052
00053 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyObjSize);
00054
00055 setNcpyOpInfo(source.ptr,
00056 (char *)(source.layerInfo),
00057 layerInfoSize,
00058 (char *)(&source.cb),
00059 ackSize,
00060 source.cnt,
00061 source.regMode,
00062 source.deregMode,
00063 source.isRegistered,
00064 source.pe,
00065 source.ref,
00066 ptr,
00067 (char *)(layerInfo),
00068 layerInfoSize,
00069 (char *)(&cb),
00070 ackSize,
00071 cnt,
00072 regMode,
00073 deregMode,
00074 isRegistered,
00075 pe,
00076 ref,
00077 ncpyOpInfo);
00078
00079 CmiIssueRget(ncpyOpInfo);
00080 }
00081
00082
00083 CkNcpyStatus CkNcpyBuffer::get(CkNcpyBuffer &source){
00084 if(regMode == CK_BUFFER_NOREG || source.regMode == CK_BUFFER_NOREG) {
00085 CkAbort("Cannot perform RDMA operations in CK_BUFFER_NOREG mode\n");
00086 }
00087
00088
00089 CkAssert(source.cnt <= cnt);
00090
00091
00092 CkAssert(CkNodeOf(pe) == CkMyNode());
00093
00094 CkNcpyMode transferMode = findTransferMode(source.pe, pe);
00095
00096
00097 if(transferMode == CkNcpyMode::MEMCPY) {
00098 memcpyGet(source);
00099
00100
00101 source.cb.send(sizeof(CkNcpyBuffer), &source);
00102
00103
00104 cb.send(sizeof(CkNcpyBuffer), this);
00105
00106
00107 return CkNcpyStatus::complete;
00108
00109 #if CMK_USE_CMA
00110 } else if(transferMode == CkNcpyMode::CMA) {
00111
00112 cmaGet(source);
00113
00114
00115 source.cb.send(sizeof(CkNcpyBuffer), &source);
00116
00117
00118 cb.send(sizeof(CkNcpyBuffer), this);
00119
00120
00121 return CkNcpyStatus::complete;
00122
00123 #endif
00124 } else if (transferMode == CkNcpyMode::RDMA) {
00125
00126 int outstandingRdmaOps = 1;
00127
00128 #if CMK_ONESIDED_IMPL
00129 #if CMK_CONVERSE_MPI
00130 outstandingRdmaOps += 1;
00131 #endif
00132 #else
00133 outstandingRdmaOps += 1;
00134 #endif
00135
00136
00137 QdCreate(outstandingRdmaOps);
00138
00139 rdmaGet(source);
00140
00141
00142 return CkNcpyStatus::incomplete;
00143
00144 } else {
00145 CkAbort("CkNcpyBuffer::get : Invalid CkNcpyMode");
00146 }
00147 }
00148
00149
00150 void CkNcpyBuffer::memcpyPut(CkNcpyBuffer &destination) {
00151
00152 memcpy((void *)destination.ptr, ptr, cnt);
00153 }
00154
00155 #if CMK_USE_CMA
00156 void CkNcpyBuffer::cmaPut(CkNcpyBuffer &destination) {
00157 CmiIssueRputUsingCMA(destination.ptr,
00158 destination.layerInfo,
00159 destination.pe,
00160 ptr,
00161 layerInfo,
00162 pe,
00163 cnt);
00164 }
00165 #endif
00166
00167 void CkNcpyBuffer::rdmaPut(CkNcpyBuffer &destination) {
00168
00169 int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00170 int ackSize = sizeof(CkCallback);
00171
00172 if(regMode == CK_BUFFER_UNREG) {
00173
00174 CmiSetRdmaBufferInfo(layerInfo + CmiGetRdmaCommonInfoSize(), ptr, cnt, regMode);
00175
00176 isRegistered = true;
00177 }
00178
00179
00180 int ncpyObjSize = getNcpyOpInfoTotalSize(
00181 layerInfoSize,
00182 ackSize,
00183 layerInfoSize,
00184 ackSize);
00185
00186 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyObjSize);
00187
00188 setNcpyOpInfo(ptr,
00189 (char *)(layerInfo),
00190 layerInfoSize,
00191 (char *)(&cb),
00192 ackSize,
00193 cnt,
00194 regMode,
00195 deregMode,
00196 isRegistered,
00197 pe,
00198 ref,
00199 destination.ptr,
00200 (char *)(destination.layerInfo),
00201 layerInfoSize,
00202 (char *)(&destination.cb),
00203 ackSize,
00204 destination.cnt,
00205 destination.regMode,
00206 destination.deregMode,
00207 destination.isRegistered,
00208 destination.pe,
00209 destination.ref,
00210 ncpyOpInfo);
00211
00212 CmiIssueRput(ncpyOpInfo);
00213 }
00214
00215
00216 CkNcpyStatus CkNcpyBuffer::put(CkNcpyBuffer &destination){
00217 if(regMode == CK_BUFFER_NOREG || destination.regMode == CK_BUFFER_NOREG) {
00218 CkAbort("Cannot perform RDMA operations in CK_BUFFER_NOREG mode\n");
00219 }
00220
00221 CkAssert(cnt <= destination.cnt);
00222
00223
00224 CkAssert(CkNodeOf(pe) == CkMyNode());
00225
00226 CkNcpyMode transferMode = findTransferMode(pe, destination.pe);
00227
00228
00229 if(transferMode == CkNcpyMode::MEMCPY) {
00230 memcpyPut(destination);
00231
00232
00233 destination.cb.send(sizeof(CkNcpyBuffer), &destination);
00234
00235
00236 cb.send(sizeof(CkNcpyBuffer), this);
00237
00238
00239 return CkNcpyStatus::complete;
00240
00241 #if CMK_USE_CMA
00242 } else if(transferMode == CkNcpyMode::CMA) {
00243 cmaPut(destination);
00244
00245
00246 destination.cb.send(sizeof(CkNcpyBuffer), &destination);
00247
00248
00249 cb.send(sizeof(CkNcpyBuffer), this);
00250
00251
00252 return CkNcpyStatus::complete;
00253
00254 #endif
00255 } else if (transferMode == CkNcpyMode::RDMA) {
00256
00257 int outstandingRdmaOps = 1;
00258
00259 #if CMK_ONESIDED_IMPL
00260 #if CMK_CONVERSE_MPI
00261 outstandingRdmaOps += 1;
00262 #endif
00263 #else
00264 outstandingRdmaOps += 1;
00265 #endif
00266
00267
00268 QdCreate(outstandingRdmaOps);
00269
00270 rdmaPut(destination);
00271
00272
00273 return CkNcpyStatus::incomplete;
00274
00275 } else {
00276 CkAbort("CkNcpyBuffer::put : Invalid CkNcpyMode");
00277 }
00278 }
00279
00280
00281 void constructSourceBufferObject(NcpyOperationInfo *info, CkNcpyBuffer &src) {
00282 src.ptr = info->srcPtr;
00283 src.pe = info->srcPe;
00284 src.cnt = info->srcSize;
00285 src.ref = info->srcRef;
00286 src.regMode = info->srcRegMode;
00287 src.deregMode = info->srcDeregMode;
00288 src.isRegistered = info->isSrcRegistered;
00289 memcpy((char *)(&src.cb), info->srcAck, info->srcAckSize);
00290 memcpy((char *)(src.layerInfo), info->srcLayerInfo, info->srcLayerSize);
00291 }
00292
00293
00294 void constructDestinationBufferObject(NcpyOperationInfo *info, CkNcpyBuffer &dest) {
00295 dest.ptr = info->destPtr;
00296 dest.pe = info->destPe;
00297 dest.cnt = info->destSize;
00298 dest.ref = info->destRef;
00299 dest.regMode = info->destRegMode;
00300 dest.deregMode = info->destDeregMode;
00301 dest.isRegistered = info->isDestRegistered;
00302 memcpy((char *)(&dest.cb), info->destAck, info->destAckSize);
00303 memcpy((char *)(dest.layerInfo), info->destLayerInfo, info->destLayerSize);
00304 }
00305
00306 void invokeSourceCallback(NcpyOperationInfo *info) {
00307 CkCallback *srcCb = (CkCallback *)(info->srcAck);
00308 if(srcCb->requiresMsgConstruction()) {
00309 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_SRC_ACK) {
00310 CkNcpyBuffer src;
00311 constructSourceBufferObject(info, src);
00312
00313 invokeCallback(info->srcAck, info->srcPe, src);
00314 }
00315 }
00316 }
00317
00318 void invokeDestinationCallback(NcpyOperationInfo *info) {
00319 CkCallback *destCb = (CkCallback *)(info->destAck);
00320 if(destCb->requiresMsgConstruction()) {
00321 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
00322 CkNcpyBuffer dest;
00323 constructDestinationBufferObject(info, dest);
00324
00325 invokeCallback(info->destAck, info->destPe, dest);
00326 }
00327 }
00328 }
00329
00330 void handleDirectApiCompletion(NcpyOperationInfo *info) {
00331 invokeSourceCallback(info);
00332 invokeDestinationCallback(info);
00333
00334 if(info->freeMe == CMK_FREE_NCPYOPINFO)
00335 CmiFree(info);
00336 }
00337
00338
00339 void CkRdmaDirectAckHandler(void *ack) {
00340
00341
00342 QdProcess(1);
00343
00344 NcpyOperationInfo *info = (NcpyOperationInfo *)(ack);
00345
00346 CkCallback *srcCb = (CkCallback *)(info->srcAck);
00347 CkCallback *destCb = (CkCallback *)(info->destAck);
00348
00349 switch(info->opMode) {
00350 case CMK_DIRECT_API : handleDirectApiCompletion(info);
00351 break;
00352 #if CMK_ONESIDED_IMPL
00353 case CMK_EM_API : handleEntryMethodApiCompletion(info);
00354 break;
00355
00356 case CMK_EM_API_SRC_ACK_INVOKE: invokeSourceCallback(info);
00357 break;
00358
00359 case CMK_EM_API_REVERSE : handleReverseEntryMethodApiCompletion(info);
00360 break;
00361
00362 case CMK_BCAST_EM_API : handleBcastEntryMethodApiCompletion(info);
00363 break;
00364
00365 case CMK_BCAST_EM_API_REVERSE : handleBcastReverseEntryMethodApiCompletion(info);
00366 break;
00367 case CMK_READONLY_BCAST : readonlyGetCompleted(info);
00368 break;
00369 #endif
00370 default : CkAbort("CkRdmaDirectAckHandler: Unknown ncpyOpInfo->opMode");
00371 break;
00372 }
00373 }
00374
00375
00376 void invokeCallback(void *cb, int pe, CkNcpyBuffer &buff) {
00377
00378 #if CMK_SMP
00379
00380
00381 _ckcallbackgroup[pe].call(*(CkCallback *)(cb), sizeof(CkNcpyBuffer), (const char *)(&buff));
00382 #else
00383
00384 ((CkCallback *)(cb))->send(sizeof(CkNcpyBuffer), &buff);
00385 #endif
00386 }
00387
00388
00389
00390
00391 CkNcpyMode findTransferMode(int srcPe, int destPe) {
00392 if(CmiNodeOf(srcPe)==CmiNodeOf(destPe))
00393 return CkNcpyMode::MEMCPY;
00394 #if CMK_USE_CMA
00395 else if(CmiDoesCMAWork() && CmiPeOnSamePhysicalNode(srcPe, destPe))
00396 return CkNcpyMode::CMA;
00397 #endif
00398 else
00399 return CkNcpyMode::RDMA;
00400 }
00401
00402 void enqueueNcpyMessage(int destPe, void *msg){
00403
00404 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRES
00405 if(destPe == CkMyPe())
00406 CmiHandleMessage(msg);
00407 else
00408 CmiPushPE(CmiRankOf(destPe), msg);
00409 #else
00410
00411
00412 CmiHandleMessage(msg);
00413 #endif
00414 }
00415
00416
00417 #if CMK_ONESIDED_IMPL
00418
00419
00420
00421
00422
00423 void performRgets(char *ref, int numops, int extraSize) {
00424
00425 for(int i=0; i<numops; i++){
00426 NcpyEmBufferInfo *ncpyEmBufferInfo = (NcpyEmBufferInfo *)(ref + sizeof(NcpyEmInfo) + i *(sizeof(NcpyEmBufferInfo) + extraSize));
00427 NcpyOperationInfo *ncpyOpInfo = &(ncpyEmBufferInfo->ncpyOpInfo);
00428 CmiIssueRget(ncpyOpInfo);
00429 }
00430 }
00431
00432
00433 void CkRdmaEMAckHandler(int destPe, void *ack) {
00434
00435 if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
00436 CmiSpanningTreeInfo &t = *_topoTree;
00437
00438 NcpyEmBufferInfo *emBuffInfo = (NcpyEmBufferInfo *)(ack);
00439
00440 char *ref = (char *)(emBuffInfo);
00441
00442 int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00443 int ncpyObjSize = getNcpyOpInfoTotalSize(
00444 layerInfoSize,
00445 sizeof(CkCallback),
00446 layerInfoSize,
00447 0);
00448
00449
00450 NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)(ref - (emBuffInfo->index) * (sizeof(NcpyEmBufferInfo) + ncpyObjSize - sizeof(NcpyOperationInfo)) - sizeof(NcpyEmInfo));
00451 ncpyEmInfo->counter++;
00452
00453 #if CMK_REG_REQUIRED
00454 if(ncpyEmInfo->mode == ncpyEmApiMode::P2P_SEND ||
00455 (ncpyEmInfo->mode == ncpyEmApiMode::BCAST_SEND && t.child_count == 0)) {
00456
00457 NcpyOperationInfo *ncpyOpInfo = &(emBuffInfo->ncpyOpInfo);
00458
00459
00460 CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destRegMode);
00461
00462 } else if(ncpyEmInfo->mode == ncpyEmApiMode::P2P_RECV ||
00463 (ncpyEmInfo->mode == ncpyEmApiMode::BCAST_RECV && t.child_count == 0)) {
00464 NcpyOperationInfo *ncpyOpInfo = &(emBuffInfo->ncpyOpInfo);
00465
00466
00467 if(ncpyOpInfo->destDeregMode == CK_BUFFER_DEREG) {
00468 CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destRegMode);
00469 }
00470 }
00471 #endif
00472
00473 if(ncpyEmInfo->counter == ncpyEmInfo->numOps) {
00474
00475
00476 switch(ncpyEmInfo->mode) {
00477 case ncpyEmApiMode::P2P_SEND : enqueueNcpyMessage(destPe, ncpyEmInfo->msg);
00478 break;
00479
00480 case ncpyEmApiMode::P2P_RECV : enqueueNcpyMessage(destPe, ncpyEmInfo->msg);
00481 CmiFree(ncpyEmInfo);
00482 break;
00483
00484 case ncpyEmApiMode::BCAST_SEND : processBcastSendEmApiCompletion(ncpyEmInfo, destPe);
00485 break;
00486
00487 case ncpyEmApiMode::BCAST_RECV : processBcastRecvEmApiCompletion(ncpyEmInfo, destPe);
00488 break;
00489
00490 default : CmiAbort("Invalid operation mode");
00491 break;
00492 }
00493 }
00494 }
00495
00496 void performEmApiMemcpy(CkNcpyBuffer &source, CkNcpyBuffer &dest, ncpyEmApiMode emMode) {
00497 dest.memcpyGet(source);
00498
00499 if(emMode == ncpyEmApiMode::P2P_SEND || emMode == ncpyEmApiMode::P2P_RECV) {
00500
00501 source.cb.send(sizeof(CkNcpyBuffer), &source);
00502 }
00503 else if (emMode == ncpyEmApiMode::BCAST_SEND || emMode == ncpyEmApiMode::BCAST_RECV) {
00504
00505 CkRdmaEMBcastAckHandler((void *)source.bcastAckInfo);
00506 }
00507 }
00508
00509 #if CMK_USE_CMA
00510 void performEmApiCmaTransfer(CkNcpyBuffer &source, CkNcpyBuffer &dest, int child_count, ncpyEmApiMode emMode) {
00511 dest.cmaGet(source);
00512
00513 if(emMode == ncpyEmApiMode::P2P_SEND || emMode == ncpyEmApiMode::P2P_RECV) {
00514
00515 source.cb.send(sizeof(CkNcpyBuffer), &source);
00516 }
00517 else if (emMode == ncpyEmApiMode::BCAST_SEND || emMode == ncpyEmApiMode::BCAST_RECV) {
00518 if(child_count != 0) {
00519 if(dest.regMode == CK_BUFFER_UNREG) {
00520
00521 CmiSetRdmaBufferInfo(dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.ptr, dest.cnt, dest.regMode);
00522 dest.isRegistered = true;
00523 }
00524 }
00525 }
00526 }
00527 #endif
00528
00529 void performEmApiRget(CkNcpyBuffer &source, CkNcpyBuffer &dest, int opIndex, char *ref, int extraSize, ncpyEmApiMode emMode) {
00530
00531 int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00532 if(dest.regMode == CK_BUFFER_UNREG) {
00533
00534 CmiSetRdmaBufferInfo(dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.ptr, dest.cnt, dest.regMode);
00535
00536 dest.isRegistered = true;
00537 }
00538
00539 NcpyEmBufferInfo *ncpyEmBufferInfo = (NcpyEmBufferInfo *)(ref + sizeof(NcpyEmInfo) + opIndex *(sizeof(NcpyEmBufferInfo) + extraSize));
00540 ncpyEmBufferInfo->index = opIndex;
00541
00542 NcpyOperationInfo *ncpyOpInfo = &(ncpyEmBufferInfo->ncpyOpInfo);
00543 setNcpyOpInfo(source.ptr,
00544 (char *)(source.layerInfo),
00545 layerInfoSize,
00546 (char *)(&source.cb),
00547 sizeof(CkCallback),
00548 source.cnt,
00549 source.regMode,
00550 source.deregMode,
00551 source.isRegistered,
00552 source.pe,
00553 source.ref,
00554 dest.ptr,
00555 (char *)(dest.layerInfo),
00556 layerInfoSize,
00557 NULL,
00558 0,
00559 dest.cnt,
00560 dest.regMode,
00561 dest.deregMode,
00562 dest.isRegistered,
00563 dest.pe,
00564 (char *)(ncpyEmBufferInfo),
00565 ncpyOpInfo);
00566
00567
00568 if(emMode == ncpyEmApiMode::BCAST_SEND || emMode == ncpyEmApiMode::BCAST_RECV)
00569 ncpyOpInfo->opMode = CMK_BCAST_EM_API;
00570 else if(emMode == ncpyEmApiMode::P2P_SEND || emMode == ncpyEmApiMode::P2P_RECV)
00571 ncpyOpInfo->opMode = CMK_EM_API;
00572 else
00573 CmiAbort("Invalid Mode\n");
00574
00575 ncpyOpInfo->freeMe = CMK_DONT_FREE_NCPYOPINFO;
00576
00577 ncpyOpInfo->refPtr = ncpyEmBufferInfo;
00578
00579
00580
00581
00582 }
00583
00584 void performEmApiNcpyTransfer(CkNcpyBuffer &source, CkNcpyBuffer &dest, int opIndex, int child_count, char *ref, int extraSize, CkNcpyMode ncpyMode, ncpyEmApiMode emMode){
00585
00586 switch(ncpyMode) {
00587 case CkNcpyMode::MEMCPY: performEmApiMemcpy(source, dest, emMode);
00588 break;
00589 #if CMK_USE_CMA
00590 case CkNcpyMode::CMA : performEmApiCmaTransfer(source, dest, child_count, emMode);
00591 break;
00592 #endif
00593 case CkNcpyMode::RDMA : performEmApiRget(source, dest, opIndex, ref, extraSize, emMode);
00594 break;
00595
00596 default : CkAbort("Invalid Mode");
00597 break;
00598 }
00599 }
00600
00601
00602 void preprocessRdmaCaseForRgets(int &layerInfoSize, int &ncpyObjSize, int &extraSize, int &totalMsgSize, int &numops) {
00603 layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00604
00605 ncpyObjSize = getNcpyOpInfoTotalSize(
00606 layerInfoSize,
00607 sizeof(CkCallback),
00608 layerInfoSize,
00609 0);
00610
00611 extraSize = ncpyObjSize - sizeof(NcpyOperationInfo);
00612
00613 totalMsgSize += sizeof(NcpyEmInfo) + numops*(sizeof(NcpyEmBufferInfo) + extraSize);
00614 }
00615
00616 void setNcpyEmInfo(char *ref, envelope *env, int &msgsize, int &numops, void *forwardMsg, ncpyEmApiMode emMode) {
00617
00618 NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)ref;
00619 ncpyEmInfo->numOps = numops;
00620 ncpyEmInfo->counter = 0;
00621 ncpyEmInfo->msg = env;
00622
00623 ncpyEmInfo->forwardMsg = forwardMsg;
00624 ncpyEmInfo->pe = CkMyPe();
00625 ncpyEmInfo->mode = emMode;
00626 }
00627
00628
00629
00630 void CkPackRdmaPtrs(char *msgBuf){
00631 PUP::toMem p((void *)msgBuf);
00632 PUP::fromMem up((void *)msgBuf);
00633 int numops;
00634 up|numops;
00635 p|numops;
00636
00637
00638 for(int i=0; i<numops; i++){
00639 CkNcpyBuffer w;
00640 up|w;
00641 w.ptr = (void *)((char *)w.ptr - (char *)msgBuf);
00642 p|w;
00643 }
00644 }
00645
00646
00647 void CkUnpackRdmaPtrs(char *msgBuf){
00648 PUP::toMem p((void *)msgBuf);
00649 PUP::fromMem up((void *)msgBuf);
00650 int numops;
00651 up|numops;
00652 p|numops;
00653
00654
00655 for(int i=0; i<numops; i++){
00656 CkNcpyBuffer w;
00657 up|w;
00658 w.ptr = (void *)((char *)msgBuf + (size_t)w.ptr);
00659 p|w;
00660 }
00661 }
00662
00663
00664
00665
00666 void getRdmaNumopsAndBufsize(envelope *env, int &numops, int &bufsize) {
00667 numops = 0;
00668 bufsize = 0;
00669 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
00670 up|numops;
00671 for(int i=0; i<numops; i++){
00672 CkNcpyBuffer w;
00673 up|w;
00674 bufsize += CK_ALIGN(w.cnt, 16);
00675 }
00676 }
00677
00678 void handleEntryMethodApiCompletion(NcpyOperationInfo *info) {
00679
00680 #if CMK_REG_REQUIRED
00681
00682 if(info->srcDeregMode == CK_BUFFER_DEREG)
00683 CmiInvokeRemoteDeregAckHandler(info->srcPe, info);
00684 else
00685 #endif
00686 invokeSourceCallback(info);
00687
00688 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
00689
00690 CkRdmaEMAckHandler(info->destPe, info->refPtr);
00691 }
00692 }
00693
00694 void handleReverseEntryMethodApiCompletion(NcpyOperationInfo *info) {
00695
00696 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
00697
00698 CmiInvokeRemoteAckHandler(info->destPe, info->refPtr);
00699 }
00700
00701 #if CMK_REG_REQUIRED
00702
00703 if(info->srcDeregMode == CK_BUFFER_DEREG) {
00704 CmiDeregisterMem(info->srcPtr, info->srcLayerInfo + CmiGetRdmaCommonInfoSize(), info->srcPe, info->srcRegMode);
00705 info->isSrcRegistered = 0;
00706 }
00707 #endif
00708
00709 invokeSourceCallback(info);
00710
00711 if(info->freeMe == CMK_FREE_NCPYOPINFO)
00712 CmiFree(info);
00713 }
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723 envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg){
00724
00725 int numops=0, bufsize=0, msgsize=0;
00726
00727 CkUnpackMessage(&env);
00728 getRdmaNumopsAndBufsize(env, numops, bufsize);
00729 CkPackMessage(&env);
00730
00731 msgsize = env->getTotalsize();
00732 int totalMsgSize = CK_ALIGN(msgsize, 16) + bufsize;
00733 char *ref;
00734 int layerInfoSize, ncpyObjSize, extraSize;
00735
00736 CkNcpyMode ncpyMode = findTransferMode(env->getSrcPe(), CkMyPe());
00737 if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
00738 CmiSpanningTreeInfo &t = *_topoTree;
00739
00740 layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00741
00742 if(ncpyMode == CkNcpyMode::RDMA) {
00743 preprocessRdmaCaseForRgets(layerInfoSize, ncpyObjSize, extraSize, totalMsgSize, numops);
00744 }
00745
00746
00747 envelope *copyenv = (envelope *)CmiAlloc(totalMsgSize);
00748
00749
00750 memcpy(copyenv, env, msgsize);
00751
00752
00753
00754
00755 copyenv->setTotalsize(totalMsgSize);
00756
00757
00758
00759 CMI_ZC_MSGTYPE(copyenv) = CMK_REG_NO_ZC_MSG;
00760
00761 if(ncpyMode == CkNcpyMode::RDMA) {
00762 ref = (char *)copyenv + CK_ALIGN(msgsize, 16) + bufsize;
00763 setNcpyEmInfo(ref, copyenv, msgsize, numops, forwardMsg, emMode);
00764 }
00765
00766 char *buf = (char *)copyenv + CK_ALIGN(msgsize, 16);
00767
00768 CkUnpackMessage(©env);
00769 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(copyenv))->msgBuf));
00770 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(copyenv))->msgBuf);
00771 up|numops;
00772 p|numops;
00773
00774
00775 CkNcpyBuffer source;
00776
00777 for(int i=0; i<numops; i++){
00778 up|source;
00779
00780
00781 CkNcpyBuffer dest((const void *)buf, source.cnt, CK_BUFFER_UNREG);
00782
00783 performEmApiNcpyTransfer(source, dest, i, t.child_count, ref, extraSize, ncpyMode, emMode);
00784
00785
00786 source.ptr = buf;
00787
00788 memcpy(source.layerInfo, dest.layerInfo, layerInfoSize);
00789
00790
00791 buf += CK_ALIGN(source.cnt, 16);
00792 p|source;
00793 }
00794
00795
00796 CkPackRdmaPtrs(((CkMarshallMsg *)EnvToUsr(copyenv))->msgBuf);
00797
00798 if(emMode == ncpyEmApiMode::P2P_SEND) {
00799 switch(ncpyMode) {
00800 case CkNcpyMode::MEMCPY:
00801 case CkNcpyMode::CMA : return copyenv;
00802 break;
00803
00804 case CkNcpyMode::RDMA : performRgets(ref, numops, extraSize);
00805 break;
00806
00807 default : CmiAbort("Invalid transfer mode\n");
00808 break;
00809 }
00810 } else if(emMode == ncpyEmApiMode::BCAST_SEND) {
00811 switch(ncpyMode) {
00812 case CkNcpyMode::MEMCPY: CkPackMessage(©env);
00813 forwardMessageToPeerNodes(copyenv, copyenv->getMsgtype());
00814 return copyenv;
00815 break;
00816
00817 case CkNcpyMode::CMA : CkPackMessage(©env);
00818 handleMsgUsingCMAPostCompletionForSendBcast(copyenv, env, source);
00819 break;
00820
00821 case CkNcpyMode::RDMA : performRgets(ref, numops, extraSize);
00822 break;
00823
00824 default : CmiAbort("Invalid transfer mode\n");
00825 break;
00826 }
00827 } else {
00828 CmiAbort("Invalid operation mode\n");
00829 }
00830 return NULL;
00831 }
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841 void CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg, int numops, void **arrPtrs, CkNcpyBufferPost *postStructs){
00842
00843 if(emMode == ncpyEmApiMode::BCAST_SEND)
00844 CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
00845
00846
00847 int msgsize = env->getTotalsize();
00848
00849 int refSize = 0;
00850 char *ref;
00851 int layerInfoSize, ncpyObjSize, extraSize;
00852
00853 CkNcpyMode ncpyMode = findTransferMode(env->getSrcPe(), CkMyPe());
00854 if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
00855 CmiSpanningTreeInfo &t = *_topoTree;
00856
00857 layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
00858
00859 if(ncpyMode == CkNcpyMode::RDMA) {
00860 preprocessRdmaCaseForRgets(layerInfoSize, ncpyObjSize, extraSize, refSize, numops);
00861 ref = (char *)CmiAlloc(refSize);
00862 }
00863
00864
00865
00866 if(emMode == ncpyEmApiMode::P2P_RECV)
00867 CMI_ZC_MSGTYPE(env) = CMK_ZC_P2P_RECV_DONE_MSG;
00868
00869 if(ncpyMode == CkNcpyMode::RDMA) {
00870 setNcpyEmInfo(ref, env, msgsize, numops, forwardMsg, emMode);
00871 }
00872
00873 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
00874 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
00875 up|numops;
00876 p|numops;
00877
00878
00879 CkNcpyBuffer source;
00880
00881 for(int i=0; i<numops; i++){
00882 up|source;
00883
00884
00885 CkNcpyBuffer dest((const void *)arrPtrs[i], source.cnt, postStructs[i].regMode, postStructs[i].deregMode);
00886
00887 performEmApiNcpyTransfer(source, dest, i, t.child_count, ref, extraSize, ncpyMode, emMode);
00888
00889
00890 source.ptr = arrPtrs[i];
00891
00892 memcpy(source.layerInfo, dest.layerInfo, layerInfoSize);
00893
00894 p|source;
00895 }
00896
00897
00898 if(emMode == ncpyEmApiMode::P2P_RECV) {
00899 switch(ncpyMode) {
00900 case CkNcpyMode::MEMCPY:
00901 case CkNcpyMode::CMA : enqueueNcpyMessage(CkMyPe(), env);
00902 break;
00903
00904 case CkNcpyMode::RDMA : performRgets(ref, numops, extraSize);
00905 break;
00906
00907 default : CmiAbort("Invalid transfer mode\n");
00908 break;
00909 }
00910 } else if(emMode == ncpyEmApiMode::BCAST_RECV) {
00911 switch(ncpyMode) {
00912 case CkNcpyMode::MEMCPY: handleMsgOnChildPostCompletionForRecvBcast(env);
00913 break;
00914
00915 case CkNcpyMode::CMA : if(t.child_count == 0) {
00916 sendAckMsgToParent(env);
00917 handleMsgOnChildPostCompletionForRecvBcast(env);
00918 } else {
00919
00920 NcpyBcastInterimAckInfo *bcastAckInfo = allocateInterimNodeAckObj(env, NULL, CkMyPe());
00921 handleMsgOnInterimPostCompletionForRecvBcast(env, bcastAckInfo, CkMyPe());
00922 }
00923 break;
00924
00925 case CkNcpyMode::RDMA : performRgets(ref, numops, extraSize);
00926 break;
00927
00928 default : CmiAbort("Invalid transfer mode\n");
00929 break;
00930 }
00931 } else {
00932 CmiAbort("Invalid operation mode\n");
00933 }
00934 }
00935
00936
00937
00938
00939
00940
00941 void CkRdmaPrepareBcastMsg(envelope *env) {
00942
00943 int numops;
00944 CkUnpackMessage(&env);
00945 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
00946 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
00947
00948 up|numops;
00949 p|numops;
00950
00951 NcpyBcastRootAckInfo *bcastAckInfo = (NcpyBcastRootAckInfo *)CmiAlloc(sizeof(NcpyBcastRootAckInfo) + numops * sizeof(CkNcpyBuffer));
00952
00953 CmiSpanningTreeInfo &t = *_topoTree;
00954 bcastAckInfo->numChildren = t.child_count + 1;
00955 bcastAckInfo->counter = 0;
00956 bcastAckInfo->isRoot = true;
00957 bcastAckInfo->numops = numops;
00958 bcastAckInfo->pe = CkMyPe();
00959
00960 for(int i=0; i<numops; i++) {
00961 CkNcpyBuffer source;
00962 up|source;
00963
00964 bcastAckInfo->src[i] = source;
00965
00966 source.bcastAckInfo = bcastAckInfo;
00967
00968 p|source;
00969 }
00970 CkPackMessage(&env);
00971 }
00972
00973
00974
00975 const void *getParentBcastAckInfo(void *msg, int &srcPe) {
00976 int numops;
00977 CkNcpyBuffer source;
00978 envelope *env = (envelope *)msg;
00979 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
00980 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
00981
00982 up|numops;
00983 p|numops;
00984
00985 CkAssert(numops >= 1);
00986
00987 up|source;
00988 p|source;
00989
00990 srcPe = source.pe;
00991 return source.bcastAckInfo;
00992 }
00993
00994
00995
00996 NcpyBcastInterimAckInfo *allocateInterimNodeAckObj(envelope *myEnv, envelope *myChildEnv, int pe) {
00997 CmiSpanningTreeInfo &t = *_topoTree;
00998
00999
01000 NcpyBcastInterimAckInfo *bcastAckInfo = (NcpyBcastInterimAckInfo *)CmiAlloc(sizeof(NcpyBcastInterimAckInfo));
01001
01002
01003 bcastAckInfo->numChildren = t.child_count;
01004 bcastAckInfo->counter = 0;
01005 bcastAckInfo->isRoot = false;
01006 bcastAckInfo->pe = pe;
01007
01008
01009 bcastAckInfo->isRecv = (myChildEnv == NULL);
01010 bcastAckInfo->isArray = (myEnv->getMsgtype() == ForArrayEltMsg);
01011
01012
01013 bcastAckInfo->msg = myEnv;
01014
01015 return bcastAckInfo;
01016 }
01017
01018
01019 void CkRdmaEMBcastAckHandler(void *ack) {
01020 NcpyBcastAckInfo *bcastAckInfo = (NcpyBcastAckInfo *)ack;
01021
01022 bcastAckInfo->counter++;
01023
01024 if(bcastAckInfo->counter == bcastAckInfo->numChildren) {
01025
01026
01027
01028 if(bcastAckInfo->isRoot) {
01029
01030 NcpyBcastRootAckInfo *bcastRootAckInfo = (NcpyBcastRootAckInfo *)(bcastAckInfo);
01031
01032 for(int i=0; i<bcastRootAckInfo->numops; i++) {
01033 #if CMK_REG_REQUIRED
01034
01035 if(bcastRootAckInfo->src[i].deregMode == CK_BUFFER_DEREG)
01036 bcastRootAckInfo->src[i].deregisterMem();
01037 #endif
01038
01039 invokeCallback(&(bcastRootAckInfo->src[i].cb),
01040 bcastRootAckInfo->pe,
01041 bcastRootAckInfo->src[i]);
01042 }
01043
01044 CmiFree(bcastRootAckInfo);
01045
01046 } else {
01047 CmiSpanningTreeInfo &t = *_topoTree;
01048
01049 NcpyBcastInterimAckInfo *bcastInterimAckInfo = (NcpyBcastInterimAckInfo *)(bcastAckInfo);
01050
01051 if(bcastInterimAckInfo->isRecv) {
01052
01053 envelope *myMsg = (envelope *)(bcastInterimAckInfo->msg);
01054
01055
01056 #if CMK_REG_REQUIRED
01057 deregisterMemFromMsg(myMsg, true);
01058 #endif
01059
01060 int srcPe;
01061 CkArray *mgr = NULL;
01062 envelope *env = (envelope *)bcastInterimAckInfo->msg;
01063 CkUnpackMessage(&env);
01064 char *ref = (char *)(getParentBcastAckInfo(bcastInterimAckInfo->msg, srcPe));
01065 CkPackMessage(&env);
01066
01067 NcpyBcastInterimAckInfo *ncpyBcastAck = (NcpyBcastInterimAckInfo *)ref;
01068 CmiInvokeBcastAckHandler(ncpyBcastAck->origPe, ncpyBcastAck->parentBcastAckInfo);
01069
01070 CMI_ZC_MSGTYPE(myMsg) = CMK_ZC_BCAST_RECV_DONE_MSG;
01071
01072 CkUnpackMessage(&myMsg);
01073
01074 if(bcastInterimAckInfo->isArray) {
01075 myMsg->setMsgtype(ForArrayEltMsg);
01076
01077 mgr = getArrayMgrFromMsg(myMsg);
01078 mgr->forwardZCMsgToOtherElems(myMsg);
01079 }
01080 #if CMK_SMP
01081 if(CmiMyNodeSize() > 1 && myMsg->getMsgtype() != ForNodeBocMsg) {
01082 sendRecvDoneMsgToPeers(myMsg, mgr);
01083 } else {
01084 if(myMsg->getMsgtype() == ForArrayEltMsg) {
01085 myMsg->setMsgtype(ForBocMsg);
01086 myMsg->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01087 }
01088 enqueueNcpyMessage(bcastAckInfo->pe, myMsg);
01089 }
01090 #else
01091 CMI_ZC_MSGTYPE(myMsg) = CMK_ZC_BCAST_RECV_ALL_DONE_MSG;
01092
01093 if(myMsg->getMsgtype() == ForArrayEltMsg) {
01094 myMsg->setMsgtype(ForBocMsg);
01095 myMsg->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01096 }
01097 enqueueNcpyMessage(bcastAckInfo->pe, myMsg);
01098 #endif
01099 } else {
01100
01101 envelope *myMsg = (envelope *)(bcastInterimAckInfo->msg);
01102
01103
01104 #if CMK_REG_REQUIRED
01105 deregisterMemFromMsg(myMsg, false);
01106 #endif
01107
01108 envelope *env = (envelope *)bcastInterimAckInfo->msg;
01109 CkUnpackMessage(&env);
01110 sendAckMsgToParent(env);
01111 CkPackMessage(&env);
01112
01113 forwardMessageToPeerNodes(myMsg, myMsg->getMsgtype());
01114
01115
01116 enqueueNcpyMessage(bcastAckInfo->pe, bcastInterimAckInfo->msg);
01117
01118 CmiFree(bcastInterimAckInfo);
01119 }
01120 }
01121 }
01122 }
01123
01124
01125
01126 void forwardMessageToChildNodes(envelope *myChildrenMsg, UChar msgType) {
01127 #if CMK_SMP && CMK_NODE_QUEUE_AVAILABLE
01128 if(msgType == ForNodeBocMsg) {
01129
01130 CmiForwardNodeBcastMsg(myChildrenMsg->getTotalsize(), (char *)myChildrenMsg);
01131 } else
01132 #endif
01133
01134 CmiForwardProcBcastMsg(myChildrenMsg->getTotalsize(), (char *)myChildrenMsg);
01135 }
01136
01137
01138 void forwardMessageToPeerNodes(envelope *myMsg, UChar msgType) {
01139 #if CMK_SMP
01140 #if CMK_NODE_QUEUE_AVAILABLE
01141 if(msgType == ForBocMsg)
01142 #endif // CMK_NODE_QUEUE_AVAILABLE
01143 CmiForwardMsgToPeers(myMsg->getTotalsize(), (char *)myMsg);
01144 #endif
01145 }
01146
01147 void handleBcastEntryMethodApiCompletion(NcpyOperationInfo *info){
01148 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
01149
01150
01151 CkRdmaEMAckHandler(info->destPe, info->refPtr);
01152 }
01153 }
01154
01155 void handleBcastReverseEntryMethodApiCompletion(NcpyOperationInfo *info) {
01156 if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
01157
01158 CmiInvokeRemoteAckHandler(info->destPe, info->refPtr);
01159 }
01160 if(info->freeMe == CMK_FREE_NCPYOPINFO)
01161 CmiFree(info);
01162 }
01163
01164 void deregisterMemFromMsg(envelope *env, bool isRecv) {
01165 CkUnpackMessage(&env);
01166 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01167 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
01168 int numops;
01169 up|numops;
01170 p|numops;
01171
01172 CkNcpyBuffer dest;
01173
01174 for(int i=0; i<numops; i++){
01175 up|dest;
01176
01177
01178
01179 if( (!isRecv) || (isRecv && dest.deregMode == CMK_BUFFER_DEREG) ) {
01180 CmiDeregisterMem(dest.ptr, (char *)dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.pe, dest.regMode);
01181 }
01182
01183 p|dest;
01184 }
01185 CkPackMessage(&env);
01186 }
01187
01188
01189
01190 void handleMsgUsingCMAPostCompletionForSendBcast(envelope *copyenv, envelope *env, CkNcpyBuffer &source) {
01191 CmiSpanningTreeInfo &t = *_topoTree;
01192
01193 if(t.child_count == 0) {
01194
01195
01196 CmiInvokeBcastAckHandler(source.pe, (void *)source.bcastAckInfo);
01197
01198
01199 forwardMessageToPeerNodes(copyenv, copyenv->getMsgtype());
01200
01201
01202 enqueueNcpyMessage(CkMyPe(), copyenv);
01203
01204 } else {
01205
01206
01207 NcpyBcastInterimAckInfo *bcastAckInfo = allocateInterimNodeAckObj(copyenv, env, CkMyPe());
01208
01210 CkReplaceSourcePtrsInBcastMsg(env, copyenv, bcastAckInfo, CkMyPe());
01211
01212
01213 forwardMessageToChildNodes(env, copyenv->getMsgtype());
01214 }
01215 }
01216
01217 void processBcastSendEmApiCompletion(NcpyEmInfo *ncpyEmInfo, int destPe) {
01218 CmiSpanningTreeInfo &t = *_topoTree;
01219 envelope *myEnv = (envelope *)(ncpyEmInfo->msg);
01220
01221 if(t.child_count == 0) {
01222
01223 CkUnpackMessage(&myEnv);
01224 sendAckMsgToParent(myEnv);
01225 CkPackMessage(&myEnv);
01226
01227
01228
01229 forwardMessageToPeerNodes(myEnv, myEnv->getMsgtype());
01230
01231
01232 enqueueNcpyMessage(destPe, myEnv);
01233
01234 } else {
01235
01236 envelope *myChildEnv = (envelope *)(ncpyEmInfo->forwardMsg);
01237
01238
01239 NcpyBcastInterimAckInfo *bcastAckInfo = allocateInterimNodeAckObj(myEnv, myChildEnv, ncpyEmInfo->pe);
01240
01241
01242 CkReplaceSourcePtrsInBcastMsg(myChildEnv, myEnv, bcastAckInfo, ncpyEmInfo->pe);
01243
01244
01245 forwardMessageToChildNodes(myChildEnv, myEnv->getMsgtype());
01246 }
01247 }
01248
01249
01250 void CkReplaceSourcePtrsInBcastMsg(envelope *prevEnv, envelope *env, void *bcastAckInfo, int origPe) {
01251
01252 int numops;
01253
01254 CkUnpackMessage(&prevEnv);
01255 PUP::toMem p_prev((void *)(((CkMarshallMsg *)EnvToUsr(prevEnv))->msgBuf));
01256 PUP::fromMem up_prev((void *)((CkMarshallMsg *)EnvToUsr(prevEnv))->msgBuf);
01257
01258 CkUnpackMessage(&env);
01259 CkUnpackRdmaPtrs((((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01260 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01261 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
01262
01263 up_prev|numops;
01264 up|numops;
01265
01266 p|numops;
01267 p_prev|numops;
01268
01269 for(int i=0; i<numops; i++){
01270
01271 CkNcpyBuffer prev_source, source;
01272
01273
01274 up_prev|prev_source;
01275
01276
01277 up|source;
01278
01279 const void *bcastAckInfoTemp = source.bcastAckInfo;
01280 int orig_source_pe = source.pe;
01281
01282 source.bcastAckInfo = bcastAckInfo;
01283 source.pe = origPe;
01284
01285
01286 p_prev|source;
01287
01288 source.bcastAckInfo = bcastAckInfoTemp;
01289 source.pe = orig_source_pe;
01290
01291
01292 p|source;
01293 }
01294
01295 CkPackMessage(&prevEnv);
01296
01297 CkPackRdmaPtrs((((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01298 CkPackMessage(&env);
01299 }
01300
01301
01302
01303 void processBcastRecvEmApiCompletion(NcpyEmInfo *ncpyEmInfo, int destPe) {
01304 CmiSpanningTreeInfo &t = *_topoTree;
01305 envelope *myEnv = (envelope *)(ncpyEmInfo->msg);
01306
01307 if(t.child_count == 0) {
01308
01309
01310 #if CMK_SMP
01311 CmiInvokeBcastPostAckHandler(destPe, ncpyEmInfo->msg);
01312 #else
01313 CkRdmaEMBcastPostAckHandler(ncpyEmInfo->msg);
01314 #endif
01315 CmiFree(ncpyEmInfo);
01316
01317 } else {
01318
01319
01320
01321 #if CMK_SMP
01322 CmiInvokeBcastPostAckHandler(destPe, ncpyEmInfo);
01323 #else
01324 CkRdmaEMBcastPostAckHandler(ncpyEmInfo);
01325 #endif
01326 }
01327 }
01328
01329 void CkRdmaEMBcastPostAckHandler(void *msg) {
01330
01331 CmiSpanningTreeInfo &t = *_topoTree;
01332
01333
01334 if(t.child_count == 0) {
01335
01336
01337 envelope *env = (envelope *)(msg);
01338 sendAckMsgToParent(env);
01339 handleMsgOnChildPostCompletionForRecvBcast(env);
01340
01341 } else if(t.child_count !=0 && t.parent != -1) {
01342
01343 NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)(msg);
01344 envelope *env = (envelope *)(ncpyEmInfo->msg);
01345
01346
01347 NcpyBcastInterimAckInfo *bcastAckInfo = allocateInterimNodeAckObj(env, NULL, ncpyEmInfo->pe);
01348 handleMsgOnInterimPostCompletionForRecvBcast(env, bcastAckInfo, ncpyEmInfo->pe);
01349
01350 CmiFree(ncpyEmInfo);
01351
01352 } else {
01353 CmiAbort("parent node reaching CkRdmaEMBcastPostAckHandler\n");
01354 }
01355
01356 }
01357
01358 void CkReplaceSourcePtrsInBcastMsg(envelope *env, NcpyBcastInterimAckInfo *bcastAckInfo, int origPe) {
01359
01360 int numops;
01361 CkUnpackMessage(&env);
01362
01363 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01364 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
01365
01366 up|numops;
01367 p|numops;
01368
01369
01370 CkNcpyBuffer source;
01371
01372 for(int i=0; i<numops; i++){
01373
01374 up|source;
01375
01376 const void *bcastAckInfoTemp = source.bcastAckInfo;
01377 int orig_source_pe = source.pe;
01378
01379 bcastAckInfo->parentBcastAckInfo = (void *)bcastAckInfoTemp;
01380 bcastAckInfo->origPe = orig_source_pe;
01381
01382 source.bcastAckInfo = bcastAckInfo;
01383 source.pe = origPe;
01384
01385
01386 p|source;
01387 }
01388
01389
01390 CkPackMessage(&env);
01391 }
01392
01393 #if CMK_SMP
01394 void updatePeerCounterAndPush(envelope *env) {
01395 int pe;
01396 int numops;
01397
01398 CkUnpackMessage(&env);
01399 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01400 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
01401
01402 up|numops;
01403 p|numops;
01404
01405 CkNcpyBuffer source;
01406
01407 up|source;
01408
01409 pe = CmiNodeFirst(CmiMyNode());
01410
01411 void *ref = (void *)source.bcastAckInfo;
01412 NcpyBcastRecvPeerAckInfo *peerAckInfo = (NcpyBcastRecvPeerAckInfo *)ref;
01413 source.bcastAckInfo = peerAckInfo->bcastAckInfo;
01414
01415 p|source;
01416 CkPackMessage(&env);
01417 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_ALL_DONE_MSG;
01418 CmiSpanningTreeInfo &t = *_topoTree;
01419 peerAckInfo->decNumPeers();
01420 if(peerAckInfo->getNumPeers() == 0) {
01421 CmiPushPE(CmiRankOf(peerAckInfo->peerParentPe), env);
01422 }
01423 }
01424
01425 void sendRecvDoneMsgToPeers(envelope *env, CkArray *mgr) {
01426
01427 CmiSpanningTreeInfo &t = *_topoTree;
01428
01429
01430 NcpyBcastRecvPeerAckInfo *peerAckInfo = new NcpyBcastRecvPeerAckInfo();
01431
01432
01433 peerAckInfo->setNumPeers(CmiMyNodeSize() - 1);
01434 peerAckInfo->msg = (void *)env;
01435 peerAckInfo->peerParentPe = CmiMyPe();
01436
01437 int numops;
01438
01439
01440 CkUnpackMessage(&env);
01441 PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
01442 PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
01443
01444 up|numops;
01445 p|numops;
01446
01447 CkNcpyBuffer source;
01448
01449 up|source;
01450
01451 peerAckInfo->bcastAckInfo = (void *)source.bcastAckInfo;
01452 source.bcastAckInfo = peerAckInfo;
01453
01454 p|source;
01455
01456 CkPackMessage(&env);
01457
01458 if(env->getMsgtype() == ForArrayEltMsg) {
01459 env->setMsgtype(ForBocMsg);
01460 env->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01461 }
01462 CmiForwardMsgToPeers(env->getTotalsize(), (char *)env);
01463 }
01464 #endif
01465
01466
01467 void sendAckMsgToParent(envelope *env) {
01468 int srcPe;
01469
01470
01471 char *ref = (char *)getParentBcastAckInfo(env,srcPe);
01472
01473
01474 CmiInvokeBcastAckHandler(srcPe, ref);
01475 }
01476
01477 CkArray* getArrayMgrFromMsg(envelope *env) {
01478 CkArray *mgr = NULL;
01479 CkGroupID gId = env->getArrayMgr();
01480 IrrGroup *obj = _getCkLocalBranchFromGroupID(gId);
01481 CkAssert(obj!=NULL);
01482 mgr = (CkArray *)obj;
01483 return mgr;
01484 }
01485
01486 void handleArrayMsgOnChildPostCompletionForRecvBcast(envelope *env) {
01487 CkArray *mgr = getArrayMgrFromMsg(env);
01488 mgr->forwardZCMsgToOtherElems(env);
01489
01490 #if CMK_SMP
01491 if(CmiMyNodeSize() > 1) {
01492 sendRecvDoneMsgToPeers(env, mgr);
01493 } else
01494 #endif
01495 {
01496 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_ALL_DONE_MSG;
01497 env->setMsgtype(ForBocMsg);
01498 env->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01499 CmiHandleMessage(env);
01500 }
01501 }
01502
01503 void handleGroupMsgOnChildPostCompletionForRecvBcast(envelope *env) {
01504 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_DONE_MSG;
01505 #if CMK_SMP
01506 if(CmiMyNodeSize() > 1) {
01507 sendRecvDoneMsgToPeers(env, NULL);
01508 } else
01509 #endif
01510 {
01511 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_ALL_DONE_MSG;
01512 CmiHandleMessage(env);
01513 }
01514 }
01515
01516 void handleNGMsgOnChildPostCompletionForRecvBcast(envelope *env) {
01517 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_ALL_DONE_MSG;
01518 CmiHandleMessage(env);
01519 }
01520
01521 void handleMsgOnChildPostCompletionForRecvBcast(envelope *env) {
01522 switch(env->getMsgtype()) {
01523
01524 case ForArrayEltMsg : handleArrayMsgOnChildPostCompletionForRecvBcast(env);
01525 break;
01526 case ForBocMsg : handleGroupMsgOnChildPostCompletionForRecvBcast(env);
01527 break;
01528 case ForNodeBocMsg : handleNGMsgOnChildPostCompletionForRecvBcast(env);
01529 break;
01530 default : CmiAbort("Type of message currently not supported\n");
01531 break;
01532 }
01533 }
01534
01535 void handleMsgOnInterimPostCompletionForRecvBcast(envelope *env, NcpyBcastInterimAckInfo *bcastAckInfo, int pe) {
01536
01537 CkReplaceSourcePtrsInBcastMsg(env, bcastAckInfo, pe);
01538
01539 CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_RECV_MSG;
01540
01541 if(env->getMsgtype() == ForArrayEltMsg) {
01542 CkArray *mgr = getArrayMgrFromMsg(env);
01543 env->setMsgtype(ForBocMsg);
01544 env->getsetArrayEp() = mgr->getRecvBroadcastEpIdx();
01545 }
01546
01547
01548 forwardMessageToChildNodes(env, env->getMsgtype());
01549 }
01550
01551
01552
01553
01554 extern int _roRdmaDoneHandlerIdx,_initHandlerIdx;
01555 CksvExtern(int, _numPendingRORdmaTransfers);
01556 extern UInt numZerocopyROops, curROIndex;
01557 extern NcpyROBcastAckInfo *roBcastAckInfo;
01558
01559 void readonlyUpdateNumops() {
01560
01561 numZerocopyROops++;
01562 }
01563
01564
01565 void readonlyAllocateOnSource() {
01566
01567 if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
01568 CmiSpanningTreeInfo &t = *_topoTree;
01569
01570
01571 roBcastAckInfo = (NcpyROBcastAckInfo *)CmiAlloc(sizeof(NcpyROBcastAckInfo) + numZerocopyROops * sizeof(NcpyROBcastBuffAckInfo));
01572
01573 roBcastAckInfo->counter = 0;
01574 roBcastAckInfo->isRoot = (t.parent == -1);
01575 roBcastAckInfo->numChildren = t.child_count;
01576 roBcastAckInfo->numops = numZerocopyROops;
01577 }
01578
01579
01580 void readonlyCreateOnSource(CkNcpyBuffer &src) {
01581 src.bcastAckInfo = roBcastAckInfo;
01582
01583 NcpyROBcastBuffAckInfo *buffAckInfo = &(roBcastAckInfo->buffAckInfo[curROIndex]);
01584
01585 buffAckInfo->ptr = src.ptr;
01586 buffAckInfo->regMode = src.regMode;
01587 buffAckInfo->pe = src.pe;
01588
01589
01590 memcpy(buffAckInfo->layerInfo, src.layerInfo, CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES);
01591
01592 curROIndex++;
01593 }
01594
01595
01596
01597
01598 void readonlyGet(CkNcpyBuffer &src, CkNcpyBuffer &dest, void *refPtr) {
01599
01600 CkAssert(CkMyRank() == 0);
01601
01602 CmiSpanningTreeInfo &t = *_topoTree;
01603
01604 CkNcpyMode transferMode = findTransferMode(src.pe, dest.pe);
01605 if(transferMode == CkNcpyMode::MEMCPY) {
01606 CmiAbort("memcpy: should not happen\n");
01607 }
01608 #if CMK_USE_CMA
01609 else if(transferMode == CkNcpyMode::CMA) {
01610 dest.cmaGet(src);
01611
01612
01613 CksvAccess(_numPendingRORdmaTransfers)--;
01614
01615
01616 if(t.child_count != 0)
01617 readonlyCreateOnSource(dest);
01618
01619
01620 if(CksvAccess(_numPendingRORdmaTransfers) == 0) {
01621
01622 if(t.child_count != 0) {
01623
01624
01625 envelope *env = (envelope *)(refPtr);
01626 CmiForwardProcBcastMsg(env->getTotalsize(), (char *)env);
01627
01628 } else {
01629
01630
01631 CmiDeregisterMem(dest.ptr, dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.pe, dest.regMode);
01632
01633
01634 envelope *compEnv = _allocEnv(ROChildCompletionMsg);
01635 compEnv->setSrcPe(CkMyPe());
01636 CmiSetHandler(compEnv, _roRdmaDoneHandlerIdx);
01637 CmiSyncSendAndFree(src.pe, compEnv->getTotalsize(), (char *)compEnv);
01638 }
01639
01640
01641 checkForInitDone(true);
01642 }
01643 }
01644 #endif
01645 else {
01646
01647
01648 int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
01649 int ackSize = 0;
01650 int ncpyObjSize = getNcpyOpInfoTotalSize(
01651 layerInfoSize,
01652 ackSize,
01653 layerInfoSize,
01654 ackSize);
01655 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyObjSize);
01656 setNcpyOpInfo(src.ptr,
01657 (char *)(src.layerInfo),
01658 layerInfoSize,
01659 NULL,
01660 ackSize,
01661 src.cnt,
01662 src.regMode,
01663 src.deregMode,
01664 src.isRegistered,
01665 src.pe,
01666 src.ref,
01667 dest.ptr,
01668 (char *)(dest.layerInfo),
01669 layerInfoSize,
01670 NULL,
01671 ackSize,
01672 dest.cnt,
01673 dest.regMode,
01674 dest.deregMode,
01675 dest.isRegistered,
01676 dest.pe,
01677 dest.ref,
01678 ncpyOpInfo);
01679
01680 ncpyOpInfo->opMode = CMK_READONLY_BCAST;
01681 ncpyOpInfo->refPtr = refPtr;
01682
01683
01684 if(t.child_count != 0)
01685 readonlyCreateOnSource(dest);
01686
01687 CmiIssueRget(ncpyOpInfo);
01688 }
01689 }
01690
01691
01692
01693
01694 void readonlyGetCompleted(NcpyOperationInfo *ncpyOpInfo) {
01695
01696 if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
01697 CmiSpanningTreeInfo &t = *_topoTree;
01698
01699
01700 CksvAccess(_numPendingRORdmaTransfers)--;
01701
01702
01703 if(CksvAccess(_numPendingRORdmaTransfers) == 0) {
01704
01705 if(t.child_count != 0) {
01706
01707 envelope *env = (envelope *)(ncpyOpInfo->refPtr);
01708
01709
01710 CmiForwardProcBcastMsg(env->getTotalsize(), (char *)env);
01711
01712
01713
01714 } else {
01715
01716
01717 CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destRegMode);
01718
01719
01720 envelope *compEnv = _allocEnv(ROChildCompletionMsg);
01721 compEnv->setSrcPe(CkMyPe());
01722 CmiSetHandler(compEnv, _roRdmaDoneHandlerIdx);
01723 CmiSyncSendAndFree(ncpyOpInfo->srcPe, compEnv->getTotalsize(), (char *)compEnv);
01724 }
01725
01726 #if CMK_SMP
01727
01728 envelope *sigEnv = _allocEnv(ROPeerCompletionMsg);
01729 sigEnv->setSrcPe(CkMyPe());
01730 CmiSetHandler(sigEnv, _roRdmaDoneHandlerIdx);
01731 CmiSyncSendAndFree(CmiNodeFirst(CmiMyNode()), sigEnv->getTotalsize(), (char *)sigEnv);
01732 #else
01733
01734 checkForInitDone(true);
01735 #endif
01736
01737 }
01738
01739
01740 CmiFree(ncpyOpInfo);
01741 }
01742 #endif
01743