00001
00002
00003
00004
00005
00006
00007 #include "adio.h"
00008 #include "adio_extern.h"
00009
00010 #ifdef AGGREGATION_PROFILE
00011 #include "mpe.h"
00012 #endif
00013
00014 #undef AGG_DEBUG
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073 int ADIOI_Calc_aggregator(ADIO_File fd,
00074 ADIO_Offset off,
00075 ADIO_Offset min_off,
00076 ADIO_Offset *len,
00077 ADIO_Offset fd_size,
00078 ADIO_Offset *fd_start,
00079 ADIO_Offset *fd_end)
00080 {
00081 int rank_index, rank;
00082 ADIO_Offset avail_bytes;
00083
00084 ADIOI_UNREFERENCED_ARG(fd_start);
00085
00086
00087 rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
00088
00089 if (fd->hints->striping_unit > 0) {
00090
00091
00092
00093
00094 rank_index = 0;
00095 while (off > fd_end[rank_index]) rank_index++;
00096 }
00097
00098
00099
00100
00101 if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
00102 FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
00103 rank_index,fd->hints->cb_nodes,fd_size,off);
00104 MPI_Abort(MPI_COMM_WORLD, 1);
00105 }
00106
00107
00108
00109
00110
00111
00112
00113
00114 avail_bytes = fd_end[rank_index] + 1 - off;
00115 if (avail_bytes < *len) {
00116
00117 *len = avail_bytes;
00118 }
00119
00120
00121
00122 rank = fd->hints->ranklist[rank_index];
00123
00124 return rank;
00125 }
00126
00127 void ADIOI_Calc_file_domains(ADIO_Offset *st_offsets, ADIO_Offset
00128 *end_offsets, int nprocs, int nprocs_for_coll,
00129 ADIO_Offset *min_st_offset_ptr,
00130 ADIO_Offset **fd_start_ptr, ADIO_Offset
00131 **fd_end_ptr, int min_fd_size,
00132 ADIO_Offset *fd_size_ptr,
00133 int striping_unit)
00134 {
00135
00136
00137
00138
00139 ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
00140 int i;
00141
00142 #ifdef AGGREGATION_PROFILE
00143 MPE_Log_event (5004, 0, NULL);
00144 #endif
00145
00146 #ifdef AGG_DEBUG
00147 FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n",
00148 nprocs_for_coll);
00149 #endif
00150
00151
00152
00153 min_st_offset = st_offsets[0];
00154 max_end_offset = end_offsets[0];
00155
00156 for (i=1; i<nprocs; i++) {
00157 min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
00158 max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
00159 }
00160
00161
00162
00163
00164
00165
00166 fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll -
00167 1)/nprocs_for_coll;
00168
00169
00170
00171
00172
00173
00174
00175 if (fd_size < min_fd_size)
00176 fd_size = min_fd_size;
00177
00178 *fd_start_ptr = (ADIO_Offset *)
00179 ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset));
00180 *fd_end_ptr = (ADIO_Offset *)
00181 ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset));
00182
00183 fd_start = *fd_start_ptr;
00184 fd_end = *fd_end_ptr;
00185
00186
00187
00188
00189 if (striping_unit > 0) {
00190 ADIO_Offset end_off;
00191 int rem_front, rem_back;
00192
00193
00194 fd_start[0] = min_st_offset;
00195 end_off = fd_start[0] + fd_size;
00196 rem_front = end_off % striping_unit;
00197 rem_back = striping_unit - rem_front;
00198 if (rem_front < rem_back)
00199 end_off -= rem_front;
00200 else
00201 end_off += rem_back;
00202 fd_end[0] = end_off - 1;
00203
00204
00205 for (i=1; i<nprocs_for_coll; i++) {
00206 fd_start[i] = fd_end[i-1] + 1;
00207 end_off = min_st_offset + fd_size * (i+1);
00208 rem_front = end_off % striping_unit;
00209 rem_back = striping_unit - rem_front;
00210 if (rem_front < rem_back)
00211 end_off -= rem_front;
00212 else
00213 end_off += rem_back;
00214 fd_end[i] = end_off - 1;
00215 }
00216 fd_end[nprocs_for_coll-1] = max_end_offset;
00217 }
00218 else {
00219 fd_start[0] = min_st_offset;
00220 fd_end[0] = min_st_offset + fd_size - 1;
00221
00222 for (i=1; i<nprocs_for_coll; i++) {
00223 fd_start[i] = fd_end[i-1] + 1;
00224 fd_end[i] = fd_start[i] + fd_size - 1;
00225 }
00226 }
00227
00228
00229
00230
00231
00232
00233
00234 for (i=0; i<nprocs_for_coll; i++) {
00235 if (fd_start[i] > max_end_offset)
00236 fd_start[i] = fd_end[i] = -1;
00237 if (fd_end[i] > max_end_offset)
00238 fd_end[i] = max_end_offset;
00239 }
00240
00241 *fd_size_ptr = fd_size;
00242 *min_st_offset_ptr = min_st_offset;
00243
00244 #ifdef AGGREGATION_PROFILE
00245 MPE_Log_event (5005, 0, NULL);
00246 #endif
00247 }
00248
00249
00250
00251
00252
00253
00254 void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list,
00255 int contig_access_count, ADIO_Offset
00256 min_st_offset, ADIO_Offset *fd_start,
00257 ADIO_Offset *fd_end, ADIO_Offset fd_size,
00258 int nprocs,
00259 int *count_my_req_procs_ptr,
00260 int **count_my_req_per_proc_ptr,
00261 ADIOI_Access **my_req_ptr,
00262 int **buf_idx_ptr)
00263
00264
00265 {
00266 int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
00267 int i, l, proc;
00268 ADIO_Offset fd_len, rem_len, curr_idx, off;
00269 ADIOI_Access *my_req;
00270
00271 #ifdef AGGREGATION_PROFILE
00272 MPE_Log_event (5024, 0, NULL);
00273 #endif
00274
00275 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int));
00276 count_my_req_per_proc = *count_my_req_per_proc_ptr;
00277
00278
00279
00280
00281
00282 buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00283
00284
00285
00286
00287
00288
00289 for (i=0; i < nprocs; i++) buf_idx[i] = -1;
00290
00291
00292
00293
00294 for (i=0; i < contig_access_count; i++) {
00295
00296
00297 if (len_list[i] == 0)
00298 continue;
00299 off = offset_list[i];
00300 fd_len = len_list[i];
00301
00302
00303
00304
00305
00306 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
00307 fd_start, fd_end);
00308 count_my_req_per_proc[proc]++;
00309
00310
00311
00312
00313
00314 rem_len = len_list[i] - fd_len;
00315
00316 while (rem_len != 0) {
00317 off += fd_len;
00318 fd_len = rem_len;
00319 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
00320 fd_size, fd_start, fd_end);
00321
00322 count_my_req_per_proc[proc]++;
00323 rem_len -= fd_len;
00324 }
00325 }
00326
00327
00328
00329 *my_req_ptr = (ADIOI_Access *)
00330 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
00331 my_req = *my_req_ptr;
00332
00333 count_my_req_procs = 0;
00334 for (i=0; i < nprocs; i++) {
00335 if (count_my_req_per_proc[i]) {
00336 my_req[i].offsets = (ADIO_Offset *)
00337 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
00338 my_req[i].lens = (int *)
00339 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(int));
00340 count_my_req_procs++;
00341 }
00342 my_req[i].count = 0;
00343
00344 }
00345
00346
00347 curr_idx = 0;
00348 for (i=0; i<contig_access_count; i++) {
00349
00350
00351 if (len_list[i] == 0)
00352 continue;
00353 off = offset_list[i];
00354 fd_len = len_list[i];
00355 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
00356 fd_start, fd_end);
00357
00358
00359 if (buf_idx[proc] == -1)
00360 {
00361 ADIOI_Assert(curr_idx == (int) curr_idx);
00362 buf_idx[proc] = (int) curr_idx;
00363 }
00364
00365 l = my_req[proc].count;
00366 curr_idx += fd_len;
00367
00368 rem_len = len_list[i] - fd_len;
00369
00370
00371
00372
00373
00374
00375 my_req[proc].offsets[l] = off;
00376 ADIOI_Assert(fd_len == (int) fd_len);
00377 my_req[proc].lens[l] = (int) fd_len;
00378 my_req[proc].count++;
00379
00380 while (rem_len != 0) {
00381 off += fd_len;
00382 fd_len = rem_len;
00383 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
00384 fd_size, fd_start, fd_end);
00385
00386 if (buf_idx[proc] == -1)
00387 {
00388 ADIOI_Assert(curr_idx == (int) curr_idx);
00389 buf_idx[proc] = (int) curr_idx;
00390 }
00391
00392 l = my_req[proc].count;
00393 curr_idx += fd_len;
00394 rem_len -= fd_len;
00395
00396 my_req[proc].offsets[l] = off;
00397 ADIOI_Assert(fd_len == (int) fd_len);
00398 my_req[proc].lens[l] = (int) fd_len;
00399 my_req[proc].count++;
00400 }
00401 }
00402
00403 #ifdef AGG_DEBUG
00404 for (i=0; i<nprocs; i++) {
00405 if (count_my_req_per_proc[i] > 0) {
00406 FPRINTF(stdout, "data needed from %d (count = %d):\n", i,
00407 my_req[i].count);
00408 for (l=0; l < my_req[i].count; l++) {
00409 FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", l,
00410 my_req[i].offsets[l], l, my_req[i].lens[l]);
00411 }
00412 FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
00413 }
00414 }
00415 #endif
00416
00417 *count_my_req_procs_ptr = count_my_req_procs;
00418 *buf_idx_ptr = buf_idx;
00419 #ifdef AGGREGATION_PROFILE
00420 MPE_Log_event (5025, 0, NULL);
00421 #endif
00422 }
00423
00424
00425
00426 void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs,
00427 int *count_my_req_per_proc,
00428 ADIOI_Access *my_req,
00429 int nprocs, int myrank,
00430 int *count_others_req_procs_ptr,
00431 ADIOI_Access **others_req_ptr)
00432 {
00433
00434
00435
00436
00437
00438
00439
00440
00441 int *count_others_req_per_proc, count_others_req_procs;
00442 int i, j;
00443 MPI_Request *requests;
00444 MPI_Status *statuses;
00445 ADIOI_Access *others_req;
00446
00447
00448 #ifdef AGGREGATION_PROFILE
00449 MPE_Log_event (5026, 0, NULL);
00450 #endif
00451 count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00452
00453 MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
00454 count_others_req_per_proc, 1, MPI_INT, fd->comm);
00455
00456 *others_req_ptr = (ADIOI_Access *)
00457 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
00458 others_req = *others_req_ptr;
00459
00460 count_others_req_procs = 0;
00461 for (i=0; i<nprocs; i++) {
00462 if (count_others_req_per_proc[i]) {
00463 others_req[i].count = count_others_req_per_proc[i];
00464 others_req[i].offsets = (ADIO_Offset *)
00465 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
00466 others_req[i].lens = (int *)
00467 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(int));
00468 others_req[i].mem_ptrs = (MPI_Aint *)
00469 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
00470 count_others_req_procs++;
00471 }
00472 else others_req[i].count = 0;
00473 }
00474
00475
00476
00477 requests = (MPI_Request *)
00478 ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)*sizeof(MPI_Request));
00479
00480
00481 j = 0;
00482 for (i=0; i<nprocs; i++) {
00483 if (others_req[i].count) {
00484 MPI_Irecv(others_req[i].offsets, others_req[i].count,
00485 ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
00486 j++;
00487 MPI_Irecv(others_req[i].lens, others_req[i].count,
00488 MPI_INT, i, i+myrank+1, fd->comm, &requests[j]);
00489 j++;
00490 }
00491 }
00492
00493 for (i=0; i < nprocs; i++) {
00494 if (my_req[i].count) {
00495 MPI_Isend(my_req[i].offsets, my_req[i].count,
00496 ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
00497 j++;
00498 MPI_Isend(my_req[i].lens, my_req[i].count,
00499 MPI_INT, i, i+myrank+1, fd->comm, &requests[j]);
00500 j++;
00501 }
00502 }
00503
00504 if (j) {
00505 statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
00506 MPI_Waitall(j, requests, statuses);
00507 ADIOI_Free(statuses);
00508 }
00509
00510 ADIOI_Free(requests);
00511 ADIOI_Free(count_others_req_per_proc);
00512
00513 *count_others_req_procs_ptr = count_others_req_procs;
00514 #ifdef AGGREGATION_PROFILE
00515 MPE_Log_event (5027, 0, NULL);
00516 #endif
00517 }