00001
00002
00003
00009
00010
00011
00012
00013
00014
00015
00016 #include "adio.h"
00017 #include "adio_extern.h"
00018 #include "ad_bgl.h"
00019 #include "ad_bgl_pset.h"
00020 #include "ad_bgl_aggrs.h"
00021
00022 #ifdef HAVE_SPI_KERNEL_INTERFACE_H
00023 #include <spi/kernel_interface.h>
00024 #endif
00025
00026 #ifdef PROFILE
00027 #include "mpe.h"
00028 #endif
00029
00030 #ifdef USE_DBG_LOGGING
00031 #define RDCOLL_DEBUG 1
00032 #endif
00033 #ifdef AGGREGATION_PROFILE
00034 #include "mpe.h"
00035 #endif
00036
00037
00038 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00039 datatype, int nprocs,
00040 int myrank, ADIOI_Access
00041 *others_req, ADIO_Offset *offset_list,
00042 ADIO_Offset *len_list, int contig_access_count,
00043 ADIO_Offset
00044 min_st_offset, ADIO_Offset fd_size,
00045 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00046 int *buf_idx, int *error_code);
00047 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00048 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00049 *len_list, int *send_size, int *recv_size,
00050 int *count, int *start_pos,
00051 int *partial_send,
00052 int *recd_from_proc, int nprocs,
00053 int myrank, int
00054 buftype_is_contig, int contig_access_count,
00055 ADIO_Offset min_st_offset,
00056 ADIO_Offset fd_size,
00057 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00058 ADIOI_Access *others_req,
00059 int iter,
00060 MPI_Aint buftype_extent, int *buf_idx);
00061 static void ADIOI_R_Exchange_data_alltoallv(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00062 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00063 *len_list, int *send_size, int *recv_size,
00064 int *count, int *start_pos,
00065 int *partial_send,
00066 int *recd_from_proc, int nprocs,
00067 int myrank, int
00068 buftype_is_contig, int contig_access_count,
00069 ADIO_Offset min_st_offset,
00070 ADIO_Offset fd_size,
00071 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00072 ADIOI_Access *others_req,
00073 int iter,
00074 MPI_Aint buftype_extent, int *buf_idx);
00075 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00076 *flat_buf, char **recv_buf, ADIO_Offset
00077 *offset_list, ADIO_Offset *len_list,
00078 unsigned *recv_size,
00079 MPI_Request *requests, MPI_Status *statuses,
00080 int *recd_from_proc, int nprocs,
00081 int contig_access_count,
00082 ADIO_Offset min_st_offset,
00083 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00084 ADIO_Offset *fd_end,
00085 MPI_Aint buftype_extent);
00086
00087 extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
00088 datatype, int file_ptr_type, ADIO_Offset
00089 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
00090 **len_list_ptr, ADIO_Offset *start_offset_ptr,
00091 ADIO_Offset *end_offset_ptr, int
00092 *contig_access_count_ptr);
00093
00094 static int ADIOI_too_much_memory_for_alltoallv(int nprocs,
00095 int * send_size, int *recv_size);
00096
00097 void ADIOI_BGL_ReadStridedColl(ADIO_File fd, void *buf, int count,
00098 MPI_Datatype datatype, int file_ptr_type,
00099 ADIO_Offset offset, ADIO_Status *status, int
00100 *error_code)
00101 {
00102
00103
00104
00105
00106
00107
00108 ADIOI_Access *my_req;
00109
00110
00111
00112 ADIOI_Access *others_req;
00113
00114
00115
00116 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00117 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00118 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00119 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
00120 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00121 *fd_end = NULL, *end_offsets = NULL;
00122 ADIO_Offset *bgl_offsets0 = NULL, *bgl_offsets = NULL;
00123 int ii;
00124 ADIO_Offset *len_list = NULL;
00125 int *buf_idx = NULL;
00126 #if BGL_PROFILE
00127 BGLMPIO_T_CIO_RESET( 0, r )
00128 #endif
00129
00130 #ifdef HAVE_STATUS_SET_BYTES
00131 int bufsize, size;
00132 #endif
00133
00134 #if 0
00135
00136 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
00137 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
00138 file_ptr_type, offset, status, error_code);
00139 return;
00140 } */
00141 #endif
00142 #ifdef PROFILE
00143 MPE_Log_event(13, 0, "start computation");
00144 #endif
00145
00146 MPI_Comm_size(fd->comm, &nprocs);
00147 MPI_Comm_rank(fd->comm, &myrank);
00148
00149
00150 nprocs_for_coll = fd->hints->cb_nodes;
00151 orig_fp = fd->fp_ind;
00152
00153 #if BGL_PROFILE
00154 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 0, BGLMPIO_CIO_LCOMP, BGLMPIO_CIO_LAST )
00155 #endif
00156
00157
00158 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00159
00160
00161
00162
00163
00164
00165 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00166 &offset_list, &len_list, &start_offset,
00167 &end_offset, &contig_access_count);
00168
00169 #if BGL_PROFILE
00170 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_GATHER, BGLMPIO_CIO_LCOMP )
00171 #endif
00172
00173 #ifdef RDCOLL_DEBUG
00174 for (i=0; i<contig_access_count; i++) {
00175 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
00176 myrank, offset_list[i], len_list[i]);
00177 }
00178 #endif
00179
00180
00181
00182
00183
00184 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00185 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00186
00187 if (bglmpio_tunegather) {
00188 bgl_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
00189 bgl_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
00190 for (ii=0; ii<nprocs; ii++) {
00191 bgl_offsets0[ii*2] = 0;
00192 bgl_offsets0[ii*2+1] = 0;
00193 }
00194 bgl_offsets0[myrank*2] = start_offset;
00195 bgl_offsets0[myrank*2+1] = end_offset;
00196
00197 MPI_Allreduce( bgl_offsets0, bgl_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
00198
00199 for (ii=0; ii<nprocs; ii++) {
00200 st_offsets [ii] = bgl_offsets[ii*2] ;
00201 end_offsets[ii] = bgl_offsets[ii*2+1];
00202 }
00203 ADIOI_Free( bgl_offsets0 );
00204 ADIOI_Free( bgl_offsets );
00205 } else {
00206 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00207 ADIO_OFFSET, fd->comm);
00208 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00209 ADIO_OFFSET, fd->comm);
00210 }
00211
00212 #if BGL_PROFILE
00213 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_PATANA, BGLMPIO_CIO_GATHER )
00214 #endif
00215
00216
00217 for (i=1; i<nprocs; i++)
00218 if ((st_offsets[i] < end_offsets[i-1]) &&
00219 (st_offsets[i] <= end_offsets[i]))
00220 interleave_count++;
00221
00222
00223 }
00224
00225 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00226
00227 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
00228 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
00229 {
00230
00231 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00232 ADIOI_Free(offset_list);
00233 ADIOI_Free(len_list);
00234 ADIOI_Free(st_offsets);
00235 ADIOI_Free(end_offsets);
00236 }
00237
00238 fd->fp_ind = orig_fp;
00239 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00240
00241 if (buftype_is_contig && filetype_is_contig) {
00242 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00243 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00244 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
00245 off, status, error_code);
00246 }
00247 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00248 0, status, error_code);
00249 }
00250 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
00251 offset, status, error_code);
00252
00253 return;
00254 }
00255
00256 #if BGL_PROFILE
00257 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_FD_PART, BGLMPIO_CIO_PATANA )
00258 #endif
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276 if (bglmpio_tuneblocking)
00277 ADIOI_BGL_GPFS_Calc_file_domains(st_offsets, end_offsets, nprocs,
00278 nprocs_for_coll, &min_st_offset,
00279 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
00280 else
00281 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00282 nprocs_for_coll, &min_st_offset,
00283 &fd_start, &fd_end,
00284 fd->hints->min_fdomain_size, &fd_size,
00285 fd->hints->striping_unit);
00286
00287 #if BGL_PROFILE
00288 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_MYREQ, BGLMPIO_CIO_FD_PART )
00289 #endif
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 if (bglmpio_tuneblocking)
00304 ADIOI_BGL_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00305 min_st_offset, fd_start, fd_end, fd_size,
00306 nprocs, &count_my_req_procs,
00307 &count_my_req_per_proc, &my_req,
00308 &buf_idx);
00309 else
00310 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00311 min_st_offset, fd_start, fd_end, fd_size,
00312 nprocs, &count_my_req_procs,
00313 &count_my_req_per_proc, &my_req,
00314 &buf_idx);
00315
00316 #if BGL_PROFILE
00317 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_OTHREQ, BGLMPIO_CIO_MYREQ )
00318 #endif
00319
00320
00321
00322
00323
00324
00325
00326
00327 if (bglmpio_tuneblocking)
00328 ADIOI_BGL_Calc_others_req(fd, count_my_req_procs,
00329 count_my_req_per_proc, my_req,
00330 nprocs, myrank, &count_others_req_procs,
00331 &others_req);
00332
00333 else
00334 ADIOI_Calc_others_req(fd, count_my_req_procs,
00335 count_my_req_per_proc, my_req,
00336 nprocs, myrank, &count_others_req_procs,
00337 &others_req);
00338
00339 #if BGL_PROFILE
00340 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_DEXCH, BGLMPIO_CIO_OTHREQ )
00341 #endif
00342
00343
00344
00345
00346 ADIOI_Free(count_my_req_per_proc);
00347 for (i=0; i<nprocs; i++) {
00348 if (my_req[i].count) {
00349 ADIOI_Free(my_req[i].offsets);
00350 ADIOI_Free(my_req[i].lens);
00351 }
00352 }
00353 ADIOI_Free(my_req);
00354
00355
00356
00357
00358
00359 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
00360 others_req, offset_list,
00361 len_list, contig_access_count, min_st_offset,
00362 fd_size, fd_start, fd_end, buf_idx, error_code);
00363
00364 #if BGL_PROFILE
00365 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_DEXCH )
00366 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_MPIO_CRW )
00367
00368 BGLMPIO_T_CIO_REPORT( 0, r, fd, myrank )
00369 #endif
00370
00371 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00372
00373
00374 for (i=0; i<nprocs; i++) {
00375 if (others_req[i].count) {
00376 ADIOI_Free(others_req[i].offsets);
00377 ADIOI_Free(others_req[i].lens);
00378 ADIOI_Free(others_req[i].mem_ptrs);
00379 }
00380 }
00381 ADIOI_Free(others_req);
00382
00383 ADIOI_Free(buf_idx);
00384 ADIOI_Free(offset_list);
00385 ADIOI_Free(len_list);
00386 ADIOI_Free(st_offsets);
00387 ADIOI_Free(end_offsets);
00388 ADIOI_Free(fd_start);
00389 ADIOI_Free(fd_end);
00390
00391 #ifdef HAVE_STATUS_SET_BYTES
00392 MPI_Type_size(datatype, &size);
00393 bufsize = size * count;
00394 MPIR_Status_set_bytes(status, datatype, bufsize);
00395
00396
00397
00398 #endif
00399
00400 fd->fp_sys_posn = -1;
00401 }
00402
00403 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00404 datatype, int nprocs,
00405 int myrank, ADIOI_Access
00406 *others_req, ADIO_Offset *offset_list,
00407 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00408 min_st_offset, ADIO_Offset fd_size,
00409 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00410 int *buf_idx, int *error_code)
00411 {
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
00423 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
00424 char *read_buf = NULL, *tmp_buf;
00425 int *curr_offlen_ptr, *count, *send_size, *recv_size;
00426 int *partial_send, *recd_from_proc, *start_pos;
00427
00428 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
00429 int req_len, flag, rank;
00430 MPI_Status status;
00431 ADIOI_Flatlist_node *flat_buf=NULL;
00432 MPI_Aint buftype_extent;
00433 int coll_bufsize;
00434 #ifdef RDCOLL_DEBUG
00435 int iii;
00436 #endif
00437 *error_code = MPI_SUCCESS;
00438
00439
00440
00441
00442
00443
00444
00445 coll_bufsize = fd->hints->cb_buffer_size;
00446
00447
00448 for (i=0; i < nprocs; i++) {
00449 if (others_req[i].count) {
00450 st_loc = others_req[i].offsets[0];
00451 end_loc = others_req[i].offsets[0];
00452 break;
00453 }
00454 }
00455
00456
00457 for (i=0; i < nprocs; i++)
00458 for (j=0; j<others_req[i].count; j++) {
00459 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00460 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00461 + others_req[i].lens[j] - 1));
00462 }
00463
00464
00465
00466
00467
00468
00469 if ((st_loc==-1) && (end_loc==-1)) {
00470
00471 ntimes = 0;
00472 }
00473 else {
00474
00475 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00476 }
00477
00478 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
00479
00480 if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
00481
00482 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00483
00484
00485 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00486
00487
00488
00489 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00490
00491
00492
00493
00494 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00495
00496
00497 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00498
00499
00500
00501 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00502
00503
00504
00505 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00506
00507
00508
00509 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00510 if (!buftype_is_contig) {
00511 ADIOI_Flatten_datatype(datatype);
00512 flat_buf = ADIOI_Flatlist;
00513 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00514 }
00515 MPI_Type_extent(datatype, &buftype_extent);
00516
00517 done = 0;
00518 off = st_loc;
00519 for_curr_iter = for_next_iter = 0;
00520
00521 MPI_Comm_rank(fd->comm, &rank);
00522
00523 #ifdef PROFILE
00524 MPE_Log_event(14, 0, "end computation");
00525 #endif
00526
00527 for (m=0; m<ntimes; m++) {
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565 #ifdef PROFILE
00566 MPE_Log_event(13, 0, "start computation");
00567 #endif
00568 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
00569 real_off = off - for_curr_iter;
00570 real_size = size + for_curr_iter;
00571
00572 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00573 for_next_iter = 0;
00574
00575 for (i=0; i<nprocs; i++) {
00576 #ifdef RDCOLL_DEBUG
00577 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
00578 #endif
00579 if (others_req[i].count) {
00580 start_pos[i] = curr_offlen_ptr[i];
00581 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
00582 j++) {
00583 if (partial_send[i]) {
00584
00585
00586 req_off = others_req[i].offsets[j] +
00587 partial_send[i];
00588 req_len = others_req[i].lens[j] -
00589 partial_send[i];
00590 partial_send[i] = 0;
00591
00592 others_req[i].offsets[j] = req_off;
00593 others_req[i].lens[j] = req_len;
00594 }
00595 else {
00596 req_off = others_req[i].offsets[j];
00597 req_len = others_req[i].lens[j];
00598 }
00599 if (req_off < real_off + real_size) {
00600 count[i]++;
00601 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
00602 MPI_Address(read_buf+req_off-real_off,
00603 &(others_req[i].mem_ptrs[j]));
00604 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
00605 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
00606 (ADIO_Offset)(unsigned)req_len));
00607
00608 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
00609 partial_send[i] = (int) (real_off + real_size - req_off);
00610 if ((j+1 < others_req[i].count) &&
00611 (others_req[i].offsets[j+1] <
00612 real_off+real_size)) {
00613
00614
00615 for_next_iter = ADIOI_MAX(for_next_iter,
00616 real_off + real_size - others_req[i].offsets[j+1]);
00617
00618
00619 }
00620 break;
00621 }
00622 }
00623 else break;
00624 }
00625 curr_offlen_ptr[i] = j;
00626 }
00627 }
00628
00629 flag = 0;
00630 for (i=0; i<nprocs; i++)
00631 if (count[i]) flag = 1;
00632
00633 #ifdef PROFILE
00634 MPE_Log_event(14, 0, "end computation");
00635 #endif
00636 if (flag) {
00637 ADIOI_Assert(size == (int)size);
00638 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
00639 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
00640 #ifdef RDCOLL_DEBUG
00641 DBG_FPRINTF(stderr, "\tread_coll: 700, data read [%lld] = ", size );
00642 for (iii=0; iii<size && iii<80; iii++) { DBGV_FPRINTF(stderr, "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); }
00643 DBG_FPRINTF(stderr, "\n" );
00644 #endif
00645
00646 if (*error_code != MPI_SUCCESS) return;
00647 }
00648
00649 for_curr_iter = for_next_iter;
00650
00651 #ifdef PROFILE
00652 MPE_Log_event(7, 0, "start communication");
00653 #endif
00654 if (bglmpio_comm == 1)
00655 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00656 send_size, recv_size, count,
00657 start_pos, partial_send, recd_from_proc, nprocs,
00658 myrank,
00659 buftype_is_contig, contig_access_count,
00660 min_st_offset, fd_size, fd_start, fd_end,
00661 others_req,
00662 m, buftype_extent, buf_idx);
00663 else
00664 if (bglmpio_comm == 0) {
00665 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
00666 send_size, recv_size, count,
00667 start_pos, partial_send, recd_from_proc, nprocs,
00668 myrank,
00669 buftype_is_contig, contig_access_count,
00670 min_st_offset, fd_size, fd_start, fd_end,
00671 others_req,
00672 m, buftype_extent, buf_idx);
00673 }
00674
00675
00676 #ifdef PROFILE
00677 MPE_Log_event(8, 0, "end communication");
00678 #endif
00679
00680 if (for_next_iter) {
00681 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
00682 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
00683 ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
00684 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
00685 ADIOI_Free(read_buf);
00686 read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
00687 memcpy(read_buf, tmp_buf, for_next_iter);
00688 ADIOI_Free(tmp_buf);
00689 }
00690
00691 off += size;
00692 done += size;
00693 }
00694
00695 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00696 #ifdef PROFILE
00697 MPE_Log_event(7, 0, "start communication");
00698 #endif
00699 for (m=ntimes; m<max_ntimes; m++)
00700
00701
00702 if (bglmpio_comm == 1)
00703 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00704 send_size, recv_size, count,
00705 start_pos, partial_send, recd_from_proc, nprocs,
00706 myrank,
00707 buftype_is_contig, contig_access_count,
00708 min_st_offset, fd_size, fd_start, fd_end,
00709 others_req, m,
00710 buftype_extent, buf_idx);
00711 else
00712 if (bglmpio_comm == 0)
00713 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
00714 send_size, recv_size, count,
00715 start_pos, partial_send, recd_from_proc, nprocs,
00716 myrank,
00717 buftype_is_contig, contig_access_count,
00718 min_st_offset, fd_size, fd_start, fd_end,
00719 others_req,
00720 m, buftype_extent, buf_idx);
00721
00722 #ifdef PROFILE
00723 MPE_Log_event(8, 0, "end communication");
00724 #endif
00725
00726 if (ntimes) ADIOI_Free(read_buf);
00727 ADIOI_Free(curr_offlen_ptr);
00728 ADIOI_Free(count);
00729 ADIOI_Free(partial_send);
00730 ADIOI_Free(send_size);
00731 ADIOI_Free(recv_size);
00732 ADIOI_Free(recd_from_proc);
00733 ADIOI_Free(start_pos);
00734 }
00735
00736 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00737 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00738 *len_list, int *send_size, int *recv_size,
00739 int *count, int *start_pos, int *partial_send,
00740 int *recd_from_proc, int nprocs,
00741 int myrank, int
00742 buftype_is_contig, int contig_access_count,
00743 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00744 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00745 ADIOI_Access *others_req,
00746 int iter, MPI_Aint buftype_extent, int *buf_idx)
00747 {
00748 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
00749 char **recv_buf = NULL;
00750 MPI_Request *requests;
00751 MPI_Datatype send_type;
00752 MPI_Status *statuses;
00753
00754
00755
00756
00757 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
00758
00759 nprocs_recv = 0;
00760 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
00761
00762 nprocs_send = 0;
00763 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
00764
00765 requests = (MPI_Request *)
00766 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00767
00768
00769
00770
00771
00772 #ifdef AGGREGATION_PROFILE
00773 MPE_Log_event (5032, 0, NULL);
00774 #endif
00775
00776 if (buftype_is_contig) {
00777 j = 0;
00778 for (i=0; i < nprocs; i++)
00779 if (recv_size[i]) {
00780 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
00781 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
00782 j++;
00783 buf_idx[i] += recv_size[i];
00784 }
00785 }
00786 else {
00787
00788 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
00789 for (i=0; i < nprocs; i++)
00790 if (recv_size[i]) recv_buf[i] =
00791 (char *) ADIOI_Malloc(recv_size[i]);
00792
00793 j = 0;
00794 for (i=0; i < nprocs; i++)
00795 if (recv_size[i]) {
00796 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
00797 myrank+i+100*iter, fd->comm, requests+j);
00798 j++;
00799 #ifdef RDCOLL_DEBUG
00800 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
00801 myrank, recv_size[i], myrank+i+100*iter);
00802 #endif
00803 }
00804 }
00805
00806
00807
00808 j = 0;
00809 for (i=0; i<nprocs; i++) {
00810 if (send_size[i]) {
00811
00812 if (partial_send[i]) {
00813 k = start_pos[i] + count[i] - 1;
00814 tmp = others_req[i].lens[k];
00815 others_req[i].lens[k] = partial_send[i];
00816 }
00817 MPI_Type_hindexed(count[i],
00818 &(others_req[i].lens[start_pos[i]]),
00819 &(others_req[i].mem_ptrs[start_pos[i]]),
00820 MPI_BYTE, &send_type);
00821
00822 MPI_Type_commit(&send_type);
00823 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
00824 fd->comm, requests+nprocs_recv+j);
00825 MPI_Type_free(&send_type);
00826 if (partial_send[i]) others_req[i].lens[k] = tmp;
00827 j++;
00828 }
00829 }
00830
00831 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00832 sizeof(MPI_Status));
00833
00834
00835
00836 if (nprocs_recv) {
00837 #ifdef NEEDS_MPI_TEST
00838 j = 0;
00839 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
00840 #else
00841 MPI_Waitall(nprocs_recv, requests, statuses);
00842 #endif
00843
00844
00845 if (!buftype_is_contig)
00846 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
00847 offset_list, len_list, (unsigned*)recv_size,
00848 requests, statuses, recd_from_proc,
00849 nprocs, contig_access_count,
00850 min_st_offset, fd_size, fd_start, fd_end,
00851 buftype_extent);
00852 }
00853
00854
00855 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
00856
00857 ADIOI_Free(statuses);
00858 ADIOI_Free(requests);
00859
00860 if (!buftype_is_contig) {
00861 for (i=0; i < nprocs; i++)
00862 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
00863 ADIOI_Free(recv_buf);
00864 }
00865 #ifdef AGGREGATION_PROFILE
00866 MPE_Log_event (5033, 0, NULL);
00867 #endif
00868 }
00869
00870 #define ADIOI_BUF_INCR \
00871 { \
00872 while (buf_incr) { \
00873 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00874 user_buf_idx += size_in_buf; \
00875 flat_buf_sz -= size_in_buf; \
00876 if (!flat_buf_sz) { \
00877 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00878 else { \
00879 flat_buf_idx = 0; \
00880 n_buftypes++; \
00881 } \
00882 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00883 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00884 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00885 } \
00886 buf_incr -= size_in_buf; \
00887 } \
00888 }
00889
00890
00891 #define ADIOI_BUF_COPY \
00892 { \
00893 while (size) { \
00894 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00895 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)(buf + user_buf_idx)); \
00896 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00897 memcpy(((char *) buf) + user_buf_idx, \
00898 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
00899 recv_buf_idx[p] += size_in_buf; \
00900 user_buf_idx += size_in_buf; \
00901 flat_buf_sz -= size_in_buf; \
00902 if (!flat_buf_sz) { \
00903 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00904 else { \
00905 flat_buf_idx = 0; \
00906 n_buftypes++; \
00907 } \
00908 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00909 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00910 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00911 } \
00912 size -= size_in_buf; \
00913 buf_incr -= size_in_buf; \
00914 } \
00915 ADIOI_BUF_INCR \
00916 }
00917
00918 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00919 *flat_buf, char **recv_buf, ADIO_Offset
00920 *offset_list, ADIO_Offset *len_list,
00921 unsigned *recv_size,
00922 MPI_Request *requests, MPI_Status *statuses,
00923 int *recd_from_proc, int nprocs,
00924 int contig_access_count,
00925 ADIO_Offset min_st_offset,
00926 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00927 ADIO_Offset *fd_end,
00928 MPI_Aint buftype_extent)
00929 {
00930
00931
00932
00933 int i, p, flat_buf_idx;
00934 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
00935 int n_buftypes;
00936 ADIO_Offset off, len, rem_len, user_buf_idx;
00937
00938 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
00939
00940 ADIOI_UNREFERENCED_ARG(requests);
00941 ADIOI_UNREFERENCED_ARG(statuses);
00942
00943
00944
00945
00946
00947
00948
00949 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00950 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00951 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00952
00953 for (i=0; i < nprocs; i++) {
00954 recv_buf_idx[i] = curr_from_proc[i] = 0;
00955 done_from_proc[i] = recd_from_proc[i];
00956 }
00957
00958 user_buf_idx = flat_buf->indices[0];
00959 flat_buf_idx = 0;
00960 n_buftypes = 0;
00961 flat_buf_sz = flat_buf->blocklens[0];
00962
00963
00964
00965
00966
00967 for (i=0; i<contig_access_count; i++) {
00968 off = offset_list[i];
00969 rem_len = len_list[i];
00970
00971
00972 while (rem_len > 0) {
00973 len = rem_len;
00974
00975
00976
00977
00978 p = ADIOI_BGL_Calc_aggregator(fd,
00979 off,
00980 min_st_offset,
00981 &len,
00982 fd_size,
00983 fd_start,
00984 fd_end);
00985
00986 if (recv_buf_idx[p] < recv_size[p]) {
00987 if (curr_from_proc[p]+len > done_from_proc[p]) {
00988 if (done_from_proc[p] > curr_from_proc[p]) {
00989 size = ADIOI_MIN(curr_from_proc[p] + len -
00990 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
00991 buf_incr = done_from_proc[p] - curr_from_proc[p];
00992 ADIOI_BUF_INCR
00993 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
00994 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
00995 curr_from_proc[p] = done_from_proc[p] + size;
00996 ADIOI_BUF_COPY
00997 }
00998 else {
00999 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
01000 buf_incr = len;
01001 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
01002 curr_from_proc[p] += (unsigned) size;
01003 ADIOI_BUF_COPY
01004 }
01005 }
01006 else {
01007 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
01008 curr_from_proc[p] += (unsigned) len;
01009 buf_incr = len;
01010 ADIOI_BUF_INCR
01011 }
01012 }
01013 else {
01014 buf_incr = len;
01015 ADIOI_BUF_INCR
01016 }
01017 off += len;
01018 rem_len -= len;
01019 }
01020 }
01021 for (i=0; i < nprocs; i++)
01022 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
01023
01024 ADIOI_Free(curr_from_proc);
01025 ADIOI_Free(done_from_proc);
01026 ADIOI_Free(recv_buf_idx);
01027 }
01028
01029 static void ADIOI_R_Exchange_data_alltoallv(
01030 ADIO_File fd, void *buf, ADIOI_Flatlist_node
01031 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
01032 *len_list, int *send_size, int *recv_size,
01033 int *count, int *start_pos, int *partial_send,
01034 int *recd_from_proc, int nprocs,
01035 int myrank, int
01036 buftype_is_contig, int contig_access_count,
01037 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
01038 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
01039 ADIOI_Access *others_req,
01040 int iter, MPI_Aint buftype_extent, int *buf_idx)
01041 {
01042 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
01043 char **recv_buf = NULL;
01044 MPI_Request *requests=NULL;
01045 MPI_Status *statuses=NULL;
01046 int rtail, stail;
01047 char *sbuf_ptr, *from_ptr;
01048 int len;
01049 int *sdispls, *rdispls;
01050 char *all_recv_buf, *all_send_buf;
01051 int my_too_big, too_big;
01052
01053
01054
01055 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
01056
01057
01058
01059 my_too_big = ADIOI_too_much_memory_for_alltoallv(nprocs,
01060 send_size, recv_size);
01061 MPI_Allreduce(&my_too_big, &too_big, 1, MPI_INT, MPI_MAX, fd->comm);
01062 if (too_big) {
01063
01064 ADIOI_R_Exchange_data(fd, buf, flat_buf,
01065 offset_list, len_list, send_size, recv_size,
01066 count, start_pos, partial_send, recd_from_proc,
01067 nprocs, myrank, buftype_is_contig,
01068 contig_access_count, min_st_offset,
01069 fd_size, fd_start, fd_end,
01070 others_req, iter, buftype_extent, buf_idx);
01071 return;
01072 }
01073
01074
01075 nprocs_recv = 0;
01076 for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; break; }
01077
01078 nprocs_send = 0;
01079 for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; break; }
01080
01081
01082 rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
01083 rtail = 0;
01084 for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
01085
01086
01087 all_recv_buf = (char *) ADIOI_Malloc( rtail );
01088 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
01089 for (i=0; i<nprocs; i++) { recv_buf[i] = all_recv_buf + rdispls[i]; }
01090
01091
01092 sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
01093 stail = 0;
01094 for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
01095
01096
01097 all_send_buf = (char *) ADIOI_Malloc( stail );
01098 for (i=0; i<nprocs; i++)
01099 {
01100 if (send_size[i]) {
01101 if (partial_send[i]) {
01102 k = start_pos[i] + count[i] - 1;
01103 tmp = others_req[i].lens[k];
01104 others_req[i].lens[k] = partial_send[i];
01105 }
01106 sbuf_ptr = all_send_buf + sdispls[i];
01107 for (j=0; j<count[i]; j++) {
01108 ADIOI_ENSURE_AINT_FITS_IN_PTR( others_req[i].mem_ptrs[ start_pos[i]+j ]);
01109 from_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
01110 len = others_req[i].lens[ start_pos[i]+j ] ;
01111 memcpy( sbuf_ptr, from_ptr, len );
01112 sbuf_ptr += len;
01113 }
01114 if (partial_send[i]) others_req[i].lens[k] = tmp;
01115 }
01116 }
01117
01118 #if RDCOLL_DEBUG
01119 DBG_FPRINTF(stderr, "\tsend_size = [%d]%2d,",0,send_size[0]);
01120 for (i=1; i<nprocs; i++) if(send_size[i-1]!=send_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,send_size[i] ); }
01121 DBG_FPRINTF(stderr, "\trecv_size = [%d]%2d,",0,recv_size[0]);
01122 for (i=1; i<nprocs; i++) if(recv_size[i-1]!=recv_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,recv_size[i] ); }
01123 DBG_FPRINTF(stderr, "\tsdispls = [%d]%2d,",0,sdispls[0]);
01124 for (i=1; i<nprocs; i++) if(sdispls[i-1]!=sdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,sdispls [i] ); }
01125 DBG_FPRINTF(stderr, "\trdispls = [%d]%2d,",0,rdispls[0]);
01126 for (i=1; i<nprocs; i++) if(rdispls[i-1]!=rdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,rdispls [i] ); }
01127 DBG_FPRINTF(stderr, "\ttails = %4d, %4d\n", stail, rtail );
01128 if (nprocs_send) {
01129 DBG_FPRINTF(stderr, "\tall_send_buf = [%d]%2d,",0,all_send_buf[0]);
01130 for (i=1; i<nprocs; i++) if(all_send_buf[(i-1)*131072]!=all_send_buf[i*131072]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i, all_send_buf [i*131072] ); }
01131 }
01132 #endif
01133
01134
01135 MPI_Alltoallv(
01136 all_send_buf, send_size, sdispls, MPI_BYTE,
01137 all_recv_buf, recv_size, rdispls, MPI_BYTE,
01138 fd->comm );
01139
01140 #if 0
01141 DBG_FPRINTF(stderr, "\tall_recv_buf = " );
01142 for (i=131072; i<131073; i++) { DBG_FPRINTF(stderr, "%2d,", all_recv_buf [i] ); }
01143 DBG_FPRINTF(stderr, "\n" );
01144 #endif
01145
01146
01147 if (nprocs_recv) {
01148 if (!buftype_is_contig)
01149 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
01150 offset_list, len_list, (unsigned*)recv_size,
01151 requests, statuses,
01152 recd_from_proc,
01153 nprocs, contig_access_count,
01154 min_st_offset, fd_size, fd_start, fd_end,
01155 buftype_extent);
01156 else {
01157 rtail = 0;
01158 for (i=0; i < nprocs; i++)
01159 if (recv_size[i]) {
01160 memcpy( (char *)buf + buf_idx[i], all_recv_buf + rtail, recv_size[i] );
01161 buf_idx[i] += recv_size[i];
01162 rtail += recv_size[i];
01163 }
01164 }
01165 }
01166
01167 ADIOI_Free( all_send_buf );
01168 ADIOI_Free( all_recv_buf );
01169 ADIOI_Free( recv_buf );
01170 ADIOI_Free( sdispls );
01171 ADIOI_Free( rdispls );
01172 return;
01173 }
01174
01175 static int ADIOI_too_much_memory_for_alltoallv(int nprocs,
01176 int * send_size, int *recv_size) {
01177
01178 unsigned int threshold;
01179 #ifdef HAVE_KERNEL_GETMEMORYSIZE
01180 Kernel_GetMemorySize(KERNEL_MEMSIZE_ESTHEAPAVAIL, &threshold);
01181 #else
01182 threshold = 1024*1024*128;
01183 #endif
01184
01185 int i, mem_required=0;
01186
01187 for (i=0; i< nprocs; i++)
01188
01189 mem_required += recv_size[i] + send_size[i];
01190
01191 mem_required += nprocs*sizeof(int)*3;
01192
01193 if (mem_required > threshold) return 1;
01194 return 0;
01195 }