00001
00002
00003
00004
00005
00006
00007
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010
00011 #ifdef AGGREGATION_PROFILE
00012 #include "mpe.h"
00013 #endif
00014
00015
00016 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00017 datatype, int nprocs, int myrank,
00018 ADIOI_Access
00019 *others_req, ADIO_Offset *offset_list,
00020 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00021 min_st_offset, ADIO_Offset fd_size,
00022 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00023 int *buf_idx, int *error_code);
00024 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00025 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00026 *offset_list, ADIO_Offset *len_list, int *send_size,
00027 int *recv_size, ADIO_Offset off, int size,
00028 int *count, int *start_pos, int *partial_recv,
00029 int *sent_to_proc, int nprocs,
00030 int myrank, int
00031 buftype_is_contig, int contig_access_count,
00032 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00033 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00034 ADIOI_Access *others_req,
00035 int *send_buf_idx, int *curr_to_proc,
00036 int *done_to_proc, int *hole, int iter,
00037 MPI_Aint buftype_extent, int *buf_idx, int *error_code);
00038 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00039 *flat_buf, char **send_buf, ADIO_Offset
00040 *offset_list, ADIO_Offset *len_list, int *send_size,
00041 MPI_Request *requests, int *sent_to_proc,
00042 int nprocs, int myrank,
00043 int contig_access_count, ADIO_Offset
00044 min_st_offset, ADIO_Offset fd_size,
00045 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00046 int *send_buf_idx, int *curr_to_proc,
00047 int *done_to_proc, int iter,
00048 MPI_Aint buftype_extent);
00049 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00050 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00051 int nprocs, int nprocs_recv, int total_elements);
00052
00053
00054 void ADIOI_GEN_WriteStridedColl(ADIO_File fd, void *buf, int count,
00055 MPI_Datatype datatype, int file_ptr_type,
00056 ADIO_Offset offset, ADIO_Status *status, int
00057 *error_code)
00058 {
00059
00060
00061
00062
00063
00064
00065 ADIOI_Access *my_req;
00066
00067
00068
00069 ADIOI_Access *others_req;
00070
00071
00072
00073 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00074 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00075 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00076 ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
00077 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00078 *fd_end = NULL, *end_offsets = NULL;
00079 int *buf_idx = NULL;
00080 ADIO_Offset *len_list = NULL;
00081 int old_error, tmp_error;
00082
00083 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
00084 ADIOI_IOStridedColl (fd, buf, count, ADIOI_WRITE, datatype,
00085 file_ptr_type, offset, status, error_code);
00086 return;
00087 }
00088
00089 MPI_Comm_size(fd->comm, &nprocs);
00090 MPI_Comm_rank(fd->comm, &myrank);
00091
00092
00093
00094
00095 nprocs_for_coll = fd->hints->cb_nodes;
00096 orig_fp = fd->fp_ind;
00097
00098
00099 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00100
00101
00102
00103
00104
00105
00106 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00107 &offset_list, &len_list, &start_offset,
00108 &end_offset, &contig_access_count);
00109
00110
00111
00112
00113
00114 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00115 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00116
00117 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00118 ADIO_OFFSET, fd->comm);
00119 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00120 ADIO_OFFSET, fd->comm);
00121
00122
00123 for (i=1; i<nprocs; i++)
00124 if ((st_offsets[i] < end_offsets[i-1]) &&
00125 (st_offsets[i] <= end_offsets[i]))
00126 interleave_count++;
00127
00128
00129 }
00130
00131 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00132
00133 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
00134 (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
00135 {
00136
00137 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00138 ADIOI_Free(offset_list);
00139 ADIOI_Free(len_list);
00140 ADIOI_Free(st_offsets);
00141 ADIOI_Free(end_offsets);
00142 }
00143
00144 fd->fp_ind = orig_fp;
00145 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00146
00147 if (buftype_is_contig && filetype_is_contig) {
00148 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00149 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00150 ADIO_WriteContig(fd, buf, count, datatype,
00151 ADIO_EXPLICIT_OFFSET,
00152 off, status, error_code);
00153 }
00154 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00155 0, status, error_code);
00156 }
00157 else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00158 offset, status, error_code);
00159
00160 return;
00161 }
00162
00163
00164
00165
00166
00167 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00168 nprocs_for_coll, &min_st_offset,
00169 &fd_start, &fd_end,
00170 fd->hints->min_fdomain_size, &fd_size,
00171 fd->hints->striping_unit);
00172
00173
00174
00175
00176
00177 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00178 min_st_offset, fd_start, fd_end, fd_size,
00179 nprocs, &count_my_req_procs,
00180 &count_my_req_per_proc, &my_req,
00181 &buf_idx);
00182
00183
00184
00185
00186
00187
00188
00189
00190 ADIOI_Calc_others_req(fd, count_my_req_procs,
00191 count_my_req_per_proc, my_req,
00192 nprocs, myrank,
00193 &count_others_req_procs, &others_req);
00194
00195 ADIOI_Free(count_my_req_per_proc);
00196 for (i=0; i < nprocs; i++) {
00197 if (my_req[i].count) {
00198 ADIOI_Free(my_req[i].offsets);
00199 ADIOI_Free(my_req[i].lens);
00200 }
00201 }
00202 ADIOI_Free(my_req);
00203
00204
00205 ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00206 others_req, offset_list,
00207 len_list, contig_access_count, min_st_offset,
00208 fd_size, fd_start, fd_end, buf_idx, error_code);
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 old_error = *error_code;
00222 if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
00223
00224
00225
00226 #ifdef ADIOI_MPE_LOGGING
00227 MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
00228 #endif
00229 if (fd->hints->cb_nodes == 1)
00230 MPI_Bcast(error_code, 1, MPI_INT,
00231 fd->hints->ranklist[0], fd->comm);
00232 else {
00233 tmp_error = *error_code;
00234 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
00235 MPI_MAX, fd->comm);
00236 }
00237 #ifdef ADIOI_MPE_LOGGING
00238 MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
00239 #endif
00240 #ifdef AGGREGATION_PROFILE
00241 MPE_Log_event (5012, 0, NULL);
00242 #endif
00243
00244 if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
00245 *error_code = old_error;
00246
00247
00248 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00249
00250
00251
00252 for (i=0; i<nprocs; i++) {
00253 if (others_req[i].count) {
00254 ADIOI_Free(others_req[i].offsets);
00255 ADIOI_Free(others_req[i].lens);
00256 ADIOI_Free(others_req[i].mem_ptrs);
00257 }
00258 }
00259 ADIOI_Free(others_req);
00260
00261 ADIOI_Free(buf_idx);
00262 ADIOI_Free(offset_list);
00263 ADIOI_Free(len_list);
00264 ADIOI_Free(st_offsets);
00265 ADIOI_Free(end_offsets);
00266 ADIOI_Free(fd_start);
00267 ADIOI_Free(fd_end);
00268
00269 #ifdef HAVE_STATUS_SET_BYTES
00270 if (status) {
00271 int bufsize, size;
00272
00273 MPI_Type_size(datatype, &size);
00274 bufsize = size * count;
00275 MPIR_Status_set_bytes(status, datatype, bufsize);
00276 }
00277
00278
00279 #endif
00280
00281 fd->fp_sys_posn = -1;
00282 #ifdef AGGREGATION_PROFILE
00283 MPE_Log_event (5013, 0, NULL);
00284 #endif
00285 }
00286
00287
00288
00289
00290
00291
00292 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00293 datatype, int nprocs,
00294 int myrank,
00295 ADIOI_Access
00296 *others_req, ADIO_Offset *offset_list,
00297 ADIO_Offset *len_list, int contig_access_count,
00298 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00299 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00300 int *buf_idx, int *error_code)
00301 {
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312 ADIO_Offset size=0;
00313 int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
00314 ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
00315 char *write_buf=NULL;
00316 int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
00317 int *partial_recv, *sent_to_proc, *start_pos, flag;
00318 int *send_buf_idx, *curr_to_proc, *done_to_proc;
00319 MPI_Status status;
00320 ADIOI_Flatlist_node *flat_buf=NULL;
00321 MPI_Aint buftype_extent;
00322 int info_flag, coll_bufsize;
00323 char *value;
00324 static char myname[] = "ADIOI_EXCH_AND_WRITE";
00325
00326 *error_code = MPI_SUCCESS;
00327
00328
00329
00330
00331
00332
00333 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
00334 ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
00335 &info_flag);
00336 coll_bufsize = atoi(value);
00337 ADIOI_Free(value);
00338
00339
00340 for (i=0; i < nprocs; i++) {
00341 if (others_req[i].count) {
00342 st_loc = others_req[i].offsets[0];
00343 end_loc = others_req[i].offsets[0];
00344 break;
00345 }
00346 }
00347
00348 for (i=0; i < nprocs; i++)
00349 for (j=0; j < others_req[i].count; j++) {
00350 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00351 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00352 + others_req[i].lens[j] - 1));
00353 }
00354
00355
00356
00357 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00358
00359 if ((st_loc==-1) && (end_loc==-1)) {
00360 ntimes = 0;
00361 }
00362
00363 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
00364 fd->comm);
00365
00366 if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize);
00367
00368 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00369
00370
00371 count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00372
00373
00374
00375 partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00376
00377
00378
00379
00380 send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00381
00382
00383
00384 recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00385
00386
00387 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00388
00389
00390
00391 send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00392 curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00393 done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00394
00395
00396 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00397
00398
00399
00400 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00401 if (!buftype_is_contig) {
00402 ADIOI_Flatten_datatype(datatype);
00403 flat_buf = CtvAccess(ADIOI_Flatlist);
00404 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00405 }
00406 MPI_Type_extent(datatype, &buftype_extent);
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 done = 0;
00421 off = st_loc;
00422
00423 for (m=0; m < ntimes; m++) {
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441 for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
00442
00443 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
00444
00445 for (i=0; i < nprocs; i++) {
00446 if (others_req[i].count) {
00447 start_pos[i] = curr_offlen_ptr[i];
00448 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
00449 if (partial_recv[i]) {
00450
00451
00452 req_off = others_req[i].offsets[j] +
00453 partial_recv[i];
00454 req_len = others_req[i].lens[j] -
00455 partial_recv[i];
00456 partial_recv[i] = 0;
00457
00458 others_req[i].offsets[j] = req_off;
00459 others_req[i].lens[j] = req_len;
00460 }
00461 else {
00462 req_off = others_req[i].offsets[j];
00463 req_len = others_req[i].lens[j];
00464 }
00465 if (req_off < off + size) {
00466 count[i]++;
00467 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
00468 MPI_Address(write_buf+req_off-off,
00469 &(others_req[i].mem_ptrs[j]));
00470 ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
00471 recv_size[i] += (int)(ADIOI_MIN(off + size - req_off,
00472 (unsigned)req_len));
00473
00474 if (off+size-req_off < (unsigned)req_len)
00475 {
00476 partial_recv[i] = (int) (off + size - req_off);
00477
00478
00479 if ((j+1 < others_req[i].count) &&
00480 (others_req[i].offsets[j+1] < off+size))
00481 {
00482 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00483 MPIR_ERR_RECOVERABLE,
00484 myname,
00485 __LINE__,
00486 MPI_ERR_ARG,
00487 "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
00488
00489
00490
00491 }
00492
00493 break;
00494 }
00495 }
00496 else break;
00497 }
00498 curr_offlen_ptr[i] = j;
00499 }
00500 }
00501
00502 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00503 len_list, send_size, recv_size, off, size, count,
00504 start_pos, partial_recv,
00505 sent_to_proc, nprocs, myrank,
00506 buftype_is_contig, contig_access_count,
00507 min_st_offset, fd_size, fd_start, fd_end,
00508 others_req, send_buf_idx, curr_to_proc,
00509 done_to_proc, &hole, m, buftype_extent, buf_idx,
00510 error_code);
00511 if (*error_code != MPI_SUCCESS) return;
00512
00513 flag = 0;
00514 for (i=0; i<nprocs; i++)
00515 if (count[i]) flag = 1;
00516
00517 if (flag) {
00518 ADIOI_Assert(size == (int)size);
00519 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00520 off, &status, error_code);
00521 if (*error_code != MPI_SUCCESS) return;
00522 }
00523
00524 off += size;
00525 done += size;
00526 }
00527
00528 for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
00529 for (m=ntimes; m<max_ntimes; m++) {
00530
00531 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00532 len_list, send_size, recv_size, off, size, count,
00533 start_pos, partial_recv,
00534 sent_to_proc, nprocs, myrank,
00535 buftype_is_contig, contig_access_count,
00536 min_st_offset, fd_size, fd_start, fd_end,
00537 others_req, send_buf_idx,
00538 curr_to_proc, done_to_proc, &hole, m,
00539 buftype_extent, buf_idx, error_code);
00540 if (*error_code != MPI_SUCCESS) return;
00541 }
00542
00543 if (ntimes) ADIOI_Free(write_buf);
00544 ADIOI_Free(curr_offlen_ptr);
00545 ADIOI_Free(count);
00546 ADIOI_Free(partial_recv);
00547 ADIOI_Free(send_size);
00548 ADIOI_Free(recv_size);
00549 ADIOI_Free(sent_to_proc);
00550 ADIOI_Free(start_pos);
00551 ADIOI_Free(send_buf_idx);
00552 ADIOI_Free(curr_to_proc);
00553 ADIOI_Free(done_to_proc);
00554 }
00555
00556
00557
00558
00559
00560 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00561 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00562 *offset_list, ADIO_Offset *len_list, int *send_size,
00563 int *recv_size, ADIO_Offset off, int size,
00564 int *count, int *start_pos,
00565 int *partial_recv,
00566 int *sent_to_proc, int nprocs,
00567 int myrank, int
00568 buftype_is_contig, int contig_access_count,
00569 ADIO_Offset min_st_offset,
00570 ADIO_Offset fd_size,
00571 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00572 ADIOI_Access *others_req,
00573 int *send_buf_idx, int *curr_to_proc,
00574 int *done_to_proc, int *hole, int iter,
00575 MPI_Aint buftype_extent, int *buf_idx,
00576 int *error_code)
00577 {
00578 int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
00579 char **send_buf = NULL;
00580 MPI_Request *requests, *send_req;
00581 MPI_Datatype *recv_types;
00582 MPI_Status *statuses, status;
00583 int *srt_len, sum;
00584 ADIO_Offset *srt_off;
00585 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00586
00587
00588
00589
00590 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
00591
00592
00593
00594 nprocs_recv = 0;
00595 for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
00596
00597 recv_types = (MPI_Datatype *)
00598 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
00599
00600
00601 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00602 j = 0;
00603 for (i=0; i<nprocs; i++) {
00604 if (recv_size[i]) {
00605
00606 if (partial_recv[i]) {
00607 k = start_pos[i] + count[i] - 1;
00608 tmp_len[i] = others_req[i].lens[k];
00609 others_req[i].lens[k] = partial_recv[i];
00610 }
00611 MPI_Type_hindexed(count[i],
00612 &(others_req[i].lens[start_pos[i]]),
00613 &(others_req[i].mem_ptrs[start_pos[i]]),
00614 MPI_BYTE, recv_types+j);
00615
00616 MPI_Type_commit(recv_types+j);
00617 j++;
00618 }
00619 }
00620
00621
00622
00623
00624
00625 sum = 0;
00626 for (i=0; i<nprocs; i++) sum += count[i];
00627
00628
00629 if (sum) {
00630 srt_off = (ADIO_Offset *) ADIOI_Malloc(sum*sizeof(ADIO_Offset));
00631 srt_len = (int *) ADIOI_Malloc(sum*sizeof(int));
00632
00633 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00634 nprocs, nprocs_recv, sum);
00635 }
00636
00637
00638 for (i=0; i<nprocs; i++)
00639 if (partial_recv[i]) {
00640 k = start_pos[i] + count[i] - 1;
00641 others_req[i].lens[k] = tmp_len[i];
00642 }
00643 ADIOI_Free(tmp_len);
00644
00645
00646
00647
00648
00649
00650
00651
00652 *hole = 0;
00653 if (sum) {
00654 if (off != srt_off[0])
00655 *hole = 1;
00656 else {
00657 for (i=1; i<sum; i++) {
00658 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
00659 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
00660 if (new_len > srt_len[0]) srt_len[0] = new_len;
00661 }
00662 else
00663 break;
00664 }
00665 if (i < sum || size != srt_len[0])
00666 *hole = 1;
00667 }
00668
00669 ADIOI_Free(srt_off);
00670 ADIOI_Free(srt_len);
00671 }
00672
00673 if (nprocs_recv) {
00674 if (*hole) {
00675 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
00676 ADIO_EXPLICIT_OFFSET, off, &status, &err);
00677
00678 if (err != MPI_SUCCESS) {
00679 *error_code = MPIO_Err_create_code(err,
00680 MPIR_ERR_RECOVERABLE, myname,
00681 __LINE__, MPI_ERR_IO,
00682 "**ioRMWrdwr", 0);
00683 return;
00684 }
00685
00686 }
00687 }
00688
00689 nprocs_send = 0;
00690 for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
00691
00692 if (fd->atomicity) {
00693
00694 requests = (MPI_Request *)
00695 ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));
00696 send_req = requests;
00697 }
00698 else {
00699 requests = (MPI_Request *)
00700 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00701
00702
00703
00704 j = 0;
00705 for (i=0; i<nprocs; i++) {
00706 if (recv_size[i]) {
00707 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00708 fd->comm, requests+j);
00709 j++;
00710 }
00711 }
00712 send_req = requests + nprocs_recv;
00713 }
00714
00715
00716
00717
00718 #ifdef AGGREGATION_PROFILE
00719 MPE_Log_event (5032, 0, NULL);
00720 #endif
00721 if (buftype_is_contig) {
00722 j = 0;
00723 for (i=0; i < nprocs; i++)
00724 if (send_size[i]) {
00725 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
00726 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
00727 send_req+j);
00728 j++;
00729 buf_idx[i] += send_size[i];
00730 }
00731 }
00732 else if (nprocs_send) {
00733
00734 send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
00735 for (i=0; i < nprocs; i++)
00736 if (send_size[i])
00737 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00738
00739 ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
00740 offset_list, len_list, send_size,
00741 send_req,
00742 sent_to_proc, nprocs, myrank,
00743 contig_access_count,
00744 min_st_offset, fd_size, fd_start, fd_end,
00745 send_buf_idx, curr_to_proc, done_to_proc, iter,
00746 buftype_extent);
00747
00748 }
00749
00750 if (fd->atomicity) {
00751
00752 j = 0;
00753 for (i=0; i<nprocs; i++) {
00754 MPI_Status wkl_status;
00755 if (recv_size[i]) {
00756 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00757 fd->comm, &wkl_status);
00758 j++;
00759 }
00760 }
00761 }
00762
00763 for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
00764 ADIOI_Free(recv_types);
00765
00766 if (fd->atomicity) {
00767
00768 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
00769 sizeof(MPI_Status));
00770
00771 }
00772 else {
00773 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00774 sizeof(MPI_Status));
00775
00776 }
00777
00778 #ifdef NEEDS_MPI_TEST
00779 i = 0;
00780 if (fd->atomicity) {
00781
00782 while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
00783 }
00784 else {
00785 while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
00786 }
00787 #else
00788 if (fd->atomicity)
00789
00790 MPI_Waitall(nprocs_send, send_req, statuses);
00791 else
00792 MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
00793 #endif
00794
00795 #ifdef AGGREGATION_PROFILE
00796 MPE_Log_event (5033, 0, NULL);
00797 #endif
00798 ADIOI_Free(statuses);
00799 ADIOI_Free(requests);
00800 if (!buftype_is_contig && nprocs_send) {
00801 for (i=0; i < nprocs; i++)
00802 if (send_size[i]) ADIOI_Free(send_buf[i]);
00803 ADIOI_Free(send_buf);
00804 }
00805 }
00806
00807 #define ADIOI_BUF_INCR \
00808 { \
00809 while (buf_incr) { \
00810 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00811 user_buf_idx += size_in_buf; \
00812 flat_buf_sz -= size_in_buf; \
00813 if (!flat_buf_sz) { \
00814 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00815 else { \
00816 flat_buf_idx = 0; \
00817 n_buftypes++; \
00818 } \
00819 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00820 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00821 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00822 } \
00823 buf_incr -= size_in_buf; \
00824 } \
00825 }
00826
00827
00828 #define ADIOI_BUF_COPY \
00829 { \
00830 while (size) { \
00831 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00832 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00833 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00834 memcpy(&(send_buf[p][send_buf_idx[p]]), \
00835 ((char *) buf) + user_buf_idx, size_in_buf); \
00836 send_buf_idx[p] += size_in_buf; \
00837 user_buf_idx += size_in_buf; \
00838 flat_buf_sz -= size_in_buf; \
00839 if (!flat_buf_sz) { \
00840 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00841 else { \
00842 flat_buf_idx = 0; \
00843 n_buftypes++; \
00844 } \
00845 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00846 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00847 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00848 } \
00849 size -= size_in_buf; \
00850 buf_incr -= size_in_buf; \
00851 } \
00852 ADIOI_BUF_INCR \
00853 }
00854
00855
00856
00857
00858
00859 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00860 *flat_buf, char **send_buf, ADIO_Offset
00861 *offset_list, ADIO_Offset *len_list, int *send_size,
00862 MPI_Request *requests, int *sent_to_proc,
00863 int nprocs, int myrank,
00864 int contig_access_count,
00865 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00866 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00867 int *send_buf_idx, int *curr_to_proc,
00868 int *done_to_proc, int iter,
00869 MPI_Aint buftype_extent)
00870 {
00871
00872
00873 int i, p, flat_buf_idx;
00874 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
00875 int jj, n_buftypes;
00876 ADIO_Offset off, len, rem_len, user_buf_idx;
00877
00878
00879
00880
00881
00882
00883
00884
00885 for (i=0; i < nprocs; i++) {
00886 send_buf_idx[i] = curr_to_proc[i] = 0;
00887 done_to_proc[i] = sent_to_proc[i];
00888 }
00889 jj = 0;
00890
00891 user_buf_idx = flat_buf->indices[0];
00892 flat_buf_idx = 0;
00893 n_buftypes = 0;
00894 flat_buf_sz = flat_buf->blocklens[0];
00895
00896
00897
00898
00899
00900 for (i=0; i<contig_access_count; i++) {
00901 off = offset_list[i];
00902 rem_len = len_list[i];
00903
00904
00905 while (rem_len != 0) {
00906 len = rem_len;
00907
00908
00909
00910
00911 p = ADIOI_Calc_aggregator(fd,
00912 off,
00913 min_st_offset,
00914 &len,
00915 fd_size,
00916 fd_start,
00917 fd_end);
00918
00919 if (send_buf_idx[p] < send_size[p]) {
00920 if (curr_to_proc[p]+len > done_to_proc[p]) {
00921 if (done_to_proc[p] > curr_to_proc[p]) {
00922 size = ADIOI_MIN(curr_to_proc[p] + len -
00923 done_to_proc[p], send_size[p]-send_buf_idx[p]);
00924 buf_incr = done_to_proc[p] - curr_to_proc[p];
00925 ADIOI_BUF_INCR
00926 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
00927 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
00928 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
00929 curr_to_proc[p] = done_to_proc[p] + size;
00930 ADIOI_BUF_COPY
00931 }
00932 else {
00933 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
00934 buf_incr = len;
00935 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
00936 curr_to_proc[p] += size;
00937 ADIOI_BUF_COPY
00938 }
00939 if (send_buf_idx[p] == send_size[p]) {
00940 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
00941 myrank+p+100*iter, fd->comm, requests+jj);
00942 jj++;
00943 }
00944 }
00945 else {
00946 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
00947 curr_to_proc[p] += len;
00948 buf_incr = len;
00949 ADIOI_BUF_INCR
00950 }
00951 }
00952 else {
00953 buf_incr = len;
00954 ADIOI_BUF_INCR
00955 }
00956 off += len;
00957 rem_len -= len;
00958 }
00959 }
00960 for (i=0; i < nprocs; i++)
00961 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
00962 }
00963
00964
00965
00966 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00967 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00968 int nprocs, int nprocs_recv, int total_elements)
00969 {
00970 typedef struct {
00971 ADIO_Offset *off_list;
00972 int *len_list;
00973 int nelem;
00974 } heap_struct;
00975
00976 heap_struct *a, tmp;
00977 int i, j, heapsize, l, r, k, smallest;
00978
00979 a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
00980
00981 j = 0;
00982 for (i=0; i<nprocs; i++)
00983 if (count[i]) {
00984 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
00985 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
00986 a[j].nelem = count[i];
00987 j++;
00988 }
00989
00990
00991
00992
00993 heapsize = nprocs_recv;
00994 for (i=heapsize/2 - 1; i>=0; i--) {
00995
00996
00997
00998
00999 k = i;
01000 for(;;) {
01001 l = 2*(k+1) - 1;
01002 r = 2*(k+1);
01003
01004 if ((l < heapsize) &&
01005 (*(a[l].off_list) < *(a[k].off_list)))
01006 smallest = l;
01007 else smallest = k;
01008
01009 if ((r < heapsize) &&
01010 (*(a[r].off_list) < *(a[smallest].off_list)))
01011 smallest = r;
01012
01013 if (smallest != k) {
01014 tmp.off_list = a[k].off_list;
01015 tmp.len_list = a[k].len_list;
01016 tmp.nelem = a[k].nelem;
01017
01018 a[k].off_list = a[smallest].off_list;
01019 a[k].len_list = a[smallest].len_list;
01020 a[k].nelem = a[smallest].nelem;
01021
01022 a[smallest].off_list = tmp.off_list;
01023 a[smallest].len_list = tmp.len_list;
01024 a[smallest].nelem = tmp.nelem;
01025
01026 k = smallest;
01027 }
01028 else break;
01029 }
01030 }
01031
01032 for (i=0; i<total_elements; i++) {
01033
01034 srt_off[i] = *(a[0].off_list);
01035 srt_len[i] = *(a[0].len_list);
01036 (a[0].nelem)--;
01037
01038 if (!a[0].nelem) {
01039 a[0].off_list = a[heapsize-1].off_list;
01040 a[0].len_list = a[heapsize-1].len_list;
01041 a[0].nelem = a[heapsize-1].nelem;
01042 heapsize--;
01043 }
01044 else {
01045 (a[0].off_list)++;
01046 (a[0].len_list)++;
01047 }
01048
01049
01050 k = 0;
01051 for (;;) {
01052 l = 2*(k+1) - 1;
01053 r = 2*(k+1);
01054
01055 if ((l < heapsize) &&
01056 (*(a[l].off_list) < *(a[k].off_list)))
01057 smallest = l;
01058 else smallest = k;
01059
01060 if ((r < heapsize) &&
01061 (*(a[r].off_list) < *(a[smallest].off_list)))
01062 smallest = r;
01063
01064 if (smallest != k) {
01065 tmp.off_list = a[k].off_list;
01066 tmp.len_list = a[k].len_list;
01067 tmp.nelem = a[k].nelem;
01068
01069 a[k].off_list = a[smallest].off_list;
01070 a[k].len_list = a[smallest].len_list;
01071 a[k].nelem = a[smallest].nelem;
01072
01073 a[smallest].off_list = tmp.off_list;
01074 a[smallest].len_list = tmp.len_list;
01075 a[smallest].nelem = tmp.nelem;
01076
01077 k = smallest;
01078 }
01079 else break;
01080 }
01081 }
01082 ADIOI_Free(a);
01083 }