00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ad_lustre.h"
00012 #include "adio_extern.h"
00013
00014
00015 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
00016 MPI_Datatype datatype, int nprocs,
00017 int myrank,
00018 ADIOI_Access *others_req,
00019 ADIOI_Access *my_req,
00020 ADIO_Offset *offset_list,
00021 ADIO_Offset *len_list,
00022 int contig_access_count,
00023 int *striping_info,
00024 int **buf_idx, int *error_code);
00025 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
00026 ADIOI_Flatlist_node *flat_buf,
00027 char **send_buf,
00028 ADIO_Offset *offset_list,
00029 ADIO_Offset *len_list, int *send_size,
00030 MPI_Request *requests,
00031 int *sent_to_proc, int nprocs,
00032 int myrank, int contig_access_count,
00033 int *striping_info,
00034 int *send_buf_idx,
00035 int *curr_to_proc,
00036 int *done_to_proc, int iter,
00037 MPI_Aint buftype_extent);
00038 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
00039 char *write_buf,
00040 ADIOI_Flatlist_node *flat_buf,
00041 ADIO_Offset *offset_list,
00042 ADIO_Offset *len_list, int *send_size,
00043 int *recv_size, ADIO_Offset off,
00044 int size, int *count,
00045 int *start_pos, int *partial_recv,
00046 int *sent_to_proc, int nprocs,
00047 int myrank, int buftype_is_contig,
00048 int contig_access_count,
00049 int *striping_info,
00050 ADIOI_Access *others_req,
00051 int *send_buf_idx,
00052 int *curr_to_proc,
00053 int *done_to_proc, int *hole,
00054 int iter, MPI_Aint buftype_extent,
00055 int *buf_idx, int *error_code);
00056 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00057 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00058 int nprocs, int nprocs_recv, int total_elements);
00059
00060 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
00061 MPI_Datatype datatype,
00062 int file_ptr_type, ADIO_Offset offset,
00063 ADIO_Status *status, int *error_code)
00064 {
00065
00066
00067
00068
00069
00070
00071
00072 ADIOI_Access *my_req;
00073
00074
00075
00076 ADIOI_Access *others_req;
00077
00078
00079
00080 int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
00081 int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
00082 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00083 ADIO_Offset orig_fp, start_offset, end_offset, off;
00084 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
00085 ADIO_Offset *len_list = NULL;
00086 int **buf_idx = NULL, *striping_info = NULL;
00087 int old_error, tmp_error;
00088
00089 MPI_Comm_size(fd->comm, &nprocs);
00090 MPI_Comm_rank(fd->comm, &myrank);
00091
00092 orig_fp = fd->fp_ind;
00093
00094
00095 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00096
00097
00098
00099
00100
00101
00102
00103 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00104 &offset_list, &len_list, &start_offset,
00105 &end_offset, &contig_access_count);
00106
00107
00108
00109
00110
00111 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
00112 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
00113 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00114 ADIO_OFFSET, fd->comm);
00115 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00116 ADIO_OFFSET, fd->comm);
00117
00118 for (i = 1; i < nprocs; i++)
00119 if ((st_offsets[i] < end_offsets[i-1]) &&
00120 (st_offsets[i] <= end_offsets[i]))
00121 interleave_count++;
00122
00123
00124
00125
00126
00127
00128
00129 if (interleave_count > 0) {
00130 do_collect = 1;
00131 } else {
00132 do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
00133 len_list, nprocs);
00134 }
00135 }
00136 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00137
00138
00139 if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
00140 fd->hints->cb_write == ADIOI_HINT_DISABLE) {
00141
00142
00143 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00144 ADIOI_Free(offset_list);
00145 ADIOI_Free(len_list);
00146 ADIOI_Free(st_offsets);
00147 ADIOI_Free(end_offsets);
00148 }
00149
00150 fd->fp_ind = orig_fp;
00151 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00152 if (buftype_is_contig && filetype_is_contig) {
00153 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00154 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00155 ADIO_WriteContig(fd, buf, count, datatype,
00156 ADIO_EXPLICIT_OFFSET,
00157 off, status, error_code);
00158 } else
00159 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00160 0, status, error_code);
00161 } else {
00162 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00163 offset, status, error_code);
00164 }
00165 return;
00166 }
00167
00168
00169 ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
00170
00171
00172
00173
00174 ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00175 striping_info, nprocs, &count_my_req_procs,
00176 &count_my_req_per_proc, &my_req,
00177 &buf_idx);
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187 ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
00188 my_req, nprocs, myrank, &count_others_req_procs,
00189 &others_req);
00190 ADIOI_Free(count_my_req_per_proc);
00191
00192
00193 ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00194 others_req, my_req, offset_list, len_list,
00195 contig_access_count, striping_info,
00196 buf_idx, error_code);
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209 old_error = *error_code;
00210 if (*error_code != MPI_SUCCESS)
00211 *error_code = MPI_ERR_IO;
00212
00213
00214
00215 #ifdef ADIOI_MPE_LOGGING
00216 MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
00217 #endif
00218 if (fd->hints->cb_nodes == 1)
00219 MPI_Bcast(error_code, 1, MPI_INT,
00220 fd->hints->ranklist[0], fd->comm);
00221 else {
00222 tmp_error = *error_code;
00223 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
00224 MPI_MAX, fd->comm);
00225 }
00226 #ifdef ADIOI_MPE_LOGGING
00227 MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
00228 #endif
00229
00230 if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
00231 *error_code = old_error;
00232
00233
00234 if (!buftype_is_contig)
00235 ADIOI_Delete_flattened(datatype);
00236
00237
00238
00239 for (i = 0; i < nprocs; i++) {
00240 if (others_req[i].count) {
00241 ADIOI_Free(others_req[i].offsets);
00242 ADIOI_Free(others_req[i].lens);
00243 ADIOI_Free(others_req[i].mem_ptrs);
00244 }
00245 }
00246 ADIOI_Free(others_req);
00247
00248 for (i = 0; i < nprocs; i++) {
00249 if (my_req[i].count) {
00250 ADIOI_Free(my_req[i].offsets);
00251 ADIOI_Free(my_req[i].lens);
00252 }
00253 }
00254 ADIOI_Free(my_req);
00255 for (i = 0; i < nprocs; i++) {
00256 ADIOI_Free(buf_idx[i]);
00257 }
00258 ADIOI_Free(buf_idx);
00259 ADIOI_Free(offset_list);
00260 ADIOI_Free(len_list);
00261 ADIOI_Free(st_offsets);
00262 ADIOI_Free(end_offsets);
00263 ADIOI_Free(striping_info);
00264
00265 #ifdef HAVE_STATUS_SET_BYTES
00266 if (status) {
00267 int bufsize, size;
00268
00269 MPI_Type_size(datatype, &size);
00270 bufsize = size * count;
00271 MPIR_Status_set_bytes(status, datatype, bufsize);
00272 }
00273
00274
00275
00276 #endif
00277
00278 fd->fp_sys_posn = -1;
00279 }
00280
00281
00282
00283
00284 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
00285 MPI_Datatype datatype, int nprocs,
00286 int myrank, ADIOI_Access *others_req,
00287 ADIOI_Access *my_req,
00288 ADIO_Offset *offset_list,
00289 ADIO_Offset *len_list,
00290 int contig_access_count,
00291 int *striping_info, int **buf_idx,
00292 int *error_code)
00293 {
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304 int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
00305 ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
00306 ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
00307 ADIO_Offset max_size, step_size = 0;
00308 int real_size, req_len, send_len;
00309 int *recv_curr_offlen_ptr, *recv_count, *recv_size;
00310 int *send_curr_offlen_ptr, *send_size;
00311 int *partial_recv, *sent_to_proc, *recv_start_pos;
00312 int *send_buf_idx, *curr_to_proc, *done_to_proc;
00313 int *this_buf_idx;
00314 char *write_buf = NULL;
00315 MPI_Status status;
00316 ADIOI_Flatlist_node *flat_buf = NULL;
00317 MPI_Aint buftype_extent;
00318 int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
00319 int data_sieving = 0;
00320
00321 *error_code = MPI_SUCCESS;
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331 for (i = 0; i < nprocs; i++) {
00332 if (others_req[i].count) {
00333 st_loc = others_req[i].offsets[0];
00334 end_loc = others_req[i].offsets[0];
00335 break;
00336 }
00337 }
00338 for (i = 0; i < nprocs; i++) {
00339 for (j = 0; j < others_req[i].count; j++) {
00340 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00341 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
00342 others_req[i].lens[j] - 1));
00343 }
00344 }
00345
00346 if ((st_loc == -1) && (end_loc == -1))
00347 ntimes = 0;
00348 MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
00349
00350 if (st_loc == -1)
00351 st_loc = max_end_loc;
00352 MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
00353
00354 min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
00355
00356
00357
00358
00359
00360 step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
00361 max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
00362 + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
00363
00364 if (ntimes)
00365 write_buf = (char *) ADIOI_Malloc(stripe_size);
00366
00367
00368 off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
00369 for (m = 0; m < max_ntimes; m ++)
00370 off_list[m] = max_end_loc;
00371 for (i = 0; i < nprocs; i++) {
00372 for (j = 0; j < others_req[i].count; j ++) {
00373 req_off = others_req[i].offsets[j];
00374 m = (int)((req_off - min_st_loc) / step_size);
00375 off_list[m] = ADIOI_MIN(off_list[m], req_off);
00376 }
00377 }
00378
00379 recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00380 send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00381
00382
00383 recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00384
00385
00386
00387 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00388
00389
00390
00391 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00392
00393
00394 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00395
00396
00397
00398 send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00399 curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00400 done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00401
00402
00403 this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00404
00405 recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00406
00407
00408
00409 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00410 if (!buftype_is_contig) {
00411 ADIOI_Flatten_datatype(datatype);
00412 flat_buf = ADIOI_Flatlist;
00413 while (flat_buf->type != datatype)
00414 flat_buf = flat_buf->next;
00415 }
00416 MPI_Type_extent(datatype, &buftype_extent);
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428 iter_st_off = min_st_loc;
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440 data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
00441
00442 for (m = 0; m < max_ntimes; m++) {
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466 for (i = 0; i < nprocs; i++)
00467 recv_count[i] = recv_size[i] = send_size[i] = 0;
00468
00469 off = off_list[m];
00470 max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
00471 real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
00472 off,
00473 end_loc - off + 1);
00474
00475 for (i = 0; i < nprocs; i++) {
00476 if (my_req[i].count) {
00477 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
00478 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
00479 send_off = my_req[i].offsets[j];
00480 send_len = my_req[i].lens[j];
00481 if (send_off < iter_st_off + max_size) {
00482 send_size[i] += send_len;
00483 } else {
00484 break;
00485 }
00486 }
00487 send_curr_offlen_ptr[i] = j;
00488 }
00489 if (others_req[i].count) {
00490 recv_start_pos[i] = recv_curr_offlen_ptr[i];
00491 for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
00492 req_off = others_req[i].offsets[j];
00493 req_len = others_req[i].lens[j];
00494 if (req_off < iter_st_off + max_size) {
00495 recv_count[i]++;
00496 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
00497 MPI_Address(write_buf + req_off - off,
00498 &(others_req[i].mem_ptrs[j]));
00499 recv_size[i] += req_len;
00500 } else {
00501 break;
00502 }
00503 }
00504 recv_curr_offlen_ptr[i] = j;
00505 }
00506 }
00507
00508 hole = data_sieving;
00509 ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00510 len_list, send_size, recv_size, off, real_size,
00511 recv_count, recv_start_pos, partial_recv,
00512 sent_to_proc, nprocs, myrank,
00513 buftype_is_contig, contig_access_count,
00514 striping_info, others_req, send_buf_idx,
00515 curr_to_proc, done_to_proc, &hole, m,
00516 buftype_extent, this_buf_idx, error_code);
00517 if (*error_code != MPI_SUCCESS)
00518 goto over;
00519
00520 flag = 0;
00521 for (i = 0; i < nprocs; i++)
00522 if (recv_count[i]) {
00523 flag = 1;
00524 break;
00525 }
00526 if (flag) {
00527
00528 if(data_sieving == ADIOI_HINT_ENABLE) {
00529 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
00530 ADIO_EXPLICIT_OFFSET, off, &status,
00531 error_code);
00532 } else {
00533
00534
00535 if (!hole) {
00536 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
00537 ADIO_EXPLICIT_OFFSET, off, &status,
00538 error_code);
00539 } else {
00540 for (i = 0; i < nprocs; i++) {
00541 if (others_req[i].count) {
00542 for (j = 0; j < others_req[i].count; j++) {
00543 if (others_req[i].offsets[j] < off + real_size &&
00544 others_req[i].offsets[j] >= off) {
00545 ADIO_WriteContig(fd,
00546 write_buf + others_req[i].offsets[j] - off,
00547 others_req[i].lens[j],
00548 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00549 others_req[i].offsets[j], &status,
00550 error_code);
00551 if (*error_code != MPI_SUCCESS)
00552 goto over;
00553 }
00554 }
00555 }
00556 }
00557 }
00558 }
00559 if (*error_code != MPI_SUCCESS)
00560 goto over;
00561 }
00562 iter_st_off += max_size;
00563 }
00564 over:
00565 if (ntimes)
00566 ADIOI_Free(write_buf);
00567 ADIOI_Free(recv_curr_offlen_ptr);
00568 ADIOI_Free(send_curr_offlen_ptr);
00569 ADIOI_Free(recv_count);
00570 ADIOI_Free(send_size);
00571 ADIOI_Free(recv_size);
00572 ADIOI_Free(sent_to_proc);
00573 ADIOI_Free(recv_start_pos);
00574 ADIOI_Free(send_buf_idx);
00575 ADIOI_Free(curr_to_proc);
00576 ADIOI_Free(done_to_proc);
00577 ADIOI_Free(this_buf_idx);
00578 ADIOI_Free(off_list);
00579 }
00580
00581
00582
00583
00584 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
00585 char *write_buf,
00586 ADIOI_Flatlist_node *flat_buf,
00587 ADIO_Offset *offset_list,
00588 ADIO_Offset *len_list, int *send_size,
00589 int *recv_size, ADIO_Offset off,
00590 int size, int *count,
00591 int *start_pos, int *partial_recv,
00592 int *sent_to_proc, int nprocs,
00593 int myrank, int buftype_is_contig,
00594 int contig_access_count,
00595 int *striping_info,
00596 ADIOI_Access *others_req,
00597 int *send_buf_idx,
00598 int *curr_to_proc, int *done_to_proc,
00599 int *hole, int iter,
00600 MPI_Aint buftype_extent,
00601 int *buf_idx, int *error_code)
00602 {
00603 int i, j, nprocs_recv, nprocs_send, err;
00604 char **send_buf = NULL;
00605 MPI_Request *requests, *send_req;
00606 MPI_Datatype *recv_types;
00607 MPI_Status *statuses, status;
00608 int *srt_len, sum, sum_recv;
00609 ADIO_Offset *srt_off;
00610 int data_sieving = *hole;
00611 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00612
00613
00614 nprocs_recv = 0;
00615 for (i = 0; i < nprocs; i++)
00616 if (recv_size[i])
00617 nprocs_recv++;
00618
00619 recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
00620 sizeof(MPI_Datatype));
00621
00622
00623 j = 0;
00624 for (i = 0; i < nprocs; i++) {
00625 if (recv_size[i]) {
00626 MPI_Type_hindexed(count[i],
00627 &(others_req[i].lens[start_pos[i]]),
00628 &(others_req[i].mem_ptrs[start_pos[i]]),
00629 MPI_BYTE, recv_types + j);
00630
00631 MPI_Type_commit(recv_types + j);
00632 j++;
00633 }
00634 }
00635
00636
00637
00638
00639
00640
00641 sum = 0;
00642 for (i = 0; i < nprocs; i++)
00643 sum += count[i];
00644 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
00645 srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
00646
00647
00648 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00649 nprocs, nprocs_recv, sum);
00650
00651
00652 *hole = 0;
00653 for (i = 0; i < sum - 1; i++) {
00654 if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
00655 *hole = 1;
00656 break;
00657 }
00658 }
00659
00660
00661
00662
00663
00664
00665 if (*hole == 0) {
00666 sum_recv = 0;
00667 for (i = 0; i < nprocs; i++)
00668 sum_recv += recv_size[i];
00669 if (size > sum_recv)
00670 *hole = 1;
00671 }
00672
00673 if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
00674 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
00675 ADIO_EXPLICIT_OFFSET, off, &status, &err);
00676
00677 if (err != MPI_SUCCESS) {
00678 *error_code = MPIO_Err_create_code(err,
00679 MPIR_ERR_RECOVERABLE,
00680 myname, __LINE__,
00681 MPI_ERR_IO,
00682 "**ioRMWrdwr", 0);
00683 ADIOI_Free(recv_types);
00684 ADIOI_Free(srt_off);
00685 ADIOI_Free(srt_len);
00686 return;
00687 }
00688
00689 }
00690 ADIOI_Free(srt_off);
00691 ADIOI_Free(srt_len);
00692
00693 nprocs_send = 0;
00694 for (i = 0; i < nprocs; i++)
00695 if (send_size[i])
00696 nprocs_send++;
00697
00698 if (fd->atomicity) {
00699
00700 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
00701 sizeof(MPI_Request));
00702 send_req = requests;
00703 } else {
00704 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
00705 sizeof(MPI_Request));
00706
00707
00708
00709 j = 0;
00710 for (i = 0; i < nprocs; i++) {
00711 if (recv_size[i]) {
00712 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
00713 myrank + i + 100 * iter, fd->comm, requests + j);
00714 j++;
00715 }
00716 }
00717 send_req = requests + nprocs_recv;
00718 }
00719
00720
00721
00722
00723
00724 if (buftype_is_contig) {
00725 j = 0;
00726 for (i = 0; i < nprocs; i++)
00727 if (send_size[i]) {
00728 ADIOI_Assert(buf_idx[i] != -1);
00729 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
00730 MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
00731 send_req + j);
00732 j++;
00733 }
00734 } else
00735 if (nprocs_send) {
00736
00737 send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
00738 for (i = 0; i < nprocs; i++)
00739 if (send_size[i])
00740 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00741
00742 ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
00743 len_list, send_size, send_req,
00744 sent_to_proc, nprocs, myrank,
00745 contig_access_count, striping_info,
00746 send_buf_idx, curr_to_proc, done_to_proc,
00747 iter, buftype_extent);
00748
00749 }
00750
00751
00752 if (fd->atomicity) {
00753 j = 0;
00754 for (i = 0; i < nprocs; i++) {
00755 MPI_Status wkl_status;
00756 if (recv_size[i]) {
00757 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
00758 myrank + i + 100 * iter, fd->comm, &wkl_status);
00759 j++;
00760 }
00761 }
00762 }
00763
00764 for (i = 0; i < nprocs_recv; i++)
00765 MPI_Type_free(recv_types + i);
00766 ADIOI_Free(recv_types);
00767
00768
00769
00770 if (fd->atomicity) {
00771 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
00772 sizeof(MPI_Status));
00773 } else {
00774 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
00775 sizeof(MPI_Status));
00776 }
00777
00778 #ifdef NEEDS_MPI_TEST
00779 i = 0;
00780 if (fd->atomicity) {
00781
00782 while (!i)
00783 MPI_Testall(nprocs_send, send_req, &i, statuses);
00784 } else {
00785 while (!i)
00786 MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
00787 }
00788 #else
00789
00790 if (fd->atomicity)
00791 MPI_Waitall(nprocs_send, send_req, statuses);
00792 else
00793 MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
00794 #endif
00795 ADIOI_Free(statuses);
00796 ADIOI_Free(requests);
00797 if (!buftype_is_contig && nprocs_send) {
00798 for (i = 0; i < nprocs; i++)
00799 if (send_size[i])
00800 ADIOI_Free(send_buf[i]);
00801 ADIOI_Free(send_buf);
00802 }
00803 }
00804
00805 #define ADIOI_BUF_INCR \
00806 { \
00807 while (buf_incr) { \
00808 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00809 user_buf_idx += size_in_buf; \
00810 flat_buf_sz -= size_in_buf; \
00811 if (!flat_buf_sz) { \
00812 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00813 else { \
00814 flat_buf_idx = 0; \
00815 n_buftypes++; \
00816 } \
00817 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00818 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00819 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00820 } \
00821 buf_incr -= size_in_buf; \
00822 } \
00823 }
00824
00825
00826 #define ADIOI_BUF_COPY \
00827 { \
00828 while (size) { \
00829 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00830 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00831 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00832 memcpy(&(send_buf[p][send_buf_idx[p]]), \
00833 ((char *) buf) + user_buf_idx, size_in_buf); \
00834 send_buf_idx[p] += size_in_buf; \
00835 user_buf_idx += size_in_buf; \
00836 flat_buf_sz -= size_in_buf; \
00837 if (!flat_buf_sz) { \
00838 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00839 else { \
00840 flat_buf_idx = 0; \
00841 n_buftypes++; \
00842 } \
00843 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00844 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00845 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00846 } \
00847 size -= size_in_buf; \
00848 buf_incr -= size_in_buf; \
00849 } \
00850 ADIOI_BUF_INCR \
00851 }
00852
00853 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
00854 ADIOI_Flatlist_node *flat_buf,
00855 char **send_buf,
00856 ADIO_Offset *offset_list,
00857 ADIO_Offset *len_list, int *send_size,
00858 MPI_Request *requests,
00859 int *sent_to_proc, int nprocs,
00860 int myrank,
00861 int contig_access_count,
00862 int *striping_info,
00863 int *send_buf_idx,
00864 int *curr_to_proc,
00865 int *done_to_proc, int iter,
00866 MPI_Aint buftype_extent)
00867 {
00868
00869 int i, p, flat_buf_idx, size;
00870 int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
00871 ADIO_Offset off, len, rem_len, user_buf_idx;
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881 for (i = 0; i < nprocs; i++) {
00882 send_buf_idx[i] = curr_to_proc[i] = 0;
00883 done_to_proc[i] = sent_to_proc[i];
00884 }
00885 jj = 0;
00886
00887 user_buf_idx = flat_buf->indices[0];
00888 flat_buf_idx = 0;
00889 n_buftypes = 0;
00890 flat_buf_sz = flat_buf->blocklens[0];
00891
00892
00893
00894
00895 for (i = 0; i < contig_access_count; i++) {
00896 off = offset_list[i];
00897 rem_len = (ADIO_Offset) len_list[i];
00898
00899
00900 while (rem_len != 0) {
00901 len = rem_len;
00902
00903
00904
00905
00906 p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
00907
00908 if (send_buf_idx[p] < send_size[p]) {
00909 if (curr_to_proc[p] + len > done_to_proc[p]) {
00910 if (done_to_proc[p] > curr_to_proc[p]) {
00911 size = (int) ADIOI_MIN(curr_to_proc[p] + len -
00912 done_to_proc[p],
00913 send_size[p] -
00914 send_buf_idx[p]);
00915 buf_incr = done_to_proc[p] - curr_to_proc[p];
00916 ADIOI_BUF_INCR
00917 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
00918 buf_incr = (int) (curr_to_proc[p] + len -
00919 done_to_proc[p]);
00920 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
00921 curr_to_proc[p] = done_to_proc[p] + size;
00922 ADIOI_BUF_COPY
00923 } else {
00924 size = (int) ADIOI_MIN(len, send_size[p] -
00925 send_buf_idx[p]);
00926 buf_incr = (int) len;
00927 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
00928 curr_to_proc[p] += size;
00929 ADIOI_BUF_COPY
00930 }
00931 if (send_buf_idx[p] == send_size[p]) {
00932 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
00933 myrank + p + 100 * iter, fd->comm,
00934 requests + jj);
00935 jj++;
00936 }
00937 } else {
00938 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
00939 curr_to_proc[p] += (int) len;
00940 buf_incr = (int) len;
00941 ADIOI_BUF_INCR
00942 }
00943 } else {
00944 buf_incr = (int) len;
00945 ADIOI_BUF_INCR
00946 }
00947 off += len;
00948 rem_len -= len;
00949 }
00950 }
00951 for (i = 0; i < nprocs; i++)
00952 if (send_size[i])
00953 sent_to_proc[i] = curr_to_proc[i];
00954 }