00001
00002 #define AMPIMSGLOG 0
00003
00004 #define exit exit
00005 #include "ampiimpl.h"
00006 #include "tcharm.h"
00007 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
00008 #include "ampiEvents.h"
00009 #include "ampiProjections.h"
00010 #endif
00011
00012 #if CMK_BIGSIM_CHARM
00013 #include "bigsim_logs.h"
00014 #endif
00015
00016 #define CART_TOPOL 1
00017 #define AMPI_PRINT_IDLE 0
00018
00019
00020 #define MSG_ORDER_DEBUG(x) //x
00021
00022 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl;
00023 #define STARTUP_DEBUG(x) //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl;
00024 #define FUNCCALL_DEBUG(x) //x
00025
00026 static CkDDT *getDDT(void) {
00027 return getAmpiParent()->myDDT;
00028 }
00029
00030 inline int checkCommunicator(MPI_Comm comm) {
00031 if(comm == MPI_COMM_NULL)
00032 return MPI_ERR_COMM;
00033 return MPI_SUCCESS;
00034 }
00035
00036 inline int checkCount(int count) {
00037 if(count < 0)
00038 return MPI_ERR_COUNT;
00039 return MPI_SUCCESS;
00040 }
00041
00042 inline int checkData(MPI_Datatype data) {
00043 if(data == MPI_DATATYPE_NULL)
00044 return MPI_ERR_TYPE;
00045 return MPI_SUCCESS;
00046 }
00047
00048 inline int checkTag(int tag) {
00049 if(tag != MPI_ANY_TAG && tag < 0)
00050 return MPI_ERR_TAG;
00051 return MPI_SUCCESS;
00052 }
00053
00054 inline int checkRank(int rank, MPI_Comm comm) {
00055 int size;
00056 AMPI_Comm_size(comm, &size);
00057 if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
00058 MPI_PROC_NULL))
00059 return MPI_SUCCESS;
00060 return MPI_ERR_RANK;
00061 }
00062
00063 inline int checkBuf(void *buf, int count) {
00064 if((count != 0 && buf == NULL) || buf == MPI_IN_PLACE)
00065 return MPI_ERR_BUFFER;
00066 return MPI_SUCCESS;
00067 }
00068
00069 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
00070 MPI_Datatype data, int ifData, int tag, int ifTag,
00071 int rank, int ifRank,
00072 void *buf1, int ifBuf1, void *buf2 = 0, int ifBuf2 = 0) {
00073 int ret;
00074 if(ifComm) {
00075 ret = checkCommunicator(comm);
00076 if(ret != MPI_SUCCESS)
00077 return ret;
00078 }
00079 if(ifCount) {
00080 ret = checkCount(count);
00081 if(ret != MPI_SUCCESS)
00082 return ret;
00083 }
00084 if(ifData) {
00085 ret = checkData(data);
00086 if(ret != MPI_SUCCESS)
00087 return ret;
00088 }
00089 if(ifTag) {
00090 ret = checkTag(tag);
00091 if(ret != MPI_SUCCESS)
00092 return ret;
00093 }
00094 if(ifRank) {
00095 ret = checkRank(rank,comm);
00096 if(ret != MPI_SUCCESS)
00097 return ret;
00098 }
00099 if(ifBuf1) {
00100 ret = checkBuf(buf1,count);
00101 if(ret != MPI_SUCCESS)
00102 return ret;
00103 }
00104 if(ifBuf2) {
00105 ret = checkBuf(buf2,count);
00106 if(ret != MPI_SUCCESS)
00107 return ret;
00108 }
00109 return MPI_SUCCESS;
00110 }
00111
00112
00113 static mpi_comm_worlds mpi_worlds;
00114
00115 int _mpi_nworlds;
00116 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS];
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 class AmpiComplex {
00128 public:
00129 double re, im;
00130 void operator+=(const AmpiComplex &a) {
00131 re+=a.re;
00132 im+=a.im;
00133 }
00134 void operator*=(const AmpiComplex &a) {
00135 double nu_re=re*a.re-im*a.im;
00136 im=re*a.im+im*a.re;
00137 re=nu_re;
00138 }
00139 int operator>(const AmpiComplex &a) {
00140 CkAbort("Cannot compare complex numbers with MPI_MAX");
00141 return 0;
00142 }
00143 int operator<(const AmpiComplex &a) {
00144 CkAbort("Cannot compare complex numbers with MPI_MIN");
00145 return 0;
00146 }
00147 };
00148 typedef struct { float val; int idx; } FloatInt;
00149 typedef struct { double val; int idx; } DoubleInt;
00150 typedef struct { long val; int idx; } LongInt;
00151 typedef struct { int val; int idx; } IntInt;
00152 typedef struct { short val; int idx; } ShortInt;
00153 typedef struct { long double val; int idx; } LongdoubleInt;
00154 typedef struct { float val; float idx; } FloatFloat;
00155 typedef struct { double val; double idx; } DoubleDouble;
00156
00157
00158 #define MPI_OP_SWITCH(OPNAME) \
00159 int i; \
00160 switch (*datatype) { \
00161 case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
00162 case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
00163 case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
00164 case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
00165 case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
00166 case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
00167 case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
00168 case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
00169 case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
00170 case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
00171 case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
00172 case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
00173 case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
00174 default: \
00175 ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
00176 CmiAbort("Unsupported MPI datatype for MPI Op"); \
00177 };\
00178
00179 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00180 #define MPI_OP_IMPL(type) \
00181 if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
00182 MPI_OP_SWITCH(MPI_MAX)
00183 #undef MPI_OP_IMPL
00184 }
00185
00186 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00187 #define MPI_OP_IMPL(type) \
00188 if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
00189 MPI_OP_SWITCH(MPI_MIN)
00190 #undef MPI_OP_IMPL
00191 }
00192
00193 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00194 #define MPI_OP_IMPL(type) \
00195 ((type *)inoutvec)[i] += ((type *)invec)[i];
00196 MPI_OP_SWITCH(MPI_SUM)
00197 #undef MPI_OP_IMPL
00198 }
00199
00200 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00201 #define MPI_OP_IMPL(type) \
00202 ((type *)inoutvec)[i] *= ((type *)invec)[i];
00203 MPI_OP_SWITCH(MPI_PROD)
00204 #undef MPI_OP_IMPL
00205 }
00206
00207 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00208 int i;
00209 switch (*datatype) {
00210 case MPI_INT:
00211 case MPI_LOGICAL:
00212 for(i=0;i<(*len);i++)
00213 ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
00214 break;
00215 default:
00216 ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
00217 CmiAbort("exiting");
00218 }
00219 }
00220 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00221 int i;
00222 switch (*datatype) {
00223 case MPI_INT:
00224 for(i=0;i<(*len);i++)
00225 ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
00226 break;
00227 case MPI_BYTE:
00228 for(i=0;i<(*len);i++)
00229 ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
00230 break;
00231 default:
00232 ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
00233 CmiAbort("exiting");
00234 }
00235 }
00236 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00237 int i;
00238 switch (*datatype) {
00239 case MPI_INT:
00240 case MPI_LOGICAL:
00241 for(i=0;i<(*len);i++)
00242 ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
00243 break;
00244 default:
00245 ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
00246 CmiAbort("exiting");
00247 }
00248 }
00249 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00250 int i;
00251 switch (*datatype) {
00252 case MPI_INT:
00253 for(i=0;i<(*len);i++)
00254 ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
00255 break;
00256 case MPI_BYTE:
00257 for(i=0;i<(*len);i++)
00258 ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
00259 break;
00260 default:
00261 ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
00262 CmiAbort("exiting");
00263 }
00264 }
00265 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00266 int i;
00267 switch (*datatype) {
00268 case MPI_INT:
00269 case MPI_LOGICAL:
00270 for(i=0;i<(*len);i++)
00271 ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]);
00272 break;
00273 default:
00274 ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
00275 CmiAbort("exiting");
00276 }
00277 }
00278 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00279 int i;
00280 switch (*datatype) {
00281 case MPI_INT:
00282 for(i=0;i<(*len);i++)
00283 ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
00284 break;
00285 case MPI_BYTE:
00286 for(i=0;i<(*len);i++)
00287 ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
00288 break;
00289 case MPI_UNSIGNED:
00290 for(i=0;i<(*len);i++)
00291 ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
00292 break;
00293 default:
00294 ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
00295 CmiAbort("exiting");
00296 }
00297 }
00298
00299 #ifndef MIN
00300 #define MIN(a,b) (a < b ? a : b)
00301 #endif
00302
00303 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00304 int i;
00305
00306 switch (*datatype) {
00307 case MPI_FLOAT_INT:
00308 for(i=0;i<(*len);i++)
00309 if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
00310 ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
00311 else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
00312 ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
00313 break;
00314 case MPI_DOUBLE_INT:
00315 for(i=0;i<(*len);i++)
00316 if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
00317 ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
00318 else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
00319 ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
00320
00321 break;
00322 case MPI_LONG_INT:
00323 for(i=0;i<(*len);i++)
00324 if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
00325 ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
00326 else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
00327 ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
00328 break;
00329 case MPI_2INT:
00330 for(i=0;i<(*len);i++)
00331 if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
00332 ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
00333 else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
00334 ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
00335 break;
00336 case MPI_SHORT_INT:
00337 for(i=0;i<(*len);i++)
00338 if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
00339 ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
00340 else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
00341 ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
00342 break;
00343 case MPI_LONG_DOUBLE_INT:
00344 for(i=0;i<(*len);i++)
00345 if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
00346 ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
00347 else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
00348 ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
00349 break;
00350 case MPI_2FLOAT:
00351 for(i=0;i<(*len);i++)
00352 if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
00353 ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
00354 else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
00355 ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
00356 break;
00357 case MPI_2DOUBLE:
00358 for(i=0;i<(*len);i++)
00359 if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
00360 ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
00361 else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
00362 ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
00363 break;
00364 default:
00365 ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
00366 CmiAbort("exiting");
00367 }
00368 }
00369 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
00370 int i;
00371 switch (*datatype) {
00372 case MPI_FLOAT_INT:
00373 for(i=0;i<(*len);i++)
00374 if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
00375 ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
00376 else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
00377 ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
00378 break;
00379 case MPI_DOUBLE_INT:
00380 for(i=0;i<(*len);i++)
00381 if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
00382 ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
00383 else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
00384 ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
00385 break;
00386 case MPI_LONG_INT:
00387 for(i=0;i<(*len);i++)
00388 if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
00389 ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
00390 else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
00391 ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
00392 break;
00393 case MPI_2INT:
00394 for(i=0;i<(*len);i++)
00395 if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
00396 ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
00397 else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
00398 ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
00399 break;
00400 case MPI_SHORT_INT:
00401 for(i=0;i<(*len);i++)
00402 if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
00403 ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
00404 else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
00405 ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
00406 break;
00407 case MPI_LONG_DOUBLE_INT:
00408 for(i=0;i<(*len);i++)
00409 if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
00410 ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
00411 else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
00412 ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
00413 break;
00414 case MPI_2FLOAT:
00415 for(i=0;i<(*len);i++)
00416 if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
00417 ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
00418 else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
00419 ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
00420 break;
00421 case MPI_2DOUBLE:
00422 for(i=0;i<(*len);i++)
00423 if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
00424 ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
00425 else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
00426 ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
00427 break;
00428 default:
00429 ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
00430 CmiAbort("exiting");
00431 }
00432 }
00433
00434
00435
00436 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
00437 AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
00438 MPI_Datatype dtype;
00439 int szhdr, szdata, len;
00440 MPI_User_function* func;
00441 func = hdr->func;
00442 dtype = hdr->dtype;
00443 szdata = hdr->szdata;
00444 len = hdr->len;
00445 szhdr = sizeof(AmpiOpHeader);
00446
00447
00448 void *ret = malloc(szhdr+szdata);
00449 memcpy(ret,msgs[0]->getData(),szhdr+szdata);
00450 for(int i=1;i<nMsg;i++){
00451 (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
00452 }
00453 CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
00454 free(ret);
00455 return retmsg;
00456 }
00457
00458 CkReduction::reducerType AmpiReducer;
00459
00460 class Builtin_kvs{
00461 public:
00462 int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
00463 Builtin_kvs(){
00464 tag_ub = MPI_TAG_UB_VALUE;
00465 host = MPI_PROC_NULL;
00466 io = 0;
00467 wtime_is_global = 0;
00468 keyval_mype = CkMyPe();
00469 keyval_numpes = CkNumPes();
00470 keyval_mynode = CkMyNode();
00471 keyval_numnodes = CkNumNodes();
00472 }
00473 };
00474
00475
00476 int _ampi_fallback_setup_count;
00477 CDECL void AMPI_Setup(void);
00478 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
00479
00480 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
00481
00482
00483 CDECL void AMPI_Fallback_Main(int argc,char **argv)
00484 {
00485 AMPI_Main_cpp(argc,argv);
00486 AMPI_Main(argc,argv);
00487 FTN_NAME(MPI_MAIN,mpi_main)();
00488 }
00489
00490 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
00491
00492
00493
00494 CDECL void AMPI_Setup_Switch(void) {
00495 _ampi_fallback_setup_count=0;
00496 FTN_NAME(AMPI_SETUP,ampi_setup)();
00497 AMPI_Setup();
00498 if (_ampi_fallback_setup_count==2)
00499 {
00500 ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
00501 }
00502 }
00503
00504 static int nodeinit_has_been_called=0;
00505 CtvDeclare(ampiParent*, ampiPtr);
00506 CtvDeclare(int, ampiInitDone);
00507 CtvDeclare(void*,stackBottom);
00508 CtvDeclare(int, ampiFinalized);
00509 CkpvDeclare(Builtin_kvs, bikvs);
00510 CkpvDeclare(int,argvExtracted);
00511 static int enableStreaming = 0;
00512
00513 CDECL long ampiCurrentStackUsage(){
00514 int localVariable;
00515
00516 unsigned long p1 = (unsigned long)((void*)&localVariable);
00517 unsigned long p2 = (unsigned long)(CtvAccess(stackBottom));
00518
00519
00520 if(p1 > p2)
00521 return p1 - p2;
00522 else
00523 return p2 - p1;
00524
00525 }
00526
00527 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
00528 long usage = ampiCurrentStackUsage();
00529 CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
00530 }
00531
00532
00533 CDECL void AMPI_threadstart(void *data);
00534 static int AMPI_threadstart_idx = -1;
00535
00536 static void ampiNodeInit(void)
00537 {
00538 _mpi_nworlds=0;
00539 for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
00540 {
00541 MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
00542 }
00543 TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
00544
00545 AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
00546
00547 CmiAssert(AMPI_threadstart_idx == -1);
00548 AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
00549
00550 nodeinit_has_been_called=1;
00551
00552
00553 _isAnytimeMigration = false;
00554 _isStaticInsertion = true;
00555 }
00556
00557 #if PRINT_IDLE
00558 static double totalidle=0.0, startT=0.0;
00559 static int beginHandle, endHandle;
00560 static void BeginIdle(void *dummy,double curWallTime)
00561 {
00562 startT = curWallTime;
00563 }
00564 static void EndIdle(void *dummy,double curWallTime)
00565 {
00566 totalidle += curWallTime - startT;
00567 }
00568 #endif
00569
00570
00571 typedef MPI_Op MPI_Op_Array[128];
00572 CtvDeclare(int, mpi_opc);
00573 CtvDeclare(MPI_Op_Array, mpi_ops);
00574
00575 static void ampiProcInit(void){
00576 CtvInitialize(ampiParent*, ampiPtr);
00577 CtvInitialize(int,ampiInitDone);
00578 CtvInitialize(int,ampiFinalized);
00579 CtvInitialize(void*,stackBottom);
00580
00581
00582 CtvInitialize(MPI_Op_Array, mpi_ops);
00583 CtvInitialize(int, mpi_opc);
00584
00585 CkpvInitialize(Builtin_kvs, bikvs);
00586 CkpvAccess(bikvs) = Builtin_kvs();
00587
00588 CkpvInitialize(int, argvExtracted);
00589 CkpvAccess(argvExtracted) = 0;
00590
00591 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
00592 REGISTER_AMPI
00593 #endif
00594 initAmpiProjections();
00595
00596 char **argv=CkGetArgv();
00597 #if AMPI_COMLIB
00598 if(CkpvAccess(argvExtracted)==0){
00599 enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
00600 }
00601 #endif
00602
00603 #if AMPIMSGLOG
00604 msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
00605
00606 if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
00607 msgLogRead = 1;
00608 }
00609
00610 char *procs = NULL;
00611 if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
00612 msgLogRanks.set(procs);
00613 }
00614 CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
00615 if (CkMyPe() == 0) {
00616 if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
00617 if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
00618 }
00619 #endif
00620
00621
00622 }
00623
00624 #if AMPIMSGLOG
00625 static inline int record_msglog(int rank){
00626 return msgLogRanks.includes(rank);
00627 }
00628 #endif
00629
00630 void AMPI_Install_Idle_Timer(){
00631 #if AMPI_PRINT_IDLE
00632 beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
00633 endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
00634 #endif
00635 }
00636
00637 void AMPI_Uninstall_Idle_Timer(){
00638 #if AMPI_PRINT_IDLE
00639 CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
00640 CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
00641 #endif
00642 }
00643
00644 PUPfunctionpointer(MPI_MainFn)
00645
00646 class MPI_threadstart_t {
00647 public:
00648 MPI_MainFn fn;
00649 MPI_threadstart_t() {}
00650 MPI_threadstart_t(MPI_MainFn fn_)
00651 :fn(fn_) {}
00652 void start(void) {
00653 char **argv=CmiCopyArgs(CkGetArgv());
00654 int argc=CkGetArgc();
00655
00656
00657
00658 CtvAccess(stackBottom) = &argv;
00659
00660 #if CMK_AMPI_FNPTR_HACK
00661 AMPI_Fallback_Main(argc,argv);
00662 #else
00663 (fn)(argc,argv);
00664 #endif
00665 }
00666 void pup(PUP::er &p) {
00667 p|fn;
00668 }
00669 };
00670 PUPmarshall(MPI_threadstart_t)
00671
00672 CDECL void AMPI_threadstart(void *data)
00673 {
00674 STARTUP_DEBUG("MPI_threadstart")
00675 MPI_threadstart_t t;
00676 pupFromBuf(data,t);
00677 #if CMK_TRACE_IN_CHARM
00678 if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
00679 #endif
00680 t.start();
00681 }
00682
00683 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
00684 {
00685 STARTUP_DEBUG("ampiCreateMain")
00686 int _nchunks=TCHARM_Get_num_chunks();
00687
00688 MPI_threadstart_t s(mainFn);
00689 memBuf b; pupIntoBuf(b,s);
00690 TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
00691 b.getData(), b.getSize());
00692 }
00693
00694
00695 #define AMPI_TCHARM_SEMAID 0x00A34100
00696 #define AMPI_BARRIER_SEMAID 0x00A34200
00697
00698 static CProxy_ampiWorlds ampiWorldsGroup;
00699
00700 static void init_operations()
00701 {
00702 CtvInitialize(MPI_Op_Array, mpi_ops);
00703 int i = 0;
00704 MPI_Op *tab = CtvAccess(mpi_ops);
00705 tab[i++] = MPI_MAX;
00706 tab[i++] = MPI_MIN;
00707 tab[i++] = MPI_SUM;
00708 tab[i++] = MPI_PROD;
00709 tab[i++] = MPI_LAND;
00710 tab[i++] = MPI_BAND;
00711 tab[i++] = MPI_LOR;
00712 tab[i++] = MPI_BOR;
00713 tab[i++] = MPI_LXOR;
00714 tab[i++] = MPI_BXOR;
00715 tab[i++] = MPI_MAXLOC;
00716 tab[i++] = MPI_MINLOC;
00717
00718 CtvInitialize(int, mpi_opc);
00719 CtvAccess(mpi_opc) = i;
00720 }
00721
00722
00723
00724
00725
00726
00727 static ampi *ampiInit(char **argv)
00728 {
00729 FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
00730 if (CtvAccess(ampiInitDone)) return NULL;
00731 STARTUP_DEBUG("ampiInit> begin")
00732
00733 MPI_Comm new_world;
00734 int _nchunks;
00735 CkArrayOptions opts;
00736 CProxy_ampiParent parent;
00737 if (TCHARM_Element()==0)
00738 {
00739 STARTUP_DEBUG("ampiInit> creating arrays")
00740
00741
00742
00743 if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
00744 {
00745 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
00746 }
00747 int new_idx=_mpi_nworlds;
00748 new_world=MPI_COMM_WORLD+new_idx;
00749
00750
00751 CkArrayID threads;
00752 opts=TCHARM_Attach_start(&threads,&_nchunks);
00753 parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
00754 STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
00755 }
00756 int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
00757
00758 FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
00759
00760 if (TCHARM_Element()==0)
00761 {
00762
00763 CkArrayID empty;
00764
00765 ampiCommStruct worldComm(new_world,empty,_nchunks);
00766 CProxy_ampi arr;
00767
00768
00769 #if AMPI_COMLIB
00770
00771 ComlibInstanceHandle ciStreaming = 1;
00772 ComlibInstanceHandle ciBcast = 2;
00773 ComlibInstanceHandle ciAllgather = 3;
00774 ComlibInstanceHandle ciAlltoall = 4;
00775
00776 arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
00777
00778
00779 CkPrintf("Using untested comlib code in ampi.C\n");
00780
00781 Strategy *sStreaming = new StreamingStrategy(1,10);
00782 CkAssert(ciStreaming == ComlibRegister(sStreaming));
00783
00784 Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
00785 CkAssert(ciBcast = ComlibRegister(sBcast));
00786
00787 Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
00788 CkAssert(ciAllgather = ComlibRegister(sAllgather));
00789
00790 Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
00791 CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
00792
00793 CmiPrintf("Created AMPI comlib strategies in new manner\n");
00794
00795
00796 CkpvAccess(conv_com_object).doneCreating();
00797 #else
00798 arr=CProxy_ampi::ckNew(parent,worldComm,opts);
00799 #endif
00800
00801
00802
00803 ampiCommStruct newComm(new_world,arr,_nchunks);
00804
00805 if (ampiWorldsGroup.ckGetGroupID().isZero())
00806 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
00807 else
00808 ampiWorldsGroup.add(newComm);
00809 STARTUP_DEBUG("ampiInit> arrays created")
00810
00811 }
00812
00813
00814 ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
00815 CtvAccess(ampiInitDone)=1;
00816 CtvAccess(ampiFinalized)=0;
00817 STARTUP_DEBUG("ampiInit> complete")
00818 #if CMK_BIGSIM_CHARM
00819
00820 TRACE_BG_ADD_TAG("AMPI_START");
00821 #endif
00822
00823 init_operations();
00824
00825 getAmpiParent()->ampiInitCallDone = 0;
00826
00827 CProxy_ampi cbproxy = ptr->getProxy();
00828 CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
00829 ptr->contribute(0, NULL, CkReduction::sum_int, cb);
00830
00831 ampiParent *thisParent = getAmpiParent();
00832 while(thisParent->ampiInitCallDone!=1){
00833
00834 thisParent->getTCharmThread()->stop();
00835
00836
00837
00838
00839 thisParent = getAmpiParent();
00840 }
00841
00842 #ifdef CMK_BIGSIM_CHARM
00843 BgSetStartOutOfCore();
00844 #endif
00845
00846 return ptr;
00847 }
00848
00850 class ampiWorlds : public CBase_ampiWorlds {
00851 public:
00852 ampiWorlds(const ampiCommStruct &nextWorld) {
00853 ampiWorldsGroup=thisgroup;
00854
00855 add(nextWorld);
00856 }
00857 ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
00858 void pup(PUP::er &p) { CBase_ampiWorlds::pup(p); }
00859 void add(const ampiCommStruct &nextWorld) {
00860 int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD);
00861 mpi_worlds[new_idx].comm=nextWorld;
00862 if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
00863 STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
00864 }
00865 };
00866
00867
00868 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
00869 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
00870 {
00871 int barrier = 0x1234;
00872 STARTUP_DEBUG("ampiParent> starting up")
00873 thread=NULL;
00874 worldPtr=NULL;
00875 myDDT=&myDDTsto;
00876 prepareCtv();
00877
00878 init();
00879
00880 thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
00881 AsyncEvacuate(CmiFalse);
00882 }
00883
00884 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
00885 thread=NULL;
00886 worldPtr=NULL;
00887 myDDT=&myDDTsto;
00888
00889 init();
00890
00891 AsyncEvacuate(CmiFalse);
00892 }
00893
00894 void ampiParent::pup(PUP::er &p) {
00895 ArrayElement1D::pup(p);
00896 p|threads;
00897 p|worldNo;
00898 p|worldStruct;
00899 myDDT->pup(p);
00900 p|splitComm;
00901 p|groupComm;
00902 p|groups;
00903
00904
00905
00906
00907
00908
00909
00910 p|ampiReqs;
00911
00912
00913
00914
00915
00916
00917
00918 p|RProxyCnt;
00919 p|tmpRProxy;
00920 p|winStructList;
00921 p|infos;
00922
00923 p|ampiInitCallDone;
00924 }
00925 void ampiParent::prepareCtv(void) {
00926 thread=threads[thisIndex].ckLocal();
00927 if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
00928 CtvAccessOther(thread->getThread(),ampiPtr) = this;
00929 STARTUP_DEBUG("ampiParent> found TCharm")
00930 }
00931
00932 void ampiParent::init(){
00933 CkAssert(groups.size() == 0);
00934 groups.push_back(new groupStruct);
00935
00936 #if AMPIMSGLOG
00937 if(msgLogWrite && record_msglog(thisIndex)){
00938 char fname[128];
00939 sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
00940 #if CMK_PROJECTIONS_USE_ZLIB && 0
00941 fMsgLog = gzopen(fname,"wb");
00942 toPUPer = new PUP::tozDisk(fMsgLog);
00943 #else
00944 fMsgLog = fopen(fname,"wb");
00945 CmiAssert(fMsgLog != NULL);
00946 toPUPer = new PUP::toDisk(fMsgLog);
00947 #endif
00948 }else if(msgLogRead){
00949 char fname[128];
00950 sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
00951 #if CMK_PROJECTIONS_USE_ZLIB && 0
00952 fMsgLog = gzopen(fname,"rb");
00953 fromPUPer = new PUP::fromzDisk(fMsgLog);
00954 #else
00955 fMsgLog = fopen(fname,"rb");
00956 CmiAssert(fMsgLog != NULL);
00957 fromPUPer = new PUP::fromDisk(fMsgLog);
00958 #endif
00959 CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
00960 }
00961 #endif
00962 }
00963
00964 void ampiParent::finalize(){
00965 #if AMPIMSGLOG
00966 if(msgLogWrite && record_msglog(thisIndex)){
00967 delete toPUPer;
00968 #if CMK_PROJECTIONS_USE_ZLIB && 0
00969 gzclose(fMsgLog);
00970 #else
00971 fclose(fMsgLog);
00972 #endif
00973 }else if(msgLogRead){
00974 delete fromPUPer;
00975 #if CMK_PROJECTIONS_USE_ZLIB && 0
00976 gzclose(fMsgLog);
00977 #else
00978 fclose(fMsgLog);
00979 #endif
00980 }
00981 #endif
00982 }
00983
00984 void ampiParent::ckJustMigrated(void) {
00985 ArrayElement1D::ckJustMigrated();
00986 prepareCtv();
00987 }
00988
00989 void ampiParent::ckJustRestored(void) {
00990 FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
00991 ArrayElement1D::ckJustRestored();
00992 prepareCtv();
00993
00994
00995
00996
00997 }
00998
00999 ampiParent::~ampiParent() {
01000 STARTUP_DEBUG("ampiParent> destructor called");
01001 finalize();
01002 }
01003
01004
01005 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
01006 {
01007 if (thread==NULL) prepareCtv();
01008
01009 if (s.getComm()>=MPI_COMM_WORLD)
01010 {
01011
01012
01013 if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
01014 worldPtr=ptr;
01015 worldStruct=s;
01016
01017
01018 CkVec<int> _indices;
01019 _indices.push_back(thisIndex);
01020 selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
01021 }
01022
01023 if (!forMigration)
01024 {
01025 MPI_Comm comm = s.getComm();
01026 STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
01027 if (comm>=MPI_COMM_WORLD) {
01028
01029 thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
01030 } else if (isSplit(comm)) {
01031 splitChildRegister(s);
01032 } else if (isGroup(comm)) {
01033 groupChildRegister(s);
01034 } else if (isCart(comm)) {
01035 cartChildRegister(s);
01036 } else if (isGraph(comm)) {
01037 graphChildRegister(s);
01038 } else if (isInter(comm)) {
01039 interChildRegister(s);
01040 } else if (isIntra(comm)) {
01041 intraChildRegister(s);
01042 }else
01043 CkAbort("ampiParent recieved child with bad communicator");
01044 }
01045
01046 return thread;
01047 }
01048
01049
01050
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087 class ckptClientStruct {
01088 public:
01089 const char *dname;
01090 ampiParent *ampiPtr;
01091 ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
01092 };
01093
01094 static void checkpointClient(void *param,void *msg)
01095 {
01096 ckptClientStruct *client = (ckptClientStruct*)param;
01097 const char *dname = client->dname;
01098 ampiParent *ampiPtr = client->ampiPtr;
01099 ampiPtr->Checkpoint(strlen(dname), dname);
01100 delete client;
01101 }
01102
01103 void ampiParent::startCheckpoint(const char* dname){
01104
01105 if (thisIndex==0) {
01106 ckptClientStruct *clientData = new ckptClientStruct(dname, this);
01107 CkCallback cb(checkpointClient, clientData);
01108 contribute(0, NULL, CkReduction::sum_int, cb);
01109 }
01110 else
01111 contribute(0, NULL, CkReduction::sum_int);
01112
01113 #if 0
01114 #if CMK_BIGSIM_CHARM
01115 void *curLog;
01116 _TRACE_BG_TLINE_END(&curLog);
01117 TRACE_BG_AMPI_SUSPEND();
01118 #endif
01119 #endif
01120
01121 thread->stop();
01122
01123 #if CMK_BIGSIM_CHARM
01124
01125 TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
01126 #endif
01127 }
01128
01129 void ampiParent::Checkpoint(int len, const char* dname){
01130 if (len == 0) {
01131
01132 CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
01133 CkStartMemCheckpoint(cb);
01134 }
01135 else {
01136 char dirname[256];
01137 strncpy(dirname,dname,len);
01138 dirname[len]='\0';
01139 CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
01140 CkStartCheckpoint(dirname,cb);
01141 }
01142 }
01143 void ampiParent::ResumeThread(void){
01144 thread->resume();
01145 }
01146
01147 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
01148 int *keyval, void* extra_state){
01149 KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
01150 int idx = kvlist.size();
01151 kvlist.resize(idx+1);
01152 kvlist[idx] = newnode;
01153 *keyval = idx;
01154 return 0;
01155 }
01156 int ampiParent::freeKeyval(int *keyval){
01157 if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
01158 return -1;
01159 delete kvlist[*keyval];
01160 kvlist[*keyval] = NULL;
01161 *keyval = MPI_KEYVAL_INVALID;
01162 return 0;
01163 }
01164
01165 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
01166 if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
01167 return -1;
01168 ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
01169
01170 while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
01171 cs.getKeyvals()[keyval]=attribute_val;
01172 return 0;
01173 }
01174
01175 int ampiParent::kv_is_builtin(int keyval) {
01176 switch(keyval) {
01177 case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
01178 case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
01179 case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
01180 case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
01181 case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
01182 case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
01183 case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
01184 case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
01185 default: return 0;
01186 };
01187 }
01188
01189 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
01190 *flag = false;
01191 if (kv_is_builtin(keyval)) {
01192 *flag=true;
01193 *(int **)attribute_val = kv_builtin_storage;
01194 return 0;
01195 }
01196 if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
01197 return -1;
01198
01199 ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
01200 if (keyval>=cs.getKeyvals().size())
01201 return 0;
01202 if (cs.getKeyvals()[keyval]==0)
01203 return 0;
01204
01205 *flag = true;
01206 *(void **)attribute_val = cs.getKeyvals()[keyval];
01207 return 0;
01208 }
01209 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
01210
01211 return putAttr(comm,keyval,0);
01212 }
01213
01214
01215 void ampi::init(void) {
01216 parent=NULL;
01217 thread=NULL;
01218 msgs=NULL;
01219 posted_ireqs=NULL;
01220 resumeOnRecv=false;
01221 AsyncEvacuate(CmiFalse);
01222 }
01223
01224 ampi::ampi()
01225 {
01226
01227 CkAbort("Default ampi constructor should never be called");
01228 }
01229
01230 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
01231 :parentProxy(parent_)
01232 {
01233 init();
01234
01235 myComm=s; myComm.setArrayID(thisArrayID);
01236 myRank=myComm.getRankForIndex(thisIndex);
01237
01238 findParent(false);
01239
01240 msgs = CmmNew();
01241 posted_ireqs = CmmNew();
01242 nbcasts = 0;
01243
01244 #if AMPI_COMLIB
01245 comlibProxy = thisProxy;
01246 #endif
01247
01248 seqEntries=parent->ckGetArraySize();
01249 oorder.init (seqEntries);
01250 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01251 if(thisIndex == 0){
01252
01253
01254
01255
01256
01257
01258 }
01259 #endif
01260 }
01261
01262 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
01263 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
01264 :parentProxy(parent_)
01265 {
01266 #if AMPI_COMLIB
01267 ciStreaming = ciStreaming_;
01268 ciBcast = ciBcast_;
01269 ciAllgather = ciAllgather_;
01270 ciAlltoall = ciAlltoall_;
01271
01272 init();
01273
01274 myComm=s; myComm.setArrayID(thisArrayID);
01275 myRank=myComm.getRankForIndex(thisIndex);
01276
01277 findParent(false);
01278
01279 msgs = CmmNew();
01280 posted_ireqs = CmmNew();
01281 nbcasts = 0;
01282
01283 comlibProxy = thisProxy;
01284 CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
01285
01286 #if AMPI_COMLIB
01287
01288 #endif
01289
01290 seqEntries=parent->ckGetArraySize();
01291 oorder.init (seqEntries);
01292 #endif
01293 }
01294
01295 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
01296 {
01297 init();
01298
01299 seqEntries=-1;
01300 }
01301
01302 void ampi::ckJustMigrated(void)
01303 {
01304 findParent(true);
01305 ArrayElement1D::ckJustMigrated();
01306 }
01307
01308 void ampi::ckJustRestored(void)
01309 {
01310 FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
01311 findParent(true);
01312 ArrayElement1D::ckJustRestored();
01313
01314
01315
01316
01317 }
01318
01319 void ampi::findParent(bool forMigration) {
01320 STARTUP_DEBUG("ampi> finding my parent")
01321 parent=parentProxy[thisIndex].ckLocal();
01322 if (parent==NULL) CkAbort("AMPI can't find its parent!");
01323 thread=parent->registerAmpi(this,myComm,forMigration);
01324 if (thread==NULL) CkAbort("AMPI can't find its thread!");
01325
01326 }
01327
01328
01329
01330 void ampi::allInitDone(CkReductionMsg *m){
01331 FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
01332 thisProxy.setInitDoneFlag();
01333 delete m;
01334 }
01335
01336 void ampi::setInitDoneFlag(){
01337
01338 parent->ampiInitCallDone=1;
01339 parent->getTCharmThread()->start();
01340 }
01341
01342 static void cmm_pup_ampi_message(pup_er p,void **msg) {
01343 CkPupMessage(*(PUP::er *)p,msg,1);
01344 if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
01345
01346 }
01347
01348 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
01349
01350 pup_int(p, (int *)msg);
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369
01370 }
01371
01372 void ampi::pup(PUP::er &p)
01373 {
01374 if(!p.isUserlevel())
01375 ArrayElement1D::pup(p);
01376 p|parentProxy;
01377 p|myComm;
01378 p|myRank;
01379 p|nbcasts;
01380 p|tmpVec;
01381 p|remoteProxy;
01382 p|resumeOnRecv;
01383 #if AMPI_COMLIB
01384 p|comlibProxy;
01385 p|ciStreaming;
01386 p|ciBcast;
01387 p|ciAllgather;
01388 p|ciAlltoall;
01389
01390 if(p.isUnpacking()){
01391
01392
01393
01394
01395 }
01396 #endif
01397
01398 msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
01399
01400
01401
01402
01403
01404
01405 posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
01406
01407
01408
01409
01410
01411 p|seqEntries;
01412 p|oorder;
01413 }
01414
01415 ampi::~ampi()
01416 {
01417 if (CkInRestarting() || _BgOutOfCoreFlag==1) {
01418
01419 int tags[3], sts[3];
01420 tags[0] = tags[1] = tags[2] = CmmWildCard;
01421 AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
01422 while (msg) {
01423 delete msg;
01424 msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
01425 }
01426 }
01427
01428 CmmFree(msgs);
01429 CmmFreeAll(posted_ireqs);
01430 }
01431
01432
01433 class ampiSplitKey {
01434 public:
01435 int nextSplitComm;
01436 int color;
01437 int key;
01438 int rank;
01439 ampiSplitKey() {}
01440 ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
01441 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
01442 };
01443
01444
01445
01446 void ampi::split(int color,int key,MPI_Comm *dest, int type)
01447 {
01448 #if CMK_BIGSIM_CHARM
01449 void *curLog;
01450 _TRACE_BG_TLINE_END(&curLog);
01451
01452 #endif
01453 if (type == CART_TOPOL) {
01454 ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
01455 int rootIdx=myComm.getIndexForRank(0);
01456 CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
01457 contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
01458
01459 thread->suspend();
01460 MPI_Comm newComm=parent->getNextCart()-1;
01461 *dest=newComm;
01462 } else {
01463 ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
01464 int rootIdx=myComm.getIndexForRank(0);
01465 CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
01466 contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
01467
01468 thread->suspend();
01469 MPI_Comm newComm=parent->getNextSplit()-1;
01470 *dest=newComm;
01471 }
01472 #if CMK_BIGSIM_CHARM
01473
01474
01475 _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
01476 #endif
01477 }
01478
01479 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
01480 const ampiSplitKey *a=(const ampiSplitKey *)a_;
01481 const ampiSplitKey *b=(const ampiSplitKey *)b_;
01482 if (a->color!=b->color) return a->color-b->color;
01483 if (a->key!=b->key) return a->key-b->key;
01484 return a->rank-b->rank;
01485 }
01486
01487 void ampi::splitPhase1(CkReductionMsg *msg)
01488 {
01489
01490 int nKeys=msg->getSize()/sizeof(ampiSplitKey);
01491 ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
01492 if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
01493 qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
01494
01495 MPI_Comm newComm = -1;
01496 for(int i=0;i<nKeys;i++)
01497 if(keys[i].nextSplitComm>newComm)
01498 newComm = keys[i].nextSplitComm;
01499
01500
01501 int lastColor=keys[0].color-1;
01502 CProxy_ampi lastAmpi;
01503 int lastRoot=0;
01504 ampiCommStruct lastComm;
01505 for (int c=0;c<nKeys;c++) {
01506 if (keys[c].color!=lastColor)
01507 {
01508 lastColor=keys[c].color;
01509 lastRoot=c;
01510 CkArrayOptions opts;
01511 opts.bindTo(parentProxy);
01512 opts.setNumInitial(0);
01513 CkArrayID unusedAID; ampiCommStruct unusedComm;
01514 lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01515 lastAmpi.doneInserting();
01516
01517 CkVec<int> indices;
01518 for (int i=c;i<nKeys;i++) {
01519 if (keys[i].color!=lastColor) break;
01520 int idx=myComm.getIndexForRank(keys[i].rank);
01521 indices.push_back(idx);
01522 }
01523
01524
01525
01526 lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
01527 }
01528 int newRank=c-lastRoot;
01529 int newIdx=lastComm.getIndexForRank(newRank);
01530
01531
01532 lastAmpi[newIdx].insert(parentProxy,lastComm);
01533 }
01534
01535 delete msg;
01536 }
01537
01538
01539 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
01540 int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
01541 if (splitComm.size()<=idx) splitComm.resize(idx+1);
01542 splitComm[idx]=new ampiCommStruct(s);
01543 thread->resume();
01544 }
01545
01546
01547
01548
01549
01550
01551
01552 class vecStruct {
01553 public:
01554 int nextgroup;
01555 groupStruct vec;
01556 vecStruct():nextgroup(-1){}
01557 vecStruct(int nextgroup_, groupStruct vec_)
01558 : nextgroup(nextgroup_), vec(vec_) { }
01559 };
01560
01561 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
01562 int rootIdx=vec[0];
01563 tmpVec = vec;
01564 CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
01565 MPI_Comm nextgroup = parent->getNextGroup();
01566 contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
01567
01568 if(getPosOp(thisIndex,vec)>=0){
01569 thread->suspend();
01570 MPI_Comm retcomm = parent->getNextGroup()-1;
01571 *newcomm = retcomm;
01572 }else{
01573 *newcomm = MPI_COMM_NULL;
01574 }
01575 }
01576
01577 void ampi::commCreatePhase1(CkReductionMsg *msg){
01578 MPI_Comm *nextGroupComm = (int *)msg->getData();
01579
01580 CkArrayOptions opts;
01581 opts.bindTo(parentProxy);
01582 opts.setNumInitial(0);
01583 CkArrayID unusedAID;
01584 ampiCommStruct unusedComm;
01585 CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01586 newAmpi.doneInserting();
01587
01588 groupStruct indices = tmpVec;
01589 ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
01590 for(int i=0;i<indices.size();i++){
01591 int newIdx=indices[i];
01592 newAmpi[newIdx].insert(parentProxy,newCommstruct);
01593 }
01594 delete msg;
01595 }
01596
01597 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
01598 int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
01599 if (groupComm.size()<=idx) groupComm.resize(idx+1);
01600 groupComm[idx]=new ampiCommStruct(s);
01601 thread->resume();
01602 }
01603
01604
01605 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
01606 int rootIdx=vec[0];
01607 tmpVec = vec;
01608 CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
01609
01610 MPI_Comm nextcart = parent->getNextCart();
01611 contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
01612
01613 if(getPosOp(thisIndex,vec)>=0){
01614 thread->suspend();
01615 MPI_Comm retcomm = parent->getNextCart()-1;
01616 *newcomm = retcomm;
01617 }else
01618 *newcomm = MPI_COMM_NULL;
01619 }
01620
01621 void ampi::cartCreatePhase1(CkReductionMsg *msg){
01622 MPI_Comm *nextCartComm = (int *)msg->getData();
01623
01624 CkArrayOptions opts;
01625 opts.bindTo(parentProxy);
01626 opts.setNumInitial(0);
01627 CkArrayID unusedAID;
01628 ampiCommStruct unusedComm;
01629 CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01630 newAmpi.doneInserting();
01631
01632 groupStruct indices = tmpVec;
01633 ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
01634 size(),indices);
01635 for(int i=0;i<indices.size();i++){
01636 int newIdx=indices[i];
01637 newAmpi[newIdx].insert(parentProxy,newCommstruct);
01638 }
01639 delete msg;
01640 }
01641
01642 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
01643 int idx=s.getComm()-MPI_COMM_FIRST_CART;
01644 if (cartComm.size()<=idx) {
01645 cartComm.resize(idx+1);
01646 cartComm.length()=idx+1;
01647 }
01648 cartComm[idx]=new ampiCommStruct(s);
01649 thread->resume();
01650 }
01651
01652 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
01653 int rootIdx=vec[0];
01654 tmpVec = vec;
01655 CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
01656 myComm.getProxy());
01657 MPI_Comm nextgraph = parent->getNextGraph();
01658 contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
01659
01660 if(getPosOp(thisIndex,vec)>=0){
01661 thread->suspend();
01662 MPI_Comm retcomm = parent->getNextGraph()-1;
01663 *newcomm = retcomm;
01664 }else
01665 *newcomm = MPI_COMM_NULL;
01666 }
01667
01668 void ampi::graphCreatePhase1(CkReductionMsg *msg){
01669 MPI_Comm *nextGraphComm = (int *)msg->getData();
01670
01671 CkArrayOptions opts;
01672 opts.bindTo(parentProxy);
01673 opts.setNumInitial(0);
01674 CkArrayID unusedAID;
01675 ampiCommStruct unusedComm;
01676 CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01677 newAmpi.doneInserting();
01678
01679 groupStruct indices = tmpVec;
01680 ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
01681 .size(),indices);
01682 for(int i=0;i<indices.size();i++){
01683 int newIdx=indices[i];
01684 newAmpi[newIdx].insert(parentProxy,newCommstruct);
01685 }
01686 delete msg;
01687 }
01688
01689 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
01690 int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
01691 if (graphComm.size()<=idx) {
01692 graphComm.resize(idx+1);
01693 graphComm.length()=idx+1;
01694 }
01695 graphComm[idx]=new ampiCommStruct(s);
01696 thread->resume();
01697 }
01698
01699 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
01700 if(thisIndex==root) {
01701 tmpVec = rvec;
01702 }
01703 CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
01704 MPI_Comm nextinter = parent->getNextInter();
01705 contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
01706
01707 thread->suspend();
01708 MPI_Comm newcomm=parent->getNextInter()-1;
01709 *ncomm=newcomm;
01710 }
01711
01712 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
01713 MPI_Comm *nextInterComm = (int *)msg->getData();
01714
01715 groupStruct lgroup = myComm.getIndices();
01716 CkArrayOptions opts;
01717 opts.bindTo(parentProxy);
01718 opts.setNumInitial(0);
01719 CkArrayID unusedAID;
01720 ampiCommStruct unusedComm;
01721 CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01722 newAmpi.doneInserting();
01723
01724 ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
01725 for(int i=0;i<lgroup.size();i++){
01726 int newIdx=lgroup[i];
01727 newAmpi[newIdx].insert(parentProxy,newCommstruct);
01728 }
01729
01730 parentProxy[0].ExchangeProxy(newAmpi);
01731 delete msg;
01732 }
01733
01734 void ampiParent::interChildRegister(const ampiCommStruct &s) {
01735 int idx=s.getComm()-MPI_COMM_FIRST_INTER;
01736 if (interComm.size()<=idx) interComm.resize(idx+1);
01737 interComm[idx]=new ampiCommStruct(s);
01738
01739 }
01740
01741 void ampi::intercommMerge(int first, MPI_Comm *ncomm){
01742 if(myRank == 0 && first == 1){
01743 groupStruct lvec = myComm.getIndices();
01744 groupStruct rvec = myComm.getRemoteIndices();
01745 int rsize = rvec.size();
01746 tmpVec = lvec;
01747 for(int i=0;i<rsize;i++)
01748 tmpVec.push_back(rvec[i]);
01749 if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
01750 }else{
01751 tmpVec.resize(0);
01752 }
01753
01754 int rootIdx=myComm.getIndexForRank(0);
01755 CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
01756 MPI_Comm nextintra = parent->getNextIntra();
01757 contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
01758
01759 thread->suspend();
01760 MPI_Comm newcomm=parent->getNextIntra()-1;
01761 *ncomm=newcomm;
01762 }
01763
01764 void ampi::intercommMergePhase1(CkReductionMsg *msg){
01765 if(tmpVec.size()==0) { delete msg; return; }
01766 MPI_Comm *nextIntraComm = (int *)msg->getData();
01767 CkArrayOptions opts;
01768 opts.bindTo(parentProxy);
01769 opts.setNumInitial(0);
01770 CkArrayID unusedAID;
01771 ampiCommStruct unusedComm;
01772 CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
01773 newAmpi.doneInserting();
01774
01775 ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
01776 for(int i=0;i<tmpVec.size();i++){
01777 int newIdx=tmpVec[i];
01778 newAmpi[newIdx].insert(parentProxy,newCommstruct);
01779 }
01780 delete msg;
01781 }
01782
01783 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
01784 int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
01785 if (intraComm.size()<=idx) intraComm.resize(idx+1);
01786 intraComm[idx]=new ampiCommStruct(s);
01787 thread->resume();
01788 }
01789
01790
01791 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
01792 {
01793 if (universeNo>MPI_COMM_WORLD) {
01794 int worldDex=universeNo-MPI_COMM_WORLD-1;
01795 if (worldDex>=_mpi_nworlds)
01796 CkAbort("Bad world communicator passed to universeComm2CommStruct");
01797 return mpi_worlds[worldDex].comm;
01798 }
01799 CkAbort("Bad communicator passed to universeComm2CommStruct");
01800 return mpi_worlds[0].comm;
01801 }
01802
01803 void ampi::block(void){
01804 thread->suspend();
01805 }
01806
01807 void ampi::yield(void){
01808 thread->schedule();
01809 }
01810
01811 void ampi::unblock(void){
01812 thread->resume();
01813 }
01814
01815 void ampi::ssend_ack(int sreq_idx){
01816 if (sreq_idx == 1)
01817 thread->resume();
01818 else {
01819 sreq_idx -= 2;
01820 AmpiRequestList *reqs = &(parent->ampiReqs);
01821 SReq *sreq = (SReq *)(*reqs)[sreq_idx];
01822 sreq->statusIreq = true;
01823 if (resumeOnRecv) {
01824 thread->resume();
01825 }
01826 }
01827 }
01828
01829 void
01830 ampi::generic(AmpiMsg* msg)
01831 {
01832 MSG_ORDER_DEBUG(
01833 CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d (from %d, seq %d) resumeOnRecv %d\n",
01834 thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
01835 )
01836 #if CMK_BIGSIM_CHARM
01837 TRACE_BG_ADD_TAG("AMPI_generic");
01838 msg->event = NULL;
01839 #endif
01840
01841 int sync = UsrToEnv(msg)->getRef();
01842 int srcIdx;
01843 if (sync) srcIdx = msg->srcIdx;
01844
01845
01846 if(msg->seq != -1) {
01847 int srcIdx=msg->srcIdx;
01848 int n=oorder.put(srcIdx,msg);
01849 if (n>0) {
01850 inorder(msg);
01851 if (n>1) {
01852 while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
01853 inorder(msg);
01854 }
01855 }
01856 }
01857 } else {
01858 inorder(msg);
01859 }
01860
01861
01862 if (sync>0) {
01863 CProxy_ampi pa(thisArrayID);
01864 pa[srcIdx].ssend_ack(sync);
01865 }
01866
01867 if(resumeOnRecv){
01868
01869 thread->resume();
01870 }
01871 }
01872
01873 inline static AmpiRequestList *getReqs(void);
01874
01875 void
01876 ampi::inorder(AmpiMsg* msg)
01877 {
01878 MSG_ORDER_DEBUG(
01879 CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d (from %d, seq %d)\n",
01880 thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
01881 )
01882
01883 int tags[3], sts[3];
01884 tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
01885 IReq *ireq = NULL;
01886 if (CpvAccess(CmiPICMethod) != 2) {
01887 #if 0
01888
01889 ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
01890 #else
01891 #if CMK_BIGSIM_CHARM
01892 _TRACE_BG_TLINE_END(&msg->event);
01893 msg->eventPe = CmiMyPe();
01894 #endif
01895
01896
01897 AmpiRequestList *reqL = &(parent->ampiReqs);
01898
01899
01900 int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
01901 if(reqL->size()>0 && ireqIdx>0)
01902 ireq = (IReq *)(*reqL)[ireqIdx-1];
01903
01904 #endif
01905
01906 if (ireq) {
01907 ireq->receive(this, msg);
01908
01909
01910
01911
01912
01913
01914 } else {
01915 CmmPut(msgs, 3, tags, msg);
01916 }
01917 }
01918 else
01919 CmmPut(msgs, 3, tags, msg);
01920 }
01921
01922 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
01923 {
01924 int tags[3];
01925 tags[0] = t; tags[1] = s; tags[2] = comm;
01926 AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
01927 return msg;
01928 }
01929
01930 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
01931 int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
01932 {
01933 CkDDT_DataType *ddt = getDDT()->getType(type);
01934 int len = ddt->getSize(count);
01935 int sIdx=thisIndex;
01936 int seq = -1;
01937 if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_ATA_SEQ_TAG)
01938 seq = oorder.nextOutgoing(destIdx);
01939 AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
01940 if (sync) UsrToEnv(msg)->setRef(sync);
01941 TCharm::activateVariable(buf);
01942 ddt->serialize((char*)buf, (char*)msg->data, count, 1);
01943 TCharm::deactivateVariable(buf);
01944 return msg;
01945 }
01946
01947 #if AMPI_COMLIB
01948 void
01949 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type, int rank, MPI_Comm destcomm)
01950 {
01951 delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
01952 }
01953 #endif
01954
01955 void
01956 ampi::send(int t, int sRank, const void* buf, int count, int type, int rank, MPI_Comm destcomm, int sync)
01957 {
01958 #if CMK_TRACE_IN_CHARM
01959 TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
01960 #endif
01961
01962 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
01963 MPI_Comm disComm = myComm.getComm();
01964 ampi *dis = getAmpiInstance(disComm);
01965 CpvAccess(_currentObj) = dis;
01966 #endif
01967
01968 const ampiCommStruct &dest=comm2CommStruct(destcomm);
01969 delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
01970
01971 #if CMK_TRACE_IN_CHARM
01972 TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
01973 #endif
01974
01975 if (sync == 1) {
01976
01977 resumeOnRecv = false;
01978 block();
01979 }
01980 }
01981
01982 void
01983 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
01984 {
01985 AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
01986 memcpy(msg->data, buf, len);
01987 CProxy_ampi pa(aid);
01988 pa[idx].generic(msg);
01989 }
01990
01991 void
01992 ampi::delesend(int t, int sRank, const void* buf, int count, int type, int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
01993 {
01994 if(rank==MPI_PROC_NULL) return;
01995 const ampiCommStruct &dest=comm2CommStruct(destcomm);
01996 int destIdx = dest.getIndexForRank(rank);
01997 if(isInter()){
01998 sRank = parent->thisIndex;
01999 destcomm = MPI_COMM_FIRST_INTER;
02000 destIdx = dest.getIndexForRemoteRank(rank);
02001 arrproxy = remoteProxy;
02002 }
02003 MSG_ORDER_DEBUG(
02004 CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
02005 )
02006
02007 arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
02008
02009 #if 0
02010 #if CMK_TRACE_ENABLED
02011 int size=0;
02012 MPI_Type_size(type,&size);
02013 _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
02014 #endif
02015 #endif
02016 }
02017
02018 int
02019 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
02020 {
02021 CkDDT_DataType *ddt = getDDT()->getType(type);
02022 int len = ddt->getSize(count);
02023
02024 if(msg->length < len){
02025 count = msg->length/(ddt->getSize(1));
02026 }
02027
02028 TCharm::activateVariable(buf);
02029 if (t==MPI_REDUCE_TAG) {
02030 ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
02031 } else {
02032 ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
02033 }
02034 TCharm::deactivateVariable(buf);
02035 return 0;
02036 }
02037
02038 int
02039 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
02040 {
02041 MPI_Comm disComm = myComm.getComm();
02042 if(s==MPI_PROC_NULL) {
02043 ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
02044 ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
02045 ((MPI_Status *)sts)->MPI_LENGTH = 0;
02046 return 0;
02047 }
02048 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
02049 _LOG_E_END_AMPI_PROCESSING(thisIndex)
02050 #endif
02051 #if CMK_BIGSIM_CHARM
02052 void *curLog;
02053 _TRACE_BG_TLINE_END(&curLog);
02054
02055 #if CMK_TRACE_IN_CHARM
02056 if(CpvAccess(traceOn)) traceSuspend();
02057 #endif
02058 #endif
02059
02060 if(isInter()){
02061 s = myComm.getIndexForRemoteRank(s);
02062 comm = MPI_COMM_FIRST_INTER;
02063 }
02064
02065 int tags[3];
02066 AmpiMsg *msg = 0;
02067
02068 MSG_ORDER_DEBUG(
02069 CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
02070 )
02071
02072 ampi *dis = getAmpiInstance(disComm);
02073 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02074
02075
02076 #endif
02077 int dosuspend = 0;
02078 while(1) {
02079
02080
02081 tags[0] = t; tags[1] = s; tags[2] = comm;
02082 msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
02083 if (msg) break;
02084 dis->resumeOnRecv=true;
02085 dis->thread->suspend();
02086 dosuspend = 1;
02087 dis = getAmpiInstance(disComm);
02088 }
02089
02090 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02091 CpvAccess(_currentObj) = dis;
02092 MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
02093 #endif
02094
02095 dis->resumeOnRecv=false;
02096
02097 if(sts)
02098 ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
02099 int status = dis->processMessage(msg, t, s, buf, count, type);
02100 if (status != 0) return status;
02101
02102 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
02103 _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
02104 #endif
02105
02106 #if CMK_BIGSIM_CHARM
02107 #if CMK_TRACE_IN_CHARM
02108
02109
02110
02111 if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
02112 #endif
02113
02114
02115
02116 #if 0
02117 #if 1
02118 if (!dosuspend) {
02119 TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
02120 if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
02121 }
02122 else
02123 #endif
02124 TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
02125 #else
02126 TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
02127 if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
02128 #endif
02129 #endif
02130
02131 delete msg;
02132 return 0;
02133 }
02134
02135 void
02136 ampi::probe(int t, int s, int comm, int *sts)
02137 {
02138 int tags[3];
02139 #if CMK_BIGSIM_CHARM
02140 void *curLog;
02141 _TRACE_BG_TLINE_END(&curLog);
02142
02143 #endif
02144
02145 AmpiMsg *msg = 0;
02146 resumeOnRecv=true;
02147 while(1) {
02148 tags[0] = t; tags[1] = s; tags[2] = comm;
02149 msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
02150 if (msg) break;
02151 thread->suspend();
02152 }
02153 resumeOnRecv=false;
02154 if(sts)
02155 ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
02156 #if CMK_BIGSIM_CHARM
02157
02158 _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME", &curLog, 1);
02159 #endif
02160 }
02161
02162 int
02163 ampi::iprobe(int t, int s, int comm, int *sts)
02164 {
02165 int tags[3];
02166 AmpiMsg *msg = 0;
02167 tags[0] = t; tags[1] = s; tags[2] = comm;
02168 msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
02169 if (msg) {
02170 if(sts)
02171 ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
02172 return 1;
02173 }
02174 #if CMK_BIGSIM_CHARM
02175 void *curLog;
02176 _TRACE_BG_TLINE_END(&curLog);
02177
02178 #endif
02179 thread->schedule();
02180 #if CMK_BIGSIM_CHARM
02181
02182 _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME", &curLog, 1);
02183 #endif
02184 return 0;
02185 }
02186
02187
02188 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
02189 void
02190 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
02191 {
02192 const ampiCommStruct &dest=comm2CommStruct(destcomm);
02193 int rootIdx=dest.getIndexForRank(root);
02194 if(rootIdx==thisIndex) {
02195 #if 0//AMPI_COMLIB
02196 ciBcast.beginIteration();
02197 comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
02198 #else
02199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02200 CpvAccess(_currentObj) = this;
02201 #endif
02202 thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
02203 #endif
02204 }
02205 if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
02206 nbcasts++;
02207 }
02208
02209 void
02210 ampi::bcastraw(void* buf, int len, CkArrayID aid)
02211 {
02212 AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
02213 memcpy(msg->data, buf, len);
02214 CProxy_ampi pa(aid);
02215 pa.generic(msg);
02216 }
02217
02218
02219 AmpiMsg*
02220 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
02221 {
02222 CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
02223 int unit;
02224 CkDDT_DataType *ddt = getDDT()->getType(type);
02225 unit = ddt->getSize(1);
02226 int totalsize = unit*cnt;
02227
02228 AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
02229 char* addr = (char*)Alltoallbuff+disp*unit;
02230 ddt->serialize((char*)msg->data, addr, cnt, (-1));
02231 return msg;
02232 }
02233
02234 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
02235 void *attr_in, void *attr_out, int *flag){
02236 (*flag) = 0;
02237 return (MPI_SUCCESS);
02238 }
02239 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
02240 void *attr_in, void *attr_out, int *flag){
02241 (*(void **)attr_out) = attr_in;
02242 (*flag) = 1;
02243 return (MPI_SUCCESS);
02244 }
02245 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
02246 return (MPI_SUCCESS);
02247 }
02248
02249
02250 void AmpiSeqQ::init(int numP)
02251 {
02252 elements.init(numP);
02253 }
02254
02255 AmpiSeqQ::~AmpiSeqQ () {
02256 }
02257
02258 void AmpiSeqQ::pup(PUP::er &p) {
02259 p|out;
02260 p|elements;
02261 }
02262
02263 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
02264 {
02265 AmpiOtherElement &el=elements[srcIdx];
02266 #if CMK_ERROR_CHECKING
02267 if (msg->seq<el.seqIncoming)
02268 CkAbort("AMPI Logic error: received late out-of-order message!\n");
02269 #endif
02270 out.enq(msg);
02271 el.nOut++;
02272 }
02273
02274 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
02275 {
02276 AmpiOtherElement &el=elements[srcIdx];
02277 if (el.nOut==0) return 0;
02278
02279 for (int i=0;i<out.length();i++) {
02280 AmpiMsg *msg=out.deq();
02281 if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
02282 el.seqIncoming++;
02283 el.nOut--;
02284 return msg;
02285 }
02286 else
02287 out.enq(msg);
02288 }
02289
02290 return 0;
02291 }
02292
02293
02294 void AmpiRequest::print(){
02295 CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
02296 }
02297
02298 void PersReq::print(){
02299 AmpiRequest::print();
02300 CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
02301 }
02302
02303 void IReq::print(){
02304 AmpiRequest::print();
02305 CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
02306 }
02307
02308 void ATAReq::print(){
02309 AmpiRequest::print();
02310 CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
02311 }
02312
02313 void SReq::print(){
02314 AmpiRequest::print();
02315 CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
02316 }
02317
02318 void AmpiRequestList::pup(PUP::er &p) {
02319 if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
02320 return;
02321 }
02322
02323 p(blklen);
02324 p(len);
02325 if(p.isUnpacking()){
02326 makeBlock(blklen,len);
02327 }
02328 int count=0;
02329 for(int i=0;i<len;i++){
02330 char nonnull;
02331 if(!p.isUnpacking()){
02332 if(block[i] == NULL){
02333 nonnull = 0;
02334 }else{
02335 nonnull = block[i]->getType();
02336 }
02337 }
02338 p(nonnull);
02339 if(nonnull != 0){
02340 if(p.isUnpacking()){
02341 switch(nonnull){
02342 case 1:
02343 block[i] = new PersReq;
02344 break;
02345 case 2:
02346 block[i] = new IReq;
02347 break;
02348 case 3:
02349 block[i] = new ATAReq;
02350 break;
02351 }
02352 }
02353 block[i]->pup(p);
02354 count++;
02355 }else{
02356 block[i] = 0;
02357 }
02358 }
02359 if(p.isDeleting()){
02360 freeBlock();
02361 }
02362 }
02363
02364
02365 ampiParent *getAmpiParent(void) {
02366 ampiParent *p = CtvAccess(ampiPtr);
02367 #if CMK_ERROR_CHECKING
02368 if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
02369 #endif
02370 return p;
02371 }
02372
02373 ampi *getAmpiInstance(MPI_Comm comm) {
02374 ampi *ptr=getAmpiParent()->comm2ampi(comm);
02375 #if CMK_ERROR_CHECKING
02376 if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
02377 #endif
02378 return ptr;
02379 }
02380
02381 inline static AmpiRequestList *getReqs(void) {
02382 return &(getAmpiParent()->ampiReqs);
02383 }
02384
02385 inline void checkComm(MPI_Comm comm){
02386 #if CMK_ERROR_CHECKING
02387 getAmpiParent()->checkComm(comm);
02388 #endif
02389 }
02390
02391 inline void checkRequest(MPI_Request req){
02392 #if CMK_ERROR_CHECKING
02393 getReqs()->checkRequest(req);
02394 #endif
02395 }
02396
02397 inline void checkRequests(int n, MPI_Request* reqs){
02398 #if CMK_ERROR_CHECKING
02399 AmpiRequestList* reqlist = getReqs();
02400 for(int i=0;i<n;i++)
02401 reqlist->checkRequest(reqs[i]);
02402 #endif
02403 }
02404
02405 CDECL void AMPI_Migrate(void)
02406 {
02407
02408 #if 0
02409 #if CMK_BIGSIM_CHARM
02410 TRACE_BG_AMPI_SUSPEND();
02411 #endif
02412 #endif
02413 TCHARM_Migrate();
02414
02415 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02416 ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
02417 CpvAccess(_currentObj) = currentAmpi;
02418 #endif
02419
02420 #if CMK_BIGSIM_CHARM
02421
02422 TRACE_BG_ADD_TAG("AMPI_MIGRATE");
02423 #endif
02424 }
02425
02426
02427 CDECL void AMPI_Evacuate(void)
02428 {
02429 TCHARM_Evacuate();
02430 }
02431
02432
02433
02434 CDECL void AMPI_Migrateto(int destPE)
02435 {
02436 AMPIAPI("AMPI_MigrateTo");
02437 #if 0
02438 #if CMK_BIGSIM_CHARM
02439 TRACE_BG_AMPI_SUSPEND();
02440 #endif
02441 #endif
02442 TCHARM_Migrate_to(destPE);
02443 #if CMK_BIGSIM_CHARM
02444
02445 TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
02446 #endif
02447 }
02448
02449 CDECL void AMPI_MigrateTo(int destPE)
02450 {
02451 AMPI_Migrateto(destPE);
02452 }
02453
02454 CDECL void AMPI_Async_Migrate(void)
02455 {
02456 AMPIAPI("AMPI_Async_Migrate");
02457 #if 0
02458 #if CMK_BIGSIM_CHARM
02459 TRACE_BG_AMPI_SUSPEND();
02460 #endif
02461 #endif
02462 TCHARM_Async_Migrate();
02463 #if CMK_BIGSIM_CHARM
02464
02465 TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
02466 #endif
02467 }
02468
02469 CDECL void AMPI_Allow_Migrate(void)
02470 {
02471 AMPIAPI("AMPI_Allow_Migrate");
02472 #if 0
02473 #if CMK_BIGSIM_CHARM
02474 TRACE_BG_AMPI_SUSPEND();
02475 #endif
02476 #endif
02477 TCHARM_Allow_Migrate();
02478 #if CMK_BIGSIM_CHARM
02479 TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
02480 #endif
02481 }
02482
02483 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
02484 #if CMK_LBDB_ON
02485
02486 ampi *ptr=getAmpiInstance(comm);
02487 ptr->setMigratable(mig);
02488 #else
02489 CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
02490 #endif
02491 }
02492
02493 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
02494 {
02495
02496 if (nodeinit_has_been_called) {
02497 AMPIAPI("AMPI_Init");
02498 char **argv;
02499 if (p_argv) argv=*p_argv;
02500 else argv=CkGetArgv();
02501 ampiInit(argv);
02502 if (p_argc) *p_argc=CmiGetArgc(argv);
02503 }
02504 else
02505 {
02506 CkAbort("AMPI_Init> Charm is not initialized!");
02507 }
02508
02509 return 0;
02510 }
02511
02512 CDECL int AMPI_Initialized(int *isInit)
02513 {
02514 if (nodeinit_has_been_called) {
02515 AMPIAPI("AMPI_Initialized");
02516 *isInit=CtvAccess(ampiInitDone);
02517 }
02518 else {
02519 *isInit=nodeinit_has_been_called;
02520 }
02521 return 0;
02522 }
02523
02524 CDECL int AMPI_Finalized(int *isFinalized)
02525 {
02526 AMPIAPI("AMPI_Initialized");
02527 *isFinalized=CtvAccess(ampiFinalized);
02528 return 0;
02529 }
02530
02531 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
02532 {
02533
02534
02535 #if CMK_ERROR_CHECKING
02536 if(checkCommunicator(comm) != MPI_SUCCESS)
02537 return checkCommunicator(comm);
02538 #endif
02539
02540 #if AMPIMSGLOG
02541 ampiParent* pptr = getAmpiParent();
02542 if(msgLogRead){
02543 PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
02544 return 0;
02545 }
02546 #endif
02547
02548 *rank = getAmpiInstance(comm)->getRank(comm);
02549
02550 #if AMPIMSGLOG
02551 if(msgLogWrite && record_msglog(pptr->thisIndex)){
02552 PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
02553 }
02554 #endif
02555 return 0;
02556 }
02557
02558 CDECL
02559 int AMPI_Comm_size(MPI_Comm comm, int *size)
02560 {
02561
02562
02563 #if CMK_ERROR_CHECKING
02564 if(checkCommunicator(comm) != MPI_SUCCESS)
02565 return checkCommunicator(comm);
02566 #endif
02567
02568 #if AMPIMSGLOG
02569 ampiParent* pptr = getAmpiParent();
02570 if(msgLogRead){
02571 PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
02572 return 0;
02573 }
02574 #endif
02575
02576 *size = getAmpiInstance(comm)->getSize(comm);
02577
02578 #if AMPIMSGLOG
02579 if(msgLogWrite && record_msglog(pptr->thisIndex)){
02580 PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
02581 }
02582 #endif
02583
02584 return 0;
02585 }
02586
02587 CDECL
02588 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
02589 {
02590
02591 #if CMK_ERROR_CHECKING
02592 if(checkCommunicator(comm1) != MPI_SUCCESS)
02593 return checkCommunicator(comm1);
02594 if(checkCommunicator(comm2) != MPI_SUCCESS)
02595 return checkCommunicator(comm2);
02596 #endif
02597
02598 AMPIAPI("AMPI_Comm_compare");
02599 if(comm1==comm2) *result=MPI_IDENT;
02600 else{
02601 int equal=1;
02602 CkVec<int> ind1, ind2;
02603 ind1 = getAmpiInstance(comm1)->getIndices();
02604 ind2 = getAmpiInstance(comm2)->getIndices();
02605 if(ind1.size()==ind2.size()){
02606 for(int i=0;i<ind1.size();i++)
02607 if(ind1[i] != ind2[i]) { equal=0; break; }
02608 }
02609 if(equal==1) *result=MPI_CONGRUENT;
02610 else *result=MPI_UNEQUAL;
02611 }
02612 return 0;
02613 }
02614
02615 CDECL void AMPI_Exit(int )
02616 {
02617 AMPIAPI("AMPI_Exit");
02618
02619 TCHARM_Done();
02620 }
02621 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
02622 {
02623 AMPI_Exit(*exitCode);
02624 }
02625
02626 CDECL
02627 int AMPI_Finalize(void)
02628 {
02629 AMPIAPI("AMPI_Finalize");
02630 #if PRINT_IDLE
02631 CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
02632 #endif
02633 #if AMPI_COUNTER
02634 getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
02635 #endif
02636 CtvAccess(ampiFinalized)=1;
02637
02638 #if CMK_BIGSIM_CHARM
02639 #if 0
02640 TRACE_BG_AMPI_SUSPEND();
02641 #endif
02642 #if CMK_TRACE_IN_CHARM
02643 if(CpvAccess(traceOn)) traceSuspend();
02644 #endif
02645 #endif
02646
02647
02648 AMPI_Exit(0);
02649 return 0;
02650 }
02651
02652 CDECL
02653 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
02654 int tag, MPI_Comm comm) {
02655
02656 #if CMK_ERROR_CHECKING
02657 int ret;
02658 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
02659 if(ret != MPI_SUCCESS)
02660 return ret;
02661 #endif
02662
02663 AMPIAPI("AMPI_Send");
02664 #if AMPIMSGLOG
02665 if(msgLogRead){
02666 return 0;
02667 }
02668 #endif
02669
02670 ampi *ptr = getAmpiInstance(comm);
02671 #if AMPI_COMLIB
02672 if(enableStreaming){
02673
02674 ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
02675 } else
02676 #endif
02677 ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
02678 #if AMPI_COUNTER
02679 getAmpiParent()->counters.send++;
02680 #endif
02681 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02682
02683
02684 #endif
02685 return 0;
02686 }
02687
02688 CDECL
02689 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
02690 int tag, MPI_Comm comm)
02691 {
02692 #if CMK_ERROR_CHECKING
02693 int ret;
02694 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
02695 if(ret != MPI_SUCCESS)
02696 return ret;
02697 #endif
02698
02699 AMPIAPI("AMPI_Ssend");
02700 #if AMPIMSGLOG
02701 if(msgLogRead){
02702 return 0;
02703 }
02704 #endif
02705
02706 ampi *ptr = getAmpiInstance(comm);
02707 #if AMPI_COMLIB
02708 if(enableStreaming){
02709 ptr->getStreaming().beginIteration();
02710 ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
02711 } else
02712 #endif
02713 ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
02714 #if AMPI_COUNTER
02715 getAmpiParent()->counters.send++;
02716 #endif
02717
02718 return 0;
02719 }
02720
02721 CDECL
02722 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
02723 int tag, MPI_Comm comm, MPI_Request *request)
02724 {
02725 AMPIAPI("AMPI_Issend");
02726
02727 #if CMK_ERROR_CHECKING
02728 int ret;
02729 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
02730 if(ret != MPI_SUCCESS)
02731 {
02732 *request = MPI_REQUEST_NULL;
02733 return ret;
02734 }
02735 #endif
02736
02737 #if AMPIMSGLOG
02738 ampiParent* pptr = getAmpiParent();
02739 if(msgLogRead){
02740 PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
02741 return 0;
02742 }
02743 #endif
02744
02745 USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
02746 ampi *ptr = getAmpiInstance(comm);
02747 AmpiRequestList* reqs = getReqs();
02748 SReq *newreq = new SReq(comm);
02749 *request = reqs->insert(newreq);
02750
02751
02752 ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
02753 #if AMPI_COUNTER
02754 getAmpiParent()->counters.isend++;
02755 #endif
02756
02757 #if AMPIMSGLOG
02758 if(msgLogWrite && record_msglog(pptr->thisIndex)){
02759 PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
02760 }
02761 #endif
02762
02763 return 0;
02764 }
02765
02766 CDECL
02767 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
02768 MPI_Comm comm, MPI_Status *status)
02769 {
02770 AMPIAPI("AMPI_Recv");
02771
02772 #if CMK_ERROR_CHECKING
02773 int ret;
02774 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
02775 if(ret != MPI_SUCCESS)
02776 return ret;
02777 #endif
02778
02779 #if AMPIMSGLOG
02780 ampiParent* pptr = getAmpiParent();
02781 if(msgLogRead){
02782 (*(pptr->fromPUPer))|(pptr->pupBytes);
02783 PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
02784 PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
02785 return 0;
02786 }
02787 #endif
02788
02789 ampi *ptr = getAmpiInstance(comm);
02790 if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
02791
02792 #if AMPI_COUNTER
02793 getAmpiParent()->counters.recv++;
02794 #endif
02795
02796 #if AMPIMSGLOG
02797 if(msgLogWrite && record_msglog(pptr->thisIndex)){
02798 (pptr->pupBytes) = getDDT()->getSize(type) * count;
02799 (*(pptr->toPUPer))|(pptr->pupBytes);
02800 PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
02801 PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
02802 }
02803 #endif
02804
02805 return 0;
02806 }
02807
02808 CDECL
02809 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
02810 {
02811
02812 #if CMK_ERROR_CHECKING
02813 int ret;
02814 ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
02815 if(ret != MPI_SUCCESS)
02816 return ret;
02817 #endif
02818
02819 AMPIAPI("AMPI_Probe");
02820 ampi *ptr = getAmpiInstance(comm);
02821 ptr->probe(tag,src, comm, (int*) status);
02822 return 0;
02823 }
02824
02825 CDECL
02826 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
02827 {
02828 AMPIAPI("AMPI_Iprobe");
02829
02830 #if CMK_ERROR_CHECKING
02831 int ret;
02832 ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
02833 if(ret != MPI_SUCCESS)
02834 return ret;
02835 #endif
02836
02837 ampi *ptr = getAmpiInstance(comm);
02838 *flag = ptr->iprobe(tag,src,comm,(int*) status);
02839 return 0;
02840 }
02841
02842 CDECL
02843 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
02844 int stag, void *rbuf, int rcount, int rtype,
02845 int src, int rtag, MPI_Comm comm, MPI_Status *sts)
02846 {
02847 AMPIAPI("AMPI_Sendrecv");
02848
02849 #if CMK_ERROR_CHECKING
02850 if (sbuf == MPI_IN_PLACE || rbuf == MPI_IN_PLACE)
02851 CmiAbort("MPI_sendrecv does not accept MPI_IN_PLACE; use MPI_Sendrecv_replace instead");
02852
02853 int ret;
02854 ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
02855 if(ret != MPI_SUCCESS)
02856 return ret;
02857 ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
02858 if(ret != MPI_SUCCESS)
02859 return ret;
02860 #endif
02861
02862 int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
02863 int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
02864 if (se) return se;
02865 else return re;
02866 }
02867
02868 CDECL
02869 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
02870 int dest, int sendtag, int source, int recvtag,
02871 MPI_Comm comm, MPI_Status *status)
02872 {
02873 AMPIAPI("AMPI_Sendrecv_replace");
02874 return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
02875 buf, count, datatype, source, recvtag, comm, status);
02876 }
02877
02878
02879 CDECL
02880 int AMPI_Barrier(MPI_Comm comm)
02881 {
02882 AMPIAPI("AMPI_Barrier");
02883
02884 #if CMK_ERROR_CHECKING
02885 if(checkCommunicator(comm) != MPI_SUCCESS)
02886 return checkCommunicator(comm);
02887 #endif
02888
02889 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
02890
02891 TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
02892
02893
02894 AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
02895
02896
02897
02898 #if AMPI_COUNTER
02899 getAmpiParent()->counters.barrier++;
02900 #endif
02901 return 0;
02902 }
02903
02904 CDECL
02905 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
02906 MPI_Comm comm)
02907 {
02908 AMPIAPI("AMPI_Bcast");
02909
02910 #if CMK_ERROR_CHECKING
02911 int ret;
02912 ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, buf, 1);
02913 if(ret != MPI_SUCCESS)
02914 return ret;
02915 #endif
02916
02917 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
02918 if(comm==MPI_COMM_SELF) return 0;
02919
02920 #if AMPIMSGLOG
02921 ampiParent* pptr = getAmpiParent();
02922 if(msgLogRead){
02923 (*(pptr->fromPUPer))|(pptr->pupBytes);
02924 PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
02925 return 0;
02926 }
02927 #endif
02928
02929 ampi* ptr = getAmpiInstance(comm);
02930 ptr->bcast(root, buf, count, type,comm);
02931 #if AMPI_COUNTER
02932 getAmpiParent()->counters.bcast++;
02933 #endif
02934
02935 #if AMPIMSGLOG
02936 if(msgLogWrite && record_msglog(pptr->thisIndex)) {
02937 (pptr->pupBytes) = getDDT()->getSize(type) * count;
02938 (*(pptr->toPUPer))|(pptr->pupBytes);
02939 PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
02940 }
02941 #endif
02942 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02943
02944
02945 #endif
02946
02947 return 0;
02948 }
02949
02951 const int MPI_REDUCE_SOURCE=0;
02952 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
02953 void ampi::reduceResult(CkReductionMsg *msg)
02954 {
02955 MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
02956 ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
02957 thisArrayID,thisIndex);
02958 delete msg;
02959 }
02960
02961 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
02962 {
02963 int szdata = ddt->getSize(count);
02964 int szhdr = sizeof(AmpiOpHeader);
02965 AmpiOpHeader newhdr(op,type,count,szdata);
02966 CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
02967 memcpy(msg->getData(),&newhdr,szhdr);
02968 if (count > 0) {
02969 TCharm::activateVariable(inbuf);
02970 ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
02971 TCharm::deactivateVariable(inbuf);
02972 }
02973 return msg;
02974 }
02975
02976
02977 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
02978
02979
02980 ampi *ptr = getAmpiInstance(comm);
02981 CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
02982 int len=ddt->getSize(count);
02983 char *serialized=new char[len];
02984 TCharm::activateVariable(inbuf);
02985 TCharm::activateVariable(outbuf);
02986 ddt->serialize((char*)inbuf,(char*)serialized,count,1);
02987 ddt->serialize((char*)outbuf,(char*)serialized,count,-1);
02988 TCharm::deactivateVariable(outbuf);
02989 TCharm::deactivateVariable(inbuf);
02990 delete [] serialized;
02991
02992 return MPI_SUCCESS;
02993 }
02994
02995 static void handle_MPI_IN_PLACE(void* &inbuf, void* &outbuf)
02996 {
02997 if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
02998 if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
02999 CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
03000 }
03001
03002 #define SYNCHRONOUS_REDUCE 0
03003
03004 CDECL
03005 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
03006 int root, MPI_Comm comm)
03007 {
03008 AMPIAPI("AMPI_Reduce");
03009
03010 handle_MPI_IN_PLACE(inbuf, outbuf);
03011
03012 #if CMK_ERROR_CHECKING
03013 int ret;
03014 ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, inbuf, 1,
03015 outbuf, getAmpiInstance(comm)->getRank(comm) == root);
03016 if(ret != MPI_SUCCESS)
03017 return ret;
03018 #endif
03019
03020 if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
03021
03022 #if AMPIMSGLOG
03023 ampiParent* pptr = getAmpiParent();
03024 if(msgLogRead){
03025 (*(pptr->fromPUPer))|(pptr->pupBytes);
03026 PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
03027 return 0;
03028 }
03029 #endif
03030
03031 ampi *ptr = getAmpiInstance(comm);
03032 int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
03033 if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
03034 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
03035
03036 CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
03037
03038 CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
03039 msg->setCallback(reduceCB);
03040 MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
03041 ptr->contribute(msg);
03042
03043 if (ptr->thisIndex == rootIdx){
03044
03045 if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
03046 CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
03047
03048 #if SYNCHRONOUS_REDUCE
03049 AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
03050 CProxy_ampi pa(ptr->getProxy());
03051 pa.generic(msg);
03052 #endif
03053 }
03054 #if SYNCHRONOUS_REDUCE
03055 ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
03056 #endif
03057
03058 #if AMPI_COUNTER
03059 getAmpiParent()->counters.reduce++;
03060 #endif
03061
03062 #if AMPIMSGLOG
03063 if(msgLogWrite && record_msglog(pptr->thisIndex)){
03064 (pptr->pupBytes) = getDDT()->getSize(type) * count;
03065 (*(pptr->toPUPer))|(pptr->pupBytes);
03066 PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
03067 }
03068 #endif
03069
03070 return 0;
03071 }
03072
03073 CDECL
03074 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
03075 MPI_Op op, MPI_Comm comm)
03076 {
03077 AMPIAPI("AMPI_Allreduce");
03078
03079 handle_MPI_IN_PLACE(inbuf, outbuf);
03080
03081 #if CMK_ERROR_CHECKING
03082 int ret;
03083 ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
03084 if(ret != MPI_SUCCESS)
03085 return ret;
03086 #endif
03087
03088 ampi *ptr = getAmpiInstance(comm);
03089
03090 CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
03091
03092 #if CMK_BIGSIM_CHARM
03093 TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
03094 #endif
03095
03096 if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
03097
03098 #if AMPIMSGLOG
03099 ampiParent* pptr = getAmpiParent();
03100 if(msgLogRead){
03101 (*(pptr->fromPUPer))|(pptr->pupBytes);
03102 PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
03103
03104 return 0;
03105 }
03106 #endif
03107
03108 if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
03109 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
03110
03111 CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
03112 CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
03113 msg->setCallback(allreduceCB);
03114 ptr->contribute(msg);
03115
03116
03117 if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
03118 CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
03119 #if AMPI_COUNTER
03120 getAmpiParent()->counters.allreduce++;
03121 #endif
03122
03123 #if AMPIMSGLOG
03124 if(msgLogWrite && record_msglog(pptr->thisIndex)){
03125 (pptr->pupBytes) = getDDT()->getSize(type) * count;
03126 (*(pptr->toPUPer))|(pptr->pupBytes);
03127 PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
03128
03129 }
03130 #endif
03131
03132 return 0;
03133 }
03134
03135 CDECL
03136 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
03137 MPI_Op op, MPI_Comm comm, MPI_Request* request)
03138 {
03139 AMPIAPI("AMPI_Iallreduce");
03140
03141 handle_MPI_IN_PLACE(inbuf, outbuf);
03142
03143 #if CMK_ERROR_CHECKING
03144 int ret;
03145 ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
03146 if(ret != MPI_SUCCESS)
03147 {
03148 *request = MPI_REQUEST_NULL;
03149 return ret;
03150 }
03151 #endif
03152
03153 if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
03154
03155 checkRequest(*request);
03156 if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
03157 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
03158 ampi *ptr = getAmpiInstance(comm);
03159
03160 CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
03161 CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
03162 msg->setCallback(allreduceCB);
03163 ptr->contribute(msg);
03164
03165
03166 AmpiRequestList* reqs = getReqs();
03167 IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
03168 *request = reqs->insert(newreq);
03169 return 0;
03170 }
03171
03172 CDECL
03173 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
03174 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
03175 {
03176 AMPIAPI("AMPI_Reduce_scatter");
03177
03178 handle_MPI_IN_PLACE(sendbuf, recvbuf);
03179
03180 #if CMK_ERROR_CHECKING
03181 int ret;
03182 ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
03183 if(ret != MPI_SUCCESS)
03184 return ret;
03185 #endif
03186
03187 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
03188 if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
03189 ampi *ptr = getAmpiInstance(comm);
03190 int size = ptr->getSize(comm);
03191 int count=0;
03192 int *displs = new int [size];
03193 int len;
03194 void *tmpbuf;
03195
03196
03197 for(int i=0;i<size;i++){
03198 displs[i] = count;
03199 count+= recvcounts[i];
03200 }
03201 len = ptr->getDDT()->getType(datatype)->getSize(count);
03202 tmpbuf = malloc(len);
03203 AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
03204 AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
03205 recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
03206 free(tmpbuf);
03207 delete [] displs;
03208 return 0;
03209 }
03210
03211
03212
03213
03214
03215
03216
03217
03218
03219
03220
03221
03222
03223
03224
03225
03226
03227
03228
03229
03230
03231
03232
03233
03234
03235
03236
03237 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) {
03238 (op)(invec,inoutvec,&count,&datatype);
03239 }
03240 CDECL
03241 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
03242 AMPIAPI("AMPI_Scan");
03243
03244 #if CMK_ERROR_CHECKING
03245 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
03246 CmiAbort("AMPI_Scan does not implement MPI_IN_PLACE");
03247
03248 int ret;
03249 ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
03250 if(ret != MPI_SUCCESS)
03251 return ret;
03252 #endif
03253
03254 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
03255 MPI_Status sts;
03256 ampi *ptr = getAmpiInstance(comm);
03257 int size = ptr->getSize(comm);
03258 int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
03259 int rank = ptr->getRank(comm);
03260 int mask = 0x1;
03261 int dst;
03262 void* tmp_buf = malloc(blklen);
03263 void* partial_scan = malloc(blklen);
03264
03265 memcpy(recvbuf, sendbuf, blklen);
03266 memcpy(partial_scan, sendbuf, blklen);
03267 while(mask < size){
03268 dst = rank^mask;
03269 if(dst < size){
03270 AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
03271 tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
03272 if(rank > dst){
03273 (op)(tmp_buf,partial_scan,&count,&datatype);
03274 (op)(tmp_buf,recvbuf,&count,&datatype);
03275 }else {
03276 (op)(partial_scan,tmp_buf,&count,&datatype);
03277 memcpy(partial_scan,tmp_buf,blklen);
03278 }
03279 }
03280 mask <<= 1;
03281
03282 }
03283
03284 free(tmp_buf);
03285 free(partial_scan);
03286 #if AMPI_COUNTER
03287 getAmpiParent()->counters.scan++;
03288 #endif
03289 return 0;
03290 }
03291
03292 CDECL
03293 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
03294
03295 *op = function;
03296 return 0;
03297 }
03298
03299 CDECL
03300 int AMPI_Op_free(MPI_Op *op){
03301
03302 *op = MPI_OP_NULL;
03303 return 0;
03304 }
03305
03306
03307 CDECL
03308 double AMPI_Wtime(void)
03309 {
03310
03311
03312 #if AMPIMSGLOG
03313 double ret=TCHARM_Wall_timer();
03314 ampiParent* pptr = getAmpiParent();
03315 if(msgLogRead){
03316 (*(pptr->fromPUPer))|ret;
03317 return ret;
03318 }
03319
03320 if(msgLogWrite && record_msglog(pptr->thisIndex)){
03321 (*(pptr->toPUPer))|ret;
03322 }
03323 #endif
03324
03325 #if CMK_BIGSIM_CHARM
03326 return BgGetTime();
03327 #else
03328 return TCHARM_Wall_timer();
03329 #endif
03330 }
03331
03332 CDECL
03333 double AMPI_Wtick(void){
03334
03335 return 1e-6;
03336 }
03337
03338
03339 int PersReq::start(){
03340 if(sndrcv == 1 || sndrcv == 3) {
03341 ampi *ptr=getAmpiInstance(comm);
03342 ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
03343 }
03344 return 0;
03345 }
03346
03347 CDECL
03348 int AMPI_Start(MPI_Request *request)
03349 {
03350 AMPIAPI("AMPI_Start");
03351 checkRequest(*request);
03352 AmpiRequestList *reqs = getReqs();
03353 if(-1==(*reqs)[*request]->start()) {
03354 CkAbort("MPI_Start could be used only on persistent communication requests!");
03355 }
03356 return 0;
03357 }
03358
03359 CDECL
03360 int AMPI_Startall(int count, MPI_Request *requests){
03361 AMPIAPI("AMPI_Startall");
03362 checkRequests(count,requests);
03363 AmpiRequestList *reqs = getReqs();
03364 for(int i=0;i<count;i++){
03365 if(-1==(*reqs)[requests[i]]->start())
03366 CkAbort("MPI_Start could be used only on persistent communication requests!");
03367 }
03368 return 0;
03369 }
03370
03371
03372
03373
03374
03375
03376
03377 inline int areInactiveReqs(int count, MPI_Request* reqs){
03378 for(int i=0;i<count;i++){
03379 if(reqs[i]!=MPI_REQUEST_NULL)
03380 return 0;
03381 }
03382 return 1;
03383 }
03384 inline int matchReq(MPI_Request ia, MPI_Request ib){
03385 checkRequest(ia);
03386 checkRequest(ib);
03387 AmpiRequestList* reqs = getReqs();
03388 AmpiRequest *a, *b;
03389 if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
03390 if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
03391 a=(*reqs)[ia]; b=(*reqs)[ib];
03392 if(a->tag != b->tag) return 0;
03393 if(a->src != b->src) return 0;
03394 if(a->comm != b->comm) return 0;
03395 return 1;
03396 }
03397 inline void swapInt(int& a,int& b){
03398 int tmp;
03399 tmp=a; a=b; b=tmp;
03400 }
03401 inline void sortedIndex(int n, int* arr, int* idx){
03402 int i,j;
03403 for(i=0;i<n;i++)
03404 idx[i]=i;
03405 for (i=0; i<n-1; i++)
03406 for (j=0; j<n-1-i; j++)
03407 if (arr[idx[j+1]] < arr[idx[j]])
03408 swapInt(idx[j+1],idx[j]);
03409 }
03410 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
03411 CkAssert(count!=0);
03412 int *newidx = new int [count];
03413 int flag;
03414 sortedIndex(count,arr,newidx);
03415 CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
03416 CkVec<int> slot;
03417 vec->push_back(slot);
03418 (*vec)[0].push_back(newidx[0]);
03419 for(int i=1;i<count;i++){
03420 flag=0;
03421 for(int j=0;j<vec->size();j++){
03422 if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
03423 ((*vec)[j]).push_back(newidx[i]);
03424 flag++;
03425 }
03426 }
03427 if(!flag){
03428 CkVec<int> newslot;
03429 newslot.push_back(newidx[i]);
03430 vec->push_back(newslot);
03431 }else{
03432 CkAssert(flag==1);
03433 }
03434 }
03435 delete [] newidx;
03436 return vec;
03437 }
03438 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
03439 printf("vec content: ");
03440 for(int i=0;i<vec.size();i++){
03441 printf("{");
03442 for(int j=0;j<(vec[i]).size();j++){
03443 printf(" %d ",arr[(vec[i])[j]]);
03444 }
03445 printf("} ");
03446 }
03447 printf("\n");
03448 }
03449
03450 int PersReq::wait(MPI_Status *sts){
03451 if(sndrcv == 2) {
03452 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
03453 CkAbort("AMPI> Error in persistent request wait");
03454 #if CMK_BIGSIM_CHARM
03455 _TRACE_BG_TLINE_END(&event);
03456 #endif
03457 }
03458 return 0;
03459 }
03460
03461 int IReq::wait(MPI_Status *sts){
03462 if(CpvAccess(CmiPICMethod) == 2) {
03463 AMPI_DEBUG("In weird clause of IReq::wait\n");
03464 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
03465 CkAbort("AMPI> Error in non-blocking request wait");
03466
03467 return 0;
03468 }
03469
03470
03471
03472
03473
03474
03475
03476 ampi *ptr = getAmpiInstance(comm);
03477
03478
03479
03480
03481
03482
03483 while (statusIreq == false) {
03484
03485
03486
03487
03488 ptr->resumeOnRecv=true;
03489 ptr->block();
03490
03491
03492
03493
03494
03495
03496 #if CMK_BIGSIM_CHARM
03497
03498
03499
03500 if(_BgInOutOfCoreMode)
03501 return -1;
03502 #endif
03503 }
03504 ptr->resumeOnRecv=false;
03505
03506 AMPI_DEBUG("IReq::wait has resumed\n");
03507
03508 if(sts) {
03509 AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait this=%p\n", (int)this->tag, this);
03510 sts->MPI_TAG = tag;
03511 sts->MPI_SOURCE = src;
03512 sts->MPI_COMM = comm;
03513 sts->MPI_LENGTH = length;
03514 }
03515
03516 return 0;
03517 }
03518
03519 int ATAReq::wait(MPI_Status *sts){
03520 int i;
03521 for(i=0;i<count;i++){
03522 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
03523 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
03524 CkAbort("AMPI> Error in alltoall request wait");
03525 #if CMK_BIGSIM_CHARM
03526 _TRACE_BG_TLINE_END(&myreqs[i].event);
03527 #endif
03528 }
03529 #if CMK_BIGSIM_CHARM
03530
03531 TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
03532 for (i=0; i<count; i++)
03533 _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
03534 _TRACE_BG_TLINE_END(&event);
03535 #endif
03536 return 0;
03537 }
03538
03539 int SReq::wait(MPI_Status *sts){
03540 ampi *ptr = getAmpiInstance(comm);
03541 while (statusIreq == false) {
03542 ptr->resumeOnRecv = true;
03543 ptr->block();
03544 ptr = getAmpiInstance(comm);
03545 ptr->resumeOnRecv = false;
03546 }
03547 return 0;
03548 }
03549
03550 CDECL
03551 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
03552 {
03553 AMPIAPI("AMPI_Wait");
03554 if(*request == MPI_REQUEST_NULL){
03555 stsempty(*sts);
03556 return 0;
03557 }
03558 checkRequest(*request);
03559 AmpiRequestList* reqs = getReqs();
03560
03561 #if AMPIMSGLOG
03562 ampiParent* pptr = getAmpiParent();
03563 if(msgLogRead){
03564 (*(pptr->fromPUPer))|(pptr->pupBytes);
03565 PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
03566 PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
03567 return 0;
03568 }
03569 #endif
03570
03571 AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
03572 AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
03573
03574 int waitResult = -1;
03575 do{
03576 AmpiRequest *waitReq = (*reqs)[*request];
03577 waitResult = waitReq->wait(sts);
03578 if(_BgInOutOfCoreMode){
03579 reqs = getReqs();
03580 }
03581 }while(waitResult==-1);
03582
03583
03584 AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
03585
03586
03587 #if AMPIMSGLOG
03588 if(msgLogWrite && record_msglog(pptr->thisIndex)){
03589 (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
03590 (*(pptr->toPUPer))|(pptr->pupBytes);
03591 PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
03592 PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
03593 }
03594 #endif
03595
03596 if((*reqs)[*request]->getType() != 1) {
03597 reqs->free(*request);
03598 *request = MPI_REQUEST_NULL;
03599 }
03600
03601 AMPI_DEBUG("End of AMPI_Wait\n");
03602
03603 return 0;
03604 }
03605
03606 CDECL
03607 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
03608 {
03609 AMPIAPI("AMPI_Waitall");
03610 if(count==0) return MPI_SUCCESS;
03611 checkRequests(count,request);
03612 int i,j,oldPe;
03613 AmpiRequestList* reqs = getReqs();
03614 CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
03615
03616 #if AMPIMSGLOG
03617 ampiParent* pptr = getAmpiParent();
03618 if(msgLogRead){
03619 for(i=0;i<reqvec->size();i++){
03620 for(j=0;j<((*reqvec)[i]).size();j++){
03621 if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
03622 stsempty(sts[((*reqvec)[i])[j]]);
03623 continue;
03624 }
03625 AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
03626 (*(pptr->fromPUPer))|(pptr->pupBytes);
03627 PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
03628 PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
03629 }
03630 }
03631 return 0;
03632 }
03633 #endif
03634
03635 #if CMK_BIGSIM_CHARM
03636 void *curLog;
03637 _TRACE_BG_TLINE_END(&curLog);
03638 #if 0
03639 TRACE_BG_AMPI_SUSPEND();
03640 #endif
03641 #endif
03642 for(i=0;i<reqvec->size();i++){
03643 for(j=0;j<((*reqvec)[i]).size();j++){
03644
03645 if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
03646 stsempty(sts[((*reqvec)[i])[j]]);
03647 continue;
03648 }
03649 oldPe = CkMyPe();
03650
03651 int waitResult = -1;
03652 do{
03653 AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
03654 waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
03655 if(_BgInOutOfCoreMode){
03656 reqs = getReqs();
03657 reqvec = vecIndex(count, request);
03658 }
03659
03660 #if AMPIMSGLOG
03661 if(msgLogWrite && record_msglog(pptr->thisIndex)){
03662 (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
03663 (*(pptr->toPUPer))|(pptr->pupBytes);
03664 PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
03665 PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
03666 }
03667 #endif
03668
03669 }while(waitResult==-1);
03670
03671 #if 1
03672 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
03673
03674 if(oldPe != CkMyPe()){
03675 #endif
03676 reqs = getReqs();
03677 reqvec = vecIndex(count,request);
03678 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
03679 }
03680 #endif
03681 #endif
03682 }
03683 }
03684 #if CMK_BIGSIM_CHARM
03685 TRACE_BG_AMPI_WAITALL(reqs);
03686 #endif
03687
03688 for(i=0;i<count;i++){
03689 if(request[i] == MPI_REQUEST_NULL)
03690 continue;
03691 if((*reqs)[request[i]]->getType() != 1) {
03692 reqs->free(request[i]);
03693 request[i] = MPI_REQUEST_NULL;
03694 }
03695 }
03696 delete reqvec;
03697 return 0;
03698 }
03699
03700 CDECL
03701 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
03702 {
03703 AMPIAPI("AMPI_Waitany");
03704
03705 USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
03706 if(count == 0) return MPI_SUCCESS;
03707 checkRequests(count,request);
03708 if(areInactiveReqs(count,request)){
03709 *idx=MPI_UNDEFINED;
03710 stsempty(*sts);
03711 return MPI_SUCCESS;
03712 }
03713 int flag=0;
03714 CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
03715 while(count>0){
03716 for(int i=0;i<reqvec->size();i++){
03717 AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
03718 if(flag == 1 && sts->MPI_COMM != 0){
03719 *idx = ((*reqvec)[i])[0];
03720 USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
03721 return 0;
03722 }
03723 }
03724
03725 AMPI_Yield(MPI_COMM_WORLD);
03726 }
03727 *idx = MPI_UNDEFINED;
03728 USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
03729 delete reqvec;
03730 return 0;
03731 }
03732
03733 CDECL
03734 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
03735 int *array_of_indices, MPI_Status *array_of_statuses)
03736 {
03737 AMPIAPI("AMPI_Waitsome");
03738 checkRequests(incount,array_of_requests);
03739 if(areInactiveReqs(incount,array_of_requests)){
03740 *outcount=MPI_UNDEFINED;
03741 return MPI_SUCCESS;
03742 }
03743 MPI_Status sts;
03744 int i;
03745 int flag=0, realflag=0;
03746 CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
03747 *outcount = 0;
03748 while(1){
03749 for(i=0;i<reqvec->size();i++){
03750 AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
03751 if(flag == 1){
03752 array_of_indices[(*outcount)]=((*reqvec)[i])[0];
03753 array_of_statuses[(*outcount)++]=sts;
03754 if(sts.MPI_COMM != 0)
03755 realflag=1;
03756 }
03757 }
03758 if(realflag && outcount>0) break;
03759 }
03760 delete reqvec;
03761 return 0;
03762 }
03763
03764 CmiBool PersReq::test(MPI_Status *sts){
03765 if(sndrcv == 2)
03766 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
03767 else
03768 return 1;
03769
03770 }
03771 void PersReq::complete(MPI_Status *sts){
03772 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
03773 CkAbort("AMPI> Error in persistent request complete");
03774 }
03775
03776 CmiBool IReq::test(MPI_Status *sts){
03777 if (statusIreq == true) {
03778 if(sts)
03779 sts->MPI_LENGTH = length;
03780 return true;
03781 }
03782 else {
03783 getAmpiInstance(comm)->yield();
03784 return false;
03785 }
03786
03787
03788
03789 }
03790
03791 CmiBool SReq::test(MPI_Status *sts){
03792 if (statusIreq == true) {
03793 return true;
03794 }
03795 else {
03796 getAmpiInstance(comm)->yield();
03797 return false;
03798 }
03799 }
03800
03801 void IReq::complete(MPI_Status *sts){
03802 wait(sts);
03803
03804
03805
03806
03807 }
03808
03809 void SReq::complete(MPI_Status *sts){
03810 wait(sts);
03811 }
03812
03813 void IReq::receive(ampi *ptr, AmpiMsg *msg)
03814 {
03815 int sts = ptr->processMessage(msg, tag, src, buf, count, type);
03816 statusIreq = (sts == 0);
03817 length = msg->length;
03818 this->tag = msg->tag;
03819 src = msg->srcRank;
03820 comm = msg->comm;
03821 AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
03822 #if CMK_BIGSIM_CHARM
03823 event = msg->event;
03824 #endif
03825 delete msg;
03826
03827
03828
03829
03830 }
03831
03832 CmiBool ATAReq::test(MPI_Status *sts){
03833 int i, flag=1;
03834 for(i=0;i<count;i++){
03835 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
03836 myreqs[i].comm, (int*) sts);
03837 }
03838 return flag;
03839 }
03840 void ATAReq::complete(MPI_Status *sts){
03841 int i;
03842 for(i=0;i<count;i++){
03843 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
03844 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
03845 CkAbort("AMPI> Error in alltoall request complete");
03846 }
03847 }
03848
03849 CDECL
03850 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
03851 {
03852 AMPIAPI("AMPI_Test");
03853 if(*request==MPI_REQUEST_NULL) {
03854 *flag = 1;
03855 stsempty(*sts);
03856 return 0;
03857 }
03858 checkRequest(*request);
03859 AmpiRequestList* reqs = getReqs();
03860 if(1 == (*flag = (*reqs)[*request]->test(sts))){
03861 (*reqs)[*request]->complete(sts);
03862 if((*reqs)[*request]->getType() != 1) {
03863 reqs->free(*request);
03864 *request = MPI_REQUEST_NULL;
03865 }
03866 }
03867 return 0;
03868 }
03869
03870 CDECL
03871 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
03872 AMPIAPI("AMPI_Testany");
03873 checkRequests(count,request);
03874 if(areInactiveReqs(count,request)){
03875 *flag=1;
03876 *index=MPI_UNDEFINED;
03877 stsempty(*sts);
03878 return MPI_SUCCESS;
03879 }
03880 CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
03881 *flag=0;
03882 for(int i=0;i<reqvec->size();i++){
03883 AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
03884 if(*flag==1 && sts->MPI_COMM!=0){
03885 *index = ((*reqvec)[i])[0];
03886 return 0;
03887 }
03888 }
03889 *index = MPI_UNDEFINED;
03890 delete reqvec;
03891 return 0;
03892 }
03893
03894 CDECL
03895 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
03896 {
03897 AMPIAPI("AMPI_Testall");
03898 if(count==0) return MPI_SUCCESS;
03899 checkRequests(count,request);
03900 int tmpflag;
03901 int i,j;
03902 AmpiRequestList* reqs = getReqs();
03903 CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
03904 *flag = 1;
03905 for(i=0;i<reqvec->size();i++){
03906 for(j=0;j<((*reqvec)[i]).size();j++){
03907 if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
03908 continue;
03909 tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
03910 *flag *= tmpflag;
03911 }
03912 }
03913 if(flag)
03914 MPI_Waitall(count,request,sts);
03915 delete reqvec;
03916 return 0;
03917 }
03918
03919 CDECL
03920 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
03921 int *array_of_indices, MPI_Status *array_of_statuses)
03922 {
03923 AMPIAPI("AMPI_Testsome");
03924 checkRequests(incount,array_of_requests);
03925 if(areInactiveReqs(incount,array_of_requests)){
03926 *outcount=MPI_UNDEFINED;
03927 return MPI_SUCCESS;
03928 }
03929 MPI_Status sts;
03930 int flag;
03931 int i;
03932 CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
03933 *outcount = 0;
03934 for(i=0;i<reqvec->size();i++){
03935 AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
03936 if(flag == 1){
03937 array_of_indices[(*outcount)]=((*reqvec)[i])[0];
03938 array_of_statuses[(*outcount)++]=sts;
03939 }
03940 }
03941 delete reqvec;
03942 return 0;
03943 }
03944
03945 CDECL
03946 int AMPI_Request_free(MPI_Request *request){
03947 AMPIAPI("AMPI_Request_free");
03948 if(*request==MPI_REQUEST_NULL) return 0;
03949 checkRequest(*request);
03950 AmpiRequestList* reqs = getReqs();
03951 reqs->free(*request);
03952 *request = MPI_REQUEST_NULL;
03953 return 0;
03954 }
03955
03956 CDECL
03957 int AMPI_Cancel(MPI_Request *request){
03958 AMPIAPI("AMPI_Cancel");
03959 return AMPI_Request_free(request);
03960 }
03961
03962 CDECL
03963 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
03964
03965 *flag = 1;
03966 return 0;
03967 }
03968
03969 CDECL
03970 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
03971 MPI_Comm comm, MPI_Request *req)
03972 {
03973 AMPIAPI("AMPI_Recv_init");
03974
03975 #if CMK_ERROR_CHECKING
03976 int ret;
03977 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
03978 if(ret != MPI_SUCCESS)
03979 {
03980 *req = MPI_REQUEST_NULL;
03981 return ret;
03982 }
03983 #endif
03984
03985 AmpiRequestList* reqs = getReqs();
03986 PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
03987 *req = reqs->insert(newreq);
03988 return 0;
03989 }
03990
03991 CDECL
03992 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
03993 MPI_Comm comm, MPI_Request *req)
03994 {
03995 AMPIAPI("AMPI_Send_init");
03996
03997 #if CMK_ERROR_CHECKING
03998 int ret;
03999 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
04000 if(ret != MPI_SUCCESS)
04001 {
04002 *req = MPI_REQUEST_NULL;
04003 return ret;
04004 }
04005 #endif
04006
04007 AmpiRequestList* reqs = getReqs();
04008 PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
04009 *req = reqs->insert(newreq);
04010 return 0;
04011 }
04012
04013 CDECL
04014 int AMPI_Ssend_init(void *buf, int count, int type, int dest, int tag,
04015 MPI_Comm comm, MPI_Request *req)
04016 {
04017 AMPIAPI("AMPI_Ssend_init");
04018
04019 #if CMK_ERROR_CHECKING
04020 int ret;
04021 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
04022 if(ret != MPI_SUCCESS)
04023 {
04024 *req = MPI_REQUEST_NULL;
04025 return ret;
04026 }
04027 #endif
04028
04029 AmpiRequestList* reqs = getReqs();
04030 PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,3);
04031 *req = reqs->insert(newreq);
04032 return 0;
04033 }
04034
04035 CDECL
04036 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
04037 MPI_Datatype *newtype)
04038 {
04039 AMPIAPI("AMPI_Type_contiguous");
04040 getDDT()->newContiguous(count, oldtype, newtype);
04041 return 0;
04042 }
04043
04044 CDECL
04045 int AMPI_Type_vector(int count, int blocklength, int stride,
04046 MPI_Datatype oldtype, MPI_Datatype* newtype)
04047 {
04048 AMPIAPI("AMPI_Type_vector");
04049 getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
04050 return 0 ;
04051 }
04052
04053 CDECL
04054 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride,
04055 MPI_Datatype oldtype, MPI_Datatype* newtype)
04056 {
04057 AMPIAPI("AMPI_Type_hvector");
04058 getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
04059 return 0 ;
04060 }
04061
04062 CDECL
04063 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp,
04064 MPI_Datatype oldtype, MPI_Datatype* newtype)
04065 {
04066 AMPIAPI("AMPI_Type_indexed");
04067 getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
04068 return 0 ;
04069 }
04070
04071 CDECL
04072 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
04073 MPI_Datatype oldtype, MPI_Datatype* newtype)
04074 {
04075 AMPIAPI("AMPI_Type_hindexed");
04076 getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
04077 return 0 ;
04078 }
04079
04080 CDECL
04081 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp,
04082 MPI_Datatype* oldtype, MPI_Datatype* newtype)
04083 {
04084 AMPIAPI("AMPI_Type_struct");
04085 getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
04086 return 0 ;
04087 }
04088
04089 CDECL
04090 int AMPI_Type_commit(MPI_Datatype *datatype)
04091 {
04092 AMPIAPI("AMPI_Type_commit");
04093 return 0;
04094 }
04095
04096 CDECL
04097 int AMPI_Type_free(MPI_Datatype *datatype)
04098 {
04099 AMPIAPI("AMPI_Type_free");
04100 getDDT()->freeType(datatype);
04101 return 0;
04102 }
04103
04104
04105 CDECL
04106 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
04107 {
04108 AMPIAPI("AMPI_Type_extent");
04109 *extent = getDDT()->getExtent(datatype);
04110 return 0;
04111 }
04112
04113 CDECL
04114 int AMPI_Type_size(MPI_Datatype datatype, int *size)
04115 {
04116 AMPIAPI("AMPI_Type_size");
04117 *size=getDDT()->getSize(datatype);
04118 return 0;
04119 }
04120
04121 CDECL
04122 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
04123 int tag, MPI_Comm comm, MPI_Request *request)
04124 {
04125 AMPIAPI("AMPI_Isend");
04126
04127 #if CMK_ERROR_CHECKING
04128 int ret;
04129 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
04130 if(ret != MPI_SUCCESS)
04131 {
04132 *request = MPI_REQUEST_NULL;
04133 return ret;
04134 }
04135 #endif
04136
04137 #if AMPIMSGLOG
04138 ampiParent* pptr = getAmpiParent();
04139 if(msgLogRead){
04140 PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
04141 return 0;
04142 }
04143 #endif
04144
04145 USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
04146 ampi *ptr = getAmpiInstance(comm);
04147 #if AMPI_COMLIB
04148 if(enableStreaming){
04149
04150 ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
04151 } else
04152 #endif
04153 ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
04154 *request = MPI_REQUEST_NULL;
04155 #if AMPI_COUNTER
04156 getAmpiParent()->counters.isend++;
04157 #endif
04158
04159 #if AMPIMSGLOG
04160 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04161 PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
04162 }
04163 #endif
04164
04165 return 0;
04166 }
04167
04168 CDECL
04169 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
04170 int tag, MPI_Comm comm, MPI_Request *request)
04171 {
04172 AMPIAPI("AMPI_Irecv");
04173
04174 #if CMK_ERROR_CHECKING
04175 int ret;
04176 ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
04177 if(ret != MPI_SUCCESS)
04178 return ret;
04179 #endif
04180
04181 if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
04182 USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
04183 AmpiRequestList* reqs = getReqs();
04184 IReq *newreq = new IReq(buf,count,type,src,tag,comm);
04185 *request = reqs->insert(newreq);
04186
04187 #if AMPIMSGLOG
04188 ampiParent* pptr = getAmpiParent();
04189 if(msgLogRead){
04190 PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
04191 return 0;
04192 }
04193 #endif
04194
04195
04196
04197
04198 ampi *ptr = getAmpiInstance(comm);
04199 AmpiMsg *msg = NULL;
04200 if (CpvAccess(CmiPICMethod) != 2)
04201 {
04202 msg = ptr->getMessage(tag, src, comm, &newreq->tag);
04203 }
04204 if (msg) {
04205 newreq->receive(ptr, msg);
04206 } else {
04207
04208 int tags[3];
04209 tags[0] = tag; tags[1] = src; tags[2] = comm;
04210 #if 0
04211 CmmPut(ptr->posted_ireqs, 3, tags, newreq);
04212 #else
04213
04214
04215
04216
04217
04218
04219
04220
04221
04222 CmmPut(ptr->posted_ireqs, 3, tags, (void *)(CmiIntPtr)((*request)+1));
04223 #endif
04224 }
04225
04226 #if AMPI_COUNTER
04227 getAmpiParent()->counters.irecv++;
04228 #endif
04229
04230 #if AMPIMSGLOG
04231 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04232 PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
04233 }
04234 #endif
04235
04236 return 0;
04237 }
04238
04239 CDECL
04240 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
04241 int root, MPI_Comm comm, MPI_Request *request)
04242 {
04243 AMPIAPI("AMPI_Ireduce");
04244
04245 handle_MPI_IN_PLACE(sendbuf, recvbuf);
04246
04247 #if CMK_ERROR_CHECKING
04248 int ret;
04249 ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, sendbuf, 1,
04250 recvbuf, getAmpiInstance(comm)->getRank(comm) == root);
04251 if(ret != MPI_SUCCESS)
04252 {
04253 *request = MPI_REQUEST_NULL;
04254 return ret;
04255 }
04256 #endif
04257
04258 if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
04259 if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
04260 ampi *ptr = getAmpiInstance(comm);
04261 CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
04262 int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
04263 CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
04264 msg->setCallback(reduceCB);
04265 ptr->contribute(msg);
04266
04267 if (ptr->thisIndex == rootIdx){
04268
04269 AmpiRequestList* reqs = getReqs();
04270 IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
04271 *request = reqs->insert(newreq);
04272 }
04273 return 0;
04274 }
04275
04276 CDECL
04277 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04278 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04279 MPI_Comm comm)
04280 {
04281 AMPIAPI("AMPI_Allgather");
04282
04283 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04284 CmiAbort("AMPI_Allgather does not implement MPI_IN_PLACE");
04285
04286 #if CMK_ERROR_CHECKING
04287 int ret;
04288 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04289 if(ret != MPI_SUCCESS)
04290 return ret;
04291 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
04292 if(ret != MPI_SUCCESS)
04293 return ret;
04294 #endif
04295
04296 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
04297 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04298
04299 ampi *ptr = getAmpiInstance(comm);
04300 int size = ptr->getSize(comm);
04301 int i;
04302
04303 #if AMPI_COMLIB
04304
04305 if(comm == MPI_COMM_WORLD) {
04306
04307
04308 for(i=0;i<size;i++) {
04309 ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04310 sendtype, i, comm, ptr->getComlibProxy());
04311 }
04312 ptr->getAllgatherStrategy()->doneInserting();
04313
04314 } else
04315 #endif
04316
04317 for(i=0;i<size;i++) {
04318 ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04319 sendtype, i, comm);
04320 }
04321
04322
04323 MPI_Status status;
04324 CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
04325 int itemsize = dttype->getSize(recvcount) ;
04326
04327 for(i=0;i<size;i++) {
04328 AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
04329 i, MPI_GATHER_TAG, comm, &status);
04330 }
04331 #if AMPI_COUNTER
04332 getAmpiParent()->counters.allgather++;
04333 #endif
04334 return 0;
04335 }
04336
04337 CDECL
04338 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04339 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04340 MPI_Comm comm, MPI_Request* request)
04341 {
04342 AMPIAPI("AMPI_Iallgather");
04343
04344 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04345 CmiAbort("AMPI_Iallgather does not implement MPI_IN_PLACE");
04346
04347 #if CMK_ERROR_CHECKING
04348 int ret;
04349 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04350 if(ret != MPI_SUCCESS)
04351 {
04352 *request = MPI_REQUEST_NULL;
04353 return ret;
04354 }
04355 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
04356 if(ret != MPI_SUCCESS)
04357 {
04358 *request = MPI_REQUEST_NULL;
04359 return ret;
04360 }
04361 #endif
04362
04363 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
04364 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04365
04366 ampi *ptr = getAmpiInstance(comm);
04367 int size = ptr->getSize(comm);
04368 int i;
04369 #if AMPI_COMLIB
04370 if(comm == MPI_COMM_WORLD) {
04371
04372
04373 for(i=0;i<size;i++) {
04374 ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04375 sendtype, i, comm, ptr->getComlibProxy());
04376 }
04377 ptr->getAllgatherStrategy()->doneInserting();
04378 } else
04379 #endif
04380 for(i=0;i<size;i++) {
04381 ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04382 sendtype, i, comm);
04383 }
04384
04385 CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
04386 int itemsize = dttype->getSize(recvcount) ;
04387
04388
04389 AmpiRequestList* reqs = getReqs();
04390 ATAReq *newreq = new ATAReq(size);
04391 for(i=0;i<size;i++){
04392 if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
04393 CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
04394 }
04395 *request = reqs->insert(newreq);
04396 AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
04397
04398 return 0;
04399 }
04400
04401 CDECL
04402 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04403 void *recvbuf, int *recvcounts, int *displs,
04404 MPI_Datatype recvtype, MPI_Comm comm)
04405 {
04406 AMPIAPI("AMPI_Allgatherv");
04407
04408 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04409 CmiAbort("AMPI_Allgatherv does not implement MPI_IN_PLACE");
04410
04411 #if CMK_ERROR_CHECKING
04412 int ret;
04413 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04414 if(ret != MPI_SUCCESS)
04415 return ret;
04416 #endif
04417
04418 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
04419 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04420
04421 ampi *ptr = getAmpiInstance(comm);
04422 int size = ptr->getSize(comm);
04423 int i;
04424 #if AMPI_COMLIB
04425 if(comm == MPI_COMM_WORLD) {
04426
04427
04428 for(i=0;i<size;i++) {
04429 ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04430 sendtype, i, comm, ptr->getComlibProxy());
04431 }
04432 ptr->getAllgatherStrategy()->doneInserting();
04433 } else
04434 #endif
04435 for(i=0;i<size;i++) {
04436 ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
04437 sendtype, i, comm);
04438 }
04439
04440 MPI_Status status;
04441 CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
04442 int itemsize = dttype->getSize() ;
04443
04444 for(i=0;i<size;i++) {
04445 AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
04446 i, MPI_GATHER_TAG, comm, &status);
04447 }
04448 #if AMPI_COUNTER
04449 getAmpiParent()->counters.allgather++;
04450 #endif
04451 return 0;
04452 }
04453
04454 CDECL
04455 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04456 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04457 int root, MPI_Comm comm)
04458 {
04459 AMPIAPI("AMPI_Gather");
04460
04461 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04462 CmiAbort("AMPI_Gather does not implement MPI_IN_PLACE");
04463
04464 #if CMK_ERROR_CHECKING
04465 int ret;
04466 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04467 if(ret != MPI_SUCCESS)
04468 return ret;
04469 #endif
04470
04471 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04472
04473 #if AMPIMSGLOG
04474 ampiParent* pptr = getAmpiParent();
04475 if(msgLogRead){
04476 (*(pptr->fromPUPer))|(pptr->pupBytes);
04477 PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
04478 return 0;
04479 }
04480 #endif
04481
04482 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
04483
04484 ampi *ptr = getAmpiInstance(comm);
04485
04486 #if CMK_ERROR_CHECKING
04487 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0,
04488 recvbuf, ptr->getRank(comm) == root);
04489 if(ret != MPI_SUCCESS)
04490 return ret;
04491 #endif
04492
04493 int size = ptr->getSize(comm);
04494 int i;
04495 AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
04496
04497 if(ptr->getRank(comm)==root) {
04498 MPI_Status status;
04499 CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
04500 int itemsize = dttype->getSize(recvcount) ;
04501
04502 for(i=0;i<size;i++) {
04503 if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*i)), recvcount, recvtype, comm, (int*)(&status)))
04504 CkAbort("AMPI> Error in MPI_Gather recv");
04505 }
04506 }
04507 #if AMPI_COUNTER
04508 getAmpiParent()->counters.gather++;
04509 #endif
04510
04511 #if AMPIMSGLOG
04512 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04513 (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount * size;
04514 (*(pptr->toPUPer))|(pptr->pupBytes);
04515 PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
04516 }
04517 #endif
04518
04519 return 0;
04520 }
04521
04522 CDECL
04523 int AMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04524 void *recvbuf, int *recvcounts, int *displs,
04525 MPI_Datatype recvtype, int root, MPI_Comm comm)
04526 {
04527 AMPIAPI("AMPI_Gatherv");
04528
04529 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04530 CmiAbort("AMPI_Gatherv does not implement MPI_IN_PLACE");
04531
04532 #if CMK_ERROR_CHECKING
04533 int ret;
04534 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04535 if(ret != MPI_SUCCESS)
04536 return ret;
04537 #endif
04538
04539 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04540
04541 int itemsize = getDDT()->getSize(recvtype);
04542
04543 #if AMPIMSGLOG
04544 ampiParent* pptr = getAmpiParent();
04545 if(msgLogRead){
04546 int commsize;
04547 (*(pptr->fromPUPer))|commsize;
04548 for(int i=0;i<commsize;i++){
04549 (*(pptr->fromPUPer))|(pptr->pupBytes);
04550 PUParray(*(pptr->fromPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
04551 }
04552 return 0;
04553 }
04554 #endif
04555
04556 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gatherv not allowed for Inter-communicator!");
04557
04558 AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
04559
04560 ampi *ptr = getAmpiInstance(comm);
04561 int size = ptr->getSize(comm);
04562 if(ptr->getRank(comm) == root) {
04563 MPI_Status status;
04564 for(int i=0;i<size;i++) {
04565 if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*displs[i])), recvcounts[i], recvtype, comm, (int*)(&status)))
04566 CkAbort("AMPI> Error in MPI_Gatherv recv");
04567 }
04568 }
04569 #if AMPI_COUNTER
04570 getAmpiParent()->counters.gather++;
04571 #endif
04572
04573 #if AMPIMSGLOG
04574 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04575 for(int i=0;i<size;i++){
04576 (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcounts[i];
04577 (*(pptr->toPUPer))|(pptr->pupBytes);
04578 PUParray(*(pptr->toPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
04579 }
04580 }
04581 #endif
04582
04583 return 0;
04584 }
04585
04586 CDECL
04587 int AMPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04588 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04589 int root, MPI_Comm comm)
04590 {
04591 AMPIAPI("AMPI_Scatter");
04592
04593 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04594 CmiAbort("AMPI_Scatter does not implement MPI_IN_PLACE");
04595
04596 #if CMK_ERROR_CHECKING
04597 int ret;
04598 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04599 if(ret != MPI_SUCCESS)
04600 return ret;
04601 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
04602 if(ret != MPI_SUCCESS)
04603 return ret;
04604 #endif
04605
04606 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatter not allowed for Inter-communicator!");
04607 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04608
04609 #if AMPIMSGLOG
04610 ampiParent* pptr = getAmpiParent();
04611 if(msgLogRead){
04612 (*(pptr->fromPUPer))|(pptr->pupBytes);
04613 PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
04614 return 0;
04615 }
04616 #endif
04617
04618 ampi *ptr = getAmpiInstance(comm);
04619 int size = ptr->getSize(comm);
04620 int i;
04621
04622 if(ptr->getRank(comm)==root) {
04623 CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
04624 int itemsize = dttype->getSize(sendcount) ;
04625 for(i=0;i<size;i++) {
04626 ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i),
04627 sendcount, sendtype, i, comm);
04628 }
04629
04630 }
04631
04632 MPI_Status status;
04633 if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
04634 CkAbort("AMPI> Error in MPI_Scatter recv");
04635
04636 #if AMPI_COUNTER
04637 getAmpiParent()->counters.scatter++;
04638 #endif
04639
04640 #if AMPIMSGLOG
04641 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04642 (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
04643 (*(pptr->toPUPer))|(pptr->pupBytes);
04644 PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
04645 }
04646 #endif
04647
04648 return 0;
04649 }
04650
04651 CDECL
04652 int AMPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
04653 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04654 int root, MPI_Comm comm)
04655 {
04656 AMPIAPI("AMPI_Scatterv");
04657
04658 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04659 CmiAbort("AMPI_Scatterv does not implement MPI_IN_PLACE");
04660
04661 #if CMK_ERROR_CHECKING
04662 int ret;
04663 ret = errorCheck(comm, 1, 0, 0, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04664 if(ret != MPI_SUCCESS)
04665 return ret;
04666 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
04667 if(ret != MPI_SUCCESS)
04668 return ret;
04669 #endif
04670
04671 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatterv not allowed for Inter-communicator!");
04672 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcounts[0],sendbuf,recvbuf);
04673
04674 #if AMPIMSGLOG
04675 ampiParent* pptr = getAmpiParent();
04676 if(msgLogRead){
04677 (*(pptr->fromPUPer))|(pptr->pupBytes);
04678 PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
04679 return 0;
04680 }
04681 #endif
04682
04683 ampi *ptr = getAmpiInstance(comm);
04684 int size = ptr->getSize(comm);
04685 int i;
04686
04687 if(ptr->getRank(comm) == root) {
04688 CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
04689 int itemsize = dttype->getSize() ;
04690 for(i=0;i<size;i++) {
04691 ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*displs[i]),
04692 sendcounts[i], sendtype, i, comm);
04693 }
04694
04695 }
04696
04697 MPI_Status status;
04698 if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
04699 CkAbort("AMPI> Error in MPI_Scatterv recv");
04700
04701 #if AMPI_COUNTER
04702 getAmpiParent()->counters.scatter++;
04703 #endif
04704
04705 #if AMPIMSGLOG
04706 if(msgLogWrite && record_msglog(pptr->thisIndex)){
04707 (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
04708 (*(pptr->toPUPer))|(pptr->pupBytes);
04709 PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
04710 }
04711 #endif
04712
04713 return 0;
04714 }
04715
04716 CDECL
04717 int AMPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
04718 void *recvbuf, int recvcount, MPI_Datatype recvtype,
04719 MPI_Comm comm)
04720 {
04721 AMPIAPI("AMPI_Alltoall");
04722
04723 #if CMK_ERROR_CHECKING
04724 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
04725 CmiAbort("MPI_Alltoall does not accept MPI_IN_PLACE");
04726
04727 int ret;
04728 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
04729 if(ret != MPI_SUCCESS)
04730 return ret;
04731 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
04732 if(ret != MPI_SUCCESS)
04733 return ret;
04734 #endif
04735
04736 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoall not allowed for Inter-communicator!");
04737 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
04738 ampi *ptr = getAmpiInstance(comm);
04739 int size = ptr->getSize(comm);
04740 CkDDT_DataType *dttype;
04741 int itemsize;
04742 int i;
04743
04744 #if 0
04745
04746 dttype = ptr->getDDT()->getType(recvtype) ;
04747 itemsize = dttype->getSize(recvcount) ;
04748 int rank = ptr->getRank(comm);
04749
04750 MPI_Request *reqs = new MPI_Request[size];
04751 for(i=0;i<size;i++) {
04752 int dst = (rank+i) % size;
04753 AMPI_Irecv(((char*)recvbuf)+(itemsize*dst), recvcount, recvtype,
04754 dst, MPI_ATA_TAG, comm, &reqs[i]);
04755 }
04756
04757
04758
04759 dttype = ptr->getDDT()->getType(sendtype) ;
04760 itemsize = dttype->getSize(sendcount) ;
04761 #if AMPI_COMLIB
04762 if(comm == MPI_COMM_WORLD) {
04763
04764 ptr->getAlltoall().beginIteration();
04765 for(i=0;i<size;i++) {
04766 ptr->delesend(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
04767 sendtype, i, comm, ptr->getComlibProxy());
04768 }
04769 ptr->getAlltoall().endIteration();
04770 } else
04771 #endif
04772 {
04773 for(i=0;i<size;i++) {
04774 int dst = (rank+i) % size;
04775 ptr->send(MPI_ATA_TAG, rank, ((char*)sendbuf)+(itemsize*dst), sendcount,
04776 sendtype, dst, comm);
04777 }
04778 }
04779
04780
04781 MPI_Status status;
04782 for (i=0;i<size;i++) AMPI_Wait(&reqs[i], &status);
04783
04784
04785
04786
04787
04788
04789 delete [] reqs;
04790 #else
04791
04792 dttype = ptr->getDDT()->getType(sendtype) ;
04793 itemsize = dttype->getSize(sendcount) ;
04794 int rank = ptr->getRank(comm);
04795 int comm_size = size;
04796 MPI_Status status;
04797
04798 #if CMK_BIGSIM_CHARM
04799 TRACE_BG_AMPI_LOG(MPI_ALLTOALL, itemsize);
04800 #endif
04801
04802
04803 if( itemsize <= AMPI_ALLTOALL_SHORT_MSG ){
04804
04805
04806
04807
04808
04809
04810
04811 int sendtype_extent = getDDT()->getExtent(sendtype);
04812 int recvtype_extent = getDDT()->getExtent(recvtype);
04813 int sendbuf_extent = sendcount * comm_size * sendtype_extent;
04814
04815 void* tmp_buf = malloc(sendbuf_extent*comm_size);
04816
04817
04818 int curr_cnt = sendcount*comm_size;
04819 copyDatatype(comm, sendtype, curr_cnt, sendbuf,
04820 ((char *)tmp_buf + rank*sendbuf_extent));
04821
04822 int mask = 0x1;
04823 int src,dst,tree_root,dst_tree_root,my_tree_root;
04824 int last_recv_cnt,nprocs_completed;
04825 int j,k,tmp_mask;
04826 i = 0;
04827 while (mask < comm_size) {
04828 dst = rank ^ mask;
04829
04830 dst_tree_root = dst >> i;
04831 dst_tree_root <<= i;
04832
04833 my_tree_root = rank >> i;
04834 my_tree_root <<= i;
04835
04836 if (dst < comm_size) {
04837 MPI_Sendrecv(((char *)tmp_buf +
04838 my_tree_root*sendbuf_extent),
04839 curr_cnt, sendtype,
04840 dst, MPI_ATA_SEQ_TAG,
04841 ((char *)tmp_buf +
04842 dst_tree_root*sendbuf_extent),
04843 sendcount*comm_size*mask,
04844 sendtype, dst, MPI_ATA_SEQ_TAG,
04845 comm, &status);
04846
04847
04848
04849 MPI_Get_count(&status, sendtype, &last_recv_cnt);
04850 curr_cnt += last_recv_cnt;
04851 }
04852
04853
04854
04855
04856
04857
04858
04859 if (dst_tree_root + mask > comm_size) {
04860 nprocs_completed = comm_size - my_tree_root - mask;
04861
04862
04863
04864
04865
04866
04867 j = mask;
04868 k = 0;
04869 while (j) {
04870 j >>= 1;
04871 k++;
04872 }
04873 k--;
04874
04875 tmp_mask = mask >> 1;
04876 while (tmp_mask) {
04877 dst = rank ^ tmp_mask;
04878
04879 tree_root = rank >> k;
04880 tree_root <<= k;
04881
04882
04883
04884
04885 if ((dst > rank) &&
04886 (rank < tree_root + nprocs_completed)
04887 && (dst >= tree_root + nprocs_completed)) {
04888
04889 MPI_Send(((char *)tmp_buf +
04890 dst_tree_root*sendbuf_extent),
04891 last_recv_cnt, sendtype,
04892 dst, MPI_ATA_SEQ_TAG,
04893 comm);
04894 }
04895
04896
04897 else if ((dst < rank) &&
04898 (dst < tree_root + nprocs_completed) &&
04899 (rank >= tree_root + nprocs_completed)) {
04900 MPI_Recv(((char *)tmp_buf +
04901 dst_tree_root*sendbuf_extent),
04902 sendcount*comm_size*mask,
04903 sendtype,
04904 dst, MPI_ATA_SEQ_TAG,
04905 comm, &status);
04906 MPI_Get_count(&status, sendtype, &last_recv_cnt);
04907 curr_cnt += last_recv_cnt;
04908 }
04909 tmp_mask >>= 1;
04910 k--;
04911 }
04912 }
04913
04914 mask <<= 1;
04915 i++;
04916 }
04917
04918
04919 for (int p=0; p<comm_size; p++) {
04920
04921 copyDatatype(comm,sendtype,sendcount,
04922 ((char *)tmp_buf +
04923 p*sendbuf_extent +
04924 rank*sendcount*sendtype_extent),
04925 ((char*)recvbuf +
04926 p*recvcount*recvtype_extent));
04927 }
04928
04929 free((char *)tmp_buf);
04930
04931 }else if ( itemsize <= AMPI_ALLTOALL_MEDIUM_MSG ) {
04932 #if AMPI_COMLIB
04933 if(comm == MPI_COMM_WORLD) {
04934
04935
04936 for(i=0;i<size;i++) {
04937 CmiPrintf("delesend\n");
04938 ptr->delesend(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
04939 sendtype, i, comm, ptr->getComlibProxy());
04940 }
04941 ptr->getAlltoallStrategy()->doneInserting();
04942 } else
04943 #endif
04944 {
04945 for(i=0;i<size;i++) {
04946 int dst = (rank+i) % size;
04947 ptr->send(MPI_ATA_TAG, rank, ((char*)sendbuf)+(itemsize*dst), sendcount,
04948 sendtype, dst, comm);
04949 }
04950 }
04951 dttype = ptr->getDDT()->getType(recvtype) ;
04952 itemsize = dttype->getSize(recvcount) ;
04953 MPI_Status status;
04954 for(i=0;i<size;i++) {
04955 int dst = (rank+i) % size;
04956 AMPI_Recv(((char*)recvbuf)+(itemsize*dst), recvcount, recvtype,
04957 dst, MPI_ATA_TAG, comm, &status);
04958 }
04959 } else {
04960
04961
04962
04963
04964 int pof2;
04965 int src, dst;
04966
04967 i = 1;
04968 while (i < size)
04969 i *= 2;
04970 if (i == size)
04971 pof2 = 1;
04972 else
04973 pof2 = 0;
04974
04975
04976 for (i=0; i<size; i++) {
04977 if (pof2 == 1) {
04978
04979 src = dst = rank ^ i;
04980 }
04981 else {
04982 src = (rank - i + size) % size;
04983 dst = (rank + i) % size;
04984 }
04985
04986 MPI_Status status;
04987 MPI_Sendrecv(((char *)sendbuf + dst*itemsize),
04988 sendcount, sendtype, dst,
04989 MPI_ATA_TAG,
04990 ((char *)recvbuf + src*itemsize),
04991 recvcount, recvtype, src,
04992 MPI_ATA_TAG, comm, &status);
04993 }
04994 }
04995 #endif
04996
04997 #if AMPI_COUNTER
04998 getAmpiParent()->counters.alltoall++;
04999 #endif
05000 return 0;
05001 }
05002
05003 CDECL
05004 int AMPI_Alltoall2(void *sendbuf, int sendcount, MPI_Datatype sendtype,
05005 void *recvbuf, int recvcount, MPI_Datatype recvtype,
05006 MPI_Comm comm)
05007 {
05008 AMPIAPI("AMPI_Alltoall2");
05009
05010 #if CMK_ERROR_CHECKING
05011 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
05012 CmiAbort("AMPI_Alltoall2 does not accept MPI_IN_PLACE");
05013
05014 int ret;
05015 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
05016 if(ret != MPI_SUCCESS)
05017 return ret;
05018 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
05019 if(ret != MPI_SUCCESS)
05020 return ret;
05021 #endif
05022
05023 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoall not allowed for Inter-communicator!");
05024 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
05025 ampi *ptr = getAmpiInstance(comm);
05026 CProxy_ampi pa(ptr->ckGetArrayID());
05027 int size = ptr->getSize(comm);
05028 CkDDT_DataType *dttype;
05029 int itemsize;
05030 int recvdisp;
05031 int myrank;
05032 int i;
05033
05034 ptr->setA2AIGetFlag((void*)sendbuf);
05035 MPI_Comm_rank(comm,&myrank);
05036 recvdisp = myrank*recvcount;
05037
05038 AMPI_Barrier(comm);
05039
05040 MPI_Request *reqs = new MPI_Request[size];
05041 for(i=0;i<size;i++) {
05042 reqs[i] = pa[i].Alltoall_RemoteIGet(recvdisp, recvcount, recvtype,
05043 MPI_ATA_TAG);
05044 }
05045
05046 dttype = ptr->getDDT()->getType(recvtype) ;
05047 itemsize = dttype->getSize(recvcount) ;
05048 AmpiMsg *msg;
05049 for(i=0;i<size;i++) {
05050 msg = (AmpiMsg*)CkWaitReleaseFuture(reqs[i]);
05051 memcpy((char*)recvbuf+(itemsize*i), msg->data,itemsize);
05052 delete msg;
05053 }
05054
05055 delete [] reqs;
05056 AMPI_Barrier(comm);
05057
05058
05059 ptr->resetA2AIGetFlag();
05060
05061 #if AMPI_COUNTER
05062 getAmpiParent()->counters.alltoall++;
05063 #endif
05064 return 0;
05065 }
05066
05067 CDECL
05068 int AMPI_Ialltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
05069 void *recvbuf, int recvcount, MPI_Datatype recvtype,
05070 MPI_Comm comm, MPI_Request *request)
05071 {
05072 AMPIAPI("AMPI_Ialltoall");
05073
05074 #if CMK_ERROR_CHECKING
05075 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
05076 CmiAbort("AMPI_Ialltoall does not accept MPI_IN_PLACE");
05077
05078 int ret;
05079 ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
05080 if(ret != MPI_SUCCESS)
05081 {
05082 *request = MPI_REQUEST_NULL;
05083 return ret;
05084 }
05085 ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
05086 if(ret != MPI_SUCCESS)
05087 {
05088 *request = MPI_REQUEST_NULL;
05089 return ret;
05090 }
05091 #endif
05092
05093 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Ialltoall not allowed for Inter-communicator!");
05094 if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
05095 ampi *ptr = getAmpiInstance(comm);
05096 AmpiRequestList* reqs = getReqs();
05097 int size = ptr->getSize(comm);
05098 int reqsSize = reqs->size();
05099 CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
05100 int itemsize = dttype->getSize(sendcount) ;
05101 int i;
05102
05103 #if AMPI_COMLIB
05104 if(comm == MPI_COMM_WORLD) {
05105
05106
05107 for(i=0;i<size;i++) {
05108 ptr->delesend(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
05109 sendtype, i, comm, ptr->getComlibProxy());
05110 }
05111 ptr->getAlltoallStrategy()->doneInserting();
05112 } else
05113 #endif
05114 for(i=0;i<size;i++) {
05115 ptr->send(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
05116 sendtype, i, comm);
05117 }
05118
05119
05120 ATAReq *newreq = new ATAReq(size);
05121 for(i=0;i<size;i++){
05122 if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_ATA_TAG+reqsSize,comm)!=(i+1))
05123 CkAbort("MPI_Ialltoall: Error adding requests into ATAReq!");
05124 }
05125 *request = reqs->insert(newreq);
05126 AMPI_DEBUG("MPI_Ialltoall: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
05127 return 0;
05128 }
05129
05130 CDECL
05131 int AMPI_Alltoallv(void *sendbuf, int *sendcounts_, int *sdispls_,
05132 MPI_Datatype sendtype, void *recvbuf, int *recvcounts_,
05133 int *rdispls_, MPI_Datatype recvtype, MPI_Comm comm)
05134 {
05135 if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoallv not allowed for Inter-communicator!");
05136 if(comm==MPI_COMM_SELF) return 0;
05137
05138 #if CMK_ERROR_CHECKING
05139 if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
05140 CmiAbort("MPI_Alltoallv does not accept MPI_IN_PLACE");
05141
05142 int ret;
05143 ret = errorCheck(comm, 1, 0, 0, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
05144 if(ret != MPI_SUCCESS)
05145 return ret;
05146 ret = errorCheck(comm, 1, 0, 0, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
05147 if(ret != MPI_SUCCESS)
05148 return ret;
05149 #endif
05150
05151 ampi *ptr = getAmpiInstance(comm);
05152 int size = ptr->getSize(comm);
05153 int *sendcounts = sendcounts_;
05154 int *sdispls = sdispls_;
05155 int *recvcounts = recvcounts_;
05156 int *rdispls = rdispls_;
05157 if (CpvAccess(CmiPICMethod) == 2)
05158 {
05159
05160 sendcounts = new int[size];
05161 sdispls = new int[size];
05162 recvcounts = new int[size];
05163 rdispls = new int[size];
05164 for (int i=0; i<size; i++) {
05165 sendcounts[i] = sendcounts_[i];
05166 sdispls[i] = sdispls_[i];
05167 recvcounts[i] = recvcounts_[i];
05168 rdispls[i] = rdispls_[i];
05169 }
05170 }
05171 AMPIAPI("AMPI_Alltoallv");
05172 CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
05173 int itemsize = dttype->getSize() ;
05174 int i;
05175 #if AMPI_COMLIB
05176 if(comm == MPI_COMM_WORLD) {
05177
05178
05179 for(i=0;i<size;i++) {
05180 ptr->delesend(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
05181 sendtype, i, comm, ptr->getComlibProxy());
05182 }
05183 ptr->getAlltoallStrategy()->doneInserting();
05184 } else
05185 #endif
05186 {
05187 for(i=0;i<size;i++) {
05188 ptr->send(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
05189 sendtype, i, comm);
05190 }
05191 }
05192 MPI_Status status;
05193 dttype = ptr->getDDT()->getType(recvtype) ;
05194 itemsize = dttype->getSize() ;
05195
05196 for(i=0;i<size;i++) {
05197 AMPI_Recv(((char*)recvbuf)+(itemsize*rdispls[i]), recvcounts[i], recvtype,
05198 i, MPI_GATHER_TAG, comm, &status);
05199 }
05200 #if AMPI_COUNTER
05201 getAmpiParent()->counters.alltoall++;
05202 #endif
05203 if (CpvAccess(CmiPICMethod) == 2)
05204 {
05205 delete [] sendcounts;
05206 delete [] sdispls;
05207 delete [] recvcounts;
05208 delete [] rdispls;
05209 }
05210 return 0;
05211 }
05212
05213 CDECL
05214 int AMPI_Comm_dup(int comm, int *newcomm)
05215 {
05216 AMPIAPI("AMPI_Comm_dup");
05217 *newcomm = comm;
05218 return 0;
05219 }
05220
05221 CDECL
05222 int AMPI_Comm_split(int src,int color,int key,int *dest)
05223 {
05224 AMPIAPI("AMPI_Comm_split");
05225
05226 {
05227 getAmpiInstance(src)->split(color,key,dest, 0);
05228 AMPI_Barrier(src);
05229 }
05230 if (color == MPI_UNDEFINED) *dest = MPI_COMM_NULL;
05231
05232 #if AMPIMSGLOG
05233 ampiParent* pptr = getAmpiParent();
05234 if(msgLogRead){
05235 PUParray(*(pptr->fromPUPer), (char *)dest, sizeof(int));
05236 return 0;
05237 }
05238 else if(msgLogWrite && record_msglog(pptr->thisIndex)){
05239 PUParray(*(pptr->toPUPer), (char *)dest, sizeof(int));
05240 }
05241 #endif
05242
05243 return 0;
05244 }
05245
05246 CDECL
05247 int AMPI_Comm_free(int *comm)
05248 {
05249 AMPIAPI("AMPI_Comm_free");
05250 return 0;
05251 }
05252
05253 CDECL
05254 int AMPI_Comm_test_inter(MPI_Comm comm, int *flag){
05255 AMPIAPI("AMPI_Comm_test_inter");
05256 *flag = getAmpiParent()->isInter(comm);
05257 return 0;
05258 }
05259
05260 CDECL
05261 int AMPI_Comm_remote_size(MPI_Comm comm, int *size){
05262 AMPIAPI("AMPI_Comm_remote_size");
05263 *size = getAmpiParent()->getRemoteSize(comm);
05264 return 0;
05265 }
05266
05267 CDECL
05268 int AMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group){
05269 AMPIAPI("AMPI_Comm_remote_group");
05270 *group = getAmpiParent()->getRemoteGroup(comm);
05271 return 0;
05272 }
05273
05274 CDECL
05275 int AMPI_Intercomm_create(MPI_Comm lcomm, int lleader, MPI_Comm rcomm, int rleader, int tag, MPI_Comm *newintercomm){
05276 AMPIAPI("AMPI_Intercomm_create");
05277 ampi *ptr = getAmpiInstance(lcomm);
05278 int root = ptr->getIndexForRank(lleader);
05279 CkVec<int> rvec;
05280 int lrank;
05281 AMPI_Comm_rank(lcomm,&lrank);
05282
05283 if(lrank==lleader){
05284 int lsize, rsize;
05285 lsize = ptr->getSize(lcomm);
05286 int *larr = new int [lsize];
05287 int *rarr;
05288 CkVec<int> lvec = ptr->getIndices();
05289 MPI_Status sts;
05290
05291
05292 int i;
05293 for(i=0;i<lsize;i++)
05294 larr[i] = lvec[i];
05295 AMPI_Send(&lsize,1,MPI_INT,rleader,tag,rcomm);
05296 AMPI_Recv(&rsize,1,MPI_INT,rleader,tag,rcomm,&sts);
05297
05298 rarr = new int [rsize];
05299 AMPI_Send(larr,lsize,MPI_INT,rleader,tag+1,rcomm);
05300 AMPI_Recv(rarr,rsize,MPI_INT,rleader,tag+1,rcomm,&sts);
05301 for(i=0;i<rsize;i++)
05302 rvec.push_back(rarr[i]);
05303
05304 delete [] larr;
05305 delete [] rarr;
05306
05307 if(rsize==0) CkAbort("MPI_Intercomm_create: remote size = 0! Does it really make sense to create an empty communicator?\n");
05308 }
05309
05310 ptr->intercommCreate(rvec,root,newintercomm);
05311 return 0;
05312 }
05313
05314 CDECL
05315 int AMPI_Intercomm_merge(MPI_Comm intercomm, int high, MPI_Comm *newintracomm){
05316 AMPIAPI("AMPI_Intercomm_merge");
05317 ampi *ptr = getAmpiInstance(intercomm);
05318 int lroot, rroot, lrank, lhigh, rhigh, first;
05319 lroot = ptr->getIndexForRank(0);
05320 rroot = ptr->getIndexForRemoteRank(0);
05321 lhigh = high;
05322 lrank = ptr->getRank(intercomm);
05323
05324 if(lrank==0){
05325 MPI_Status sts;
05326 AMPI_Send(&lhigh,1,MPI_INT,0,10010,intercomm);
05327 AMPI_Recv(&rhigh,1,MPI_INT,0,10010,intercomm,&sts);
05328
05329 if((lhigh && rhigh) || (!lhigh && !rhigh)){
05330 first = (lroot < rroot);
05331 }else{
05332 first = (lhigh == false);
05333 }
05334 }
05335
05336 ptr->intercommMerge(first, newintracomm);
05337 return 0;
05338 }
05339
05340 CDECL
05341 int AMPI_Abort(int comm, int errorcode)
05342 {
05343 AMPIAPI("AMPI_Abort");
05344 CkAbort("AMPI: User called MPI_Abort!\n");
05345 return errorcode;
05346 }
05347
05348 CDECL
05349 int AMPI_Get_count(MPI_Status *sts, MPI_Datatype dtype, int *count){
05350 AMPIAPI("AMPI_Get_count");
05351 CkDDT_DataType* dttype = getDDT()->getType(dtype);
05352 int itemsize = dttype->getSize() ;
05353 if (itemsize == 0) {
05354 *count = 0;
05355 } else {
05356 *count = sts->MPI_LENGTH/itemsize;
05357 }
05358 return 0;
05359 }
05360
05361 CDECL
05362 int AMPI_Type_lb(MPI_Datatype dtype, MPI_Aint* displacement){
05363 AMPIAPI("AMPI_Type_lb");
05364 *displacement = getDDT()->getLB(dtype);
05365 return 0;
05366 }
05367
05368 CDECL
05369 int AMPI_Type_ub(MPI_Datatype dtype, MPI_Aint* displacement){
05370 AMPIAPI("AMPI_Type_ub");
05371 *displacement = getDDT()->getUB(dtype);
05372 return 0;
05373 }
05374
05375 CDECL
05376 int AMPI_Address(void* location, MPI_Aint *address){
05377 AMPIAPI("AMPI_Address");
05378 *address = (MPI_Aint)(unsigned long)(char *)location;
05379 return 0;
05380 }
05381
05382 CDECL
05383 int AMPI_Get_elements(MPI_Status *sts, MPI_Datatype dtype, int *count){
05384 AMPIAPI("AMPI_Get_elements");
05385 CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
05386 int basesize = dttype->getBaseSize() ;
05387 if(basesize==0) basesize=dttype->getSize();
05388 *count = sts->MPI_LENGTH/basesize;
05389 return 0;
05390 }
05391
05392 CDECL
05393 int AMPI_Pack(void *inbuf, int incount, MPI_Datatype dtype, void *outbuf,
05394 int outsize, int *position, MPI_Comm comm)
05395 {
05396 AMPIAPI("AMPI_Pack");
05397 CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
05398 int itemsize = dttype->getSize();
05399 dttype->serialize((char*)inbuf, ((char*)outbuf)+(*position), incount, 1);
05400 *position += (itemsize*incount);
05401 return 0;
05402 }
05403
05404 CDECL
05405 int AMPI_Unpack(void *inbuf, int insize, int *position, void *outbuf,
05406 int outcount, MPI_Datatype dtype, MPI_Comm comm)
05407 {
05408 AMPIAPI("AMPI_Unpack");
05409 CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
05410 int itemsize = dttype->getSize();
05411 dttype->serialize((char*)outbuf, ((char*)inbuf+(*position)), outcount, -1);
05412 *position += (itemsize*outcount);
05413 return 0;
05414 }
05415
05416 CDECL
05417 int AMPI_Pack_size(int incount,MPI_Datatype datatype,MPI_Comm comm,int *sz)
05418 {
05419 AMPIAPI("AMPI_Pack_size");
05420 CkDDT_DataType* dttype = getDDT()->getType(datatype) ;
05421 *sz = incount*dttype->getSize() ;
05422 return 0;
05423 }
05424
05425 CDECL
05426 int AMPI_Get_processor_name(char *name, int *resultlen){
05427 AMPIAPI("AMPI_Get_processor_name");
05428 ampiParent *ptr = getAmpiParent();
05429 sprintf(name,"AMPI_VP[%d]_PE[%d]",ptr->thisIndex,ptr->getMyPe());
05430 *resultlen = strlen(name);
05431 return 0;
05432 }
05433
05434
05435 #if defined(USE_STDARG)
05436 void error_handler(MPI_Comm *, int *, ...);
05437 #else
05438 void error_handler ( MPI_Comm *, int * );
05439 #endif
05440
05441 CDECL
05442 int AMPI_Errhandler_create(MPI_Handler_function *function, MPI_Errhandler *errhandler){
05443 AMPIAPI("AMPI_Errhandler_create");
05444 return MPI_SUCCESS;
05445 }
05446
05447 CDECL
05448 int AMPI_Errhandler_set(MPI_Comm comm, MPI_Errhandler errhandler){
05449 AMPIAPI("AMPI_Errhandler_set");
05450 return MPI_SUCCESS;
05451 }
05452
05453 CDECL
05454 int AMPI_Errhandler_get(MPI_Comm comm, MPI_Errhandler *errhandler){
05455 AMPIAPI("AMPI_Errhandler_get");
05456 return MPI_SUCCESS;
05457 }
05458
05459 CDECL
05460 int AMPI_Errhandler_free(MPI_Errhandler *errhandler){
05461 AMPIAPI("AMPI_Errhandler_free");
05462 return MPI_SUCCESS;
05463 }
05464
05465 CDECL
05466 int AMPI_Error_class(int errorcode, int *errorclass){
05467 AMPIAPI("AMPI_Error_class");
05468 *errorclass = errorcode;
05469 return MPI_SUCCESS;
05470 }
05471
05472 CDECL
05473 int AMPI_Error_string(int errorcode, char *string, int *resultlen)
05474 {
05475 AMPIAPI("AMPI_Error_string");
05476 const char *ret="";
05477 switch(errorcode) {
05478 case MPI_SUCCESS:
05479 ret="Success";
05480 break;
05481 default:
05482 return 1;
05483 };
05484 *resultlen=strlen(ret);
05485 strcpy(string,ret);
05486 return MPI_SUCCESS;
05487 }
05488
05489
05490
05491 CDECL
05492 int AMPI_Comm_group(MPI_Comm comm, MPI_Group *group)
05493 {
05494 AMPIAPI("AMPI_Comm_Group");
05495 *group = getAmpiParent()->comm2group(comm);
05496 return 0;
05497 }
05498
05499 CDECL
05500 int AMPI_Group_union(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
05501 {
05502 AMPIAPI("AMPI_Group_union");
05503 groupStruct vec1, vec2, newvec;
05504 ampiParent *ptr = getAmpiParent();
05505 vec1 = ptr->group2vec(group1);
05506 vec2 = ptr->group2vec(group2);
05507 newvec = unionOp(vec1,vec2);
05508 *newgroup = ptr->saveGroupStruct(newvec);
05509 return 0;
05510 }
05511
05512 CDECL
05513 int AMPI_Group_intersection(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
05514 {
05515 AMPIAPI("AMPI_Group_intersection");
05516 groupStruct vec1, vec2, newvec;
05517 ampiParent *ptr = getAmpiParent();
05518 vec1 = ptr->group2vec(group1);
05519 vec2 = ptr->group2vec(group2);
05520 newvec = intersectOp(vec1,vec2);
05521 *newgroup = ptr->saveGroupStruct(newvec);
05522 return 0;
05523 }
05524
05525 CDECL
05526 int AMPI_Group_difference(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
05527 {
05528 AMPIAPI("AMPI_Group_difference");
05529 groupStruct vec1, vec2, newvec;
05530 ampiParent *ptr = getAmpiParent();
05531 vec1 = ptr->group2vec(group1);
05532 vec2 = ptr->group2vec(group2);
05533 newvec = diffOp(vec1,vec2);
05534 *newgroup = ptr->saveGroupStruct(newvec);
05535 return 0;
05536 }
05537
05538 CDECL
05539 int AMPI_Group_size(MPI_Group group, int *size)
05540 {
05541 AMPIAPI("AMPI_Group_size");
05542 *size = (getAmpiParent()->group2vec(group)).size();
05543 return 0;
05544 }
05545
05546 CDECL
05547 int AMPI_Group_rank(MPI_Group group, int *rank)
05548 {
05549 AMPIAPI("AMPI_Group_rank");
05550 *rank = getAmpiParent()->getRank(group);
05551 return 0;
05552 }
05553
05554 CDECL
05555 int AMPI_Group_translate_ranks (MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2)
05556 {
05557 AMPIAPI("AMPI_Group_translate_ranks");
05558 ampiParent *ptr = getAmpiParent();
05559 groupStruct vec1, vec2;
05560 vec1 = ptr->group2vec(group1);
05561 vec2 = ptr->group2vec(group2);
05562 translateRanksOp(n, vec1, ranks1, vec2, ranks2);
05563 return 0;
05564 }
05565
05566 CDECL
05567 int AMPI_Group_compare(MPI_Group group1,MPI_Group group2, int *result)
05568 {
05569 AMPIAPI("AMPI_Group_compare");
05570 ampiParent *ptr = getAmpiParent();
05571 groupStruct vec1, vec2;
05572 vec1 = ptr->group2vec(group1);
05573 vec2 = ptr->group2vec(group2);
05574 *result = compareVecOp(vec1, vec2);
05575 return 0;
05576 }
05577
05578 CDECL
05579 int AMPI_Group_incl(MPI_Group group, int n, int *ranks, MPI_Group *newgroup)
05580 {
05581 AMPIAPI("AMPI_Group_incl");
05582 groupStruct vec, newvec;
05583 ampiParent *ptr = getAmpiParent();
05584 vec = ptr->group2vec(group);
05585 newvec = inclOp(n,ranks,vec);
05586 *newgroup = ptr->saveGroupStruct(newvec);
05587 return 0;
05588 }
05589 CDECL
05590 int AMPI_Group_excl(MPI_Group group, int n, int *ranks, MPI_Group *newgroup)
05591 {
05592 AMPIAPI("AMPI_Group_excl");
05593 groupStruct vec, newvec;
05594 ampiParent *ptr = getAmpiParent();
05595 vec = ptr->group2vec(group);
05596 newvec = exclOp(n,ranks,vec);
05597 *newgroup = ptr->saveGroupStruct(newvec);
05598
05599 return 0;
05600 }
05601 CDECL
05602 int AMPI_Group_range_incl(MPI_Group group, int n, int ranges[][3], MPI_Group *newgroup)
05603 {
05604 AMPIAPI("AMPI_Group_range_incl");
05605 groupStruct vec, newvec;
05606 ampiParent *ptr = getAmpiParent();
05607 vec = ptr->group2vec(group);
05608 newvec = rangeInclOp(n,ranges,vec);
05609 *newgroup = ptr->saveGroupStruct(newvec);
05610 return 0;
05611 }
05612 CDECL
05613 int AMPI_Group_range_excl(MPI_Group group, int n, int ranges[][3], MPI_Group *newgroup)
05614 {
05615 AMPIAPI("AMPI_Group_range_excl");
05616 groupStruct vec, newvec;
05617 ampiParent *ptr = getAmpiParent();
05618 vec = ptr->group2vec(group);
05619 newvec = rangeExclOp(n,ranges,vec);
05620 *newgroup = ptr->saveGroupStruct(newvec);
05621 return 0;
05622 }
05623 CDECL
05624 int AMPI_Group_free(MPI_Group *group)
05625 {
05626 AMPIAPI("AMPI_Group_free");
05627 return 0;
05628 }
05629 CDECL
05630 int AMPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm* newcomm)
05631 {
05632 AMPIAPI("AMPI_Comm_create");
05633 groupStruct vec = getAmpiParent()->group2vec(group);
05634 if(vec.size()==0) CkAbort("AMPI> Abort: Does it really make sense to create an empty communicator?");
05635 getAmpiInstance(comm)->commCreate(vec, newcomm);
05636 AMPI_Barrier(comm);
05637 return 0;
05638 }
05639
05640
05641 CDECL
05642 void AMPI_Checkpoint(char *dname)
05643 {
05644 AMPI_Barrier(MPI_COMM_WORLD);
05645 AMPIAPI("AMPI_Checkpoint");
05646 getAmpiParent()->startCheckpoint(dname);
05647 }
05648
05649 CDECL
05650 void AMPI_MemCheckpoint()
05651 {
05652 #if CMK_MEM_CHECKPOINT
05653 AMPI_Barrier(MPI_COMM_WORLD);
05654 AMPIAPI("AMPI_Checkpoint");
05655 getAmpiParent()->startCheckpoint("");
05656 #else
05657 CmiPrintf("Error: In memory checkpoint/restart is not on! \n");
05658 CmiAbort("Error: recompile Charm++ with CMK_MEM_CHECKPOINT. \n");
05659 #endif
05660 }
05661
05662 CDECL
05663 void AMPI_Print(char *str)
05664 {
05665 AMPIAPI("AMPI_Print");
05666 ampiParent *ptr = getAmpiParent();
05667 CkPrintf("[%d] %s\n", ptr->thisIndex, str);
05668 }
05669
05670 CDECL
05671 int AMPI_Register(void *d, MPI_PupFn f)
05672 {
05673 AMPIAPI("AMPI_Register");
05674 return TCHARM_Register(d,f);
05675 }
05676
05677 CDECL
05678 void *MPI_Get_userdata(int idx)
05679 {
05680 AMPIAPI("AMPI_Get_userdata");
05681 return TCHARM_Get_userdata(idx);
05682 }
05683
05684 CDECL
05685 void AMPI_Start_measure()
05686 {
05687 AMPIAPI("AMPI_Start_measure");
05688 ampiParent *ptr = getAmpiParent();
05689 ptr->start_measure();
05690 }
05691
05692 CDECL
05693 void AMPI_Stop_measure()
05694 {
05695 AMPIAPI("AMPI_Stop_measure");
05696 ampiParent *ptr = getAmpiParent();
05697 ptr->stop_measure();
05698 }
05699
05700 CDECL
05701 void AMPI_Set_load(double load)
05702 {
05703 AMPIAPI("AMPI_Set_load");
05704 ampiParent *ptr = getAmpiParent();
05705 ptr->setObjTime(load);
05706 }
05707
05708 CDECL
05709 void AMPI_Register_main(MPI_MainFn mainFn,const char *name)
05710 {
05711 AMPIAPI("AMPI_Register_main");
05712 if (TCHARM_Element()==0)
05713 {
05714 ampiCreateMain(mainFn,name,strlen(name));
05715 }
05716 }
05717 FDECL
05718 void FTN_NAME(MPI_REGISTER_MAIN,mpi_register_main)
05719 (MPI_MainFn mainFn,const char *name,int nameLen)
05720 {
05721 AMPIAPI("AMPI_register_main");
05722 if (TCHARM_Element()==0)
05723 {
05724 ampiCreateMain(mainFn,name,nameLen);
05725 }
05726 }
05727
05728 CDECL
05729 int AMPI_Keyval_create(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn, int *keyval, void* extra_state){
05730 AMPIAPI("AMPI_Keyval_create");
05731 return getAmpiParent()->createKeyval(copy_fn,delete_fn,keyval,extra_state);
05732 }
05733
05734 CDECL
05735 int AMPI_Keyval_free(int *keyval){
05736 AMPIAPI("AMPI_Keyval_free");
05737 return getAmpiParent()->freeKeyval(keyval);
05738 }
05739
05740 CDECL
05741 int AMPI_Attr_put(MPI_Comm comm, int keyval, void* attribute_val){
05742 AMPIAPI("AMPI_Attr_put");
05743 return getAmpiParent()->putAttr(comm,keyval,attribute_val);
05744 }
05745
05746 CDECL
05747 int AMPI_Attr_get(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
05748 AMPIAPI("AMPI_Attr_get");
05749 return getAmpiParent()->getAttr(comm,keyval,attribute_val,flag);
05750 }
05751
05752 CDECL
05753 int AMPI_Attr_delete(MPI_Comm comm, int keyval){
05754 AMPIAPI("AMPI_Attr_delete");
05755 return getAmpiParent()->deleteAttr(comm,keyval);
05756 }
05757
05758 CDECL
05759 int AMPI_Cart_map(MPI_Comm comm, int ndims, int *dims, int *periods,
05760 int *newrank) {
05761 AMPIAPI("AMPI_Cart_map");
05762
05763 AMPI_Comm_rank(comm, newrank);
05764
05765 return 0;
05766 }
05767
05768 CDECL
05769 int AMPI_Graph_map(MPI_Comm comm, int nnodes, int *index, int *edges,
05770 int *newrank) {
05771 AMPIAPI("AMPI_Graph_map");
05772 AMPI_Comm_rank(comm, newrank);
05773
05774 return 0;
05775 }
05776
05777 CDECL
05778 int AMPI_Cart_create(MPI_Comm comm_old, int ndims, int *dims, int *periods,
05779 int reorder, MPI_Comm *comm_cart) {
05780
05781 AMPIAPI("AMPI_Cart_create");
05782
05783
05784
05785
05786
05787
05788
05789
05790
05791 int newrank;
05792 AMPI_Cart_map(comm_old, ndims, dims, periods, &newrank);
05793
05794 ampiParent *ptr = getAmpiParent();
05795 groupStruct vec = ptr->group2vec(ptr->comm2group(comm_old));
05796 getAmpiInstance(comm_old)->cartCreate(vec, comm_cart);
05797 ampiCommStruct &c = ptr->getCart(*comm_cart);
05798 c.setndims(ndims);
05799
05800 CkVec<int> dimsv;
05801 CkVec<int> periodsv;
05802
05803 for (int i = 0; i < ndims; i++) {
05804 dimsv.push_back(dims[i]);
05805 periodsv.push_back(periods[i]);
05806 if ((periods[i] != 0) && (periods[i] != 1))
05807 CkAbort("MPI_Cart_create: periods should be all booleans\n");
05808 }
05809
05810 c.setdims(dimsv);
05811 c.setperiods(periodsv);
05812
05813 return 0;
05814 }
05815
05816 CDECL
05817 int AMPI_Graph_create(MPI_Comm comm_old, int nnodes, int *index, int *edges,
05818 int reorder, MPI_Comm *comm_graph) {
05819 AMPIAPI("AMPI_Graph_create");
05820
05821
05822 int newrank;
05823 AMPI_Graph_map(comm_old, nnodes, index, edges, &newrank);
05824
05825 ampiParent *ptr = getAmpiParent();
05826 groupStruct vec = ptr->group2vec(ptr->comm2group(comm_old));
05827 getAmpiInstance(comm_old)->graphCreate(vec, comm_graph);
05828
05829 ampiCommStruct &c = ptr->getGraph(*comm_graph);
05830 c.setnvertices(nnodes);
05831
05832 CkVec<int> index_;
05833 CkVec<int> edges_;
05834
05835 int i;
05836 for (i = 0; i < nnodes; i++)
05837 index_.push_back(index[i]);
05838
05839 c.setindex(index_);
05840
05841 for (i = 0; i < index[nnodes - 1]; i++)
05842 edges_.push_back(edges[i]);
05843
05844 c.setedges(edges_);
05845
05846 return 0;
05847 }
05848
05849 CDECL
05850 int AMPI_Topo_test(MPI_Comm comm, int *status) {
05851 AMPIAPI("AMPI_Topo_test");
05852
05853 ampiParent *ptr = getAmpiParent();
05854
05855 if (ptr->isCart(comm))
05856 *status = MPI_CART;
05857 else if (ptr->isGraph(comm))
05858 *status = MPI_GRAPH;
05859 else *status = MPI_UNDEFINED;
05860
05861 return 0;
05862 }
05863
05864 CDECL
05865 int AMPI_Cartdim_get(MPI_Comm comm, int *ndims) {
05866 AMPIAPI("AMPI_Cartdim_get");
05867
05868 *ndims = getAmpiParent()->getCart(comm).getndims();
05869
05870 return 0;
05871 }
05872
05873 CDECL
05874 int AMPI_Cart_get(MPI_Comm comm, int maxdims, int *dims, int *periods,
05875 int *coords){
05876 int i, ndims;
05877
05878 AMPIAPI("AMPI_Cart_get");
05879
05880 ampiCommStruct &c = getAmpiParent()->getCart(comm);
05881 ndims = c.getndims();
05882 int rank;
05883
05884 AMPI_Comm_rank(comm, &rank);
05885
05886 const CkVec<int> &dims_ = c.getdims();
05887 const CkVec<int> &periods_ = c.getperiods();
05888
05889 for (i = 0; i < maxdims; i++) {
05890 dims[i] = dims_[i];
05891 periods[i] = periods_[i];
05892 }
05893
05894 for (i = ndims - 1; i >= 0; i--) {
05895 if (i < maxdims)
05896 coords[i] = rank % dims_[i];
05897 rank = (int) (rank / dims_[i]);
05898 }
05899
05900 return 0;
05901 }
05902
05903 CDECL
05904 int AMPI_Cart_rank(MPI_Comm comm, int *coords, int *rank) {
05905 AMPIAPI("AMPI_Cart_rank");
05906
05907 ampiCommStruct &c = getAmpiParent()->getCart(comm);
05908 int ndims = c.getndims();
05909 const CkVec<int> &dims = c.getdims();
05910 const CkVec<int> &periods = c.getperiods();
05911
05912 int prod = 1;
05913 int r = 0;
05914
05915 for (int i = ndims - 1; i >= 0; i--) {
05916 if ((coords[i] < 0) || (coords[i] >= dims[i]))
05917 if (periods[i] == 1)
05918 if (coords[i] > 0)
05919 coords[i] %= dims[i];
05920 else {
05921
05922 while (coords[i] < 0) coords[i]+=dims[i];
05923 }
05924 r += prod * coords[i];
05925 prod *= dims[i];
05926 }
05927
05928 *rank = r;
05929
05930 return 0;
05931 }
05932
05933 CDECL
05934 int AMPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int *coords) {
05935 AMPIAPI("AMPI_Cart_coords");
05936
05937 ampiCommStruct &c = getAmpiParent()->getCart(comm);
05938 int ndims = c.getndims();
05939 const CkVec<int> &dims = c.getdims();
05940
05941 for (int i = ndims - 1; i >= 0; i--) {
05942 if (i < maxdims)
05943 coords[i] = rank % dims[i];
05944 rank = (int) (rank / dims[i]);
05945 }
05946
05947 return 0;
05948 }
05949
05950
05951
05952 static void cart_clamp_coord(MPI_Comm comm, const CkVec<int> &dims,
05953 const CkVec<int> &periodicity, int *coords,
05954 int direction, int displacement, int *rank_out)
05955 {
05956 int base_coord = coords[direction];
05957 coords[direction] += displacement;
05958
05959 if (periodicity[direction] == 1) {
05960 while (coords[direction] < 0)
05961 coords[direction] += dims[direction];
05962 while (coords[direction] >= dims[direction])
05963 coords[direction] -= dims[direction];
05964 }
05965
05966 if (coords[direction]<0 || coords[direction]>= dims[direction])
05967 *rank_out = MPI_PROC_NULL;
05968 else
05969 AMPI_Cart_rank(comm, coords, rank_out);
05970
05971 coords[direction] = base_coord;
05972 }
05973
05974 CDECL
05975 int AMPI_Cart_shift(MPI_Comm comm, int direction, int disp,
05976 int *rank_source, int *rank_dest) {
05977 AMPIAPI("AMPI_Cart_shift");
05978
05979 ampiCommStruct &c = getAmpiParent()->getCart(comm);
05980 int ndims = c.getndims();
05981 if ((direction < 0) || (direction >= ndims))
05982 CkAbort("MPI_Cart_shift: direction not within dimensions range");
05983
05984 const CkVec<int> &dims = c.getdims();
05985 const CkVec<int> &periods = c.getperiods();
05986 int *coords = new int[ndims];
05987
05988 int mype;
05989 AMPI_Comm_rank(comm, &mype);
05990 AMPI_Cart_coords(comm, mype, ndims, coords);
05991
05992 cart_clamp_coord(comm, dims, periods, coords, direction, disp, rank_dest);
05993 cart_clamp_coord(comm, dims, periods, coords, direction, -disp, rank_source);
05994
05995 delete [] coords;
05996 return 0;
05997 }
05998
05999 CDECL
06000 int AMPI_Graphdims_get(MPI_Comm comm, int *nnodes, int *nedges) {
06001 AMPIAPI("AMPI_Graphdim_get");
06002
06003 ampiCommStruct &c = getAmpiParent()->getGraph(comm);
06004 *nnodes = c.getnvertices();
06005 const CkVec<int> &index = c.getindex();
06006 *nedges = index[(*nnodes) - 1];
06007
06008 return 0;
06009 }
06010
06011 CDECL
06012 int AMPI_Graph_get(MPI_Comm comm, int maxindex, int maxedges, int *index,
06013 int *edges) {
06014 AMPIAPI("AMPI_Graph_get");
06015
06016 ampiCommStruct &c = getAmpiParent()->getGraph(comm);
06017
06018 const CkVec<int> &index_ = c.getindex();
06019 const CkVec<int> &edges_ = c.getedges();
06020
06021 if (maxindex > index_.size())
06022 maxindex = index_.size();
06023
06024 int i;
06025 for (i = 0; i < maxindex; i++)
06026 index[i] = index_[i];
06027
06028 for (i = 0; i < maxedges; i++)
06029 edges[i] = edges_[i];
06030
06031 return 0;
06032 }
06033
06034 CDECL
06035 int AMPI_Graph_neighbors_count(MPI_Comm comm, int rank, int *nneighbors) {
06036 AMPIAPI("AMPI_Graph_neighbors_count");
06037
06038 ampiCommStruct &c = getAmpiParent()->getGraph(comm);
06039
06040 const CkVec<int> &index = c.getindex();
06041
06042 if ((rank >= index.size()) || (rank < 0))
06043 CkAbort("MPI_Graph_neighbors_count: rank not within range");
06044
06045 if (rank == 0)
06046 *nneighbors = index[rank];
06047 else
06048 *nneighbors = index[rank] - index[rank - 1];
06049
06050 return 0;
06051 }
06052
06053 CDECL
06054 int AMPI_Graph_neighbors(MPI_Comm comm, int rank, int maxneighbors,
06055 int *neighbors) {
06056 AMPIAPI("AMPI_Graph_neighbors");
06057
06058 ampiCommStruct &c = getAmpiParent()->getGraph(comm);
06059 const CkVec<int> &index = c.getindex();
06060 const CkVec<int> &edges = c.getedges();
06061
06062 int numneighbors = (rank == 0) ? index[rank] : index[rank] - index[rank - 1];
06063 if (maxneighbors > numneighbors)
06064 maxneighbors = numneighbors;
06065
06066 if (maxneighbors < 0)
06067 CkAbort("MPI_Graph_neighbors: maxneighbors < 0");
06068
06069 if ((rank >= index.size()) || (rank < 0))
06070 CkAbort("MPI_Graph_neighbors: rank not within range");
06071
06072 if (rank == 0)
06073 for (int i = 0; i < maxneighbors; i++)
06074 neighbors[i] = edges[i];
06075 else
06076 for (int i = 0; i < maxneighbors; i++)
06077 neighbors[i] = edges[index[rank - 1] + i];
06078
06079 return 0;
06080 }
06081
06082
06083
06089 int integerRoot(int n,int d) {
06090 double epsilon=0.001;
06091 return (int)floor(pow(n+epsilon,1.0/d));
06092 }
06093
06102 bool factors(int n, int d, int *dims, int m) {
06103 if (d==1)
06104 {
06105 if (n>=m) {
06106 dims[0]=n;
06107 return true;
06108 }
06109 }
06110 else {
06111 int k_up=integerRoot(n,d);
06112 for (int k=k_up;k>=m;k--)
06113 if (n%k==0) {
06114 dims[0]=k;
06115 if (factors(n/k,d-1,&dims[1],k))
06116 return true;
06117 }
06118 }
06119
06120 return false;
06121 }
06122
06123 CDECL
06124 int AMPI_Dims_create(int nnodes, int ndims, int *dims) {
06125 AMPIAPI("AMPI_Dims_create");
06126
06127 int i, n, d, *pdims;
06128
06129 n = nnodes;
06130 d = ndims;
06131
06132 for (i = 0; i < ndims; i++)
06133 if (dims[i] != 0)
06134 if (n % dims[i] != 0)
06135 CkAbort("MPI_Dims_Create: Value in dimensions array infeasible!");
06136 else {
06137 n = n / dims[i];
06138 d--;
06139 }
06140
06141 pdims = new int[d];
06142
06143 if (!factors(n, d, pdims, 1))
06144 CkAbort("MPI_Dims_Create: Factorization failed. Wonder why?");
06145
06146 int j = 0;
06147 for (i = 0; i < ndims; i++)
06148 if (dims[i] == 0) {
06149 dims[i] = pdims[j];
06150 j++;
06151 }
06152
06153 delete [] pdims;
06154
06155 return 0;
06156 }
06157
06158
06159
06160
06161
06162
06163 CDECL
06164 int AMPI_Cart_sub(MPI_Comm comm, int *remain_dims, MPI_Comm *newcomm) {
06165 AMPIAPI("AMPI_Cart_sub");
06166
06167 int i, *coords, ndims, rank;
06168 int color = 1, key = 1;
06169
06170 AMPI_Comm_rank(comm, &rank);
06171 ampiCommStruct &c = getAmpiParent()->getCart(comm);
06172 ndims = c.getndims();
06173 const CkVec<int> &dims = c.getdims();
06174 int num_remain_dims = 0;
06175
06176 coords = new int [ndims];
06177 AMPI_Cart_coords(comm, rank, ndims, coords);
06178
06179 for (i = 0; i < ndims; i++)
06180 if (remain_dims[i]) {
06181
06182 key = key * dims[i] + coords[i];
06183 num_remain_dims++;
06184 }
06185 else
06186
06187 color = color * dims[i] + coords[i];
06188
06189 getAmpiInstance(comm)->split(color, key, newcomm, CART_TOPOL);
06190
06191 ampiCommStruct &newc = getAmpiParent()->getCart(*newcomm);
06192 newc.setndims(num_remain_dims);
06193 CkVec<int> dimsv;
06194 const CkVec<int> &periods = c.getperiods();
06195 CkVec<int> periodsv;
06196
06197 for (i = 0; i < ndims; i++)
06198 if (remain_dims[i]) {
06199 dimsv.push_back(dims[i]);
06200 periodsv.push_back(periods[i]);
06201 }
06202
06203 newc.setdims(dimsv);
06204 newc.setperiods(periodsv);
06205
06206 delete [] coords;
06207 return 0;
06208 }
06209
06210 void _registerampif(void)
06211 {
06212 _registerampi();
06213 }
06214
06215 void AMPI_Datatype_iscontig(MPI_Datatype datatype, int *flag){
06216 *flag = getDDT()->isContig(datatype);
06217 }
06218
06219 CDECL
06220 int AMPI_Type_get_envelope(MPI_Datatype datatype, int *ni, int *na, int *nd, int *combiner){
06221 AMPIAPI("AMPI_Type_get_envelope");
06222 return getDDT()->getEnvelope(datatype,ni,na,nd,combiner);
06223 }
06224
06225 CDECL
06226 int AMPI_Type_get_contents(MPI_Datatype datatype, int ni, int na, int nd, int i[], MPI_Aint a[], MPI_Datatype d[]){
06227 AMPIAPI("AMPI_Type_get_contents");
06228 return getDDT()->getContents(datatype,ni,na,nd,i,a,d);
06229 }
06230
06231
06232
06233
06234 CDECL
06235 int AMPI_Suspend(int comm) {
06236 AMPIAPI("AMPI_Suspend");
06237 getAmpiInstance(comm)->block();
06238 return 0;
06239 }
06240
06241 CDECL
06242 int AMPI_Yield(int comm) {
06243 AMPIAPI("AMPI_Yield");
06244 getAmpiInstance(comm)->yield();
06245 return 0;
06246 }
06247
06248 CDECL
06249 int AMPI_Resume(int dest, int comm) {
06250 AMPIAPI("AMPI_Resume");
06251 getAmpiInstance(comm)->getProxy()[dest].unblock();
06252 return 0;
06253 }
06254
06255 CDECL
06256 int AMPI_System(const char *cmd) {
06257 return TCHARM_System(cmd);
06258 }
06259
06260 #if CMK_BIGSIM_CHARM
06261
06262 extern "C" void startCFnCall(void *param,void *msg)
06263 {
06264 BgSetStartEvent();
06265 ampi *ptr = (ampi*)param;
06266 ampi::bcastraw(NULL, 0, ptr->getProxy());
06267 delete (CkReductionMsg*)msg;
06268 }
06269
06270 CDECL
06271 int AMPI_Set_startevent(MPI_Comm comm)
06272 {
06273 AMPIAPI("AMPI_BgSetStartEvent");
06274 CmiAssert(comm == MPI_COMM_WORLD);
06275
06276 ampi *ptr = getAmpiInstance(comm);
06277
06278 CkDDT_DataType *ddt_type = ptr->getDDT()->getType(MPI_INT);
06279
06280 CkReductionMsg *msg=makeRednMsg(ddt_type, NULL, 0, MPI_INT, MPI_SUM);
06281 if (CkMyPe() == 0) {
06282 CkCallback allreduceCB(startCFnCall, ptr);
06283 msg->setCallback(allreduceCB);
06284 }
06285 ptr->contribute(msg);
06286
06287
06288 if(-1==ptr->recv(MPI_BCAST_TAG, -1, NULL, 0, MPI_INT, MPI_COMM_WORLD))
06289 CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
06290
06291 printf("AMPI_Set_startevent done %d\n", CkMyPe());
06292 return 0;
06293 }
06294 #endif
06295
06296 #if CMK_CUDA
06297 GPUReq::GPUReq()
06298 {
06299 comm = MPI_COMM_SELF;
06300 isComplete = false;
06301 isvalid = true;
06302 MPI_Comm_rank(comm, &src);
06303 buf = getAmpiInstance(comm);
06304 }
06305
06306 bool GPUReq::test(MPI_Status *sts)
06307 {
06308 return isComplete;
06309 }
06310
06311 void GPUReq::complete(MPI_Status *sts)
06312 {
06313 wait(sts);
06314 }
06315
06316 int GPUReq::wait(MPI_Status *sts)
06317 {
06318 (void)sts;
06319 while (!isComplete) {
06320 getAmpiInstance(comm)->block();
06321 }
06322 return 0;
06323 }
06324
06325 void GPUReq::receive(ampi *ptr, AmpiMsg *msg)
06326 {
06327 CkAbort("GPUReq::receive should never be called");
06328 }
06329
06330 void GPUReq::setComplete()
06331 {
06332 isComplete = true;
06333 }
06334
06335 class workRequestQueue;
06336 extern workRequestQueue *wrQueue;
06337 void enqueue(workRequestQueue *q, workRequest *wr);
06338 extern "C++" void setWRCallback(workRequest *wr, void *cb);
06339
06340 void AMPI_GPU_complete(void *request, void* dummy)
06341 {
06342 GPUReq *req = static_cast<GPUReq *>(request);
06343 req->setComplete();
06344 ampi *ptr = static_cast<ampi *>(req->buf);
06345 ptr->unblock();
06346 }
06347
06348 CDECL
06349 int AMPI_GPU_Iinvoke(workRequest *to_call, MPI_Request *request)
06350 {
06351 AMPIAPI(__func__);
06352
06353 AmpiRequestList* reqs = getReqs();
06354 GPUReq *newreq = new GPUReq();
06355 *request = reqs->insert(newreq);
06356
06357
06358 CkCallback *cb = new CkCallback(&I_GPU_complete, newreq);
06359 setWRCallback(to_call, cb);
06360
06361 enqueue(wrQueue, to_call);
06362 }
06363
06364 CDECL
06365 int AMPI_GPU_Invoke(workRequest *to_call)
06366 {
06367 AMPIAPI(__func__);
06368
06369 MPI_Request req;
06370 AMPI_GPU_Iinvoke(to_call, &req);
06371 MPI_Wait(&req, MPI_STATUS_IGNORE);
06372
06373 return 0;
06374 }
06375
06376 #endif
06377
06378 #include "ampi.def.h"
06379