00001
00002
00003
00004
00005
00006
00007 #include "ampiimpl.h"
00008
00009
00010
00011
00012
00013 #define WIN_SUCCESS 0
00014 #define WIN_ERROR (-1)
00015
00016 extern int AMPI_RDMA_THRESHOLD;
00017 extern int AMPI_SMP_RDMA_THRESHOLD;
00018
00019 win_obj::win_obj() noexcept {
00020 baseAddr = NULL;
00021 comm = MPI_COMM_NULL;
00022 initflag = false;
00023 }
00024
00025 win_obj::win_obj(const char *name, void *base, MPI_Aint size, int disp_unit,
00026 MPI_Comm comm) noexcept {
00027 create(name, base, size, disp_unit, comm);
00028 owner = -1;
00029 }
00030
00031 void win_obj::setName(const char *src) noexcept {
00032 CkDDT_SetName(winName, src);
00033 }
00034
00035 void win_obj::getName(char *name, int *len) noexcept {
00036 int length = *len = winName.size();
00037 memcpy(name, winName.data(), length);
00038 name[length] = '\0';
00039 }
00040
00041 win_obj::~win_obj() noexcept {
00042 free();
00043 }
00044
00045
00046
00047 void win_obj::pup(PUP::er &p) noexcept {
00048 #if 0
00049 p|winSize;
00050 p|disp_unit;
00051 p|comm;
00052 p|initflag;
00053
00054 p|winName;
00055 p|keyvals;
00056
00057 int size = 0;
00058 if(baseAddr) size = winSize;
00059 p|size;
00060 if(p.isUnpacking()) baseAddr = new char[size+1];
00061 p(baseAddr, size);
00062 #endif
00063 }
00064
00065 int win_obj::create(const char *name, void *base, MPI_Aint size, int disp_unit, MPI_Comm comm) noexcept {
00066 if (name) setName(name);
00067 baseAddr = base;
00068 winSize = size*disp_unit;
00069 this->disp_unit = disp_unit;
00070 this->comm = comm;
00071
00072 initflag = true;
00073 return WIN_SUCCESS;
00074 }
00075
00076 int win_obj::free() noexcept {
00077
00078 initflag = false;
00079 return WIN_SUCCESS;
00080 }
00081
00082
00083
00084
00085 int win_obj::put(void *orgaddr, int orgcnt, int orgunit, MPI_Aint targdisp,
00086 int targcnt, int targunit) noexcept {
00087 if(!initflag) {
00088 CkAbort("Put to non-existing MPI_Win\n");
00089 return WIN_ERROR;
00090 }
00091 int totalsize = targdisp+targcnt*targunit;
00092 if(totalsize > (winSize)){
00093 CkAbort("Put size exceeds MPI_Win size\n");
00094 return WIN_ERROR;
00095 }
00096
00097 return WIN_SUCCESS;
00098 }
00099
00100 int win_obj::get(void *orgaddr, int orgcnt, int orgunit, MPI_Aint targdisp,
00101 int targcnt, int targunit) noexcept {
00102 if(!initflag) {
00103 CkAbort("Get from non-existing MPI_Win\n");
00104 return WIN_ERROR;
00105 }
00106 int totalsize = targdisp+targcnt*targunit;
00107 if(totalsize > (winSize)){
00108 CkAbort("Get size exceeds MPI_Win size\n");
00109 return WIN_ERROR;
00110 }
00111
00112
00113 return WIN_SUCCESS;
00114 }
00115
00116 int win_obj::iget(int orgcnt, int orgunit, MPI_Aint targdisp,
00117 int targcnt, int targunit) noexcept {
00118 if(!initflag) {
00119 CkAbort("Get from non-existing MPI_Win\n");
00120 return WIN_ERROR;
00121 }
00122
00123 if((targdisp+targcnt*targunit) > (winSize)){
00124 CkAbort("Get size exceeds MPI_Win size\n");
00125 return WIN_ERROR;
00126 }
00127
00128
00129 return WIN_SUCCESS;
00130 }
00131
00132 int win_obj::accumulate(void *orgaddr, int count, MPI_Aint targdisp, MPI_Datatype targtype,
00133 MPI_Op op, ampiParent* pptr) noexcept
00134 {
00135
00136 CkAssert(pptr != NULL);
00137 pptr->applyOp(targtype, op, count, orgaddr, (void*)((char*)baseAddr+disp_unit*targdisp));
00138 return WIN_SUCCESS;
00139 }
00140
00141 int win_obj::fence() noexcept {
00142 return WIN_SUCCESS;
00143 }
00144
00145 int win_obj::lock(int requestRank, int lock_type) noexcept {
00146 owner = requestRank;
00147 return WIN_SUCCESS;
00148 }
00149
00150 int win_obj::unlock(int requestRank) noexcept {
00151 if (owner != requestRank){
00152 CkPrintf(" ERROR: Can't unlock a lock which you don't own.\n");
00153 return WIN_ERROR;
00154 }
00155 owner = -1;
00156
00157
00158 dequeue();
00159
00160 return WIN_SUCCESS;
00161 }
00162
00163 void win_obj::dequeue() noexcept {
00164 lockQueueEntry *lq = lockQueue.deq();
00165 delete lq;
00166 }
00167
00168 void win_obj::enqueue(int requestRank, int lock_type) noexcept {
00169 lockQueueEntry *lq = new lockQueueEntry(requestRank, lock_type);
00170 lockQueue.enq(lq);
00171 }
00172
00173 bool win_obj::emptyQueue() noexcept {
00174 return (lockQueue.length()==0);
00175 }
00176
00177 void win_obj::lockTopQueue() noexcept {
00178 lockQueueEntry *lq = lockQueue.deq();
00179 lock(lq->requestRank, lq->lock_type);
00180 lockQueue.insert(0, lq);
00181 }
00182
00183
00184 int win_obj::wait() noexcept {
00185 return -1;
00186 }
00187
00188 int win_obj::post() noexcept {
00189 return -1;
00190 }
00191
00192 int win_obj::start() noexcept {
00193 return -1;
00194 }
00195
00196 int win_obj::complete() noexcept {
00197 return -1;
00198 }
00199
00200 int ampiParent::addWinStruct(WinStruct* win) noexcept {
00201 winStructList.push_back(win);
00202 return winStructList.size()-1;
00203 }
00204
00205 WinStruct *ampiParent::getWinStruct(MPI_Win win) const noexcept {
00206 return winStructList[(int)win];
00207 }
00208
00209 void ampiParent::removeWinStruct(WinStruct *win) noexcept {}
00210
00211 int ampi::winPut(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00212 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, WinStruct *win) noexcept {
00213 CkDDT_DataType *ddt = getDDT()->getType(orgtype);
00214 int orgtotalsize = ddt->getSize(orgcnt);
00215 AMPI_DEBUG(" Rank[%d:%d] invoke Remote put at [%d]\n", thisIndex, myRank, rank);
00216
00217 if (ddt->isContig()) {
00218 ampi *destPtr = thisProxy[rank].ckLocal();
00219 if (destPtr != NULL) {
00220 destPtr->winRemotePut(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00221 targcnt, targtype, win->index);
00222 }
00223 #if AMPI_RDMA_IMPL
00224 else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00225 (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00226 {
00227 AmpiRequestList& reqs = getReqs();
00228 SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00229 MPI_Request req = reqs.insert(ampiReq);
00230 CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00231 completedSendCB.setRefnum(req);
00232 thisProxy[rank].winRemotePut(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt, orgtype,
00233 targdisp, targcnt, targtype, win->index);
00234 ampiReq->wait(MPI_STATUS_IGNORE);
00235 reqs.free(parent->reqPool, req, getDDT());
00236 }
00237 #endif
00238 else {
00239 thisProxy[rank].winRemotePut(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00240 targcnt, targtype, win->index);
00241 }
00242 }
00243 else {
00244 vector<char> sorgaddr(orgtotalsize);
00245 int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00246 ddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00247 thisProxy[rank].winRemotePut(orgtotalsize, sorgaddr.data(), orgcnt, orgtype, targdisp,
00248 targcnt, targtype, win->index);
00249 }
00250
00251 return MPI_SUCCESS;
00252 }
00253
00254 void ampi::winRemotePut(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00255 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, int winIndex) noexcept {
00256 win_obj *winobj = winObjects[winIndex];
00257 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00258 int targunit = tddt->getSize();
00259 int orgunit = getDDT()->getSize(orgtype);
00260
00261 winobj->put(sorgaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00262 char* targaddr = ((char*)(winobj->baseAddr)) + winobj->disp_unit*targdisp;
00263 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00264 tddt->serialize(targaddr, (char*)sorgaddr, targcnt, targsize, UNPACK);
00265 }
00266
00267 int ampi::winGet(void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00268 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00269 WinStruct *win) noexcept {
00270
00271 AMPI_DEBUG(" Rank[%d:%d] invoke Remote get at [%d]\n", thisIndex, myRank, rank);
00272 CkDDT_DataType *orgddt = getDDT()->getType(orgtype);
00273 CkDDT_DataType *targddt = getDDT()->getType(targtype);
00274 int orgtotalsize = orgddt->getSize(orgcnt);
00275 int targtotalsize = targddt->getSize(targcnt);
00276
00277
00278
00279 if (orgddt->isContig() || targddt->isContig()) {
00280 ampi *destPtr = thisProxy[rank].ckLocal();
00281 if (destPtr != NULL) {
00282 char* targdata = destPtr->winLocalGet(orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00283 if (orgddt->isContig()) {
00284 int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00285 orgddt->serialize((char*)orgaddr, targdata, orgcnt, orgsize, UNPACK);
00286 } else {
00287 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00288 targddt->serialize((char*)orgaddr, targdata, targcnt, targsize, PACK);
00289 }
00290 return MPI_SUCCESS;
00291 }
00292 }
00293
00294 AmpiMsg* msg = thisProxy[rank].winRemoteGet(orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00295
00296
00297 int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00298 orgddt->serialize((char*)orgaddr, msg->getData(), orgcnt, orgsize, UNPACK);
00299 AMPI_DEBUG(" Rank[%d] got win [%d] \n", thisIndex, *(int*)msg->getData());
00300 AMPI_DEBUG(" Rank[%d] got win [%d] , size %d\n", thisIndex, *(int*)orgaddr, orgcnt);
00301
00302 delete msg;
00303 return MPI_SUCCESS;
00304 }
00305
00306 char* ampi::winLocalGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp, int targcnt,
00307 MPI_Datatype targtype, int winIndex) noexcept {
00308 AMPI_DEBUG(" LocalGet invoked at Rank[%d:%d]\n", thisIndex, myRank);
00309
00310 win_obj *winobj = winObjects[winIndex];
00311 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00312 int targunit = tddt->getSize();
00313 int targtotalsize = winobj->disp_unit*targcnt;
00314 int orgunit = getDDT()->getSize(orgtype);
00315 char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00316
00317 winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00318
00319 AMPI_DEBUG(" Rank[%d] local get win [%d] \n", thisIndex, *(int*)(targaddr));
00320 return targaddr;
00321 }
00322
00323 AmpiMsg* ampi::winRemoteGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp, int targcnt,
00324 MPI_Datatype targtype, int winIndex) noexcept {
00325 AMPI_DEBUG(" RemoteGet invoked at Rank[%d:%d]\n", thisIndex, myRank);
00326
00327 win_obj *winobj = winObjects[winIndex];
00328 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00329 int targunit = tddt->getSize();
00330 int targtotalsize = winobj->disp_unit*targcnt;
00331 int orgunit = getDDT()->getSize(orgtype);
00332 char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00333
00334 winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00335
00336 AMPI_DEBUG(" Rank[%d] get win [%d] \n", thisIndex, *(int*)(targaddr));
00337 AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00338 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00339 tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00340 return msg;
00341 }
00342
00343 int ampi::winIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, int rank,
00344 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00345 WinStruct *win, MPI_Request *req) noexcept {
00346
00347 AMPI_DEBUG(" Rank[%d:%d] request Remote iget at [%d]\n", thisIndex, myRank, rank);
00348 *req = thisProxy[rank].winRemoteIget(orgdisp, orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00349 return MPI_SUCCESS;
00350 }
00351
00352 AmpiMsg* ampi::winRemoteIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype,
00353 MPI_Aint targdisp, int targcnt,
00354 MPI_Datatype targtype, int winIndex) noexcept {
00355 AMPI_DEBUG(" RemoteIget invoked at Rank[%d:%d]\n", thisIndex, myRank);
00356 win_obj *winobj = winObjects[winIndex];
00357 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00358 int targunit = tddt->getSize();
00359 int targtotalsize = winobj->disp_unit*targcnt;
00360 int orgunit = getDDT()->getSize(orgtype);
00361
00362 winobj->iget(orgcnt, orgunit, targdisp, targcnt, targunit);
00363
00364 AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00365
00366 char* targaddr = (char*)(winobj->baseAddr) + targdisp*winobj->disp_unit;
00367 AMPI_DEBUG(" Rank[%d] iget win [%d] \n", thisIndex, *(int*)(targaddr));
00368 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00369 tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00370 AMPI_DEBUG(" Rank[%d] copy win [%d] \n", thisIndex, *(int*)msg->getData());
00371 return msg;
00372 }
00373
00374 int ampi::winIgetWait(MPI_Request *request, MPI_Status *status) noexcept {
00375
00376 AMPI_DEBUG(" [%d] Iget Waiting\n", thisIndex, *request);
00377 status->msg = (AmpiMsg*)CkWaitReleaseFuture(*request);
00378 AMPI_DEBUG(" [%d] Iget Waiting [%d] awaken\n", thisIndex, *request);
00379 return MPI_SUCCESS;
00380 }
00381
00382 int ampi::winIgetFree(MPI_Request *request, MPI_Status *status) noexcept {
00383 AMPI_DEBUG(" [%d] : Iget [%d] frees buffer\n", thisIndex, *request);
00384
00385 void *data = NULL;
00386 AMPI_Iget_data(&data, *status);
00387 if(!data) {
00388 AMPI_DEBUG(" [%d] Iget [%d] attempt to free NULL buffer \n", thisIndex, *request);
00389 return ampiErrhandler("AMPI_Iget_free", MPI_ERR_BUFFER);
00390 }
00391 else {
00392 delete (status->msg);
00393 return MPI_SUCCESS;
00394 }
00395 }
00396
00397 int ampi::winAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00398 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00399 MPI_Op op, WinStruct *win) noexcept {
00400 CkDDT_DataType *ddt = getDDT()->getType(orgtype);
00401 int orgtotalsize = ddt->getSize(orgcnt);
00402 AMPI_DEBUG(" Rank[%d:%d] invoke Remote accumulate at [%d]\n", thisIndex, myRank, rank);
00403
00404 if (ddt->isContig()) {
00405 ampi *destPtr = thisProxy[rank].ckLocal();
00406 if (destPtr != NULL) {
00407 destPtr->winRemoteAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00408 targcnt, targtype, op, win->index);
00409 }
00410 #if AMPI_RDMA_IMPL
00411 else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00412 (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00413 {
00414 AmpiRequestList& reqs = getReqs();
00415 SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00416 MPI_Request req = reqs.insert(ampiReq);
00417 CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00418 completedSendCB.setRefnum(req);
00419 thisProxy[rank].winRemoteAccumulate(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt,
00420 orgtype, targdisp, targcnt, targtype, op, win->index);
00421 ampiReq->wait(MPI_STATUS_IGNORE);
00422 reqs.free(parent->reqPool, req, getDDT());
00423 }
00424 #endif
00425 else {
00426 thisProxy[rank].winRemoteAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype,
00427 targdisp, targcnt, targtype, op, win->index);
00428 }
00429 }
00430 else {
00431 vector<char> sorgaddr(orgtotalsize);
00432 int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00433 ddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00434 thisProxy[rank].winRemoteAccumulate(orgtotalsize, sorgaddr.data(), orgcnt, orgtype,
00435 targdisp, targcnt, targtype, op, win->index);
00436 }
00437
00438 return MPI_SUCCESS;
00439 }
00440
00441 void ampi::winRemoteAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt,
00442 MPI_Datatype orgtype, MPI_Aint targdisp,
00443 int targcnt, MPI_Datatype targtype, MPI_Op op,
00444 int winIndex) noexcept {
00445 win_obj *winobj = winObjects[winIndex];
00446 CkDDT_DataType *ddt = getDDT()->getType(targtype);
00447 if (ddt->isContig()) {
00448 winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00449 }
00450 else {
00451 vector<char> getdata(orgtotalsize);
00452 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00453 ddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00454 winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00455 }
00456 }
00457
00458 int ampi::winGetAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00459 void *resaddr, int rescnt, MPI_Datatype restype, int rank,
00460 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00461 MPI_Op op, WinStruct *win) noexcept {
00462 CkDDT_DataType *orgddt = getDDT()->getType(orgtype);
00463 CkDDT_DataType *resddt = getDDT()->getType(restype);
00464 int orgtotalsize = orgddt->getSize(orgcnt);
00465 AMPI_DEBUG(" Rank[%d:%d] invoke Remote get at [%d]\n", thisIndex, myRank, rank);
00466
00467 AmpiMsg *msg;
00468 if (orgddt->isContig()) {
00469 ampi *destPtr = thisProxy[rank].ckLocal();
00470 if (destPtr != NULL) {
00471 destPtr->winLocalGetAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00472 targcnt, targtype, op, (char*)resaddr, win->index);
00473 return MPI_SUCCESS;
00474 }
00475 #if AMPI_RDMA_IMPL
00476 else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00477 (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00478 {
00479 AmpiRequestList& reqs = getReqs();
00480 SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00481 MPI_Request req = reqs.insert(ampiReq);
00482 CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00483 completedSendCB.setRefnum(req);
00484 msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt,
00485 orgtype, targdisp, targcnt, targtype, op, win->index);
00486 ampiReq->wait(MPI_STATUS_IGNORE);
00487 reqs.free(parent->reqPool, req, getDDT());
00488 }
00489 #endif
00490 else {
00491 msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, CkSendBuffer(orgaddr), orgcnt, orgtype, targdisp,
00492 targcnt, targtype, op, win->index);
00493 }
00494 }
00495 else {
00496 vector<char> sorgaddr(orgtotalsize);
00497 int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00498 orgddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00499 msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, sorgaddr.data(), orgcnt, orgtype, targdisp,
00500 targcnt, targtype, op, win->index);
00501 }
00502
00503 int ressize = getDDT()->getType(restype)->getSize(rescnt);
00504 resddt->serialize((char*)resaddr, msg->getData(), rescnt, ressize, UNPACK);
00505 delete msg;
00506
00507 return MPI_SUCCESS;
00508 }
00509
00510 void ampi::winLocalGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00511 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
00512 char *resaddr, int winIndex) noexcept {
00513 win_obj *winobj = winObjects[winIndex];
00514 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00515 int targunit = tddt->getSize();
00516 int targtotalsize = winobj->disp_unit*targcnt;
00517 int orgunit = getDDT()->getSize(orgtype);
00518 char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00519
00520
00521 winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00522 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00523 tddt->serialize(targaddr, resaddr, targcnt, targsize, PACK);
00524
00525
00526 if (tddt->isContig()) {
00527 winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00528 }
00529 else {
00530 vector<char> getdata(orgtotalsize);
00531 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00532 tddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00533 winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00534 }
00535 }
00536
00537 AmpiMsg* ampi::winRemoteGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00538 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
00539 int winIndex) noexcept {
00540 win_obj *winobj = winObjects[winIndex];
00541 CkDDT_DataType *tddt = getDDT()->getType(targtype);
00542 int targunit = tddt->getSize();
00543 int targtotalsize = winobj->disp_unit*targcnt;
00544 int orgunit = getDDT()->getSize(orgtype);
00545 char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00546
00547
00548 winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00549 AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00550 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00551 tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00552
00553
00554 if (tddt->isContig()) {
00555 winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00556 }
00557 else {
00558 vector<char> getdata(orgtotalsize);
00559 int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00560 tddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00561 winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00562 }
00563
00564 return msg;
00565 }
00566
00567 int ampi::winCompareAndSwap(const void *orgaddr, const void *compaddr, void *resaddr, MPI_Datatype type,
00568 int rank, MPI_Aint targdisp, WinStruct *win) noexcept {
00569 CkDDT_DataType *ddt = getDDT()->getType(type);
00570
00571 if (ddt->isContig()) {
00572 ampi *destPtr = thisProxy[rank].ckLocal();
00573 if (destPtr != NULL) {
00574 char* targaddr = destPtr->winLocalCompareAndSwap(ddt->getSize(), (char*)orgaddr,
00575 (char*)compaddr, type, targdisp, win->index);
00576 int targsize = getDDT()->getType(type)->getSize(1);
00577 ddt->serialize((char*)resaddr, targaddr, 1, targsize, PACK);
00578 return MPI_SUCCESS;
00579 }
00580 }
00581
00582 AmpiMsg* msg = thisProxy[rank].winRemoteCompareAndSwap(getDDT()->getType(type)->getSize(1), (char*)orgaddr,
00583 (char*)compaddr, type, targdisp, win->index);
00584 int ressize = getDDT()->getType(type)->getSize(1);
00585 ddt->serialize((char*)resaddr, msg->getData(), 1, ressize, PACK);
00586
00587 delete msg;
00588 return MPI_SUCCESS;
00589 }
00590
00591 char* ampi::winLocalCompareAndSwap(int size, char* sorgaddr, char* compaddr, MPI_Datatype type,
00592 MPI_Aint targdisp, int winIndex) noexcept {
00593 win_obj *winobj = winObjects[winIndex];
00594 winobj->put(sorgaddr, 1, size, targdisp, 1, size);
00595
00596 CkDDT_DataType *ddt = getDDT()->getType(type);
00597 char* targaddr = ((char*)(winobj->baseAddr)) + ddt->getSize(targdisp);
00598
00599 if (*targaddr == *compaddr) {
00600 int size = ddt->getSize(1);
00601 ddt->serialize(targaddr, (char*)sorgaddr, 1, size, UNPACK);
00602 }
00603
00604 return targaddr;
00605 }
00606
00607 AmpiMsg* ampi::winRemoteCompareAndSwap(int size, char* sorgaddr, char* compaddr, MPI_Datatype type,
00608 MPI_Aint targdisp, int winIndex) noexcept {
00609 win_obj *winobj = winObjects[winIndex];
00610 winobj->put(sorgaddr, 1, size, targdisp, 1, size);
00611
00612 CkDDT_DataType *ddt = getDDT()->getType(type);
00613 char* targaddr = ((char*)(winobj->baseAddr)) + ddt->getSize(targdisp);
00614
00615 AmpiMsg *msg = new (size, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, size);
00616 ddt->serialize(targaddr, msg->getData(), 1, msg->getLength(), PACK);
00617
00618 if (*targaddr == *compaddr) {
00619 ddt->serialize(targaddr, (char*)sorgaddr, 1, ddt->getSize(1), UNPACK);
00620 }
00621
00622 return msg;
00623 }
00624
00625 int ampi::winLock(int lock_type, int rank, WinStruct *win) noexcept {
00626 AMPI_DEBUG(" [%d] Lock: invoke Remote lock at [%d]\n", thisIndex, rank);
00627 thisProxy[rank].winRemoteLock(lock_type, win->index, thisIndex);
00628 return MPI_SUCCESS;
00629 }
00630
00631 void ampi::winRemoteLock(int lock_type, int winIndex, int requestRank) noexcept {
00632 AMPI_DEBUG(" [%d] RemoteLock: invoked \n", thisIndex);
00633 win_obj *winobj = winObjects[winIndex];
00634
00635
00636 if(winobj->owner > -1 && !(winobj->emptyQueue())) {
00637
00638 winobj->enqueue(requestRank, lock_type);
00639 AMPI_DEBUG(" [%d] RemoteLock: queue lock from [%d] \n", thisIndex, requestRank);
00640 }
00641
00642 else {
00643 winobj->lock(requestRank, lock_type);
00644 winobj->enqueue(requestRank, lock_type);
00645 AMPI_DEBUG(" [%d] RemoteLock: give lock to [%d] \n", thisIndex, requestRank);
00646 }
00647 }
00648
00649 int ampi::winUnlock(int rank, WinStruct *win) noexcept {
00650 AMPI_DEBUG(" [%d] Unlock: invoke Remote lock at [%d]\n", thisIndex, rank);
00651 thisProxy[rank].winRemoteUnlock(win->index, thisIndex);
00652 return MPI_SUCCESS;
00653 }
00654
00655 void ampi::winRemoteUnlock(int winIndex, int requestRank) noexcept {
00656 AMPI_DEBUG(" [%d] RemoteUnlock: invoked \n", thisIndex);
00657 win_obj *winobj = winObjects[winIndex];
00658 winobj->unlock(requestRank);
00659 AMPI_DEBUG(" [%d] RemoteUnlock: [%d] release lock\n", thisIndex, requestRank);
00660
00661
00662 if(!(winobj->emptyQueue())) {
00663 AMPI_DEBUG(" [%d] RemoteUnlock: queue non-empty, give lock to \n", thisIndex);
00664 winobj->lockTopQueue();
00665 }
00666 }
00667
00668 MPI_Win ampi::createWinInstance(void *base, MPI_Aint size, int disp_unit, MPI_Info info) noexcept {
00669 AMPI_DEBUG(" Creating win obj {%d, %p}\n ", myComm.getComm(), base);
00670 win_obj *newobj = new win_obj((char*)(NULL), base, size, disp_unit, myComm.getComm());
00671 winObjects.push_back(newobj);
00672 WinStruct *newwin = new WinStruct(myComm.getComm(),winObjects.size()-1);
00673 AMPI_DEBUG(" Creating MPI_WIN at (%p) with {%d, %d}\n", &newwin, myComm.getComm(), winObjects.size()-1);
00674 return (parent->addWinStruct(newwin));
00675 }
00676
00677 int ampi::deleteWinInstance(MPI_Win win) noexcept {
00678 WinStruct *winStruct = parent->getWinStruct(win);
00679 win_obj *winobj = winObjects[winStruct->index];
00680 parent->removeWinStruct(winStruct);
00681 winobj->free();
00682 return MPI_SUCCESS;
00683 }
00684
00685 int ampi::winGetGroup(WinStruct *win, MPI_Group *group) const noexcept {
00686 *group = parent->comm2group(win->comm);
00687 return MPI_SUCCESS;
00688 }
00689
00690 void ampi::winSetName(WinStruct *win, const char *name) noexcept {
00691 win_obj *winobj = winObjects[win->index];
00692 winobj->setName(name);
00693 }
00694
00695 void ampi::winGetName(WinStruct *win, char *name, int *length) const noexcept {
00696 win_obj *winobj = winObjects[win->index];
00697 winobj->getName(name, length);
00698 }
00699
00700 win_obj* ampi::getWinObjInstance(WinStruct *win) const noexcept {
00701 return winObjects[win->index];
00702 }
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724 AMPI_API_IMPL(int, MPI_Win_create, void *base, MPI_Aint size, int disp_unit,
00725 MPI_Info info, MPI_Comm comm, MPI_Win *newwin)
00726 {
00727 AMPI_API("AMPI_Win_create");
00728 ampiParent *parent = getAmpiParent();
00729 ampi *ptr = getAmpiInstance(comm);
00730 *newwin = ptr->createWinInstance(base, size, disp_unit, info);
00731
00732 WinStruct *winStruct = parent->getWinStruct(*newwin);
00733 vector<int>& keyvals = ptr->getWinObjInstance(winStruct)->getKeyvals();
00734 parent->setAttr(*newwin, keyvals, MPI_WIN_BASE, &base);
00735 parent->setAttr(*newwin, keyvals, MPI_WIN_SIZE, &size);
00736 parent->setAttr(*newwin, keyvals, MPI_WIN_DISP_UNIT, &disp_unit);
00737 ptr->barrier();
00738 return MPI_SUCCESS;
00739 }
00740
00741
00742
00743
00744
00745
00746
00747 AMPI_API_IMPL(int, MPI_Win_free, MPI_Win *win)
00748 {
00749 AMPI_API("AMPI_Win_free");
00750 if(win==NULL) { return ampiErrhandler("AMPI_Win_free", MPI_ERR_WIN); }
00751
00752 WinStruct *winStruct = getAmpiParent()->getWinStruct(*win);
00753 ampi *ptr = getAmpiInstance(winStruct->comm);
00754 ptr->deleteWinInstance(*win);
00755
00756 ptr->barrier();
00757 *win = MPI_WIN_NULL;
00758 return MPI_SUCCESS;
00759 }
00760
00761
00762
00763
00764
00765
00766 AMPI_API_IMPL(int, MPI_Put, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00767 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win)
00768 {
00769 AMPI_API("AMPI_Put");
00770 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00771 handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00772 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00773 ampi *ptr = getAmpiInstance(winStruct->comm);
00774 return ptr->winPut(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00775 }
00776
00777
00778
00779
00780
00781
00782 AMPI_API_IMPL(int, MPI_Get, void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00783 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00784 MPI_Win win)
00785 {
00786 AMPI_API("AMPI_Get");
00787 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00788 handle_MPI_BOTTOM(orgaddr, orgtype);
00789 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00790 ampi *ptr = getAmpiInstance(winStruct->comm);
00791
00792 return ptr->winGet(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00793 }
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805 AMPI_API_IMPL(int, MPI_Accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00806 int rank, MPI_Aint targdisp, int targcnt,
00807 MPI_Datatype targtype, MPI_Op op, MPI_Win win)
00808 {
00809 AMPI_API("AMPI_Accumulate");
00810 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00811 handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00812 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00813 ampi *ptr = getAmpiInstance(winStruct->comm);
00814 return ptr->winAccumulate(orgaddr, orgcnt, orgtype, rank,
00815 targdisp, targcnt, targtype, op, winStruct);
00816 }
00817
00818
00819
00820
00821
00822
00823
00824
00825 AMPI_API_IMPL(int, MPI_Get_accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00826 void *resaddr, int rescnt, MPI_Datatype restype,
00827 int rank, MPI_Aint targdisp, int targcnt,
00828 MPI_Datatype targtype, MPI_Op op, MPI_Win win)
00829 {
00830 AMPI_API("AMPI_Get_accumulate");
00831 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00832 handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00833 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00834 ampi *ptr = getAmpiInstance(winStruct->comm);
00835 return ptr->winGetAccumulate(orgaddr, orgcnt, orgtype, resaddr, rescnt, restype,
00836 rank, targdisp, targcnt, targtype, op, winStruct);
00837 }
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847 AMPI_API_IMPL(int, MPI_Rput, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00848 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win,
00849 MPI_Request *request)
00850 {
00851 AMPI_API("AMPI_Rput");
00852 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00853 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00854 ampi *ptr = getAmpiInstance(winStruct->comm);
00855 *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00856 return ptr->winPut(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00857 }
00858
00859
00860
00861
00862
00863
00864
00865
00866 AMPI_API_IMPL(int, MPI_Rget, void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00867 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00868 MPI_Win win, MPI_Request *request)
00869 {
00870 AMPI_API("AMPI_Rget");
00871 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00872 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00873 ampi *ptr = getAmpiInstance(winStruct->comm);
00874 *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00875 return ptr->winGet(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00876 }
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886 AMPI_API_IMPL(int, MPI_Raccumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00887 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00888 MPI_Op op, MPI_Win win, MPI_Request *request)
00889 {
00890 AMPI_API("AMPI_Raccumulate");
00891 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00892 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00893 ampi *ptr = getAmpiInstance(winStruct->comm);
00894 *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00895 return ptr->winAccumulate(orgaddr, orgcnt, orgtype, rank,
00896 targdisp, targcnt, targtype, op, winStruct);
00897 }
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907 AMPI_API_IMPL(int, MPI_Rget_accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00908 void *resaddr, int rescnt, MPI_Datatype restype,
00909 int rank, MPI_Aint targdisp, int targcnt,
00910 MPI_Datatype targtype, MPI_Op op, MPI_Win win,
00911 MPI_Request *request)
00912 {
00913 AMPI_API("AMPI_Rget_accumulate");
00914 if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00915 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00916 ampi *ptr = getAmpiInstance(winStruct->comm);
00917 *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00918 return ptr->winGetAccumulate(orgaddr, orgcnt, orgtype, resaddr, rescnt, restype,
00919 rank, targdisp, targcnt, targtype, op, winStruct);
00920 }
00921
00922
00923
00924
00925
00926
00927 AMPI_API_IMPL(int, MPI_Fetch_and_op, const void *orgaddr, void *resaddr, MPI_Datatype type,
00928 int rank, MPI_Aint targdisp, MPI_Op op, MPI_Win win)
00929 {
00930 AMPI_API("AMPI_Fetch_and_op");
00931 #if AMPI_ERROR_CHECKING
00932 if (type > AMPI_MAX_PREDEFINED_TYPE)
00933 {
00934 return ampiErrhandler("AMPI_Fetch_and_op", MPI_ERR_UNSUPPORTED_OPERATION);
00935 }
00936 #endif
00937 handle_MPI_BOTTOM((void*&)orgaddr, type);
00938 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00939 ampi *ptr = getAmpiInstance(winStruct->comm);
00940
00941 return ptr->winGetAccumulate(orgaddr, 1, type, resaddr, 1, type,
00942 rank, targdisp, 1, type, op, winStruct);
00943 }
00944
00945
00946
00947
00948
00949
00950 AMPI_API_IMPL(int, MPI_Compare_and_swap, const void *orgaddr, const void *compaddr, void *resaddr,
00951 MPI_Datatype type, int rank, MPI_Aint targdisp, MPI_Win win)
00952 {
00953 AMPI_API("AMPI_Compare_and_swap");
00954 #if AMPI_ERROR_CHECKING
00955 if (type > AMPI_MAX_PREDEFINED_TYPE)
00956 {
00957 return ampiErrhandler("AMPI_Compare_and_swap", MPI_ERR_UNSUPPORTED_OPERATION);
00958 }
00959 #endif
00960 handle_MPI_BOTTOM((void*&)orgaddr, type);
00961 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00962 ampi *ptr = getAmpiInstance(winStruct->comm);
00963 return ptr->winCompareAndSwap(orgaddr, compaddr, resaddr, type, rank, targdisp, winStruct);
00964 }
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975 AMPI_API_IMPL(int, MPI_Win_fence, int assertion, MPI_Win win)
00976 {
00977 AMPI_API("AMPI_Win_fence");
00978 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00979 MPI_Comm comm = winStruct->comm;
00980 ampi *ptr = getAmpiInstance(comm);
00981
00982
00983 ptr->barrier();
00984
00985
00986
00987 return MPI_SUCCESS;
00988 }
00989
00990
00991
00992
00993
00994
00995
00996
00997
00998
00999 AMPI_API_IMPL(int, MPI_Win_lock, int lock_type, int rank, int assertion, MPI_Win win)
01000 {
01001 AMPI_API("AMPI_Win_lock");
01002 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01003 ampi *ptr = getAmpiInstance(winStruct->comm);
01004
01005
01006
01007 ptr->winLock(lock_type, rank, winStruct);
01008 return MPI_SUCCESS;
01009 }
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020 AMPI_API_IMPL(int, MPI_Win_unlock, int rank, MPI_Win win)
01021 {
01022 AMPI_API("AMPI_Win_unlock");
01023 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01024 ampi *ptr = getAmpiInstance(winStruct->comm);
01025
01026
01027
01028 ptr->winUnlock(rank, winStruct);
01029 return MPI_SUCCESS;
01030 }
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044 AMPI_API_IMPL(int, MPI_Win_post, MPI_Group group, int assertion, MPI_Win win)
01045 {
01046 AMPI_API("AMPI_Win_post");
01047
01048 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01049 if (winStruct->isInEpoch()) {
01050 return ampiErrhandler("AMPI_Win_post", MPI_ERR_RMA_SYNC);
01051 } else {
01052 winStruct->setInEpoch(true);
01053 }
01054
01055 int parentGroupSize;
01056 MPI_Group parentGroup;
01057 MPI_Comm_group(winStruct->comm, &parentGroup);
01058 MPI_Group_size(parentGroup, &parentGroupSize);
01059
01060 std::vector<int> parentGroupRanks(parentGroupSize);
01061 std::vector<int> subsetGroupRanks(parentGroupSize);
01062 for (int i=0; i<parentGroupSize; i++) {
01063 parentGroupRanks[i] = i;
01064 }
01065 MPI_Group_translate_ranks(parentGroup, parentGroupSize, parentGroupRanks.data(), group, subsetGroupRanks.data());
01066
01067 ampi *ptr = getAmpiInstance(winStruct->comm);
01068 int actualRanks = 0;
01069 for (int i=0; i<subsetGroupRanks.size(); i++) {
01070 if (subsetGroupRanks[i] != MPI_UNDEFINED) {
01071 subsetGroupRanks[actualRanks++] = i;
01072 ptr->send(MPI_EPOCH_START_TAG, ptr->getRank(), NULL, 0, MPI_INT, subsetGroupRanks[actualRanks-1], winStruct->comm);
01073 }
01074 }
01075
01076 for (int i=actualRanks; i<subsetGroupRanks.size(); i++) {
01077 subsetGroupRanks.pop_back();
01078 }
01079 winStruct->setExposureRankList(subsetGroupRanks);
01080 std::vector<MPI_Request> &reqList = winStruct->getRequestList();
01081 reqList.resize(subsetGroupRanks.size());
01082
01083 return MPI_SUCCESS;
01084 }
01085
01086 AMPI_API_IMPL(int, MPI_Win_wait, MPI_Win win)
01087 {
01088 AMPI_API("AMPI_Win_wait");
01089
01090 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01091 if (!winStruct->isInEpoch()) {
01092 return ampiErrhandler("AMPI_Win_wait", MPI_ERR_RMA_SYNC);
01093 }
01094
01095 ampi* ptr = getAmpiInstance(winStruct->comm);
01096 const std::vector<int> &exposureRankList = winStruct->getExposureRankList();
01097 std::vector<MPI_Request> &requestList = winStruct->getRequestList();
01098
01099
01100 if (!winStruct->AreRecvsPosted()) {
01101 for (int i=0; i<exposureRankList.size(); i++) {
01102 ptr->irecv(NULL, 0, MPI_INT, exposureRankList[i], MPI_EPOCH_END_TAG, winStruct->comm, &requestList[i]);
01103 }
01104 }
01105
01106 MPI_Waitall(requestList.size(), requestList.data(), MPI_STATUSES_IGNORE);
01107 winStruct->clearEpochExposure();
01108
01109 return MPI_SUCCESS;
01110 }
01111
01112 AMPI_API_IMPL(int, MPI_Win_start, MPI_Group group, int assertion, MPI_Win win)
01113 {
01114 AMPI_API("AMPI_Win_start");
01115
01116 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01117 if (winStruct->isInEpoch()) {
01118 return ampiErrhandler("AMPI_Win_start", MPI_ERR_RMA_SYNC);
01119 } else {
01120 winStruct->setInEpoch(true);
01121 }
01122
01123 int parentGroupSize;
01124 MPI_Group parentGroup;
01125 MPI_Comm_group(winStruct->comm, &parentGroup);
01126 MPI_Group_size(parentGroup, &parentGroupSize);
01127
01128 std::vector<int> subsetGroupRanks(parentGroupSize);
01129 std::vector<int> parentGroupRanks(parentGroupSize);
01130 for (int i=0; i<parentGroupSize; i++) {
01131 parentGroupRanks[i] = i;
01132 }
01133
01134 MPI_Group_translate_ranks(parentGroup, parentGroupSize, parentGroupRanks.data(), group, subsetGroupRanks.data());
01135
01136 ampi *ptr = getAmpiInstance(winStruct->comm);
01137 int actualRanks = 0;
01138 for (int i=0; i<subsetGroupRanks.size(); i++) {
01139 if (subsetGroupRanks[i] != MPI_UNDEFINED) {
01140 subsetGroupRanks[actualRanks++] = i;
01141 ptr->recv(MPI_EPOCH_START_TAG, subsetGroupRanks[actualRanks-1], NULL, 0, MPI_INT, winStruct->comm, MPI_STATUS_IGNORE);
01142 }
01143 }
01144
01145 for (int i=actualRanks; i<subsetGroupRanks.size(); i++) {
01146 subsetGroupRanks.pop_back();
01147 }
01148 winStruct->setAccessRankList(subsetGroupRanks);
01149
01150 return MPI_SUCCESS;
01151 }
01152
01153 AMPI_API_IMPL(int, MPI_Win_complete, MPI_Win win)
01154 {
01155 AMPI_API("AMPI_Win_complete");
01156
01157 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01158 if (!winStruct->isInEpoch()) {
01159 return ampiErrhandler("AMPI_Win_complete", MPI_ERR_RMA_SYNC);
01160 } else {
01161 winStruct->setInEpoch(true);
01162 }
01163
01164 std::vector<int> &accessGroupRanks = winStruct->getAccessRankList();
01165 ampi *ptr = getAmpiInstance(winStruct->comm);
01166
01167 for (int i=0; i<accessGroupRanks.size(); i++) {
01168 ptr->send(MPI_EPOCH_END_TAG, ptr->getRank(), NULL, 0, MPI_INT, accessGroupRanks[i], winStruct->comm);
01169 }
01170 winStruct->clearEpochAccess();
01171
01172 return MPI_SUCCESS;
01173 }
01174
01175 AMPI_API_IMPL(int, MPI_Win_test, MPI_Win win, int *flag)
01176 {
01177 AMPI_API("AMPI_Win_test");
01178
01179 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01180 if (!winStruct->isInEpoch()) {
01181 return ampiErrhandler("AMPI_Win_test", MPI_ERR_RMA_SYNC);
01182 }
01183
01184 std::vector<MPI_Request> &reqList = winStruct->getRequestList();
01185 const std::vector<int> &exposureRankList = winStruct->getExposureRankList();
01186 ampi* ptr = getAmpiInstance(winStruct->comm);
01187
01188
01189 if (!winStruct->AreRecvsPosted()) {
01190 for (int i=0; i<reqList.size(); i++) {
01191 ptr->irecv(NULL, 0, MPI_INT, exposureRankList[i], MPI_EPOCH_END_TAG, winStruct->comm, reqList.data());
01192 }
01193 winStruct->setAreRecvsPosted(true);
01194 }
01195
01196 MPI_Testall(reqList.size(), reqList.data(), flag, MPI_STATUSES_IGNORE);
01197 if (*flag) {
01198 winStruct->clearEpochExposure();
01199 }
01200
01201 return MPI_SUCCESS;
01202 }
01203
01204
01205 CLINKAGE
01206 int AMPI_Iget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, int rank,
01207 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win,
01208 MPI_Request *request) {
01209 AMPI_API("AMPI_Iget");
01210 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01211 ampi *ptr = getAmpiInstance(winStruct->comm);
01212
01213 return ptr->winIget(orgdisp, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct,
01214 request);
01215 }
01216
01217 CLINKAGE
01218 int AMPI_Iget_wait(MPI_Request *request, MPI_Status *status, MPI_Win win) {
01219 AMPI_API("AMPI_Iget_wait");
01220 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01221 ampi *ptr = getAmpiInstance(winStruct->comm);
01222
01223 return ptr->winIgetWait(request,status);
01224 }
01225
01226 CLINKAGE
01227 int AMPI_Iget_free(MPI_Request *request, MPI_Status *status, MPI_Win win) {
01228 AMPI_API("AMPI_Iget_free");
01229 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01230 ampi *ptr = getAmpiInstance(winStruct->comm);
01231
01232 return ptr->winIgetFree(request, status);
01233 }
01234
01235 CLINKAGE
01236 int AMPI_Iget_data(void *data, MPI_Status status) {
01237 *((char**)data) = ((AmpiMsg*)status.msg)->data;
01238 return MPI_SUCCESS;
01239 }
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253 AMPI_API_IMPL(int, MPI_Alloc_mem, MPI_Aint size, MPI_Info info, void *baseptr)
01254 {
01255
01256 *(void **)baseptr = malloc(size);
01257 return MPI_SUCCESS;
01258 }
01259
01260
01261
01262
01263
01264 AMPI_API_IMPL(int, MPI_Free_mem, void *baseptr)
01265 {
01266
01267 free(baseptr);
01268 return MPI_SUCCESS;
01269 }
01270
01271 AMPI_API_IMPL(int, MPI_Win_get_group, MPI_Win win, MPI_Group *group)
01272 {
01273 AMPI_API("AMPI_Win_get_group");
01274 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01275 ampi *ptr = getAmpiInstance(winStruct->comm);
01276 ptr->winGetGroup(winStruct, group);
01277 return MPI_SUCCESS;
01278 }
01279
01280 AMPI_API_IMPL(int, MPI_Win_delete_attr, MPI_Win win, int key)
01281 {
01282 AMPI_API("AMPI_Win_delete_attr");
01283 ampiParent *parent = getAmpiParent();
01284 WinStruct *winStruct = parent->getWinStruct(win);
01285 vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01286 return parent->deleteAttr(win, keyvals, key);
01287 }
01288
01289 AMPI_API_IMPL(int, MPI_Win_get_attr, MPI_Win win, int key, void* value, int* flag)
01290 {
01291 AMPI_API("AMPI_Win_get_attr");
01292 ampiParent *parent = getAmpiParent();
01293 WinStruct *winStruct = parent->getWinStruct(win);
01294 vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01295 return parent->getAttr(win, keyvals, key, value, flag);
01296 }
01297
01298 AMPI_API_IMPL(int, MPI_Win_set_attr, MPI_Win win, int key, void* value)
01299 {
01300 AMPI_API("AMPI_Win_set_attr");
01301 ampiParent *parent = getAmpiParent();
01302 WinStruct *winStruct = parent->getWinStruct(win);
01303 vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01304 return parent->setAttr(win, keyvals, key, value);
01305 }
01306
01307 AMPI_API_IMPL(int, MPI_Win_set_name, MPI_Win win, const char *name)
01308 {
01309 AMPI_API("AMPI_Win_set_name");
01310 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01311 ampi *ptr = getAmpiInstance(winStruct->comm);
01312 ptr->winSetName(winStruct, name);
01313 return MPI_SUCCESS;
01314 }
01315
01316 AMPI_API_IMPL(int, MPI_Win_set_info, MPI_Win win, MPI_Info info)
01317 {
01318 AMPI_API("AMPI_Win_set_info");
01319
01320 return MPI_SUCCESS;
01321 }
01322
01323 AMPI_API_IMPL(int, MPI_Win_get_info, MPI_Win win, MPI_Info *info)
01324 {
01325 AMPI_API("AMPI_Win_get_info");
01326
01327 *info = MPI_INFO_NULL;
01328 return MPI_SUCCESS;
01329 }
01330
01331 AMPI_API_IMPL(int, MPI_Win_create_errhandler, MPI_Win_errhandler_function *win_errhandler_fn,
01332 MPI_Errhandler *errhandler)
01333 {
01334 AMPI_API("AMPI_Win_create_errhandler");
01335 return MPI_SUCCESS;
01336 }
01337
01338 AMPI_API_IMPL(int, MPI_Win_call_errhandler, MPI_Win win, int errorcode)
01339 {
01340 AMPI_API("AMPI_Win_call_errhandler");
01341 CkPrintf("WARNING: AMPI does not support MPI_Win_call_errhandler (errorcode = %d)\n", errorcode);
01342 return MPI_SUCCESS;
01343 }
01344
01345 AMPI_API_IMPL(int, MPI_Win_get_errhandler, MPI_Win win, MPI_Errhandler *errhandler)
01346 {
01347 AMPI_API("AMPI_Win_get_errhandler");
01348 return MPI_SUCCESS;
01349 }
01350
01351 AMPI_API_IMPL(int, MPI_Win_set_errhandler, MPI_Win win, MPI_Errhandler errhandler)
01352 {
01353 AMPI_API("AMPI_Win_set_errhandler");
01354 return MPI_SUCCESS;
01355 }
01356
01357 int MPI_win_null_copy_fn(MPI_Win win, int keyval, void *extra_state,
01358 void *attr_in, void *attr_out, int *flag){
01359 (*flag) = 0;
01360 return MPI_SUCCESS;
01361 }
01362
01363 int MPI_win_dup_fn(MPI_Win win, int keyval, void *extra_state,
01364 void *attr_in, void *attr_out, int *flag){
01365 (*(void **)attr_out) = attr_in;
01366 (*flag) = 1;
01367 return MPI_SUCCESS;
01368 }
01369
01370 int MPI_win_null_delete_fn(MPI_Win win, int keyval, void *attr, void *extra_state){
01371 return MPI_SUCCESS;
01372 }
01373
01374 AMPI_API_IMPL(int, MPI_Win_create_keyval, MPI_Win_copy_attr_function *copy_fn,
01375 MPI_Win_delete_attr_function *delete_fn,
01376 int *keyval, void *extra_state)
01377 {
01378 AMPI_API("AMPI_Win_create_keyval");
01379 return getAmpiParent()->createKeyval(copy_fn,delete_fn,keyval,extra_state);
01380 }
01381
01382 AMPI_API_IMPL(int, MPI_Win_free_keyval, int *keyval)
01383 {
01384 AMPI_API("AMPI_Win_free_keyval");
01385 ampiParent *parent = getAmpiParent();
01386 ampiCommStruct& cs = *(ampiCommStruct*)&getAmpiInstance(MPI_COMM_WORLD)->comm2CommStruct(MPI_COMM_WORLD);
01387 return getAmpiParent()->freeUserKeyval(MPI_COMM_WORLD, cs.getKeyvals(), keyval);
01388 }
01389
01390 AMPI_API_IMPL(int, MPI_Win_get_name, MPI_Win win, char *name, int *length)
01391 {
01392 AMPI_API("AMPI_Win_get_name");
01393 WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01394 ampi *ptr = getAmpiInstance(winStruct->comm);
01395 ptr->winGetName(winStruct, name, length);
01396 return MPI_SUCCESS;
01397 }