00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "adio.h"
00010 #include "adio_extern.h"
00011 #ifdef PROFILE
00012 #include "mpe.h"
00013 #endif
00014
00015
00016 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00017 datatype, int nprocs, int myrank, ADIOI_Access
00018 *others_req, ADIO_Offset *offset_list,
00019 int *len_list, int contig_access_count, ADIO_Offset
00020 min_st_offset, ADIO_Offset fd_size,
00021 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00022 int *buf_idx, int *error_code);
00023 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00024 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00025 *offset_list, int *len_list, int *send_size,
00026 int *recv_size, ADIO_Offset off, int size,
00027 int *count, int *start_pos, int *partial_recv,
00028 int *sent_to_proc, int nprocs,
00029 int myrank, int
00030 buftype_is_contig, int contig_access_count,
00031 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00032 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00033 ADIOI_Access *others_req,
00034 int *send_buf_idx, int *curr_to_proc,
00035 int *done_to_proc, int *hole, int iter,
00036 MPI_Aint buftype_extent, int *buf_idx, int *error_code);
00037 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **send_buf, ADIO_Offset
00038 *offset_list, int *len_list, int *send_size,
00039 MPI_Request *requests, int *sent_to_proc,
00040 int nprocs, int myrank,
00041 int contig_access_count, ADIO_Offset
00042 min_st_offset, ADIO_Offset fd_size,
00043 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00044 int *send_buf_idx, int *curr_to_proc,
00045 int *done_to_proc, int iter,
00046 MPI_Aint buftype_extent);
00047 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00048 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00049 int nprocs, int nprocs_recv, int total_elements);
00050
00051
00052 void ADIOI_GEN_WriteStridedColl(ADIO_File fd, void *buf, int count,
00053 MPI_Datatype datatype, int file_ptr_type,
00054 ADIO_Offset offset, ADIO_Status *status, int
00055 *error_code)
00056 {
00057
00058
00059
00060
00061
00062
00063 ADIOI_Access *my_req;
00064
00065
00066
00067 ADIOI_Access *others_req;
00068
00069
00070
00071 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00072 int *len_list, contig_access_count, interleave_count;
00073 int buftype_is_contig, *buf_idx;
00074 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00075 ADIO_Offset *offset_list, start_offset, end_offset, *st_offsets, orig_fp;
00076 ADIO_Offset *fd_start, *fd_end, fd_size, min_st_offset, *end_offsets;
00077 ADIO_Offset off;
00078
00079 #ifdef PROFILE
00080 MPE_Log_event(13, 0, "start computation");
00081 #endif
00082
00083 MPI_Comm_size(fd->comm, &nprocs);
00084 MPI_Comm_rank(fd->comm, &myrank);
00085
00086
00087
00088
00089 nprocs_for_coll = fd->hints->cb_nodes;
00090
00091
00092
00093
00094
00095
00096
00097
00098 orig_fp = fd->fp_ind;
00099 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00100 &offset_list, &len_list, &start_offset,
00101 &end_offset, &contig_access_count);
00102
00103
00104
00105
00106
00107 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00108 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00109
00110 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1, ADIO_OFFSET,
00111 fd->comm);
00112 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1, ADIO_OFFSET,
00113 fd->comm);
00114
00115
00116 interleave_count = 0;
00117 for (i=1; i<nprocs; i++)
00118 if (st_offsets[i] < end_offsets[i-1]) interleave_count++;
00119
00120
00121
00122 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00123
00124 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
00125 (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
00126 {
00127
00128 ADIOI_Free(offset_list);
00129 ADIOI_Free(len_list);
00130 ADIOI_Free(st_offsets);
00131 ADIOI_Free(end_offsets);
00132
00133 fd->fp_ind = orig_fp;
00134 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00135
00136 if (buftype_is_contig && filetype_is_contig) {
00137 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00138 off = fd->disp + (fd->etype_size) * offset;
00139 ADIO_WriteContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
00140 off, status, error_code);
00141 }
00142 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00143 0, status, error_code);
00144 }
00145 else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00146 offset, status, error_code);
00147
00148 return;
00149 }
00150
00151
00152
00153
00154
00155 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00156 nprocs_for_coll, &min_st_offset,
00157 &fd_start, &fd_end, &fd_size);
00158
00159
00160
00161
00162
00163 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00164 min_st_offset, fd_start, fd_end, fd_size,
00165 nprocs, &count_my_req_procs,
00166 &count_my_req_per_proc, &my_req,
00167 &buf_idx);
00168
00169
00170
00171
00172
00173
00174
00175
00176 ADIOI_Calc_others_req(fd, count_my_req_procs,
00177 count_my_req_per_proc, my_req,
00178 nprocs, myrank,
00179 &count_others_req_procs, &others_req);
00180
00181 ADIOI_Free(count_my_req_per_proc);
00182 for (i=0; i < nprocs; i++) {
00183 if (my_req[i].count) {
00184 ADIOI_Free(my_req[i].offsets);
00185 ADIOI_Free(my_req[i].lens);
00186 }
00187 }
00188 ADIOI_Free(my_req);
00189
00190
00191 ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00192 others_req, offset_list,
00193 len_list, contig_access_count, min_st_offset,
00194 fd_size, fd_start, fd_end, buf_idx, error_code);
00195
00196 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00197
00198
00199
00200 for (i=0; i<nprocs; i++) {
00201 if (others_req[i].count) {
00202 ADIOI_Free(others_req[i].offsets);
00203 ADIOI_Free(others_req[i].lens);
00204 ADIOI_Free(others_req[i].mem_ptrs);
00205 }
00206 }
00207 ADIOI_Free(others_req);
00208
00209 ADIOI_Free(buf_idx);
00210 ADIOI_Free(offset_list);
00211 ADIOI_Free(len_list);
00212 ADIOI_Free(st_offsets);
00213 ADIOI_Free(end_offsets);
00214 ADIOI_Free(fd_start);
00215 ADIOI_Free(fd_end);
00216
00217 #ifdef HAVE_STATUS_SET_BYTES
00218 if (status) {
00219 int bufsize, size;
00220
00221 MPI_Type_size(datatype, &size);
00222 bufsize = size * count;
00223 MPIR_Status_set_bytes(status, datatype, bufsize);
00224 }
00225
00226
00227 #endif
00228
00229 fd->fp_sys_posn = -1;
00230 }
00231
00232
00233
00234 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00235 datatype, int nprocs, int myrank, ADIOI_Access
00236 *others_req, ADIO_Offset *offset_list,
00237 int *len_list, int contig_access_count, ADIO_Offset
00238 min_st_offset, ADIO_Offset fd_size,
00239 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00240 int *buf_idx, int *error_code)
00241 {
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251 int hole, i, j, m, size=0, ntimes, max_ntimes, buftype_is_contig;
00252 ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
00253 char *write_buf=NULL;
00254 int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
00255 int *partial_recv, *sent_to_proc, *start_pos, flag;
00256 int *send_buf_idx, *curr_to_proc, *done_to_proc;
00257 MPI_Status status;
00258 ADIOI_Flatlist_node *flat_buf=NULL;
00259 MPI_Aint buftype_extent;
00260 int info_flag, coll_bufsize;
00261 char *value;
00262
00263 *error_code = MPI_SUCCESS;
00264
00265
00266
00267
00268
00269
00270 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
00271 MPI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
00272 &info_flag);
00273 coll_bufsize = atoi(value);
00274 ADIOI_Free(value);
00275
00276
00277 for (i=0; i < nprocs; i++) {
00278 if (others_req[i].count) {
00279 st_loc = others_req[i].offsets[0];
00280 end_loc = others_req[i].offsets[0];
00281 break;
00282 }
00283 }
00284
00285 for (i=0; i < nprocs; i++)
00286 for (j=0; j < others_req[i].count; j++) {
00287 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00288 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00289 + others_req[i].lens[j] - 1));
00290 }
00291
00292
00293
00294 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00295
00296 if ((st_loc==-1) && (end_loc==-1)) ntimes = 0;
00297
00298 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
00299 fd->comm);
00300
00301 if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize);
00302
00303 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00304
00305
00306 count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00307
00308
00309
00310 partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00311
00312
00313
00314
00315 send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00316
00317
00318
00319 recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00320
00321
00322 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00323
00324
00325
00326 send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00327 curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00328 done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00329
00330
00331 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00332
00333
00334
00335 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00336 if (!buftype_is_contig) {
00337 ADIOI_Flatten_datatype(datatype);
00338 flat_buf = ADIOI_Flatlist;
00339 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00340 }
00341 MPI_Type_extent(datatype, &buftype_extent);
00342
00343
00344
00345
00346
00347
00348
00349
00350 ADIOI_Complete_async(error_code);
00351 if (*error_code != MPI_SUCCESS) return;
00352 MPI_Barrier(fd->comm);
00353
00354 done = 0;
00355 off = st_loc;
00356
00357 #ifdef PROFILE
00358 MPE_Log_event(14, 0, "end computation");
00359 #endif
00360
00361 for (m=0; m < ntimes; m++) {
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379 #ifdef PROFILE
00380 MPE_Log_event(13, 0, "start computation");
00381 #endif
00382 for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
00383
00384 size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done));
00385
00386 for (i=0; i < nprocs; i++) {
00387 if (others_req[i].count) {
00388 start_pos[i] = curr_offlen_ptr[i];
00389 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
00390 if (partial_recv[i]) {
00391
00392
00393 req_off = others_req[i].offsets[j] +
00394 partial_recv[i];
00395 req_len = others_req[i].lens[j] -
00396 partial_recv[i];
00397 partial_recv[i] = 0;
00398
00399 others_req[i].offsets[j] = req_off;
00400 others_req[i].lens[j] = req_len;
00401 }
00402 else {
00403 req_off = others_req[i].offsets[j];
00404 req_len = others_req[i].lens[j];
00405 }
00406 if (req_off < off + size) {
00407 count[i]++;
00408 MPI_Address(write_buf+req_off-off,
00409 &(others_req[i].mem_ptrs[j]));
00410 recv_size[i] += (int)(ADIOI_MIN(off + (ADIO_Offset)size -
00411 req_off, req_len));
00412
00413 if (off+size-req_off < req_len) {
00414 partial_recv[i] = (int) (off + size - req_off);
00415 if ((j+1 < others_req[i].count) &&
00416 (others_req[i].offsets[j+1] <
00417 off+size)) {
00418 FPRINTF(stderr, "Error: the filetype specifies an overlapping write\n");
00419 MPI_Abort(MPI_COMM_WORLD, 1);
00420 }
00421 break;
00422 }
00423 }
00424 else break;
00425 }
00426 curr_offlen_ptr[i] = j;
00427 }
00428 }
00429
00430 #ifdef PROFILE
00431 MPE_Log_event(14, 0, "end computation");
00432 MPE_Log_event(7, 0, "start communication");
00433 #endif
00434 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00435 len_list, send_size, recv_size, off, size, count,
00436 start_pos, partial_recv,
00437 sent_to_proc, nprocs, myrank,
00438 buftype_is_contig, contig_access_count,
00439 min_st_offset, fd_size, fd_start, fd_end,
00440 others_req, send_buf_idx, curr_to_proc,
00441 done_to_proc, &hole, m, buftype_extent, buf_idx,
00442 error_code);
00443 if (*error_code != MPI_SUCCESS) return;
00444 #ifdef PROFILE
00445 MPE_Log_event(8, 0, "end communication");
00446 #endif
00447
00448 flag = 0;
00449 for (i=0; i<nprocs; i++)
00450 if (count[i]) flag = 1;
00451
00452 if (flag) {
00453 ADIO_WriteContig(fd, write_buf, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00454 off, &status, error_code);
00455 if (*error_code != MPI_SUCCESS) return;
00456 }
00457
00458 off += size;
00459 done += size;
00460 }
00461
00462 for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
00463 #ifdef PROFILE
00464 MPE_Log_event(7, 0, "start communication");
00465 #endif
00466 for (m=ntimes; m<max_ntimes; m++)
00467
00468 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00469 len_list, send_size, recv_size, off, size, count,
00470 start_pos, partial_recv,
00471 sent_to_proc, nprocs, myrank,
00472 buftype_is_contig, contig_access_count,
00473 min_st_offset, fd_size, fd_start, fd_end,
00474 others_req, send_buf_idx,
00475 curr_to_proc, done_to_proc, &hole, m,
00476 buftype_extent, buf_idx, error_code);
00477 if (*error_code != MPI_SUCCESS) return;
00478 #ifdef PROFILE
00479 MPE_Log_event(8, 0, "end communication");
00480 #endif
00481
00482 if (ntimes) ADIOI_Free(write_buf);
00483 ADIOI_Free(curr_offlen_ptr);
00484 ADIOI_Free(count);
00485 ADIOI_Free(partial_recv);
00486 ADIOI_Free(send_size);
00487 ADIOI_Free(recv_size);
00488 ADIOI_Free(sent_to_proc);
00489 ADIOI_Free(start_pos);
00490 ADIOI_Free(send_buf_idx);
00491 ADIOI_Free(curr_to_proc);
00492 ADIOI_Free(done_to_proc);
00493 }
00494
00495
00496
00497 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00498 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
00499 *offset_list, int *len_list, int *send_size,
00500 int *recv_size, ADIO_Offset off, int size,
00501 int *count, int *start_pos, int *partial_recv,
00502 int *sent_to_proc, int nprocs,
00503 int myrank, int
00504 buftype_is_contig, int contig_access_count,
00505 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00506 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00507 ADIOI_Access *others_req,
00508 int *send_buf_idx, int *curr_to_proc,
00509 int *done_to_proc, int *hole, int iter,
00510 MPI_Aint buftype_extent, int *buf_idx,
00511 int *error_code)
00512 {
00513 int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
00514 char **send_buf = NULL;
00515 MPI_Request *requests;
00516 MPI_Datatype *recv_types;
00517 MPI_Status *statuses, status;
00518 int *srt_len, sum;
00519 ADIO_Offset *srt_off;
00520 #ifndef PRINT_ERR_MSG
00521 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00522 #endif
00523
00524
00525
00526
00527 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
00528
00529
00530
00531 nprocs_recv = 0;
00532 for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
00533
00534 recv_types = (MPI_Datatype *)
00535 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
00536
00537
00538 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00539 j = 0;
00540 for (i=0; i<nprocs; i++) {
00541 if (recv_size[i]) {
00542
00543 if (partial_recv[i]) {
00544 k = start_pos[i] + count[i] - 1;
00545 tmp_len[i] = others_req[i].lens[k];
00546 others_req[i].lens[k] = partial_recv[i];
00547 }
00548 MPI_Type_hindexed(count[i],
00549 &(others_req[i].lens[start_pos[i]]),
00550 &(others_req[i].mem_ptrs[start_pos[i]]),
00551 MPI_BYTE, recv_types+j);
00552
00553 MPI_Type_commit(recv_types+j);
00554 j++;
00555 }
00556 }
00557
00558
00559
00560
00561
00562 sum = 0;
00563 for (i=0; i<nprocs; i++) sum += count[i];
00564 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
00565 srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
00566
00567
00568 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00569 nprocs, nprocs_recv, sum);
00570
00571
00572 for (i=0; i<nprocs; i++)
00573 if (partial_recv[i]) {
00574 k = start_pos[i] + count[i] - 1;
00575 others_req[i].lens[k] = tmp_len[i];
00576 }
00577 ADIOI_Free(tmp_len);
00578
00579
00580 *hole = 0;
00581 for (i=0; i<sum-1; i++)
00582 if (srt_off[i]+srt_len[i] < srt_off[i+1]) {
00583 *hole = 1;
00584 break;
00585 }
00586
00587 ADIOI_Free(srt_off);
00588 ADIOI_Free(srt_len);
00589
00590 if (nprocs_recv) {
00591 if (*hole) {
00592 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
00593 ADIO_EXPLICIT_OFFSET, off, &status, &err);
00594 if (err != MPI_SUCCESS) {
00595 #ifdef PRINT_ERR_MSG
00596 FPRINTF(stderr, "ADIOI_GEN_WriteStridedColl: ROMIO tries to optimize this access by doing a read-modify-write, but is unable to read the file. Please give the file read permission and open it with MPI_MODE_RDWR.\n");
00597 MPI_Abort(MPI_COMM_WORLD, 1);
00598 #else
00599 *error_code = MPIR_Err_setmsg(MPI_ERR_IO, MPIR_READ_PERM,
00600 myname, (char *) 0, (char *) 0);
00601 ADIOI_Error(fd, *error_code, myname);
00602 return;
00603 #endif
00604 }
00605 }
00606 }
00607
00608 nprocs_send = 0;
00609 for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
00610
00611 requests = (MPI_Request *)
00612 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00613
00614
00615
00616 j = 0;
00617 for (i=0; i<nprocs; i++) {
00618 if (recv_size[i]) {
00619 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00620 fd->comm, requests+j);
00621 j++;
00622 }
00623 }
00624
00625
00626
00627
00628 if (buftype_is_contig) {
00629 j = 0;
00630 for (i=0; i < nprocs; i++)
00631 if (send_size[i]) {
00632 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
00633 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
00634 requests+nprocs_recv+j);
00635 j++;
00636 buf_idx[i] += send_size[i];
00637 }
00638 }
00639 else if (nprocs_send) {
00640
00641 send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
00642 for (i=0; i < nprocs; i++)
00643 if (send_size[i])
00644 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00645
00646 ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
00647 offset_list, len_list, send_size,
00648 requests+nprocs_recv,
00649 sent_to_proc, nprocs, myrank,
00650 contig_access_count,
00651 min_st_offset, fd_size, fd_start, fd_end,
00652 send_buf_idx, curr_to_proc, done_to_proc, iter,
00653 buftype_extent);
00654
00655 }
00656
00657 for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
00658 ADIOI_Free(recv_types);
00659
00660 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00661 sizeof(MPI_Status));
00662
00663
00664 #ifdef NEEDS_MPI_TEST
00665 i = 0;
00666 while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
00667 #else
00668 MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
00669 #endif
00670
00671 ADIOI_Free(statuses);
00672 ADIOI_Free(requests);
00673 if (!buftype_is_contig && nprocs_send) {
00674 for (i=0; i < nprocs; i++)
00675 if (send_size[i]) ADIOI_Free(send_buf[i]);
00676 ADIOI_Free(send_buf);
00677 }
00678 }
00679
00680
00681 #define ADIOI_BUF_INCR \
00682 { \
00683 while (buf_incr) { \
00684 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00685 user_buf_idx += size_in_buf; \
00686 flat_buf_sz -= size_in_buf; \
00687 if (!flat_buf_sz) { \
00688 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00689 else { \
00690 flat_buf_idx = 0; \
00691 n_buftypes++; \
00692 } \
00693 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00694 n_buftypes*buftype_extent; \
00695 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00696 } \
00697 buf_incr -= size_in_buf; \
00698 } \
00699 }
00700
00701
00702 #define ADIOI_BUF_COPY \
00703 { \
00704 while (size) { \
00705 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00706 memcpy(&(send_buf[p][send_buf_idx[p]]), \
00707 ((char *) buf) + user_buf_idx, size_in_buf); \
00708 send_buf_idx[p] += size_in_buf; \
00709 user_buf_idx += size_in_buf; \
00710 flat_buf_sz -= size_in_buf; \
00711 if (!flat_buf_sz) { \
00712 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00713 else { \
00714 flat_buf_idx = 0; \
00715 n_buftypes++; \
00716 } \
00717 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00718 n_buftypes*buftype_extent; \
00719 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00720 } \
00721 size -= size_in_buf; \
00722 buf_incr -= size_in_buf; \
00723 } \
00724 ADIOI_BUF_INCR \
00725 }
00726
00727
00728
00729 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00730 *flat_buf, char **send_buf, ADIO_Offset
00731 *offset_list, int *len_list, int *send_size,
00732 MPI_Request *requests, int *sent_to_proc,
00733 int nprocs, int myrank,
00734 int contig_access_count,
00735 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00736 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00737 int *send_buf_idx, int *curr_to_proc,
00738 int *done_to_proc, int iter,
00739 MPI_Aint buftype_extent)
00740 {
00741
00742
00743 int i, p, flat_buf_idx, size;
00744 int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
00745 ADIO_Offset off, len, rem_len, user_buf_idx;
00746
00747
00748
00749
00750
00751
00752
00753
00754 for (i=0; i < nprocs; i++) {
00755 send_buf_idx[i] = curr_to_proc[i] = 0;
00756 done_to_proc[i] = sent_to_proc[i];
00757 }
00758 jj = 0;
00759
00760 user_buf_idx = flat_buf->indices[0];
00761 flat_buf_idx = 0;
00762 n_buftypes = 0;
00763 flat_buf_sz = flat_buf->blocklens[0];
00764
00765
00766
00767
00768
00769 for (i=0; i<contig_access_count; i++) {
00770 off = offset_list[i];
00771 rem_len = (ADIO_Offset) len_list[i];
00772
00773
00774 while (rem_len != 0) {
00775 len = rem_len;
00776
00777
00778
00779
00780 p = ADIOI_Calc_aggregator(fd,
00781 off,
00782 min_st_offset,
00783 &len,
00784 fd_size,
00785 fd_start,
00786 fd_end);
00787
00788 if (send_buf_idx[p] < send_size[p]) {
00789 if (curr_to_proc[p]+len > done_to_proc[p]) {
00790 if (done_to_proc[p] > curr_to_proc[p]) {
00791 size = (int)ADIOI_MIN(curr_to_proc[p] + len -
00792 done_to_proc[p], send_size[p]-send_buf_idx[p]);
00793 buf_incr = done_to_proc[p] - curr_to_proc[p];
00794 ADIOI_BUF_INCR
00795 buf_incr = (int)(curr_to_proc[p] + len - done_to_proc[p]);
00796 curr_to_proc[p] = done_to_proc[p] + size;
00797 ADIOI_BUF_COPY
00798 }
00799 else {
00800 size = (int)ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
00801 buf_incr = (int)len;
00802 curr_to_proc[p] += size;
00803 ADIOI_BUF_COPY
00804 }
00805 if (send_buf_idx[p] == send_size[p]) {
00806 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
00807 myrank+p+100*iter, fd->comm, requests+jj);
00808 jj++;
00809 }
00810 }
00811 else {
00812 curr_to_proc[p] += (int)len;
00813 buf_incr = (int)len;
00814 ADIOI_BUF_INCR
00815 }
00816 }
00817 else {
00818 buf_incr = (int)len;
00819 ADIOI_BUF_INCR
00820 }
00821 off += len;
00822 rem_len -= len;
00823 }
00824 }
00825 for (i=0; i < nprocs; i++)
00826 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
00827 }
00828
00829
00830
00831 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00832 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00833 int nprocs, int nprocs_recv, int total_elements)
00834 {
00835 typedef struct {
00836 ADIO_Offset *off_list;
00837 int *len_list;
00838 int nelem;
00839 } heap_struct;
00840
00841 heap_struct *a, tmp;
00842 int i, j, heapsize, l, r, k, smallest;
00843
00844 a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
00845
00846 j = 0;
00847 for (i=0; i<nprocs; i++)
00848 if (count[i]) {
00849 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
00850 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
00851 a[j].nelem = count[i];
00852 j++;
00853 }
00854
00855
00856
00857
00858 heapsize = nprocs_recv;
00859 for (i=heapsize/2 - 1; i>=0; i--) {
00860
00861
00862
00863
00864 k = i;
00865 while (1) {
00866 l = 2*(k+1) - 1;
00867 r = 2*(k+1);
00868
00869 if ((l < heapsize) &&
00870 (*(a[l].off_list) < *(a[k].off_list)))
00871 smallest = l;
00872 else smallest = k;
00873
00874 if ((r < heapsize) &&
00875 (*(a[r].off_list) < *(a[smallest].off_list)))
00876 smallest = r;
00877
00878 if (smallest != k) {
00879 tmp.off_list = a[k].off_list;
00880 tmp.len_list = a[k].len_list;
00881 tmp.nelem = a[k].nelem;
00882
00883 a[k].off_list = a[smallest].off_list;
00884 a[k].len_list = a[smallest].len_list;
00885 a[k].nelem = a[smallest].nelem;
00886
00887 a[smallest].off_list = tmp.off_list;
00888 a[smallest].len_list = tmp.len_list;
00889 a[smallest].nelem = tmp.nelem;
00890
00891 k = smallest;
00892 }
00893 else break;
00894 }
00895 }
00896
00897 for (i=0; i<total_elements; i++) {
00898
00899 srt_off[i] = *(a[0].off_list);
00900 srt_len[i] = *(a[0].len_list);
00901 (a[0].nelem)--;
00902
00903 if (!a[0].nelem) {
00904 a[0].off_list = a[heapsize-1].off_list;
00905 a[0].len_list = a[heapsize-1].len_list;
00906 a[0].nelem = a[heapsize-1].nelem;
00907 heapsize--;
00908 }
00909 else {
00910 (a[0].off_list)++;
00911 (a[0].len_list)++;
00912 }
00913
00914
00915 k = 0;
00916 while (1) {
00917 l = 2*(k+1) - 1;
00918 r = 2*(k+1);
00919
00920 if ((l < heapsize) &&
00921 (*(a[l].off_list) < *(a[k].off_list)))
00922 smallest = l;
00923 else smallest = k;
00924
00925 if ((r < heapsize) &&
00926 (*(a[r].off_list) < *(a[smallest].off_list)))
00927 smallest = r;
00928
00929 if (smallest != k) {
00930 tmp.off_list = a[k].off_list;
00931 tmp.len_list = a[k].len_list;
00932 tmp.nelem = a[k].nelem;
00933
00934 a[k].off_list = a[smallest].off_list;
00935 a[k].len_list = a[smallest].len_list;
00936 a[k].nelem = a[smallest].nelem;
00937
00938 a[smallest].off_list = tmp.off_list;
00939 a[smallest].len_list = tmp.len_list;
00940 a[smallest].nelem = tmp.nelem;
00941
00942 k = smallest;
00943 }
00944 else break;
00945 }
00946 }
00947 ADIOI_Free(a);
00948 }