00001
00002
00003
00004
00005
00006
00007
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010
00011 #ifdef USE_DBG_LOGGING
00012 #define RDCOLL_DEBUG 1
00013 #endif
00014 #ifdef AGGREGATION_PROFILE
00015 #include "mpe.h"
00016 #endif
00017
00018
00019 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00020 datatype, int nprocs,
00021 int myrank, ADIOI_Access
00022 *others_req, ADIO_Offset *offset_list,
00023 ADIO_Offset *len_list, int contig_access_count,
00024 ADIO_Offset
00025 min_st_offset, ADIO_Offset fd_size,
00026 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00027 int *buf_idx, int *error_code);
00028 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00029 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00030 *len_list, int *send_size, int *recv_size,
00031 int *count, int *start_pos,
00032 int *partial_send,
00033 int *recd_from_proc, int nprocs,
00034 int myrank, int
00035 buftype_is_contig, int contig_access_count,
00036 ADIO_Offset min_st_offset,
00037 ADIO_Offset fd_size,
00038 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00039 ADIOI_Access *others_req,
00040 int iter,
00041 MPI_Aint buftype_extent, int *buf_idx);
00042 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00043 *flat_buf, char **recv_buf, ADIO_Offset
00044 *offset_list, ADIO_Offset *len_list,
00045 unsigned *recv_size,
00046 MPI_Request *requests, MPI_Status *statuses,
00047 int *recd_from_proc, int nprocs,
00048 int contig_access_count,
00049 ADIO_Offset min_st_offset,
00050 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00051 ADIO_Offset *fd_end,
00052 MPI_Aint buftype_extent);
00053
00054
00055 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
00056 MPI_Datatype datatype, int file_ptr_type,
00057 ADIO_Offset offset, ADIO_Status *status, int
00058 *error_code)
00059 {
00060
00061
00062
00063
00064
00065
00066 ADIOI_Access *my_req;
00067
00068
00069
00070 ADIOI_Access *others_req;
00071
00072
00073
00074 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00075 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00076 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00077 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
00078 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00079 *fd_end = NULL, *end_offsets = NULL;
00080 ADIO_Offset *len_list = NULL;
00081 int *buf_idx = NULL;
00082
00083 #ifdef HAVE_STATUS_SET_BYTES
00084 int bufsize, size;
00085 #endif
00086
00087 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
00088 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
00089 file_ptr_type, offset, status, error_code);
00090 return;
00091 }
00092
00093
00094 MPI_Comm_size(fd->comm, &nprocs);
00095 MPI_Comm_rank(fd->comm, &myrank);
00096
00097
00098 nprocs_for_coll = fd->hints->cb_nodes;
00099 orig_fp = fd->fp_ind;
00100
00101
00102 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00103
00104
00105
00106
00107
00108
00109 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00110 &offset_list, &len_list, &start_offset,
00111 &end_offset, &contig_access_count);
00112
00113 #ifdef RDCOLL_DEBUG
00114 for (i=0; i<contig_access_count; i++) {
00115 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
00116 myrank, offset_list[i], len_list[i]);
00117 }
00118 #endif
00119
00120
00121
00122
00123
00124 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00125 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00126
00127 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00128 ADIO_OFFSET, fd->comm);
00129 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00130 ADIO_OFFSET, fd->comm);
00131
00132
00133 for (i=1; i<nprocs; i++)
00134 if ((st_offsets[i] < end_offsets[i-1]) &&
00135 (st_offsets[i] <= end_offsets[i]))
00136 interleave_count++;
00137
00138
00139 }
00140
00141 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00142
00143 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
00144 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
00145 {
00146
00147 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00148 ADIOI_Free(offset_list);
00149 ADIOI_Free(len_list);
00150 ADIOI_Free(st_offsets);
00151 ADIOI_Free(end_offsets);
00152 }
00153
00154 fd->fp_ind = orig_fp;
00155 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00156
00157 if (buftype_is_contig && filetype_is_contig) {
00158 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00159 off = fd->disp + (fd->etype_size) * offset;
00160 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
00161 off, status, error_code);
00162 }
00163 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00164 0, status, error_code);
00165 }
00166 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
00167 offset, status, error_code);
00168
00169 return;
00170 }
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00189 nprocs_for_coll, &min_st_offset,
00190 &fd_start, &fd_end,
00191 fd->hints->min_fdomain_size, &fd_size,
00192 fd->hints->striping_unit);
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00207 min_st_offset, fd_start, fd_end, fd_size,
00208 nprocs, &count_my_req_procs,
00209 &count_my_req_per_proc, &my_req,
00210 &buf_idx);
00211
00212
00213
00214
00215
00216
00217
00218
00219 ADIOI_Calc_others_req(fd, count_my_req_procs,
00220 count_my_req_per_proc, my_req,
00221 nprocs, myrank, &count_others_req_procs,
00222 &others_req);
00223
00224
00225
00226
00227 ADIOI_Free(count_my_req_per_proc);
00228 for (i=0; i<nprocs; i++) {
00229 if (my_req[i].count) {
00230 ADIOI_Free(my_req[i].offsets);
00231 ADIOI_Free(my_req[i].lens);
00232 }
00233 }
00234 ADIOI_Free(my_req);
00235
00236
00237
00238
00239
00240 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
00241 others_req, offset_list,
00242 len_list, contig_access_count, min_st_offset,
00243 fd_size, fd_start, fd_end, buf_idx, error_code);
00244
00245 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00246
00247
00248 for (i=0; i<nprocs; i++) {
00249 if (others_req[i].count) {
00250 ADIOI_Free(others_req[i].offsets);
00251 ADIOI_Free(others_req[i].lens);
00252 ADIOI_Free(others_req[i].mem_ptrs);
00253 }
00254 }
00255 ADIOI_Free(others_req);
00256
00257 ADIOI_Free(buf_idx);
00258 ADIOI_Free(offset_list);
00259 ADIOI_Free(len_list);
00260 ADIOI_Free(st_offsets);
00261 ADIOI_Free(end_offsets);
00262 ADIOI_Free(fd_start);
00263 ADIOI_Free(fd_end);
00264
00265 #ifdef HAVE_STATUS_SET_BYTES
00266 MPI_Type_size(datatype, &size);
00267 bufsize = size * count;
00268 MPIR_Status_set_bytes(status, datatype, bufsize);
00269
00270
00271
00272 #endif
00273
00274 fd->fp_sys_posn = -1;
00275 }
00276
00277 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
00278 datatype, int file_ptr_type, ADIO_Offset
00279 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
00280 **len_list_ptr, ADIO_Offset *start_offset_ptr,
00281 ADIO_Offset *end_offset_ptr, int
00282 *contig_access_count_ptr)
00283 {
00284 int filetype_size, etype_size;
00285 unsigned buftype_size;
00286 int i, j, k;
00287 ADIO_Offset i_offset;
00288 ADIO_Offset frd_size=0, old_frd_size=0;
00289 int st_index=0;
00290 ADIO_Offset n_filetypes, etype_in_filetype;
00291 ADIO_Offset abs_off_in_filetype=0;
00292 ADIO_Offset bufsize;
00293 ADIO_Offset sum, n_etypes_in_filetype, size_in_filetype;
00294 int contig_access_count, filetype_is_contig;
00295 ADIO_Offset *len_list;
00296 MPI_Aint filetype_extent, filetype_lb;
00297 ADIOI_Flatlist_node *flat_file;
00298 ADIO_Offset *offset_list, off, end_offset=0, disp;
00299
00300 #ifdef AGGREGATION_PROFILE
00301 MPE_Log_event (5028, 0, NULL);
00302 #endif
00303
00304
00305
00306
00307 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00308
00309 MPI_Type_size(fd->filetype, &filetype_size);
00310 MPI_Type_extent(fd->filetype, &filetype_extent);
00311 MPI_Type_lb(fd->filetype, &filetype_lb);
00312 MPI_Type_size(datatype, (int*)&buftype_size);
00313 etype_size = fd->etype_size;
00314
00315 if ( ! filetype_size ) {
00316 *contig_access_count_ptr = 0;
00317 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00318 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00319
00320
00321 offset_list = *offset_list_ptr;
00322 len_list = *len_list_ptr;
00323 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00324 fd->disp + (ADIO_Offset)etype_size * offset;
00325 len_list[0] = 0;
00326 *start_offset_ptr = offset_list[0];
00327 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00328
00329 return;
00330 }
00331
00332 if (filetype_is_contig) {
00333 *contig_access_count_ptr = 1;
00334 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00335 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00336
00337
00338 offset_list = *offset_list_ptr;
00339 len_list = *len_list_ptr;
00340 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00341 fd->disp + (ADIO_Offset)etype_size * offset;
00342 len_list[0] = (ADIO_Offset)bufcount * (ADIO_Offset)buftype_size;
00343 *start_offset_ptr = offset_list[0];
00344 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00345
00346
00347 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
00348 }
00349
00350 else {
00351
00352
00353
00354
00355 flat_file = CtvAccess(ADIOI_Flatlist);
00356 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
00357 disp = fd->disp;
00358
00359 #ifdef RDCOLL_DEBUG
00360 {
00361 int ii;
00362 DBG_FPRINTF(stderr, "flattened %3d : ", flat_file->count );
00363 for (ii=0; ii<flat_file->count; ii++) {
00364 DBG_FPRINTF(stderr, "%16qd:%-16qd", flat_file->indices[ii], flat_file->blocklens[ii] );
00365 }
00366 DBG_FPRINTF(stderr, "\n" );
00367 }
00368 #endif
00369 if (file_ptr_type == ADIO_INDIVIDUAL) {
00370
00371 offset = fd->fp_ind - disp;
00372 n_filetypes = (offset - flat_file->indices[0]) / filetype_extent;
00373 offset -= (ADIO_Offset)n_filetypes * filetype_extent;
00374
00375
00376
00377 for (i=0; i<flat_file->count; i++) {
00378 ADIO_Offset dist;
00379 if (flat_file->blocklens[i] == 0) continue;
00380 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
00381
00382 if (dist == 0) {
00383 i++;
00384 offset = flat_file->indices[i];
00385 frd_size = flat_file->blocklens[i];
00386 break;
00387 }
00388 if (dist > 0) {
00389 frd_size = dist;
00390 break;
00391 }
00392 }
00393 st_index = i;
00394 offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
00395 }
00396 else {
00397 n_etypes_in_filetype = filetype_size/etype_size;
00398 n_filetypes = offset / n_etypes_in_filetype;
00399 etype_in_filetype = offset % n_etypes_in_filetype;
00400 size_in_filetype = etype_in_filetype * etype_size;
00401
00402 sum = 0;
00403 for (i=0; i<flat_file->count; i++) {
00404 sum += flat_file->blocklens[i];
00405 if (sum > size_in_filetype) {
00406 st_index = i;
00407 frd_size = sum - size_in_filetype;
00408 abs_off_in_filetype = flat_file->indices[i] +
00409 size_in_filetype - (sum - flat_file->blocklens[i]);
00410 break;
00411 }
00412 }
00413
00414
00415 offset = disp + n_filetypes* (ADIO_Offset)filetype_extent +
00416 abs_off_in_filetype;
00417 }
00418
00419
00420
00421 old_frd_size = frd_size;
00422 contig_access_count = i_offset = 0;
00423 j = st_index;
00424 bufsize = (ADIO_Offset)buftype_size * (ADIO_Offset)bufcount;
00425 frd_size = ADIOI_MIN(frd_size, bufsize);
00426 while (i_offset < bufsize) {
00427 if (frd_size) contig_access_count++;
00428 i_offset += frd_size;
00429 j = (j + 1) % flat_file->count;
00430 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
00431 }
00432
00433
00434
00435 *offset_list_ptr = (ADIO_Offset *)
00436 ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
00437 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
00438
00439
00440 offset_list = *offset_list_ptr;
00441 len_list = *len_list_ptr;
00442
00443
00444
00445 *start_offset_ptr = offset;
00446
00447 i_offset = k = 0;
00448 j = st_index;
00449 off = offset;
00450 frd_size = ADIOI_MIN(old_frd_size, bufsize);
00451 while (i_offset < bufsize) {
00452 if (frd_size) {
00453 offset_list[k] = off;
00454 len_list[k] = frd_size;
00455 k++;
00456 }
00457 i_offset += frd_size;
00458 end_offset = off + frd_size - 1;
00459
00460
00461
00462
00463 if (off + frd_size < disp + flat_file->indices[j] +
00464 flat_file->blocklens[j] +
00465 n_filetypes* (ADIO_Offset)filetype_extent)
00466 {
00467 off += frd_size;
00468
00469
00470
00471 }
00472 else {
00473 j = (j+1) % flat_file->count;
00474 n_filetypes += (j == 0) ? 1 : 0;
00475 while (flat_file->blocklens[j]==0) {
00476 j = (j+1) % flat_file->count;
00477 n_filetypes += (j == 0) ? 1 : 0;
00478
00479
00480 }
00481 off = disp + flat_file->indices[j] +
00482 n_filetypes* (ADIO_Offset)filetype_extent;
00483 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
00484 }
00485 }
00486
00487
00488 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00489
00490 *contig_access_count_ptr = contig_access_count;
00491 *end_offset_ptr = end_offset;
00492 }
00493 #ifdef AGGREGATION_PROFILE
00494 MPE_Log_event (5029, 0, NULL);
00495 #endif
00496 }
00497
00498 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00499 datatype, int nprocs,
00500 int myrank, ADIOI_Access
00501 *others_req, ADIO_Offset *offset_list,
00502 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00503 min_st_offset, ADIO_Offset fd_size,
00504 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00505 int *buf_idx, int *error_code)
00506 {
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
00518 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
00519 char *read_buf = NULL, *tmp_buf;
00520 int *curr_offlen_ptr, *count, *send_size, *recv_size;
00521 int *partial_send, *recd_from_proc, *start_pos;
00522
00523 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
00524 int req_len, flag, rank;
00525 MPI_Status status;
00526 ADIOI_Flatlist_node *flat_buf=NULL;
00527 MPI_Aint buftype_extent;
00528 int coll_bufsize;
00529
00530 *error_code = MPI_SUCCESS;
00531
00532
00533
00534
00535
00536
00537
00538 coll_bufsize = fd->hints->cb_buffer_size;
00539
00540
00541 for (i=0; i < nprocs; i++) {
00542 if (others_req[i].count) {
00543 st_loc = others_req[i].offsets[0];
00544 end_loc = others_req[i].offsets[0];
00545 break;
00546 }
00547 }
00548
00549
00550 for (i=0; i < nprocs; i++)
00551 for (j=0; j<others_req[i].count; j++) {
00552 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00553 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00554 + others_req[i].lens[j] - 1));
00555 }
00556
00557
00558
00559
00560
00561
00562 if ((st_loc==-1) && (end_loc==-1)) {
00563
00564 ntimes = 0;
00565 }
00566 else {
00567
00568 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00569 }
00570
00571 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
00572
00573 if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
00574
00575 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00576
00577
00578 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00579
00580
00581
00582 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00583
00584
00585
00586
00587 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00588
00589
00590 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00591
00592
00593
00594 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00595
00596
00597
00598 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00599
00600
00601
00602 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00603 if (!buftype_is_contig) {
00604 ADIOI_Flatten_datatype(datatype);
00605 flat_buf = CtvAccess(ADIOI_Flatlist);
00606 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00607 }
00608 MPI_Type_extent(datatype, &buftype_extent);
00609
00610 done = 0;
00611 off = st_loc;
00612 for_curr_iter = for_next_iter = 0;
00613
00614 MPI_Comm_rank(fd->comm, &rank);
00615
00616 for (m=0; m<ntimes; m++) {
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
00655 real_off = off - for_curr_iter;
00656 real_size = size + for_curr_iter;
00657
00658 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00659 for_next_iter = 0;
00660
00661 for (i=0; i<nprocs; i++) {
00662 #ifdef RDCOLL_DEBUG
00663 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
00664 #endif
00665 if (others_req[i].count) {
00666 start_pos[i] = curr_offlen_ptr[i];
00667 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
00668 j++) {
00669 if (partial_send[i]) {
00670
00671
00672 req_off = others_req[i].offsets[j] +
00673 partial_send[i];
00674 req_len = others_req[i].lens[j] -
00675 partial_send[i];
00676 partial_send[i] = 0;
00677
00678 others_req[i].offsets[j] = req_off;
00679 others_req[i].lens[j] = req_len;
00680 }
00681 else {
00682 req_off = others_req[i].offsets[j];
00683 req_len = others_req[i].lens[j];
00684 }
00685 if (req_off < real_off + real_size) {
00686 count[i]++;
00687 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
00688 MPI_Address(read_buf+req_off-real_off,
00689 &(others_req[i].mem_ptrs[j]));
00690 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
00691 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
00692 (ADIO_Offset)(unsigned)req_len));
00693
00694 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
00695 partial_send[i] = (int) (real_off + real_size - req_off);
00696 if ((j+1 < others_req[i].count) &&
00697 (others_req[i].offsets[j+1] <
00698 real_off+real_size)) {
00699
00700
00701 for_next_iter = ADIOI_MAX(for_next_iter,
00702 real_off + real_size - others_req[i].offsets[j+1]);
00703
00704
00705 }
00706 break;
00707 }
00708 }
00709 else break;
00710 }
00711 curr_offlen_ptr[i] = j;
00712 }
00713 }
00714
00715 flag = 0;
00716 for (i=0; i<nprocs; i++)
00717 if (count[i]) flag = 1;
00718
00719 if (flag) {
00720 ADIOI_Assert(size == (int)size);
00721 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
00722 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
00723 if (*error_code != MPI_SUCCESS) return;
00724 }
00725
00726 for_curr_iter = for_next_iter;
00727
00728 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00729 send_size, recv_size, count,
00730 start_pos, partial_send, recd_from_proc, nprocs,
00731 myrank,
00732 buftype_is_contig, contig_access_count,
00733 min_st_offset, fd_size, fd_start, fd_end,
00734 others_req,
00735 m, buftype_extent, buf_idx);
00736
00737
00738 if (for_next_iter) {
00739 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
00740 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
00741 ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
00742 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
00743 ADIOI_Free(read_buf);
00744 read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
00745 memcpy(read_buf, tmp_buf, for_next_iter);
00746 ADIOI_Free(tmp_buf);
00747 }
00748
00749 off += size;
00750 done += size;
00751 }
00752
00753 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00754 for (m=ntimes; m<max_ntimes; m++)
00755
00756 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00757 send_size, recv_size, count,
00758 start_pos, partial_send, recd_from_proc, nprocs,
00759 myrank,
00760 buftype_is_contig, contig_access_count,
00761 min_st_offset, fd_size, fd_start, fd_end,
00762 others_req, m,
00763 buftype_extent, buf_idx);
00764
00765 if (ntimes) ADIOI_Free(read_buf);
00766 ADIOI_Free(curr_offlen_ptr);
00767 ADIOI_Free(count);
00768 ADIOI_Free(partial_send);
00769 ADIOI_Free(send_size);
00770 ADIOI_Free(recv_size);
00771 ADIOI_Free(recd_from_proc);
00772 ADIOI_Free(start_pos);
00773 }
00774
00775 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00776 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00777 *len_list, int *send_size, int *recv_size,
00778 int *count, int *start_pos, int *partial_send,
00779 int *recd_from_proc, int nprocs,
00780 int myrank, int
00781 buftype_is_contig, int contig_access_count,
00782 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00783 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00784 ADIOI_Access *others_req,
00785 int iter, MPI_Aint buftype_extent, int *buf_idx)
00786 {
00787 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
00788 char **recv_buf = NULL;
00789 MPI_Request *requests;
00790 MPI_Datatype send_type;
00791 MPI_Status *statuses;
00792
00793
00794
00795
00796 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
00797
00798 nprocs_recv = 0;
00799 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
00800
00801 nprocs_send = 0;
00802 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
00803
00804 requests = (MPI_Request *)
00805 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00806
00807
00808
00809
00810
00811 #ifdef AGGREGATION_PROFILE
00812 MPE_Log_event (5032, 0, NULL);
00813 #endif
00814
00815 if (buftype_is_contig) {
00816 j = 0;
00817 for (i=0; i < nprocs; i++)
00818 if (recv_size[i]) {
00819 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
00820 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
00821 j++;
00822 buf_idx[i] += recv_size[i];
00823 }
00824 }
00825 else {
00826
00827 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
00828 for (i=0; i < nprocs; i++)
00829 if (recv_size[i]) recv_buf[i] =
00830 (char *) ADIOI_Malloc(recv_size[i]);
00831
00832 j = 0;
00833 for (i=0; i < nprocs; i++)
00834 if (recv_size[i]) {
00835 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
00836 myrank+i+100*iter, fd->comm, requests+j);
00837 j++;
00838 #ifdef RDCOLL_DEBUG
00839 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
00840 myrank, recv_size[i], myrank+i+100*iter);
00841 #endif
00842 }
00843 }
00844
00845
00846
00847 j = 0;
00848 for (i=0; i<nprocs; i++) {
00849 if (send_size[i]) {
00850
00851 if (partial_send[i]) {
00852 k = start_pos[i] + count[i] - 1;
00853 tmp = others_req[i].lens[k];
00854 others_req[i].lens[k] = partial_send[i];
00855 }
00856 MPI_Type_hindexed(count[i],
00857 &(others_req[i].lens[start_pos[i]]),
00858 &(others_req[i].mem_ptrs[start_pos[i]]),
00859 MPI_BYTE, &send_type);
00860
00861 MPI_Type_commit(&send_type);
00862 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
00863 fd->comm, requests+nprocs_recv+j);
00864 MPI_Type_free(&send_type);
00865 if (partial_send[i]) others_req[i].lens[k] = tmp;
00866 j++;
00867 }
00868 }
00869
00870 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00871 sizeof(MPI_Status));
00872
00873
00874
00875 if (nprocs_recv) {
00876 #ifdef NEEDS_MPI_TEST
00877 j = 0;
00878 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
00879 #else
00880 MPI_Waitall(nprocs_recv, requests, statuses);
00881 #endif
00882
00883
00884 if (!buftype_is_contig)
00885 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
00886 offset_list, len_list, (unsigned*)recv_size,
00887 requests, statuses, recd_from_proc,
00888 nprocs, contig_access_count,
00889 min_st_offset, fd_size, fd_start, fd_end,
00890 buftype_extent);
00891 }
00892
00893
00894 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
00895
00896 ADIOI_Free(statuses);
00897 ADIOI_Free(requests);
00898
00899 if (!buftype_is_contig) {
00900 for (i=0; i < nprocs; i++)
00901 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
00902 ADIOI_Free(recv_buf);
00903 }
00904 #ifdef AGGREGATION_PROFILE
00905 MPE_Log_event (5033, 0, NULL);
00906 #endif
00907 }
00908
00909 #define ADIOI_BUF_INCR \
00910 { \
00911 while (buf_incr) { \
00912 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00913 user_buf_idx += size_in_buf; \
00914 flat_buf_sz -= size_in_buf; \
00915 if (!flat_buf_sz) { \
00916 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00917 else { \
00918 flat_buf_idx = 0; \
00919 n_buftypes++; \
00920 } \
00921 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00922 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00923 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00924 } \
00925 buf_incr -= size_in_buf; \
00926 } \
00927 }
00928
00929
00930 #define ADIOI_BUF_COPY \
00931 { \
00932 while (size) { \
00933 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00934 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00935 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00936 memcpy(((char *) buf) + user_buf_idx, \
00937 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
00938 recv_buf_idx[p] += size_in_buf; \
00939 user_buf_idx += size_in_buf; \
00940 flat_buf_sz -= size_in_buf; \
00941 if (!flat_buf_sz) { \
00942 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00943 else { \
00944 flat_buf_idx = 0; \
00945 n_buftypes++; \
00946 } \
00947 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00948 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00949 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00950 } \
00951 size -= size_in_buf; \
00952 buf_incr -= size_in_buf; \
00953 } \
00954 ADIOI_BUF_INCR \
00955 }
00956
00957 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00958 *flat_buf, char **recv_buf, ADIO_Offset
00959 *offset_list, ADIO_Offset *len_list,
00960 unsigned *recv_size,
00961 MPI_Request *requests, MPI_Status *statuses,
00962 int *recd_from_proc, int nprocs,
00963 int contig_access_count,
00964 ADIO_Offset min_st_offset,
00965 ADIO_Offset fd_size, ADIO_Offset *fd_start,
00966 ADIO_Offset *fd_end,
00967 MPI_Aint buftype_extent)
00968 {
00969
00970
00971
00972 int i, p, flat_buf_idx;
00973 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
00974 int n_buftypes;
00975 ADIO_Offset off, len, rem_len, user_buf_idx;
00976
00977 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
00978
00979 ADIOI_UNREFERENCED_ARG(requests);
00980 ADIOI_UNREFERENCED_ARG(statuses);
00981
00982
00983
00984
00985
00986
00987
00988 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00989 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00990 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00991
00992 for (i=0; i < nprocs; i++) {
00993 recv_buf_idx[i] = curr_from_proc[i] = 0;
00994 done_from_proc[i] = recd_from_proc[i];
00995 }
00996
00997 user_buf_idx = flat_buf->indices[0];
00998 flat_buf_idx = 0;
00999 n_buftypes = 0;
01000 flat_buf_sz = flat_buf->blocklens[0];
01001
01002
01003
01004
01005
01006 for (i=0; i<contig_access_count; i++) {
01007 off = offset_list[i];
01008 rem_len = len_list[i];
01009
01010
01011 while (rem_len != 0) {
01012 len = rem_len;
01013
01014
01015
01016
01017 p = ADIOI_Calc_aggregator(fd,
01018 off,
01019 min_st_offset,
01020 &len,
01021 fd_size,
01022 fd_start,
01023 fd_end);
01024
01025 if (recv_buf_idx[p] < recv_size[p]) {
01026 if (curr_from_proc[p]+len > done_from_proc[p]) {
01027 if (done_from_proc[p] > curr_from_proc[p]) {
01028 size = ADIOI_MIN(curr_from_proc[p] + len -
01029 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
01030 buf_incr = done_from_proc[p] - curr_from_proc[p];
01031 ADIOI_BUF_INCR
01032 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
01033 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
01034 curr_from_proc[p] = done_from_proc[p] + size;
01035 ADIOI_BUF_COPY
01036 }
01037 else {
01038 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
01039 buf_incr = len;
01040 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
01041 curr_from_proc[p] += (unsigned) size;
01042 ADIOI_BUF_COPY
01043 }
01044 }
01045 else {
01046 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
01047 curr_from_proc[p] += (unsigned) len;
01048 buf_incr = len;
01049 ADIOI_BUF_INCR
01050 }
01051 }
01052 else {
01053 buf_incr = len;
01054 ADIOI_BUF_INCR
01055 }
01056 off += len;
01057 rem_len -= len;
01058 }
01059 }
01060 for (i=0; i < nprocs; i++)
01061 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
01062
01063 ADIOI_Free(curr_from_proc);
01064 ADIOI_Free(done_from_proc);
01065 ADIOI_Free(recv_buf_idx);
01066 }