00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "adio.h"
00010 #include "adio_extern.h"
00011 #ifdef PROFILE
00012 #include "mpe.h"
00013 #endif
00014
00015
00016 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00017 datatype, int nprocs,
00018 int myrank, ADIOI_Access
00019 *others_req, ADIO_Offset *offset_list,
00020 int *len_list, int contig_access_count,
00021 ADIO_Offset
00022 min_st_offset, ADIO_Offset fd_size,
00023 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00024 int *buf_idx, int *error_code);
00025 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00026 *flat_buf, ADIO_Offset *offset_list, int
00027 *len_list, int *send_size, int *recv_size,
00028 int *count, int *start_pos,
00029 int *partial_send,
00030 int *recd_from_proc, int nprocs,
00031 int myrank, int
00032 buftype_is_contig, int contig_access_count,
00033 ADIO_Offset min_st_offset,
00034 ADIO_Offset fd_size,
00035 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00036 ADIOI_Access *others_req,
00037 int iter,
00038 MPI_Aint buftype_extent, int *buf_idx);
00039 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00040 *flat_buf, char **recv_buf, ADIO_Offset
00041 *offset_list, int *len_list,
00042 int *recv_size,
00043 MPI_Request *requests, MPI_Status *statuses,
00044 int *recd_from_proc, int nprocs,
00045 int contig_access_count,
00046 ADIO_Offset min_st_offset,
00047 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00048 ADIO_Offset *fd_end,
00049 MPI_Aint buftype_extent);
00050
00051
00052 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
00053 MPI_Datatype datatype, int file_ptr_type,
00054 ADIO_Offset offset, ADIO_Status *status, int
00055 *error_code)
00056 {
00057
00058
00059
00060
00061
00062
00063 ADIOI_Access *my_req;
00064
00065
00066
00067 ADIOI_Access *others_req;
00068
00069
00070
00071 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00072 int *len_list, contig_access_count, interleave_count;
00073 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00074 int buftype_is_contig, *buf_idx;
00075 ADIO_Offset *offset_list, start_offset, end_offset, *st_offsets, orig_fp;
00076 ADIO_Offset *fd_start, *fd_end, fd_size, min_st_offset, *end_offsets;
00077 ADIO_Offset off;
00078 #ifdef HAVE_STATUS_SET_BYTES
00079 int bufsize, size;
00080 #endif
00081
00082 #ifdef PROFILE
00083 MPE_Log_event(13, 0, "start computation");
00084 #endif
00085
00086 MPI_Comm_size(fd->comm, &nprocs);
00087 MPI_Comm_rank(fd->comm, &myrank);
00088
00089
00090 nprocs_for_coll = fd->hints->cb_nodes;
00091
00092
00093
00094
00095
00096
00097
00098 orig_fp = fd->fp_ind;
00099 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00100 &offset_list, &len_list, &start_offset,
00101 &end_offset, &contig_access_count);
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00113 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00114
00115 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1, ADIO_OFFSET,
00116 fd->comm);
00117 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1, ADIO_OFFSET,
00118 fd->comm);
00119
00120
00121 interleave_count = 0;
00122 for (i=1; i<nprocs; i++)
00123 if (st_offsets[i] < end_offsets[i-1]) interleave_count++;
00124
00125
00126
00127 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00128
00129 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
00130 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
00131 {
00132
00133 ADIOI_Free(offset_list);
00134 ADIOI_Free(len_list);
00135 ADIOI_Free(st_offsets);
00136 ADIOI_Free(end_offsets);
00137
00138 fd->fp_ind = orig_fp;
00139 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00140
00141 if (buftype_is_contig && filetype_is_contig) {
00142 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00143 off = fd->disp + (fd->etype_size) * offset;
00144 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
00145 off, status, error_code);
00146 }
00147 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00148 0, status, error_code);
00149 }
00150 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
00151 offset, status, error_code);
00152
00153 return;
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00173 nprocs_for_coll, &min_st_offset,
00174 &fd_start, &fd_end, &fd_size);
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00189 min_st_offset, fd_start, fd_end, fd_size,
00190 nprocs, &count_my_req_procs,
00191 &count_my_req_per_proc, &my_req,
00192 &buf_idx);
00193
00194
00195
00196
00197
00198
00199
00200
00201 ADIOI_Calc_others_req(fd, count_my_req_procs,
00202 count_my_req_per_proc, my_req,
00203 nprocs, myrank, &count_others_req_procs,
00204 &others_req);
00205
00206
00207
00208
00209 ADIOI_Free(count_my_req_per_proc);
00210 for (i=0; i<nprocs; i++) {
00211 if (my_req[i].count) {
00212 ADIOI_Free(my_req[i].offsets);
00213 ADIOI_Free(my_req[i].lens);
00214 }
00215 }
00216 ADIOI_Free(my_req);
00217
00218
00219
00220
00221
00222 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
00223 others_req, offset_list,
00224 len_list, contig_access_count, min_st_offset,
00225 fd_size, fd_start, fd_end, buf_idx, error_code);
00226
00227 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00228
00229
00230 for (i=0; i<nprocs; i++) {
00231 if (others_req[i].count) {
00232 ADIOI_Free(others_req[i].offsets);
00233 ADIOI_Free(others_req[i].lens);
00234 ADIOI_Free(others_req[i].mem_ptrs);
00235 }
00236 }
00237 ADIOI_Free(others_req);
00238
00239 ADIOI_Free(buf_idx);
00240 ADIOI_Free(offset_list);
00241 ADIOI_Free(len_list);
00242 ADIOI_Free(st_offsets);
00243 ADIOI_Free(end_offsets);
00244 ADIOI_Free(fd_start);
00245 ADIOI_Free(fd_end);
00246
00247 #ifdef HAVE_STATUS_SET_BYTES
00248 MPI_Type_size(datatype, &size);
00249 bufsize = size * count;
00250 MPIR_Status_set_bytes(status, datatype, bufsize);
00251
00252
00253
00254 #endif
00255
00256 fd->fp_sys_posn = -1;
00257 }
00258
00259 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
00260 datatype, int file_ptr_type, ADIO_Offset
00261 offset, ADIO_Offset **offset_list_ptr, int
00262 **len_list_ptr, ADIO_Offset *start_offset_ptr,
00263 ADIO_Offset *end_offset_ptr, int
00264 *contig_access_count_ptr)
00265 {
00266 int filetype_size, buftype_size, etype_size;
00267 int i, j, k, frd_size=0, old_frd_size=0, st_index=0;
00268 int n_filetypes, etype_in_filetype;
00269 ADIO_Offset abs_off_in_filetype=0;
00270 int bufsize, sum, n_etypes_in_filetype, size_in_filetype;
00271 int contig_access_count, *len_list, flag, filetype_is_contig;
00272 MPI_Aint filetype_extent, filetype_lb;
00273 ADIOI_Flatlist_node *flat_file;
00274 ADIO_Offset *offset_list, off, end_offset=0, disp;
00275
00276
00277
00278
00279 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00280
00281 MPI_Type_size(fd->filetype, &filetype_size);
00282 MPI_Type_extent(fd->filetype, &filetype_extent);
00283 MPI_Type_lb(fd->filetype, &filetype_lb);
00284 MPI_Type_size(datatype, &buftype_size);
00285 etype_size = fd->etype_size;
00286
00287 if ( ! filetype_size ) {
00288 *contig_access_count_ptr = 0;
00289 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00290 *len_list_ptr = (int *) ADIOI_Malloc(2*sizeof(int));
00291
00292
00293 offset_list = *offset_list_ptr;
00294 len_list = *len_list_ptr;
00295 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00296 fd->disp + etype_size * offset;
00297 len_list[0] = 0;
00298 *start_offset_ptr = offset_list[0];
00299 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00300
00301 return;
00302 }
00303
00304 if (filetype_is_contig) {
00305 *contig_access_count_ptr = 1;
00306 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00307 *len_list_ptr = (int *) ADIOI_Malloc(2*sizeof(int));
00308
00309
00310 offset_list = *offset_list_ptr;
00311 len_list = *len_list_ptr;
00312 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00313 fd->disp + etype_size * offset;
00314 len_list[0] = bufcount * buftype_size;
00315 *start_offset_ptr = offset_list[0];
00316 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00317
00318
00319 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
00320 }
00321
00322 else {
00323
00324
00325
00326
00327 flat_file = ADIOI_Flatlist;
00328 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
00329 disp = fd->disp;
00330
00331 if (file_ptr_type == ADIO_INDIVIDUAL) {
00332 offset = fd->fp_ind;
00333 n_filetypes = -1;
00334 flag = 0;
00335 while (!flag) {
00336 n_filetypes++;
00337 for (i=0; i<flat_file->count; i++) {
00338 if (disp + flat_file->indices[i] +
00339 (ADIO_Offset) n_filetypes*filetype_extent +
00340 flat_file->blocklens[i] >= offset)
00341 {
00342 st_index = i;
00343 frd_size = (int) (disp + flat_file->indices[i] +
00344 (ADIO_Offset) n_filetypes*filetype_extent
00345 + flat_file->blocklens[i] - offset);
00346 flag = 1;
00347 break;
00348 }
00349 }
00350 }
00351 }
00352 else {
00353 n_etypes_in_filetype = filetype_size/etype_size;
00354 n_filetypes = (int) (offset / n_etypes_in_filetype);
00355 etype_in_filetype = (int) (offset % n_etypes_in_filetype);
00356 size_in_filetype = etype_in_filetype * etype_size;
00357
00358 sum = 0;
00359 for (i=0; i<flat_file->count; i++) {
00360 sum += flat_file->blocklens[i];
00361 if (sum > size_in_filetype) {
00362 st_index = i;
00363 frd_size = sum - size_in_filetype;
00364 abs_off_in_filetype = flat_file->indices[i] +
00365 size_in_filetype - (sum - flat_file->blocklens[i]);
00366 break;
00367 }
00368 }
00369
00370
00371 offset = disp + (ADIO_Offset) n_filetypes*filetype_extent +
00372 abs_off_in_filetype;
00373 }
00374
00375
00376
00377 old_frd_size = frd_size;
00378 contig_access_count = i = 0;
00379 j = st_index;
00380 bufsize = buftype_size * bufcount;
00381 frd_size = ADIOI_MIN(frd_size, bufsize);
00382 while (i < bufsize) {
00383 if (frd_size) contig_access_count++;
00384 i += frd_size;
00385 j = (j + 1) % flat_file->count;
00386 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);
00387 }
00388
00389
00390
00391 *offset_list_ptr = (ADIO_Offset *)
00392 ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
00393 *len_list_ptr = (int *) ADIOI_Malloc((contig_access_count+1)*sizeof(int));
00394
00395
00396 offset_list = *offset_list_ptr;
00397 len_list = *len_list_ptr;
00398
00399
00400
00401 *start_offset_ptr = offset;
00402
00403 i = k = 0;
00404 j = st_index;
00405 off = offset;
00406 frd_size = ADIOI_MIN(old_frd_size, bufsize);
00407 while (i < bufsize) {
00408 if (frd_size) {
00409 offset_list[k] = off;
00410 len_list[k] = frd_size;
00411 k++;
00412 }
00413 i += frd_size;
00414 end_offset = off + frd_size - 1;
00415
00416
00417
00418
00419 if (off + frd_size < disp + flat_file->indices[j] +
00420 flat_file->blocklens[j] +
00421 (ADIO_Offset) n_filetypes*filetype_extent)
00422 {
00423 off += frd_size;
00424
00425
00426
00427 }
00428 else {
00429 if (j < (flat_file->count - 1)) j++;
00430 else {
00431
00432
00433
00434 j = 0;
00435 n_filetypes++;
00436 }
00437 off = disp + flat_file->indices[j] +
00438 (ADIO_Offset) n_filetypes*filetype_extent;
00439 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);
00440 }
00441 }
00442
00443
00444 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00445
00446 *contig_access_count_ptr = contig_access_count;
00447 *end_offset_ptr = end_offset;
00448 }
00449 }
00450
00451 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00452 datatype, int nprocs,
00453 int myrank, ADIOI_Access
00454 *others_req, ADIO_Offset *offset_list,
00455 int *len_list, int contig_access_count, ADIO_Offset
00456 min_st_offset, ADIO_Offset fd_size,
00457 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00458 int *buf_idx, int *error_code)
00459 {
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470 int i, j, m, size, ntimes, max_ntimes, buftype_is_contig;
00471 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
00472 char *read_buf = NULL, *tmp_buf;
00473 int *curr_offlen_ptr, *count, *send_size, *recv_size;
00474 int *partial_send, *recd_from_proc, *start_pos, for_next_iter;
00475 int real_size, req_len, flag, for_curr_iter, rank;
00476 MPI_Status status;
00477 ADIOI_Flatlist_node *flat_buf=NULL;
00478 MPI_Aint buftype_extent;
00479 int coll_bufsize;
00480
00481 *error_code = MPI_SUCCESS;
00482
00483
00484
00485
00486
00487
00488
00489 coll_bufsize = fd->hints->cb_buffer_size;
00490
00491
00492 for (i=0; i < nprocs; i++) {
00493 if (others_req[i].count) {
00494 st_loc = others_req[i].offsets[0];
00495 end_loc = others_req[i].offsets[0];
00496 break;
00497 }
00498 }
00499
00500
00501 for (i=0; i < nprocs; i++)
00502 for (j=0; j<others_req[i].count; j++) {
00503 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00504 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00505 + others_req[i].lens[j] - 1));
00506 }
00507
00508
00509
00510
00511
00512
00513 if ((st_loc==-1) && (end_loc==-1)) {
00514
00515 ntimes = 0;
00516 }
00517 else {
00518
00519 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00520 }
00521
00522 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
00523
00524 if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
00525
00526 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00527
00528
00529 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00530
00531
00532
00533 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00534
00535
00536
00537
00538 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00539
00540
00541 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00542
00543
00544
00545 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00546
00547
00548
00549 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00550
00551
00552
00553 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00554 if (!buftype_is_contig) {
00555 ADIOI_Flatten_datatype(datatype);
00556 flat_buf = ADIOI_Flatlist;
00557 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00558 }
00559 MPI_Type_extent(datatype, &buftype_extent);
00560
00561 done = 0;
00562 off = st_loc;
00563 for_curr_iter = for_next_iter = 0;
00564
00565 MPI_Comm_rank(fd->comm, &rank);
00566
00567 #ifdef PROFILE
00568 MPE_Log_event(14, 0, "end computation");
00569 #endif
00570
00571 for (m=0; m<ntimes; m++) {
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609 #ifdef PROFILE
00610 MPE_Log_event(13, 0, "start computation");
00611 #endif
00612 size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done));
00613 real_off = off - for_curr_iter;
00614 real_size = size + for_curr_iter;
00615
00616 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00617 for_next_iter = 0;
00618
00619 for (i=0; i<nprocs; i++) {
00620
00621 if (others_req[i].count) {
00622 start_pos[i] = curr_offlen_ptr[i];
00623 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
00624 j++) {
00625 if (partial_send[i]) {
00626
00627
00628 req_off = others_req[i].offsets[j] +
00629 partial_send[i];
00630 req_len = others_req[i].lens[j] -
00631 partial_send[i];
00632 partial_send[i] = 0;
00633
00634 others_req[i].offsets[j] = req_off;
00635 others_req[i].lens[j] = req_len;
00636 }
00637 else {
00638 req_off = others_req[i].offsets[j];
00639 req_len = others_req[i].lens[j];
00640 }
00641 if (req_off < real_off + real_size) {
00642 count[i]++;
00643 MPI_Address(read_buf+req_off-real_off,
00644 &(others_req[i].mem_ptrs[j]));
00645 send_size[i] += (int)(ADIOI_MIN(real_off + (ADIO_Offset)real_size -
00646 req_off, req_len));
00647
00648 if (real_off+real_size-req_off < req_len) {
00649 partial_send[i] = (int) (real_off+real_size-
00650 req_off);
00651 if ((j+1 < others_req[i].count) &&
00652 (others_req[i].offsets[j+1] <
00653 real_off+real_size)) {
00654
00655
00656 for_next_iter = (int) (ADIOI_MAX(for_next_iter,
00657 real_off + real_size -
00658 others_req[i].offsets[j+1]));
00659
00660
00661 }
00662 break;
00663 }
00664 }
00665 else break;
00666 }
00667 curr_offlen_ptr[i] = j;
00668 }
00669 }
00670
00671 flag = 0;
00672 for (i=0; i<nprocs; i++)
00673 if (count[i]) flag = 1;
00674
00675 #ifdef PROFILE
00676 MPE_Log_event(14, 0, "end computation");
00677 #endif
00678 if (flag) {
00679 ADIO_ReadContig(fd, read_buf+for_curr_iter, size, MPI_BYTE,
00680 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
00681 if (*error_code != MPI_SUCCESS) return;
00682 }
00683
00684 for_curr_iter = for_next_iter;
00685
00686 #ifdef PROFILE
00687 MPE_Log_event(7, 0, "start communication");
00688 #endif
00689 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00690 send_size, recv_size, count,
00691 start_pos, partial_send, recd_from_proc, nprocs,
00692 myrank,
00693 buftype_is_contig, contig_access_count,
00694 min_st_offset, fd_size, fd_start, fd_end,
00695 others_req,
00696 m, buftype_extent, buf_idx);
00697
00698 #ifdef PROFILE
00699 MPE_Log_event(8, 0, "end communication");
00700 #endif
00701
00702 if (for_next_iter) {
00703 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
00704 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
00705 ADIOI_Free(read_buf);
00706 read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
00707 memcpy(read_buf, tmp_buf, for_next_iter);
00708 ADIOI_Free(tmp_buf);
00709 }
00710
00711 off += size;
00712 done += size;
00713 }
00714
00715 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00716 #ifdef PROFILE
00717 MPE_Log_event(7, 0, "start communication");
00718 #endif
00719 for (m=ntimes; m<max_ntimes; m++)
00720
00721 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00722 send_size, recv_size, count,
00723 start_pos, partial_send, recd_from_proc, nprocs,
00724 myrank,
00725 buftype_is_contig, contig_access_count,
00726 min_st_offset, fd_size, fd_start, fd_end,
00727 others_req, m,
00728 buftype_extent, buf_idx);
00729 #ifdef PROFILE
00730 MPE_Log_event(8, 0, "end communication");
00731 #endif
00732
00733 if (ntimes) ADIOI_Free(read_buf);
00734 ADIOI_Free(curr_offlen_ptr);
00735 ADIOI_Free(count);
00736 ADIOI_Free(partial_send);
00737 ADIOI_Free(send_size);
00738 ADIOI_Free(recv_size);
00739 ADIOI_Free(recd_from_proc);
00740 ADIOI_Free(start_pos);
00741 }
00742
00743 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00744 *flat_buf, ADIO_Offset *offset_list, int
00745 *len_list, int *send_size, int *recv_size,
00746 int *count, int *start_pos, int *partial_send,
00747 int *recd_from_proc, int nprocs,
00748 int myrank, int
00749 buftype_is_contig, int contig_access_count,
00750 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00751 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00752 ADIOI_Access *others_req,
00753 int iter, MPI_Aint buftype_extent, int *buf_idx)
00754 {
00755 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
00756 char **recv_buf = NULL;
00757 MPI_Request *requests;
00758 MPI_Datatype send_type;
00759 MPI_Status *statuses;
00760
00761
00762
00763
00764 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
00765
00766 nprocs_recv = 0;
00767 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
00768
00769 nprocs_send = 0;
00770 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
00771
00772 requests = (MPI_Request *)
00773 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00774
00775
00776
00777
00778
00779 if (buftype_is_contig) {
00780 j = 0;
00781 for (i=0; i < nprocs; i++)
00782 if (recv_size[i]) {
00783 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
00784 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
00785 j++;
00786 buf_idx[i] += recv_size[i];
00787 }
00788 }
00789 else {
00790
00791 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
00792 for (i=0; i < nprocs; i++)
00793 if (recv_size[i]) recv_buf[i] =
00794 (char *) ADIOI_Malloc(recv_size[i]);
00795
00796 j = 0;
00797 for (i=0; i < nprocs; i++)
00798 if (recv_size[i]) {
00799 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
00800 myrank+i+100*iter, fd->comm, requests+j);
00801 j++;
00802
00803
00804 }
00805 }
00806
00807
00808
00809 j = 0;
00810 for (i=0; i<nprocs; i++) {
00811 if (send_size[i]) {
00812
00813 if (partial_send[i]) {
00814 k = start_pos[i] + count[i] - 1;
00815 tmp = others_req[i].lens[k];
00816 others_req[i].lens[k] = partial_send[i];
00817 }
00818 MPI_Type_hindexed(count[i],
00819 &(others_req[i].lens[start_pos[i]]),
00820 &(others_req[i].mem_ptrs[start_pos[i]]),
00821 MPI_BYTE, &send_type);
00822
00823 MPI_Type_commit(&send_type);
00824 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
00825 fd->comm, requests+nprocs_recv+j);
00826 MPI_Type_free(&send_type);
00827 if (partial_send[i]) others_req[i].lens[k] = tmp;
00828 j++;
00829 }
00830 }
00831
00832 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00833 sizeof(MPI_Status));
00834
00835
00836
00837 if (nprocs_recv) {
00838 #ifdef NEEDS_MPI_TEST
00839 j = 0;
00840 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
00841 #else
00842 MPI_Waitall(nprocs_recv, requests, statuses);
00843 #endif
00844
00845
00846 if (!buftype_is_contig)
00847 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
00848 offset_list, len_list, recv_size,
00849 requests, statuses, recd_from_proc,
00850 nprocs, contig_access_count,
00851 min_st_offset, fd_size, fd_start, fd_end,
00852 buftype_extent);
00853 }
00854
00855
00856 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
00857
00858 ADIOI_Free(statuses);
00859 ADIOI_Free(requests);
00860
00861 if (!buftype_is_contig) {
00862 for (i=0; i < nprocs; i++)
00863 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
00864 ADIOI_Free(recv_buf);
00865 }
00866 }
00867
00868
00869 #define ADIOI_BUF_INCR \
00870 { \
00871 while (buf_incr) { \
00872 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00873 user_buf_idx += size_in_buf; \
00874 flat_buf_sz -= size_in_buf; \
00875 if (!flat_buf_sz) { \
00876 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00877 else { \
00878 flat_buf_idx = 0; \
00879 n_buftypes++; \
00880 } \
00881 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00882 n_buftypes*buftype_extent; \
00883 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00884 } \
00885 buf_incr -= size_in_buf; \
00886 } \
00887 }
00888
00889
00890 #define ADIOI_BUF_COPY \
00891 { \
00892 while (size) { \
00893 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00894 memcpy(((char *) buf) + user_buf_idx, \
00895 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
00896 recv_buf_idx[p] += size_in_buf; \
00897 user_buf_idx += size_in_buf; \
00898 flat_buf_sz -= size_in_buf; \
00899 if (!flat_buf_sz) { \
00900 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00901 else { \
00902 flat_buf_idx = 0; \
00903 n_buftypes++; \
00904 } \
00905 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00906 n_buftypes*buftype_extent; \
00907 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00908 } \
00909 size -= size_in_buf; \
00910 buf_incr -= size_in_buf; \
00911 } \
00912 ADIOI_BUF_INCR \
00913 }
00914
00915
00916 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00917 *flat_buf, char **recv_buf, ADIO_Offset
00918 *offset_list, int *len_list,
00919 int *recv_size,
00920 MPI_Request *requests, MPI_Status *statuses,
00921 int *recd_from_proc, int nprocs,
00922 int contig_access_count,
00923 ADIO_Offset min_st_offset,
00924 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00925 ADIO_Offset *fd_end,
00926 MPI_Aint buftype_extent)
00927 {
00928
00929
00930 int i, p, flat_buf_idx, size, buf_incr;
00931 int flat_buf_sz, size_in_buf, n_buftypes;
00932 ADIO_Offset off, len, rem_len, user_buf_idx;
00933
00934 int *curr_from_proc, *done_from_proc, *recv_buf_idx;
00935
00936
00937
00938
00939
00940
00941
00942 curr_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00943 done_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00944 recv_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00945
00946 for (i=0; i < nprocs; i++) {
00947 recv_buf_idx[i] = curr_from_proc[i] = 0;
00948 done_from_proc[i] = recd_from_proc[i];
00949 }
00950
00951 user_buf_idx = flat_buf->indices[0];
00952 flat_buf_idx = 0;
00953 n_buftypes = 0;
00954 flat_buf_sz = flat_buf->blocklens[0];
00955
00956
00957
00958
00959
00960 for (i=0; i<contig_access_count; i++) {
00961 off = offset_list[i];
00962 rem_len = (ADIO_Offset) len_list[i];
00963
00964
00965 while (rem_len != 0) {
00966 len = rem_len;
00967
00968
00969
00970
00971 p = ADIOI_Calc_aggregator(fd,
00972 off,
00973 min_st_offset,
00974 &len,
00975 fd_size,
00976 fd_start,
00977 fd_end);
00978
00979 if (recv_buf_idx[p] < recv_size[p]) {
00980 if (curr_from_proc[p]+len > done_from_proc[p]) {
00981 if (done_from_proc[p] > curr_from_proc[p]) {
00982 size = (int)ADIOI_MIN(curr_from_proc[p] + len -
00983 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
00984 buf_incr = done_from_proc[p] - curr_from_proc[p];
00985 ADIOI_BUF_INCR
00986 buf_incr = (int)(curr_from_proc[p]+len-done_from_proc[p]);
00987 curr_from_proc[p] = done_from_proc[p] + size;
00988 ADIOI_BUF_COPY
00989 }
00990 else {
00991 size = (int)ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
00992 buf_incr = (int)len;
00993 curr_from_proc[p] += size;
00994 ADIOI_BUF_COPY
00995 }
00996 }
00997 else {
00998 curr_from_proc[p] += (int)len;
00999 buf_incr = (int)len;
01000 ADIOI_BUF_INCR
01001 }
01002 }
01003 else {
01004 buf_incr = (int)len;
01005 ADIOI_BUF_INCR
01006 }
01007 off += len;
01008 rem_len -= len;
01009 }
01010 }
01011 for (i=0; i < nprocs; i++)
01012 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
01013
01014 ADIOI_Free(curr_from_proc);
01015 ADIOI_Free(done_from_proc);
01016 ADIOI_Free(recv_buf_idx);
01017 }