00001
00002
00003
00009
00010
00011
00012
00013
00014
00015 #include "adio.h"
00016 #include "adio_extern.h"
00017 #include "ad_bgl.h"
00018 #include "ad_bgl_pset.h"
00019 #include "ad_bgl_aggrs.h"
00020
00021 #ifdef AGGREGATION_PROFILE
00022 #include "mpe.h"
00023 #endif
00024 #ifdef PROFILE
00025 #include "mpe.h"
00026 #endif
00027
00028
00029 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00030 datatype, int nprocs, int myrank, ADIOI_Access
00031 *others_req, ADIO_Offset *offset_list,
00032 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00033 min_st_offset, ADIO_Offset fd_size,
00034 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00035 int *buf_idx, int *error_code);
00036 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00037 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00038 *offset_list, ADIO_Offset *len_list, int *send_size,
00039 int *recv_size, ADIO_Offset off, int size,
00040 int *count, int *start_pos, int *partial_recv,
00041 int *sent_to_proc, int nprocs,
00042 int myrank, int
00043 buftype_is_contig, int contig_access_count,
00044 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00045 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00046 ADIOI_Access *others_req,
00047 int *send_buf_idx, int *curr_to_proc,
00048 int *done_to_proc, int *hole, int iter,
00049 MPI_Aint buftype_extent, int *buf_idx, int *error_code);
00050 static void ADIOI_W_Exchange_data_alltoallv(
00051 ADIO_File fd, void *buf,
00052 char *write_buf,
00053 ADIOI_Flatlist_node *flat_buf,
00054 ADIO_Offset *offset_list,
00055 ADIO_Offset *len_list, int *send_size, int *recv_size,
00056 ADIO_Offset off, int size,
00057 int *count, int *start_pos, int *partial_recv,
00058 int *sent_to_proc, int nprocs, int myrank,
00059 int buftype_is_contig, int contig_access_count,
00060 ADIO_Offset min_st_offset,
00061 ADIO_Offset fd_size,
00062 ADIO_Offset *fd_start,
00063 ADIO_Offset *fd_end,
00064 ADIOI_Access *others_req,
00065 int *send_buf_idx, int *curr_to_proc,
00066 int *done_to_proc, int *hole,
00067 int iter, MPI_Aint buftype_extent, int *buf_idx,
00068 int *error_code);
00069 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00070 *flat_buf, char **send_buf, ADIO_Offset
00071 *offset_list, ADIO_Offset *len_list, int *send_size,
00072 MPI_Request *requests, int *sent_to_proc,
00073 int nprocs, int myrank,
00074 int contig_access_count, ADIO_Offset
00075 min_st_offset, ADIO_Offset fd_size,
00076 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00077 int *send_buf_idx, int *curr_to_proc,
00078 int *done_to_proc, int iter,
00079 MPI_Aint buftype_extent);
00080 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00081 *flat_buf, char **send_buf, ADIO_Offset
00082 *offset_list, ADIO_Offset *len_list, int *send_size,
00083 MPI_Request *requests, int *sent_to_proc,
00084 int nprocs, int myrank,
00085 int contig_access_count, ADIO_Offset
00086 min_st_offset, ADIO_Offset fd_size,
00087 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00088 int *send_buf_idx, int *curr_to_proc,
00089 int *done_to_proc, int iter,
00090 MPI_Aint buftype_extent);
00091 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00092 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00093 int nprocs, int nprocs_recv, int total_elements);
00094
00095
00096 void ADIOI_BGL_WriteStridedColl(ADIO_File fd, void *buf, int count,
00097 MPI_Datatype datatype, int file_ptr_type,
00098 ADIO_Offset offset, ADIO_Status *status, int
00099 *error_code)
00100 {
00101
00102
00103
00104
00105
00106
00107 ADIOI_Access *my_req;
00108
00109
00110
00111 ADIOI_Access *others_req;
00112
00113
00114
00115 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00116 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00117 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00118 ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
00119 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00120 *fd_end = NULL, *end_offsets = NULL;
00121 ADIO_Offset *bgl_offsets0 = NULL, *bgl_offsets = NULL;
00122 int ii;
00123
00124 int *buf_idx = NULL;
00125 ADIO_Offset *len_list = NULL;
00126 #if BGL_PROFILE
00127 BGLMPIO_T_CIO_RESET( 0, w )
00128 #endif
00129 #if 0
00130
00131 int old_error, tmp_error;
00132 #endif
00133 #ifdef PROFILE
00134 MPE_Log_event(13, 0, "start computation");
00135 #endif
00136
00137 #if 0
00138
00139 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
00140 ADIOI_IOStridedColl (fd, buf, count, ADIOI_WRITE, datatype,
00141 file_ptr_type, offset, status, error_code);
00142 return;
00143 }
00144 #endif
00145 MPI_Comm_size(fd->comm, &nprocs);
00146 MPI_Comm_rank(fd->comm, &myrank);
00147
00148
00149
00150
00151 nprocs_for_coll = fd->hints->cb_nodes;
00152 orig_fp = fd->fp_ind;
00153
00154 #if BGL_PROFILE
00155 BGLMPIO_T_CIO_SET_GET( 0, w, 0, 1, 0, BGLMPIO_CIO_LCOMP, BGLMPIO_CIO_LAST )
00156 #endif
00157
00158
00159
00160 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00161
00162
00163
00164
00165
00166
00167 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00168 &offset_list, &len_list, &start_offset,
00169 &end_offset, &contig_access_count);
00170
00171 #if BGL_PROFILE
00172 BGLMPIO_T_CIO_SET_GET( 0, w, 1, 1, 1, BGLMPIO_CIO_GATHER, BGLMPIO_CIO_LCOMP )
00173 #endif
00174
00175
00176
00177
00178
00179 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00180 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00181
00182 if (bglmpio_tunegather) {
00183 bgl_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
00184 bgl_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
00185 for (ii=0; ii<nprocs; ii++) {
00186 bgl_offsets0[ii*2] = 0;
00187 bgl_offsets0[ii*2+1] = 0;
00188 }
00189 bgl_offsets0[myrank*2] = start_offset;
00190 bgl_offsets0[myrank*2+1] = end_offset;
00191
00192 MPI_Allreduce( bgl_offsets0, bgl_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
00193
00194 for (ii=0; ii<nprocs; ii++) {
00195 st_offsets [ii] = bgl_offsets[ii*2] ;
00196 end_offsets[ii] = bgl_offsets[ii*2+1];
00197 }
00198 ADIOI_Free( bgl_offsets0 );
00199 ADIOI_Free( bgl_offsets );
00200 } else {
00201 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00202 ADIO_OFFSET, fd->comm);
00203 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00204 ADIO_OFFSET, fd->comm);
00205 }
00206
00207 #if BGL_PROFILE
00208 BGLMPIO_T_CIO_SET_GET( 0, w, 0, 1, 1, BGLMPIO_CIO_PATANA, BGLMPIO_CIO_GATHER )
00209 #endif
00210
00211
00212 for (i=1; i<nprocs; i++)
00213 if ((st_offsets[i] < end_offsets[i-1]) &&
00214 (st_offsets[i] <= end_offsets[i]))
00215 interleave_count++;
00216
00217
00218 }
00219
00220 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00221
00222 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
00223 (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
00224 {
00225
00226 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00227 ADIOI_Free(offset_list);
00228 ADIOI_Free(len_list);
00229 ADIOI_Free(st_offsets);
00230 ADIOI_Free(end_offsets);
00231 }
00232
00233 fd->fp_ind = orig_fp;
00234 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00235
00236 if (buftype_is_contig && filetype_is_contig) {
00237
00238 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00239 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00240 ADIO_WriteContig(fd, buf, count, datatype,
00241 ADIO_EXPLICIT_OFFSET,
00242 off, status, error_code);
00243 }
00244 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00245 0, status, error_code);
00246 }
00247 else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00248 offset, status, error_code);
00249
00250 return;
00251 }
00252
00253 #if BGL_PROFILE
00254 BGLMPIO_T_CIO_SET_GET( 0, w, 1, 1, 1, BGLMPIO_CIO_FD_PART, BGLMPIO_CIO_PATANA )
00255 #endif
00256
00257
00258
00259
00260
00261 if (bglmpio_tuneblocking)
00262 ADIOI_BGL_GPFS_Calc_file_domains(st_offsets, end_offsets, nprocs,
00263 nprocs_for_coll, &min_st_offset,
00264 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
00265 else
00266 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00267 nprocs_for_coll, &min_st_offset,
00268 &fd_start, &fd_end,
00269 fd->hints->min_fdomain_size, &fd_size,
00270 fd->hints->striping_unit);
00271
00272 #if BGL_PROFILE
00273 BGLMPIO_T_CIO_SET_GET( 0, w, 0, 1, 1, BGLMPIO_CIO_MYREQ, BGLMPIO_CIO_FD_PART )
00274 #endif
00275
00276
00277
00278
00279 if (bglmpio_tuneblocking)
00280 ADIOI_BGL_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00281 min_st_offset, fd_start, fd_end, fd_size,
00282 nprocs, &count_my_req_procs,
00283 &count_my_req_per_proc, &my_req,
00284 &buf_idx);
00285 else
00286 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00287 min_st_offset, fd_start, fd_end, fd_size,
00288 nprocs, &count_my_req_procs,
00289 &count_my_req_per_proc, &my_req,
00290 &buf_idx);
00291
00292 #if BGL_PROFILE
00293 BGLMPIO_T_CIO_SET_GET( 0, w, 1, 1, 1, BGLMPIO_CIO_OTHREQ, BGLMPIO_CIO_MYREQ )
00294 #endif
00295
00296
00297
00298
00299
00300
00301
00302
00303 if (bglmpio_tuneblocking)
00304 ADIOI_BGL_Calc_others_req(fd, count_my_req_procs,
00305 count_my_req_per_proc, my_req,
00306 nprocs, myrank,
00307 &count_others_req_procs, &others_req);
00308 else
00309 ADIOI_Calc_others_req(fd, count_my_req_procs,
00310 count_my_req_per_proc, my_req,
00311 nprocs, myrank,
00312 &count_others_req_procs, &others_req);
00313
00314 #if BGL_PROFILE
00315 BGLMPIO_T_CIO_SET_GET( 0, w, 1, 1, 1, BGLMPIO_CIO_DEXCH, BGLMPIO_CIO_OTHREQ )
00316 #endif
00317
00318 ADIOI_Free(count_my_req_per_proc);
00319 for (i=0; i < nprocs; i++) {
00320 if (my_req[i].count) {
00321 ADIOI_Free(my_req[i].offsets);
00322 ADIOI_Free(my_req[i].lens);
00323 }
00324 }
00325 ADIOI_Free(my_req);
00326
00327
00328 ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00329 others_req, offset_list,
00330 len_list, contig_access_count, min_st_offset,
00331 fd_size, fd_start, fd_end, buf_idx, error_code);
00332
00333 #if BGL_PROFILE
00334 BGLMPIO_T_CIO_SET_GET( 0, w, 1, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_DEXCH )
00335 BGLMPIO_T_CIO_SET_GET( 0, w, 0, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_MPIO_CRW )
00336
00337 BGLMPIO_T_CIO_REPORT( 0, w, fd, myrank )
00338 #endif
00339 #if 0
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 old_error = *error_code;
00354 if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
00355
00356
00357
00358 #ifdef ADIOI_MPE_LOGGING
00359 MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
00360 #endif
00361 if (fd->hints->cb_nodes == 1)
00362 MPI_Bcast(error_code, 1, MPI_INT,
00363 fd->hints->ranklist[0], fd->comm);
00364 else {
00365 tmp_error = *error_code;
00366 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
00367 MPI_MAX, fd->comm);
00368 }
00369 #ifdef ADIOI_MPE_LOGGING
00370 MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
00371 #endif
00372 #ifdef AGGREGATION_PROFILE
00373 MPE_Log_event (5012, 0, NULL);
00374 #endif
00375
00376 if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
00377 *error_code = old_error;
00378
00379
00380 #endif
00381
00382 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00383
00384 for (i=0; i<nprocs; i++) {
00385 if (others_req[i].count) {
00386 ADIOI_Free(others_req[i].offsets);
00387 ADIOI_Free(others_req[i].lens);
00388 ADIOI_Free(others_req[i].mem_ptrs);
00389 }
00390 }
00391 ADIOI_Free(others_req);
00392
00393 ADIOI_Free(buf_idx);
00394 ADIOI_Free(offset_list);
00395 ADIOI_Free(len_list);
00396 ADIOI_Free(st_offsets);
00397 ADIOI_Free(end_offsets);
00398 ADIOI_Free(fd_start);
00399 ADIOI_Free(fd_end);
00400
00401 #ifdef HAVE_STATUS_SET_BYTES
00402 if (status) {
00403 int bufsize, size;
00404
00405 MPI_Type_size(datatype, &size);
00406 bufsize = size * count;
00407 MPIR_Status_set_bytes(status, datatype, bufsize);
00408 }
00409
00410
00411 #endif
00412
00413 fd->fp_sys_posn = -1;
00414 #ifdef AGGREGATION_PROFILE
00415 MPE_Log_event (5013, 0, NULL);
00416 #endif
00417 }
00418
00419
00420
00421
00422
00423
00424 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00425 datatype, int nprocs,
00426 int myrank,
00427 ADIOI_Access
00428 *others_req, ADIO_Offset *offset_list,
00429 ADIO_Offset *len_list, int contig_access_count,
00430 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00431 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00432 int *buf_idx, int *error_code)
00433 {
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444 ADIO_Offset size=0;
00445 int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
00446 ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
00447 char *write_buf=NULL;
00448 int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
00449 int *partial_recv, *sent_to_proc, *start_pos, flag;
00450 int *send_buf_idx, *curr_to_proc, *done_to_proc;
00451 MPI_Status status;
00452 ADIOI_Flatlist_node *flat_buf=NULL;
00453 MPI_Aint buftype_extent;
00454 int info_flag, coll_bufsize;
00455 char *value;
00456 static char myname[] = "ADIOI_EXCH_AND_WRITE";
00457
00458 *error_code = MPI_SUCCESS;
00459
00460
00461
00462
00463
00464
00465 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
00466 ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
00467 &info_flag);
00468 coll_bufsize = atoi(value);
00469 ADIOI_Free(value);
00470
00471
00472 for (i=0; i < nprocs; i++) {
00473 if (others_req[i].count) {
00474 st_loc = others_req[i].offsets[0];
00475 end_loc = others_req[i].offsets[0];
00476 break;
00477 }
00478 }
00479
00480 for (i=0; i < nprocs; i++)
00481 for (j=0; j < others_req[i].count; j++) {
00482 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00483 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00484 + others_req[i].lens[j] - 1));
00485 }
00486
00487
00488
00489 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00490
00491 if ((st_loc==-1) && (end_loc==-1)) {
00492 ntimes = 0;
00493 }
00494
00495 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
00496 fd->comm);
00497
00498 if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize);
00499
00500 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00501
00502
00503 count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00504
00505
00506
00507 partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00508
00509
00510
00511
00512 send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00513
00514
00515
00516 recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00517
00518
00519 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00520
00521
00522
00523 send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00524 curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00525 done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00526
00527
00528 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00529
00530
00531
00532 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00533 if (!buftype_is_contig) {
00534 ADIOI_Flatten_datatype(datatype);
00535 flat_buf = ADIOI_Flatlist;
00536 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00537 }
00538 MPI_Type_extent(datatype, &buftype_extent);
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 done = 0;
00553 off = st_loc;
00554
00555 #ifdef PROFILE
00556 MPE_Log_event(14, 0, "end computation");
00557 #endif
00558
00559 for (m=0; m < ntimes; m++) {
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577 #ifdef PROFILE
00578 MPE_Log_event(13, 0, "start computation");
00579 #endif
00580 for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
00581
00582 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
00583
00584 for (i=0; i < nprocs; i++) {
00585 if (others_req[i].count) {
00586 start_pos[i] = curr_offlen_ptr[i];
00587 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
00588 if (partial_recv[i]) {
00589
00590
00591 req_off = others_req[i].offsets[j] +
00592 partial_recv[i];
00593 req_len = others_req[i].lens[j] -
00594 partial_recv[i];
00595 partial_recv[i] = 0;
00596
00597 others_req[i].offsets[j] = req_off;
00598 others_req[i].lens[j] = req_len;
00599 }
00600 else {
00601 req_off = others_req[i].offsets[j];
00602 req_len = others_req[i].lens[j];
00603 }
00604 if (req_off < off + size) {
00605 count[i]++;
00606 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
00607 MPI_Address(write_buf+req_off-off,
00608 &(others_req[i].mem_ptrs[j]));
00609 ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
00610 recv_size[i] += (int)(ADIOI_MIN(off + size - req_off,
00611 (unsigned)req_len));
00612
00613 if (off+size-req_off < (unsigned)req_len)
00614 {
00615 partial_recv[i] = (int) (off + size - req_off);
00616
00617
00618 if ((j+1 < others_req[i].count) &&
00619 (others_req[i].offsets[j+1] < off+size))
00620 {
00621 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00622 MPIR_ERR_RECOVERABLE,
00623 myname,
00624 __LINE__,
00625 MPI_ERR_ARG,
00626 "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
00627
00628
00629
00630 }
00631
00632 break;
00633 }
00634 }
00635 else break;
00636 }
00637 curr_offlen_ptr[i] = j;
00638 }
00639 }
00640
00641 #ifdef PROFILE
00642 MPE_Log_event(14, 0, "end computation");
00643 MPE_Log_event(7, 0, "start communication");
00644 #endif
00645 if (bglmpio_comm == 1)
00646 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00647 len_list, send_size, recv_size, off, size, count,
00648 start_pos, partial_recv,
00649 sent_to_proc, nprocs, myrank,
00650 buftype_is_contig, contig_access_count,
00651 min_st_offset, fd_size, fd_start, fd_end,
00652 others_req, send_buf_idx, curr_to_proc,
00653 done_to_proc, &hole, m, buftype_extent, buf_idx,
00654 error_code);
00655 else
00656 if (bglmpio_comm == 0)
00657 ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
00658 len_list, send_size, recv_size, off, size, count,
00659 start_pos, partial_recv,
00660 sent_to_proc, nprocs, myrank,
00661 buftype_is_contig, contig_access_count,
00662 min_st_offset, fd_size, fd_start, fd_end,
00663 others_req, send_buf_idx, curr_to_proc,
00664 done_to_proc, &hole, m, buftype_extent, buf_idx,
00665 error_code);
00666 if (*error_code != MPI_SUCCESS) return;
00667 #ifdef PROFILE
00668 MPE_Log_event(8, 0, "end communication");
00669 #endif
00670
00671 flag = 0;
00672 for (i=0; i<nprocs; i++)
00673 if (count[i]) flag = 1;
00674
00675 if (flag) {
00676 ADIOI_Assert(size == (int)size);
00677 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00678 off, &status, error_code);
00679 if (*error_code != MPI_SUCCESS) return;
00680 }
00681
00682 off += size;
00683 done += size;
00684 }
00685
00686 for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
00687 #ifdef PROFILE
00688 MPE_Log_event(7, 0, "start communication");
00689 #endif
00690 for (m=ntimes; m<max_ntimes; m++)
00691
00692 if (bglmpio_comm == 1)
00693 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00694 len_list, send_size, recv_size, off, size, count,
00695 start_pos, partial_recv,
00696 sent_to_proc, nprocs, myrank,
00697 buftype_is_contig, contig_access_count,
00698 min_st_offset, fd_size, fd_start, fd_end,
00699 others_req, send_buf_idx,
00700 curr_to_proc, done_to_proc, &hole, m,
00701 buftype_extent, buf_idx, error_code);
00702 else
00703 if (bglmpio_comm == 0)
00704 ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
00705 len_list, send_size, recv_size, off, size, count,
00706 start_pos, partial_recv,
00707 sent_to_proc, nprocs, myrank,
00708 buftype_is_contig, contig_access_count,
00709 min_st_offset, fd_size, fd_start, fd_end,
00710 others_req, send_buf_idx,
00711 curr_to_proc, done_to_proc, &hole, m,
00712 buftype_extent, buf_idx, error_code);
00713 if (*error_code != MPI_SUCCESS) return;
00714 #ifdef PROFILE
00715 MPE_Log_event(8, 0, "end communication");
00716 #endif
00717
00718 if (ntimes) ADIOI_Free(write_buf);
00719 ADIOI_Free(curr_offlen_ptr);
00720 ADIOI_Free(count);
00721 ADIOI_Free(partial_recv);
00722 ADIOI_Free(send_size);
00723 ADIOI_Free(recv_size);
00724 ADIOI_Free(sent_to_proc);
00725 ADIOI_Free(start_pos);
00726 ADIOI_Free(send_buf_idx);
00727 ADIOI_Free(curr_to_proc);
00728 ADIOI_Free(done_to_proc);
00729 }
00730
00731
00732
00733
00734
00735 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00736 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00737 *offset_list, ADIO_Offset *len_list, int *send_size,
00738 int *recv_size, ADIO_Offset off, int size,
00739 int *count, int *start_pos,
00740 int *partial_recv,
00741 int *sent_to_proc, int nprocs,
00742 int myrank, int
00743 buftype_is_contig, int contig_access_count,
00744 ADIO_Offset min_st_offset,
00745 ADIO_Offset fd_size,
00746 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00747 ADIOI_Access *others_req,
00748 int *send_buf_idx, int *curr_to_proc,
00749 int *done_to_proc, int *hole, int iter,
00750 MPI_Aint buftype_extent, int *buf_idx,
00751 int *error_code)
00752 {
00753 int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
00754 char **send_buf = NULL;
00755 MPI_Request *requests, *send_req;
00756 MPI_Datatype *recv_types;
00757 MPI_Status *statuses, status;
00758 int *srt_len, sum;
00759 ADIO_Offset *srt_off;
00760 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00761
00762
00763
00764
00765 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
00766
00767
00768
00769 nprocs_recv = 0;
00770 for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
00771
00772 recv_types = (MPI_Datatype *)
00773 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
00774
00775
00776 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00777 j = 0;
00778 for (i=0; i<nprocs; i++) {
00779 if (recv_size[i]) {
00780
00781 if (partial_recv[i]) {
00782 k = start_pos[i] + count[i] - 1;
00783 tmp_len[i] = others_req[i].lens[k];
00784 others_req[i].lens[k] = partial_recv[i];
00785 }
00786 MPI_Type_hindexed(count[i],
00787 &(others_req[i].lens[start_pos[i]]),
00788 &(others_req[i].mem_ptrs[start_pos[i]]),
00789 MPI_BYTE, recv_types+j);
00790
00791 MPI_Type_commit(recv_types+j);
00792 j++;
00793 }
00794 }
00795
00796
00797
00798
00799
00800 sum = 0;
00801 for (i=0; i<nprocs; i++) sum += count[i];
00802
00803
00804 if (sum) {
00805 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum)*sizeof(ADIO_Offset));
00806 srt_len = (int *) ADIOI_Malloc((sum)*sizeof(int));
00807
00808 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00809 nprocs, nprocs_recv, sum);
00810 }
00811
00812
00813 for (i=0; i<nprocs; i++)
00814 if (partial_recv[i]) {
00815 k = start_pos[i] + count[i] - 1;
00816 others_req[i].lens[k] = tmp_len[i];
00817 }
00818 ADIOI_Free(tmp_len);
00819
00820
00821
00822
00823
00824
00825
00826 *hole = 0;
00827 if (sum) {
00828 if (off != srt_off[0])
00829 *hole = 1;
00830 else {
00831 for (i=1; i<sum; i++) {
00832 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
00833 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
00834 if (new_len > srt_len[0]) srt_len[0] = new_len;
00835 }
00836 else
00837 break;
00838 }
00839 if (i < sum || size != srt_len[0])
00840 *hole = 1;
00841 }
00842
00843 ADIOI_Free(srt_off);
00844 ADIOI_Free(srt_len);
00845 }
00846
00847 if (nprocs_recv) {
00848 if (*hole) {
00849 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
00850 ADIO_EXPLICIT_OFFSET, off, &status, &err);
00851
00852 if (err != MPI_SUCCESS) {
00853 *error_code = MPIO_Err_create_code(err,
00854 MPIR_ERR_RECOVERABLE, myname,
00855 __LINE__, MPI_ERR_IO,
00856 "**ioRMWrdwr", 0);
00857 return;
00858 }
00859
00860 }
00861 }
00862
00863 nprocs_send = 0;
00864 for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
00865
00866 if (fd->atomicity) {
00867
00868 requests = (MPI_Request *)
00869 ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));
00870 send_req = requests;
00871 }
00872 else {
00873 requests = (MPI_Request *)
00874 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00875
00876
00877
00878 j = 0;
00879 for (i=0; i<nprocs; i++) {
00880 if (recv_size[i]) {
00881 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00882 fd->comm, requests+j);
00883 j++;
00884 }
00885 }
00886 send_req = requests + nprocs_recv;
00887 }
00888
00889
00890
00891
00892 #ifdef AGGREGATION_PROFILE
00893 MPE_Log_event (5032, 0, NULL);
00894 #endif
00895 if (buftype_is_contig) {
00896 j = 0;
00897 for (i=0; i < nprocs; i++)
00898 if (send_size[i]) {
00899 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
00900 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
00901 send_req+j);
00902 j++;
00903 buf_idx[i] += send_size[i];
00904 }
00905 }
00906 else if (nprocs_send) {
00907
00908 send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
00909 for (i=0; i < nprocs; i++)
00910 if (send_size[i])
00911 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00912
00913 ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
00914 offset_list, len_list, send_size,
00915 send_req,
00916 sent_to_proc, nprocs, myrank,
00917 contig_access_count,
00918 min_st_offset, fd_size, fd_start, fd_end,
00919 send_buf_idx, curr_to_proc, done_to_proc, iter,
00920 buftype_extent);
00921
00922 }
00923
00924 if (fd->atomicity) {
00925
00926 j = 0;
00927 for (i=0; i<nprocs; i++) {
00928 MPI_Status wkl_status;
00929 if (recv_size[i]) {
00930 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00931 fd->comm, &wkl_status);
00932 j++;
00933 }
00934 }
00935 }
00936
00937 for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
00938 ADIOI_Free(recv_types);
00939
00940 if (fd->atomicity) {
00941
00942 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
00943 sizeof(MPI_Status));
00944
00945 }
00946 else {
00947 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00948 sizeof(MPI_Status));
00949
00950 }
00951
00952 #ifdef NEEDS_MPI_TEST
00953 i = 0;
00954 if (fd->atomicity) {
00955
00956 while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
00957 }
00958 else {
00959 while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
00960 }
00961 #else
00962 if (fd->atomicity)
00963
00964 MPI_Waitall(nprocs_send, send_req, statuses);
00965 else
00966 MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
00967 #endif
00968
00969 #ifdef AGGREGATION_PROFILE
00970 MPE_Log_event (5033, 0, NULL);
00971 #endif
00972 ADIOI_Free(statuses);
00973 ADIOI_Free(requests);
00974 if (!buftype_is_contig && nprocs_send) {
00975 for (i=0; i < nprocs; i++)
00976 if (send_size[i]) ADIOI_Free(send_buf[i]);
00977 ADIOI_Free(send_buf);
00978 }
00979 }
00980
00981
00982 #define ADIOI_BUF_INCR \
00983 { \
00984 while (buf_incr) { \
00985 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00986 user_buf_idx += size_in_buf; \
00987 flat_buf_sz -= size_in_buf; \
00988 if (!flat_buf_sz) { \
00989 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00990 else { \
00991 flat_buf_idx = 0; \
00992 n_buftypes++; \
00993 } \
00994 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00995 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00996 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00997 } \
00998 buf_incr -= size_in_buf; \
00999 } \
01000 }
01001
01002
01003 #define ADIOI_BUF_COPY \
01004 { \
01005 while (size) { \
01006 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
01007 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
01008 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
01009 memcpy(&(send_buf[p][send_buf_idx[p]]), \
01010 ((char *) buf) + user_buf_idx, size_in_buf); \
01011 send_buf_idx[p] += size_in_buf; \
01012 user_buf_idx += size_in_buf; \
01013 flat_buf_sz -= size_in_buf; \
01014 if (!flat_buf_sz) { \
01015 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
01016 else { \
01017 flat_buf_idx = 0; \
01018 n_buftypes++; \
01019 } \
01020 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
01021 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
01022 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
01023 } \
01024 size -= size_in_buf; \
01025 buf_incr -= size_in_buf; \
01026 } \
01027 ADIOI_BUF_INCR \
01028 }
01029
01030 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
01031 *flat_buf, char **send_buf, ADIO_Offset
01032 *offset_list, ADIO_Offset *len_list, int *send_size,
01033 MPI_Request *requests, int *sent_to_proc,
01034 int nprocs, int myrank,
01035 int contig_access_count,
01036 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
01037 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
01038 int *send_buf_idx, int *curr_to_proc,
01039 int *done_to_proc, int iter,
01040 MPI_Aint buftype_extent)
01041 {
01042
01043
01044 int i, p, flat_buf_idx;
01045 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
01046 int jj, n_buftypes;
01047 ADIO_Offset off, len, rem_len, user_buf_idx;
01048
01049
01050
01051
01052
01053
01054
01055
01056 for (i=0; i < nprocs; i++) {
01057 send_buf_idx[i] = curr_to_proc[i] = 0;
01058 done_to_proc[i] = sent_to_proc[i];
01059 }
01060 jj = 0;
01061
01062 user_buf_idx = flat_buf->indices[0];
01063 flat_buf_idx = 0;
01064 n_buftypes = 0;
01065 flat_buf_sz = flat_buf->blocklens[0];
01066
01067
01068
01069
01070
01071 for (i=0; i<contig_access_count; i++) {
01072 off = offset_list[i];
01073 rem_len = len_list[i];
01074
01075
01076 while (rem_len != 0) {
01077 len = rem_len;
01078
01079
01080
01081
01082 p = ADIOI_BGL_Calc_aggregator(fd,
01083 off,
01084 min_st_offset,
01085 &len,
01086 fd_size,
01087 fd_start,
01088 fd_end);
01089
01090 if (send_buf_idx[p] < send_size[p]) {
01091 if (curr_to_proc[p]+len > done_to_proc[p]) {
01092 if (done_to_proc[p] > curr_to_proc[p]) {
01093 size = ADIOI_MIN(curr_to_proc[p] + len -
01094 done_to_proc[p], send_size[p]-send_buf_idx[p]);
01095 buf_incr = done_to_proc[p] - curr_to_proc[p];
01096 ADIOI_BUF_INCR
01097 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
01098 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
01099 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
01100 curr_to_proc[p] = done_to_proc[p] + size;
01101 ADIOI_BUF_COPY
01102 }
01103 else {
01104 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
01105 buf_incr = len;
01106 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
01107 curr_to_proc[p] += size;
01108 ADIOI_BUF_COPY
01109 }
01110 if (send_buf_idx[p] == send_size[p]) {
01111 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
01112 myrank+p+100*iter, fd->comm, requests+jj);
01113 jj++;
01114 }
01115 }
01116 else {
01117 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
01118 curr_to_proc[p] += len;
01119 buf_incr = len;
01120 ADIOI_BUF_INCR
01121 }
01122 }
01123 else {
01124 buf_incr = len;
01125 ADIOI_BUF_INCR
01126 }
01127 off += len;
01128 rem_len -= len;
01129 }
01130 }
01131 for (i=0; i < nprocs; i++)
01132 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
01133 }
01134
01135
01136
01137 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
01138 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
01139 int nprocs, int nprocs_recv, int total_elements)
01140 {
01141 typedef struct {
01142 ADIO_Offset *off_list;
01143 int *len_list;
01144 int nelem;
01145 } heap_struct;
01146
01147 heap_struct *a, tmp;
01148 int i, j, heapsize, l, r, k, smallest;
01149
01150 a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
01151
01152 j = 0;
01153 for (i=0; i<nprocs; i++)
01154 if (count[i]) {
01155 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
01156 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
01157 a[j].nelem = count[i];
01158 j++;
01159 }
01160
01161
01162
01163
01164 heapsize = nprocs_recv;
01165 for (i=heapsize/2 - 1; i>=0; i--) {
01166
01167
01168
01169
01170 k = i;
01171 while (1) {
01172 l = 2*(k+1) - 1;
01173 r = 2*(k+1);
01174
01175 if ((l < heapsize) &&
01176 (*(a[l].off_list) < *(a[k].off_list)))
01177 smallest = l;
01178 else smallest = k;
01179
01180 if ((r < heapsize) &&
01181 (*(a[r].off_list) < *(a[smallest].off_list)))
01182 smallest = r;
01183
01184 if (smallest != k) {
01185 tmp.off_list = a[k].off_list;
01186 tmp.len_list = a[k].len_list;
01187 tmp.nelem = a[k].nelem;
01188
01189 a[k].off_list = a[smallest].off_list;
01190 a[k].len_list = a[smallest].len_list;
01191 a[k].nelem = a[smallest].nelem;
01192
01193 a[smallest].off_list = tmp.off_list;
01194 a[smallest].len_list = tmp.len_list;
01195 a[smallest].nelem = tmp.nelem;
01196
01197 k = smallest;
01198 }
01199 else break;
01200 }
01201 }
01202
01203 for (i=0; i<total_elements; i++) {
01204
01205 srt_off[i] = *(a[0].off_list);
01206 srt_len[i] = *(a[0].len_list);
01207 (a[0].nelem)--;
01208
01209 if (!a[0].nelem) {
01210 a[0].off_list = a[heapsize-1].off_list;
01211 a[0].len_list = a[heapsize-1].len_list;
01212 a[0].nelem = a[heapsize-1].nelem;
01213 heapsize--;
01214 }
01215 else {
01216 (a[0].off_list)++;
01217 (a[0].len_list)++;
01218 }
01219
01220
01221 k = 0;
01222 while (1) {
01223 l = 2*(k+1) - 1;
01224 r = 2*(k+1);
01225
01226 if ((l < heapsize) &&
01227 (*(a[l].off_list) < *(a[k].off_list)))
01228 smallest = l;
01229 else smallest = k;
01230
01231 if ((r < heapsize) &&
01232 (*(a[r].off_list) < *(a[smallest].off_list)))
01233 smallest = r;
01234
01235 if (smallest != k) {
01236 tmp.off_list = a[k].off_list;
01237 tmp.len_list = a[k].len_list;
01238 tmp.nelem = a[k].nelem;
01239
01240 a[k].off_list = a[smallest].off_list;
01241 a[k].len_list = a[smallest].len_list;
01242 a[k].nelem = a[smallest].nelem;
01243
01244 a[smallest].off_list = tmp.off_list;
01245 a[smallest].len_list = tmp.len_list;
01246 a[smallest].nelem = tmp.nelem;
01247
01248 k = smallest;
01249 }
01250 else break;
01251 }
01252 }
01253
01254 ADIOI_Free(a);
01255 }
01256
01257
01258 static void ADIOI_W_Exchange_data_alltoallv(
01259 ADIO_File fd, void *buf,
01260 char *write_buf,
01261 ADIOI_Flatlist_node *flat_buf,
01262 ADIO_Offset *offset_list,
01263 ADIO_Offset *len_list, int *send_size, int *recv_size,
01264 ADIO_Offset off, int size,
01265 int *count, int *start_pos, int *partial_recv,
01266 int *sent_to_proc, int nprocs, int myrank,
01267 int buftype_is_contig, int contig_access_count,
01268 ADIO_Offset min_st_offset,
01269 ADIO_Offset fd_size,
01270 ADIO_Offset *fd_start,
01271 ADIO_Offset *fd_end,
01272 ADIOI_Access *others_req,
01273 int *send_buf_idx, int *curr_to_proc,
01274 int *done_to_proc, int *hole,
01275 int iter, MPI_Aint buftype_extent, int *buf_idx,
01276 int *error_code)
01277 {
01278 int i, j, k=0, nprocs_recv, nprocs_send, *tmp_len, err;
01279 char **send_buf = NULL;
01280 MPI_Request *send_req=NULL;
01281 MPI_Status status;
01282 int rtail, stail;
01283 char *sbuf_ptr, *to_ptr;
01284 int len;
01285 int *sdispls, *rdispls;
01286 char *all_recv_buf, *all_send_buf;
01287 int *srt_len, sum;
01288 ADIO_Offset *srt_off;
01289 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
01290
01291
01292
01293
01294 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
01295
01296 nprocs_recv = 0;
01297 for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; }
01298 nprocs_send = 0;
01299 for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; }
01300
01301
01302 rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
01303 rtail = 0;
01304 for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
01305
01306
01307 all_recv_buf = (char *) ADIOI_Malloc( rtail );
01308
01309
01310 sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
01311 stail = 0;
01312 for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
01313
01314
01315 all_send_buf = (char *) ADIOI_Malloc( stail );
01316 if (buftype_is_contig) {
01317 for (i=0; i<nprocs; i++)
01318 {
01319 if (send_size[i]) {
01320 sbuf_ptr = all_send_buf + sdispls[i];
01321 memcpy( sbuf_ptr, buf + buf_idx[i], send_size[i] );
01322 buf_idx[i] += send_size[i];
01323 }
01324 }
01325 } else {
01326 send_buf = (char **) ADIOI_Malloc( nprocs * sizeof(char *) );
01327 for (i=0; i<nprocs; i++)
01328 send_buf[i] = all_send_buf + sdispls[i];
01329 ADIOI_Fill_send_buffer_nosend(fd, buf, flat_buf, send_buf,
01330 offset_list, len_list, send_size,
01331 send_req,
01332 sent_to_proc, nprocs, myrank,
01333 contig_access_count,
01334 min_st_offset, fd_size, fd_start, fd_end,
01335 send_buf_idx, curr_to_proc, done_to_proc, iter,
01336 buftype_extent);
01337 }
01338
01339
01340 MPI_Alltoallv(
01341 all_send_buf, send_size, sdispls, MPI_BYTE,
01342 all_recv_buf, recv_size, rdispls, MPI_BYTE,
01343 fd->comm );
01344
01345
01346
01347
01348
01349
01350 sum = 0;
01351 for (i=0; i<nprocs; i++) sum += count[i];
01352 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
01353 srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
01354
01355 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
01356 nprocs, nprocs_recv, sum);
01357
01358
01359 *hole = 0;
01360
01361 if((srt_off[0] > off) ||
01362 ((srt_off[sum-1] + srt_len[sum-1]) < (off + size)))
01363 {
01364 *hole = 1;
01365 }
01366 else
01367 for (i=0; i<sum-1; i++)
01368 if (srt_off[i]+srt_len[i] < srt_off[i+1]) {
01369 *hole = 1;
01370 break;
01371 }
01372
01373 ADIOI_Free(srt_off);
01374 ADIOI_Free(srt_len);
01375
01376 if (nprocs_recv) {
01377 if (*hole) {
01378 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
01379 ADIO_EXPLICIT_OFFSET, off, &status, &err);
01380
01381 if (err != MPI_SUCCESS) {
01382 *error_code = MPIO_Err_create_code(err,
01383 MPIR_ERR_RECOVERABLE, myname,
01384 __LINE__, MPI_ERR_IO,
01385 "**ioRMWrdwr", 0);
01386 return;
01387 }
01388
01389 }
01390 }
01391
01392
01393 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
01394 for (i=0; i<nprocs; i++)
01395 {
01396 if (recv_size[i]) {
01397 if (partial_recv[i]) {
01398 k = start_pos[i] + count[i] - 1;
01399 tmp_len[i] = others_req[i].lens[k];
01400 others_req[i].lens[k] = partial_recv[i];
01401 }
01402
01403 sbuf_ptr = all_recv_buf + rdispls[i];
01404 for (j=0; j<count[i]; j++) {
01405 ADIOI_ENSURE_AINT_FITS_IN_PTR(others_req[i].mem_ptrs[ start_pos[i]+j ]);
01406 to_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
01407 len = others_req[i].lens[ start_pos[i]+j ] ;
01408 memcpy( to_ptr, sbuf_ptr, len );
01409 sbuf_ptr += len;
01410 }
01411
01412
01413 if (partial_recv[i]) {
01414 k = start_pos[i] + count[i] - 1;
01415 others_req[i].lens[k] = tmp_len[i];
01416 }
01417
01418 }
01419 }
01420
01421 ADIOI_Free( tmp_len );
01422 ADIOI_Free( all_send_buf );
01423 ADIOI_Free( all_recv_buf );
01424 ADIOI_Free(sdispls);
01425 ADIOI_Free(rdispls);
01426 return;
01427 }
01428
01429 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, void *buf, ADIOI_Flatlist_node
01430 *flat_buf, char **send_buf, ADIO_Offset
01431 *offset_list, ADIO_Offset *len_list, int *send_size,
01432 MPI_Request *requests, int *sent_to_proc,
01433 int nprocs, int myrank,
01434 int contig_access_count,
01435 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
01436 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
01437 int *send_buf_idx, int *curr_to_proc,
01438 int *done_to_proc, int iter,
01439 MPI_Aint buftype_extent)
01440 {
01441
01442
01443 int i, p, flat_buf_idx;
01444 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
01445 int jj, n_buftypes;
01446 ADIO_Offset off, len, rem_len, user_buf_idx;
01447
01448
01449
01450
01451
01452
01453
01454
01455 for (i=0; i < nprocs; i++) {
01456 send_buf_idx[i] = curr_to_proc[i] = 0;
01457 done_to_proc[i] = sent_to_proc[i];
01458 }
01459 jj = 0;
01460
01461 user_buf_idx = flat_buf->indices[0];
01462 flat_buf_idx = 0;
01463 n_buftypes = 0;
01464 flat_buf_sz = flat_buf->blocklens[0];
01465
01466
01467
01468
01469
01470 for (i=0; i<contig_access_count; i++) {
01471 off = offset_list[i];
01472 rem_len = len_list[i];
01473
01474
01475 while (rem_len != 0) {
01476 len = rem_len;
01477
01478
01479
01480
01481 p = ADIOI_BGL_Calc_aggregator(fd,
01482 off,
01483 min_st_offset,
01484 &len,
01485 fd_size,
01486 fd_start,
01487 fd_end);
01488
01489 if (send_buf_idx[p] < send_size[p]) {
01490 if (curr_to_proc[p]+len > done_to_proc[p]) {
01491 if (done_to_proc[p] > curr_to_proc[p]) {
01492 size = ADIOI_MIN(curr_to_proc[p] + len -
01493 done_to_proc[p], send_size[p]-send_buf_idx[p]);
01494 buf_incr = done_to_proc[p] - curr_to_proc[p];
01495 ADIOI_BUF_INCR
01496 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
01497 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
01498 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
01499 curr_to_proc[p] = done_to_proc[p] + size;
01500 ADIOI_BUF_COPY
01501 }
01502 else {
01503 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
01504 buf_incr = len;
01505 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
01506 curr_to_proc[p] += size;
01507 ADIOI_BUF_COPY
01508 }
01509
01510
01511
01512
01513
01514
01515
01516
01517 }
01518 else {
01519 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
01520 curr_to_proc[p] += (int)len;
01521 buf_incr = len;
01522 ADIOI_BUF_INCR
01523 }
01524 }
01525 else {
01526 buf_incr = len;
01527 ADIOI_BUF_INCR
01528 }
01529 off += len;
01530 rem_len -= len;
01531 }
01532 }
01533 for (i=0; i < nprocs; i++)
01534 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
01535 }