00001
00002
00003
00004
00005
00006
00007 #include "assert.h"
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010 #ifdef AGGREGATION_PROFILE
00011 #include "mpe.h"
00012 #endif
00013
00014
00015
00016
00017
00018
00019 #define USE_PRE_REQ
00020
00021 static void Exch_data_amounts (ADIO_File fd, int nprocs,
00022 ADIO_Offset *client_comm_sz_arr,
00023 ADIO_Offset *agg_comm_sz_arr,
00024 int *client_alltoallw_counts,
00025 int *agg_alltoallw_counts,
00026 int *aggregators_done);
00027 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
00028 void *cb_buf,
00029 MPI_Datatype *client_comm_dtype_arr,
00030 ADIO_Offset *client_comm_sz_arr,
00031 MPI_Request **requests,
00032 int *aggregators_client_count_p);
00033
00034 static void post_client_comm (ADIO_File fd, int rw_type,
00035 int agg_rank, void *buf,
00036 MPI_Datatype agg_comm_dtype,
00037 int agg_alltoallw_count,
00038 MPI_Request *request);
00039
00040
00041
00042
00043
00044 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
00045 MPI_Datatype datatype, int file_ptr_type,
00046 ADIO_Offset offset, ADIO_Status *status,
00047 int *error_code)
00048 {
00049 ADIO_Offset min_st_offset=0, max_end_offset=0;
00050 ADIO_Offset st_end_offset[2];
00051 ADIO_Offset *all_st_end_offsets = NULL;
00052 int filetype_is_contig, buftype_is_contig, is_contig;
00053 ADIO_Offset orig_fp, off;
00054 int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
00055 int cb_enable;
00056 ADIO_Offset bufsize;
00057 MPI_Aint extent, bufextent;
00058 int size;
00059 int agg_rank;
00060
00061 ADIO_Offset agg_disp;
00062 MPI_Datatype agg_dtype;
00063
00064 int aggregators_done = 0;
00065 ADIO_Offset buffered_io_size = 0;
00066
00067 int *alltoallw_disps;
00068
00069 int *alltoallw_counts;
00070 int *client_alltoallw_counts;
00071 int *agg_alltoallw_counts;
00072
00073 char *cb_buf = NULL;
00074
00075 MPI_Datatype *client_comm_dtype_arr;
00076 MPI_Datatype *agg_comm_dtype_arr;
00077 ADIO_Offset *client_comm_sz_arr;
00078 ADIO_Offset *agg_comm_sz_arr;
00079
00080
00081 view_state *client_file_view_state_arr = NULL;
00082 view_state *agg_file_view_state_arr = NULL;
00083
00084 view_state *my_mem_view_state_arr = NULL;
00085
00086 MPI_Status *agg_comm_statuses = NULL;
00087 MPI_Request *agg_comm_requests = NULL;
00088 MPI_Status *client_comm_statuses = NULL;
00089 MPI_Request *client_comm_requests = NULL;
00090 int aggs_client_count = 0;
00091 int clients_agg_count = 0;
00092
00093 MPI_Comm_size (fd->comm, &nprocs);
00094 MPI_Comm_rank (fd->comm, &myrank);
00095 #ifdef DEBUG
00096 fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
00097 #endif
00098 #ifdef AGGREGATION_PROFILE
00099 if (rdwr == ADIOI_READ)
00100 MPE_Log_event (5010, 0, NULL);
00101 else
00102 MPE_Log_event (5012, 0, NULL);
00103 #endif
00104
00105
00106
00107
00108
00109
00110
00111
00112 nprocs_for_coll = fd->hints->cb_nodes;
00113 orig_fp = fd->fp_ind;
00114
00115 if (rdwr == ADIOI_READ)
00116 cb_enable = fd->hints->cb_read;
00117 else
00118 cb_enable = fd->hints->cb_write;
00119
00120
00121 if (cb_enable != ADIOI_HINT_DISABLE) {
00122
00123 ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
00124 &st_end_offset[0], &st_end_offset[1]);
00125
00126
00127 all_st_end_offsets = (ADIO_Offset *)
00128 ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
00129 MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
00130 ADIO_OFFSET, fd->comm);
00131
00132 min_st_offset = all_st_end_offsets[0];
00133 max_end_offset = all_st_end_offsets[1];
00134
00135 for (i=1; i<nprocs; i++) {
00136
00137 if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
00138 (all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
00139 interleave_count++;
00140
00141
00142
00143 min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
00144 min_st_offset);
00145 max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
00146 max_end_offset);
00147 }
00148 }
00149
00150 ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
00151 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
00152
00153 if ((cb_enable == ADIOI_HINT_DISABLE
00154 || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
00155 && (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
00156 if (cb_enable != ADIOI_HINT_DISABLE) {
00157 ADIOI_Free (all_st_end_offsets);
00158 }
00159
00160 if (buftype_is_contig && filetype_is_contig) {
00161 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00162 off = fd->disp + (fd->etype_size) * offset;
00163 if (rdwr == ADIOI_READ)
00164 ADIO_ReadContig(fd, buf, count, datatype,
00165 ADIO_EXPLICIT_OFFSET, off, status,
00166 error_code);
00167 else
00168 ADIO_WriteContig(fd, buf, count, datatype,
00169 ADIO_EXPLICIT_OFFSET, off, status,
00170 error_code);
00171 }
00172 else {
00173 if (rdwr == ADIOI_READ)
00174 ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00175 0, status, error_code);
00176 else
00177 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00178 0, status, error_code);
00179 }
00180 }
00181 else {
00182 if (rdwr == ADIOI_READ)
00183 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
00184 offset, status, error_code);
00185 else
00186 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00187 offset, status, error_code);
00188 }
00189 return;
00190 }
00191
00192 MPI_Type_extent(datatype, &extent);
00193 bufextent = extent * count;
00194 MPI_Type_size(datatype, &size);
00195 bufsize = size * count;
00196
00197
00198 if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
00199 (fd->file_realm_types == NULL))
00200 ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
00201
00202 my_mem_view_state_arr = (view_state *)
00203 ADIOI_Calloc (1, nprocs * sizeof(view_state));
00204 agg_file_view_state_arr = (view_state *)
00205 ADIOI_Calloc (1, nprocs * sizeof(view_state));
00206 client_comm_sz_arr = (ADIO_Offset *)
00207 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
00208
00209 if (fd->is_agg) {
00210 client_file_view_state_arr = (view_state *)
00211 ADIOI_Calloc (1, nprocs * sizeof(view_state));
00212 }
00213 else {
00214 client_file_view_state_arr = NULL;
00215 }
00216
00217
00218
00219 client_comm_dtype_arr = (MPI_Datatype *)
00220 ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
00221 if (!fd->is_agg)
00222 for (i = 0; i < nprocs; i++)
00223 client_comm_dtype_arr[i] = MPI_BYTE;
00224
00225 ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
00226 datatype, offset, my_mem_view_state_arr,
00227 agg_file_view_state_arr,
00228 client_file_view_state_arr);
00229
00230 agg_comm_sz_arr = (ADIO_Offset *)
00231 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
00232 agg_comm_dtype_arr = (MPI_Datatype *)
00233 ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
00234 if (fd->is_agg) {
00235 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
00236 client_file_view_state_arr,
00237 client_comm_dtype_arr,
00238 client_comm_sz_arr,
00239 &agg_disp,
00240 &agg_dtype);
00241 buffered_io_size = 0;
00242 for (i=0; i <nprocs; i++) {
00243 if (client_comm_sz_arr[i] > 0)
00244 buffered_io_size += client_comm_sz_arr[i];
00245 }
00246 }
00247 #ifdef USE_PRE_REQ
00248 else
00249 {
00250
00251
00252
00253 for (i = 0; i < fd->hints->cb_nodes; i++)
00254 {
00255 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
00256 #ifdef AGGREGATION_PROFILE
00257 MPE_Log_event (5040, 0, NULL);
00258 #endif
00259 ADIOI_Build_client_pre_req(
00260 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
00261 &(my_mem_view_state_arr[agg_rank]),
00262 &(agg_file_view_state_arr[agg_rank]),
00263 2*1024*1024,
00264 64*1024);
00265 #ifdef AGGREGATION_PROFILE
00266 MPE_Log_event (5041, 0, NULL);
00267 #endif
00268 }
00269 }
00270 #endif
00271
00272
00273 if (fd->is_agg)
00274 cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
00275 alltoallw_disps = (int *) ADIOI_Calloc (nprocs, sizeof(int));
00276 alltoallw_counts = client_alltoallw_counts = (int *)
00277 ADIOI_Calloc (2*nprocs, sizeof(int));
00278 agg_alltoallw_counts = &alltoallw_counts[nprocs];
00279
00280 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
00281
00282 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
00283 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
00284 client_comm_dtype_arr,
00285 client_comm_sz_arr,
00286 &agg_comm_requests,
00287 &aggs_client_count);
00288 }
00289
00290 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
00291 client_alltoallw_counts, agg_alltoallw_counts,
00292 &aggregators_done);
00293
00294 #ifdef DEBUG
00295 fprintf (stderr, "client_alltoallw_counts[ ");
00296 for (i=0; i<nprocs; i++) {
00297 fprintf (stderr, "%d ", client_alltoallw_counts[i]);
00298 }
00299 fprintf (stderr, "]\n");
00300 fprintf (stderr, "agg_alltoallw_counts[ ");
00301 for (i=0; i<nprocs; i++) {
00302 fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
00303 }
00304 fprintf (stderr, "]\n");
00305 #endif
00306
00307
00308 while (aggregators_done != nprocs_for_coll) {
00309 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
00310
00311
00312
00313
00314 client_comm_requests = (MPI_Request *)
00315 ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
00316
00317 for (i = 0; i < fd->hints->cb_nodes; i++)
00318 {
00319 clients_agg_count = 0;
00320 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
00321 if (agg_comm_sz_arr[agg_rank] > 0) {
00322 ADIOI_Build_client_req(fd, agg_rank,
00323 (i+myrank)%fd->hints->cb_nodes,
00324 &(my_mem_view_state_arr[agg_rank]),
00325 &(agg_file_view_state_arr[agg_rank]),
00326 agg_comm_sz_arr[agg_rank],
00327 &(agg_comm_dtype_arr[agg_rank]));
00328
00329 #ifdef AGGREGATION_PROFILE
00330 if (i == 0)
00331 MPE_Log_event (5038, 0, NULL);
00332 #endif
00333 post_client_comm (fd, rdwr, agg_rank, buf,
00334 agg_comm_dtype_arr[agg_rank],
00335 agg_alltoallw_counts[agg_rank],
00336 &client_comm_requests[clients_agg_count]);
00337 clients_agg_count++;
00338 }
00339 }
00340 #ifdef AGGREGATION_PROFILE
00341 if (!clients_agg_count)
00342 MPE_Log_event(5039, 0, NULL);
00343 #endif
00344
00345 if (rdwr == ADIOI_READ) {
00346 if (fd->is_agg && buffered_io_size) {
00347 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
00348 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
00349 ADIOI_READ, status, error_code);
00350 if (*error_code != MPI_SUCCESS) return;
00351 MPI_Type_free (&agg_dtype);
00352 }
00353
00354 #ifdef DEBUG
00355 fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
00356 for (i=0; i < nprocs; i++) {
00357 MPI_Type_size (agg_comm_dtype_arr[i], &size);
00358 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
00359 size, agg_alltoallw_counts[i]);
00360 if (i != nprocs - 1)
00361 fprintf(stderr, ",");
00362 }
00363 fprintf (stderr, "]\n");
00364 if (fd->is_agg) {
00365 fprintf (stderr, "sending to [client](disp,size,cnt)=");
00366 for (i=0; i < nprocs; i++) {
00367 if (fd->is_agg)
00368 MPI_Type_size (client_comm_dtype_arr[i], &size);
00369 else
00370 size = -1;
00371
00372 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
00373 size, client_alltoallw_counts[i]);
00374 if (i != nprocs - 1)
00375 fprintf(stderr, ",");
00376 }
00377 fprintf (stderr,"\n");
00378 }
00379 fflush (NULL);
00380 #endif
00381
00382 if (fd->is_agg)
00383 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
00384 client_comm_dtype_arr,
00385 client_comm_sz_arr,
00386 &agg_comm_requests,
00387 &aggs_client_count);
00388
00389 if (fd->is_agg && aggs_client_count) {
00390 agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
00391 sizeof(MPI_Status));
00392 MPI_Waitall(aggs_client_count, agg_comm_requests,
00393 agg_comm_statuses);
00394 #ifdef AGGREGATION_PROFILE
00395 MPE_Log_event (5033, 0, NULL);
00396 #endif
00397 ADIOI_Free (agg_comm_requests);
00398 ADIOI_Free (agg_comm_statuses);
00399 }
00400
00401 if (clients_agg_count) {
00402 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
00403 sizeof(MPI_Status));
00404 MPI_Waitall(clients_agg_count, client_comm_requests,
00405 client_comm_statuses);
00406 #ifdef AGGREGATION_PROFILE
00407 MPE_Log_event (5039, 0, NULL);
00408 #endif
00409 ADIOI_Free (client_comm_requests);
00410 ADIOI_Free (client_comm_statuses);
00411 }
00412
00413 #ifdef DEBUG2
00414 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
00415 if (fd->is_agg && buffered_io_size) {
00416 fprintf (stderr, "buf = [");
00417 for (i=0; i<bufextent; i++)
00418 fprintf (stderr, "%c", ((char *) buf)[i]);
00419 fprintf (stderr, "]\n");
00420 fprintf (stderr, "cb_buf = [");
00421 for (i=0; i<buffered_io_size; i++)
00422 fprintf (stderr, "%c", cb_buf[i]);
00423 fprintf (stderr, "]\n");
00424 fflush (NULL);
00425 }
00426 #endif
00427 }
00428 else {
00429 #ifdef DEBUG
00430 fprintf (stderr, "sending to [agg](disp,size,cnt)=");
00431 for (i=0; i < nprocs; i++) {
00432 MPI_Type_size (agg_comm_dtype_arr[i], &size);
00433 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
00434 size, agg_alltoallw_counts[i]);
00435 if (i != nprocs - 1)
00436 fprintf(stderr, ",");
00437 }
00438 fprintf (stderr, "]\n");
00439 fprintf (stderr, "expecting from [client](disp,size,cnt)=");
00440 for (i=0; i < nprocs; i++) {
00441 if (fd->is_agg)
00442 MPI_Type_size (client_comm_dtype_arr[i], &size);
00443 else
00444 size = -1;
00445
00446 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
00447 size, client_alltoallw_counts[i]);
00448 if (i != nprocs - 1)
00449 fprintf(stderr, ",");
00450 }
00451 fprintf (stderr,"\n");
00452 fflush (NULL);
00453 #endif
00454 #ifdef DEBUG
00455 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
00456 #endif
00457
00458 if (clients_agg_count) {
00459 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
00460 sizeof(MPI_Status));
00461 MPI_Waitall(clients_agg_count, client_comm_requests,
00462 client_comm_statuses);
00463 #ifdef AGGREGATION_PROFILE
00464 MPE_Log_event (5039, 0, NULL);
00465 #endif
00466 ADIOI_Free(client_comm_requests);
00467 ADIOI_Free(client_comm_statuses);
00468 }
00469 #ifdef DEBUG2
00470 if (bufextent) {
00471 fprintf (stderr, "buf = [");
00472 for (i=0; i<bufextent; i++)
00473 fprintf (stderr, "%c", ((char *) buf)[i]);
00474 fprintf (stderr, "]\n");
00475 }
00476 #endif
00477
00478 if (fd->is_agg && buffered_io_size) {
00479 assert (aggs_client_count != 0);
00480
00481 agg_comm_statuses = (MPI_Status *)
00482 ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
00483
00484 MPI_Waitall (aggs_client_count, agg_comm_requests,
00485 agg_comm_statuses);
00486 #ifdef AGGREGATION_PROFILE
00487 MPE_Log_event (5033, 0, NULL);
00488 #endif
00489 ADIOI_Free (agg_comm_requests);
00490 ADIOI_Free (agg_comm_statuses);
00491 #ifdef DEBUG2
00492 fprintf (stderr, "cb_buf = [");
00493 for (i=0; i<buffered_io_size; i++)
00494 fprintf (stderr, "%c", cb_buf[i]);
00495 fprintf (stderr, "]\n");
00496 fflush (NULL);
00497 #endif
00498 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
00499 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
00500 ADIOI_WRITE, status, error_code);
00501 if (*error_code != MPI_SUCCESS) return;
00502 MPI_Type_free (&agg_dtype);
00503 }
00504
00505 }
00506 } else {
00507
00508 ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
00509 agg_file_view_state_arr,
00510 agg_comm_sz_arr, agg_comm_dtype_arr);
00511
00512 if (rdwr == ADIOI_READ) {
00513 if (fd->is_agg && buffered_io_size) {
00514 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
00515 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
00516 ADIOI_READ, status, error_code);
00517 if (*error_code != MPI_SUCCESS) return;
00518 MPI_Type_free (&agg_dtype);
00519 }
00520
00521 #ifdef AGGREGATION_PROFILE
00522 MPE_Log_event (5032, 0, NULL);
00523 #endif
00524 MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
00525 client_comm_dtype_arr,
00526 buf, agg_alltoallw_counts , alltoallw_disps,
00527 agg_comm_dtype_arr,
00528 fd->comm);
00529 #ifdef AGGREGATION_PROFILE
00530 MPE_Log_event (5033, 0, NULL);
00531 #endif
00532 }
00533 else {
00534 #ifdef AGGREGATION_PROFILE
00535 MPE_Log_event (5032, 0, NULL);
00536 #endif
00537 MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
00538 agg_comm_dtype_arr,
00539 cb_buf, client_alltoallw_counts, alltoallw_disps,
00540 client_comm_dtype_arr,
00541 fd->comm);
00542 #ifdef AGGREGATION_PROFILE
00543 MPE_Log_event (5033, 0, NULL);
00544 #endif
00545 if (fd->is_agg && buffered_io_size) {
00546 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
00547 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
00548 ADIOI_WRITE, status, error_code);
00549 if (*error_code != MPI_SUCCESS) return;
00550 MPI_Type_free (&agg_dtype);
00551 }
00552 }
00553 }
00554
00555
00556 if (fd->is_agg) {
00557 if (buffered_io_size > 0) {
00558 for (i=0; i<nprocs; i++) {
00559 if (client_comm_sz_arr[i] > 0)
00560 MPI_Type_free (&client_comm_dtype_arr[i]);
00561 }
00562 }
00563 }
00564 for (i=0; i<nprocs; i++) {
00565 if (agg_comm_sz_arr[i] > 0)
00566 MPI_Type_free (&agg_comm_dtype_arr[i]);
00567 }
00568
00569
00570 if (fd->is_agg) {
00571 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
00572 client_file_view_state_arr,
00573 client_comm_dtype_arr,
00574 client_comm_sz_arr,
00575 &agg_disp,
00576 &agg_dtype);
00577 buffered_io_size = 0;
00578 for (i=0; i <nprocs; i++) {
00579 if (client_comm_sz_arr[i] > 0)
00580 buffered_io_size += client_comm_sz_arr[i];
00581 }
00582 }
00583 #ifdef USE_PRE_REQ
00584 else {
00585
00586
00587 for (i = 0; i < fd->hints->cb_nodes; i++)
00588 {
00589 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
00590 #ifdef AGGREGATION_PROFILE
00591 MPE_Log_event (5040, 0, NULL);
00592 #endif
00593 ADIOI_Build_client_pre_req(
00594 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
00595 &(my_mem_view_state_arr[agg_rank]),
00596 &(agg_file_view_state_arr[agg_rank]),
00597 2*1024*1024,
00598 64*1024);
00599 #ifdef AGGREGATION_PROFILE
00600 MPE_Log_event (5041, 0, NULL);
00601 #endif
00602 }
00603 }
00604 #endif
00605
00606
00607
00608
00609 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
00610 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
00611 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
00612 client_comm_dtype_arr,
00613 client_comm_sz_arr,
00614 &agg_comm_requests,
00615 &aggs_client_count);
00616 }
00617
00618
00619 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
00620 client_alltoallw_counts, agg_alltoallw_counts,
00621 &aggregators_done);
00622
00623 }
00624
00625
00626
00627 if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
00628
00629 if (1) {
00630 ADIOI_Delete_flattened (fd->file_realm_types[0]);
00631 MPI_Type_free (&fd->file_realm_types[0]);
00632 }
00633 else {
00634 for (i=0; i<fd->hints->cb_nodes; i++) {
00635 ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
00636 if (!is_contig)
00637 ADIOI_Delete_flattened(fd->file_realm_types[i]);
00638 MPI_Type_free (&fd->file_realm_types[i]);
00639 }
00640 }
00641 ADIOI_Free (fd->file_realm_types);
00642 ADIOI_Free (fd->file_realm_st_offs);
00643 }
00644
00645
00646
00647
00648 ADIOI_Delete_flattened(datatype);
00649 ADIOI_Delete_flattened(fd->filetype);
00650
00651 if (fd->is_agg) {
00652 if (buffered_io_size > 0)
00653 MPI_Type_free (&agg_dtype);
00654 for (i=0; i<nprocs; i++) {
00655 MPI_Type_free (&client_comm_dtype_arr[i]);
00656 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
00657 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
00658 ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
00659 }
00660 ADIOI_Free (client_file_view_state_arr);
00661 ADIOI_Free (cb_buf);
00662 }
00663 for (i = 0; i<nprocs; i++)
00664 if (agg_comm_sz_arr[i] > 0)
00665 MPI_Type_free (&agg_comm_dtype_arr[i]);
00666
00667 ADIOI_Free (client_comm_sz_arr);
00668 ADIOI_Free (client_comm_dtype_arr);
00669 ADIOI_Free (my_mem_view_state_arr);
00670 ADIOI_Free (agg_file_view_state_arr);
00671 ADIOI_Free (agg_comm_sz_arr);
00672 ADIOI_Free (agg_comm_dtype_arr);
00673 ADIOI_Free (alltoallw_disps);
00674 ADIOI_Free (alltoallw_counts);
00675 ADIOI_Free (all_st_end_offsets);
00676
00677 #ifdef HAVE_STATUS_SET_BYTES
00678 MPIR_Status_set_bytes(status, datatype, bufsize);
00679
00680
00681
00682 #endif
00683 fd->fp_sys_posn = -1;
00684 #ifdef AGGREGATION_PROFILE
00685 if (rdwr == ADIOI_READ)
00686 MPE_Log_event (5011, 0, NULL);
00687 else
00688 MPE_Log_event (5013, 0, NULL);
00689 #endif
00690 }
00691
00692
00693
00694
00695 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
00696 int file_ptr_type, ADIO_Offset offset,
00697 ADIO_Offset *st_offset, ADIO_Offset *end_offset)
00698 {
00699 int filetype_size, buftype_size, etype_size;
00700 int i, sum;
00701 MPI_Aint filetype_extent;
00702 ADIO_Offset total_io;
00703 int filetype_is_contig;
00704 int remainder;
00705 ADIOI_Flatlist_node *flat_file;
00706
00707 ADIO_Offset st_byte_off, end_byte_off;
00708
00709 #ifdef AGGREGATION_PROFILE
00710 MPE_Log_event (5000, 0, NULL);
00711 #endif
00712
00713 if (!count) {
00714
00715
00716 memset (st_offset, 8, sizeof(ADIO_Offset));
00717 *st_offset = *st_offset / 2;
00718 *end_offset = -1;
00719 return;
00720 }
00721
00722 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
00723
00724 MPI_Type_size (fd->filetype, &filetype_size);
00725 MPI_Type_extent (fd->filetype, &filetype_extent);
00726 MPI_Type_size (fd->etype, &etype_size);
00727 MPI_Type_size (buftype, &buftype_size);
00728
00729 total_io = buftype_size * count;
00730
00731 if (filetype_is_contig) {
00732 if (file_ptr_type == ADIO_INDIVIDUAL)
00733 st_byte_off = fd->fp_ind;
00734 else
00735 st_byte_off = fd->disp + etype_size * offset;
00736
00737 end_byte_off = st_byte_off + total_io - 1;
00738 }
00739 else {
00740 flat_file = CtvAccess(ADIOI_Flatlist);
00741 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
00742
00743
00744
00745
00746
00747 if (file_ptr_type == ADIO_INDIVIDUAL) {
00748 st_byte_off = fd->fp_ind;
00749
00750
00751
00752 end_byte_off = (ADIO_Offset)
00753 ((fd->fp_ind - fd->disp - flat_file->indices[0]) /
00754 filetype_extent) * filetype_extent + fd->disp +
00755 flat_file->indices[0];
00756
00757 remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
00758 filetype_extent;
00759 if (remainder) {
00760
00761 sum = 0;
00762 for (i=0; i<flat_file->count; i++) {
00763 sum += flat_file->blocklens[i];
00764 if ((flat_file->indices[i] - flat_file->indices[0] +
00765 flat_file->blocklens[i]) >= remainder) {
00766 sum -= (flat_file->blocklens[i] - (sum - remainder));
00767 break;
00768 }
00769 }
00770 total_io += sum;
00771 }
00772
00773 end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
00774
00775 remainder = total_io % filetype_size;
00776 if (!remainder) {
00777 for (i=flat_file->count - 1; i>=0; i--) {
00778 if (flat_file->blocklens[i]) break;
00779 }
00780 assert (i > -1);
00781 end_byte_off += flat_file->indices[i] +
00782 flat_file->blocklens[i] - 1;
00783 end_byte_off -= flat_file->indices[0];
00784 }
00785 else {
00786 sum = 0;
00787 for (i=0; i<flat_file->count; i++) {
00788 sum += flat_file->blocklens[i];
00789 if (sum >= remainder) {
00790 end_byte_off += flat_file->indices[i] +
00791 flat_file->blocklens[i] - sum + remainder - 1;
00792 break;
00793 }
00794 }
00795 end_byte_off -= flat_file->indices[0];
00796 }
00797 }
00798 else {
00799
00800
00801 st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
00802 filetype_extent;
00803
00804 remainder = (etype_size * offset) % filetype_size;
00805
00806 sum = 0;
00807 for (i=0; i<flat_file->count; i++) {
00808 sum += flat_file->blocklens[i];
00809 if (sum >= remainder) {
00810 if (sum == remainder)
00811 st_byte_off += flat_file->indices[i+1];
00812 else
00813 st_byte_off += flat_file->indices[i] +
00814 flat_file->blocklens[i] - sum + remainder;
00815 break;
00816 }
00817 }
00818
00819
00820
00821 end_byte_off = fd->disp + (offset * etype_size + total_io) /
00822 filetype_size * filetype_extent;
00823
00824 remainder = (offset * etype_size + total_io) % filetype_size;
00825
00826 if (!remainder) {
00827
00828 for (i=flat_file->count-1; i>=0; i--) {
00829 if (flat_file->blocklens[i]) break;
00830 }
00831 assert (i >= 0);
00832
00833
00834
00835
00836
00837
00838 end_byte_off -= filetype_extent - flat_file->indices[i] -
00839 flat_file->blocklens[i] + 1;
00840 }
00841 else {
00842 sum = 0;
00843 for (i=0; i<flat_file->count; i++) {
00844 sum += flat_file->blocklens[i];
00845 if (sum >= remainder) {
00846 end_byte_off += flat_file->indices[i] +
00847 flat_file->blocklens[i] - sum + remainder - 1;
00848 break;
00849 }
00850 }
00851 }
00852 }
00853 }
00854
00855 *st_offset = st_byte_off;
00856 *end_offset = end_byte_off;
00857 #ifdef DEBUG
00858 printf ("st_offset = %lld\nend_offset = %lld\n",
00859 st_byte_off, end_byte_off);
00860 #endif
00861 #ifdef AGGREGATION_PROFILE
00862 MPE_Log_event (5001, 0, NULL);
00863 #endif
00864 }
00865
00866
00867
00868
00869
00870
00871 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
00872 MPI_Datatype datatype, int file_ptr_type,
00873 ADIO_Offset offset, MPI_Datatype custom_ftype,
00874 int rdwr, ADIO_Status *status, int *error_code)
00875 {
00876 MPI_Datatype user_filetype;
00877 MPI_Datatype user_etype;
00878 ADIO_Offset user_disp;
00879 int user_ind_wr_buffer_size;
00880 int user_ind_rd_buffer_size;
00881 int f_is_contig, m_is_contig;
00882 int user_ds_read, user_ds_write;
00883 MPI_Aint f_extent;
00884 int f_size;
00885 int f_ds_percent;
00886
00887 #ifdef AGGREGATION_PROFILE
00888 if (rdwr == ADIOI_READ)
00889 MPE_Log_event(5006, 0, NULL);
00890 else
00891 MPE_Log_event(5008, 0, NULL);
00892 #endif
00893 MPI_Type_extent(custom_ftype, &f_extent);
00894 MPI_Type_size(custom_ftype, &f_size);
00895 f_ds_percent = 100 * f_size / f_extent;
00896
00897
00898 user_filetype = fd->filetype;
00899 user_etype = fd->etype;
00900 user_disp = fd->disp;
00901 user_ds_read = fd->hints->ds_read;
00902 user_ds_write = fd->hints->ds_write;
00903
00904 user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
00905 user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
00906
00907
00908 fd->filetype = custom_ftype;
00909 fd->etype = MPI_BYTE;
00910
00911 fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
00912 fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
00913
00914 #ifdef DEBUG
00915 printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
00916 fd->hints->cb_ds_threshold);
00917 #endif
00918 if (f_ds_percent >= fd->hints->cb_ds_threshold) {
00919 fd->hints->ds_read = ADIOI_HINT_ENABLE;
00920 fd->hints->ds_write = ADIOI_HINT_ENABLE;
00921 }
00922 else {
00923 fd->hints->ds_read = ADIOI_HINT_DISABLE;
00924 fd->hints->ds_write = ADIOI_HINT_DISABLE;
00925 }
00926
00927
00928
00929
00930
00931
00932 ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
00933 ADIOI_Datatype_iscontig(datatype, &m_is_contig);
00934 if (!f_is_contig)
00935 ADIOI_Flatten_datatype (custom_ftype);
00936
00937
00938
00939 if (f_is_contig && m_is_contig) {
00940 fd->disp = 0;
00941 if (rdwr == ADIOI_READ)
00942 ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
00943 status, error_code);
00944 else
00945 ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
00946 status, error_code);
00947 }
00948 else {
00949 fd->disp = offset;
00950 if (rdwr == ADIOI_READ)
00951 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
00952 status, error_code);
00953 else
00954 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
00955 status, error_code);
00956 }
00957
00958
00959 if (!f_is_contig)
00960 ADIOI_Delete_flattened (custom_ftype);
00961
00962
00963 fd->filetype = user_filetype;
00964 fd->etype = user_etype;
00965 fd->disp = user_disp;
00966 fd->hints->ds_read = user_ds_read;
00967 fd->hints->ds_write = user_ds_write;
00968 fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
00969 fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
00970 #ifdef AGGREGATION_PROFILE
00971 if (rdwr == ADIOI_READ)
00972 MPE_Log_event (5007, 0, NULL);
00973 else
00974 MPE_Log_event (5009, 0, NULL);
00975 #endif
00976 }
00977
00978 static void Exch_data_amounts (ADIO_File fd, int nprocs,
00979 ADIO_Offset *client_comm_sz_arr,
00980 ADIO_Offset *agg_comm_sz_arr,
00981 int *client_alltoallw_counts,
00982 int *agg_alltoallw_counts,
00983 int *aggregators_done)
00984 {
00985 int i;
00986 int recv_idx;
00987 MPI_Request *recv_requests;
00988 MPI_Request *send_requests;
00989 MPI_Status status;
00990 MPI_Status *send_statuses;
00991
00992 if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
00993 MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
00994 agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
00995 fd->comm);
00996
00997 if (fd->is_agg) {
00998 for (i=0; i<nprocs; i++)
00999 if (client_comm_sz_arr[i] > 0)
01000 client_alltoallw_counts[i] = 1;
01001 else
01002 client_alltoallw_counts[i] = 0;
01003 }
01004 *aggregators_done = 0;
01005 for (i=0; i<nprocs; i++) {
01006 if (agg_comm_sz_arr[i] == -1)
01007 *aggregators_done = *aggregators_done + 1;
01008 else if (agg_comm_sz_arr[i] > 0)
01009 agg_alltoallw_counts[i] = 1;
01010 else
01011 agg_alltoallw_counts[i] = 0;
01012 }
01013 } else {
01014
01015
01016
01017 recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
01018
01019 for (i = 0; i < fd->hints->cb_nodes; i++)
01020 MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
01021 sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
01022 AMT_TAG, fd->comm, &recv_requests[i]);
01023
01024
01025
01026
01027 send_requests = NULL;
01028 if (fd->is_agg) {
01029
01030 send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request));
01031
01032
01033 for (i = 0; i < nprocs; i++) {
01034 MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
01035 MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
01036
01037 if (client_comm_sz_arr[i] > 0)
01038 client_alltoallw_counts[i] = 1;
01039 else
01040 client_alltoallw_counts[i] = 0;
01041 }
01042 }
01043
01044 *aggregators_done = 0;
01045 for (i=0; i < fd->hints->cb_nodes; i++) {
01046 MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
01047 if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
01048 *aggregators_done = *aggregators_done + 1;
01049 else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
01050 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
01051 else
01052 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
01053 }
01054
01055 ADIOI_Free (recv_requests);
01056 if (fd->is_agg) {
01057
01058 send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
01059 MPI_Waitall (nprocs, send_requests, send_statuses);
01060 ADIOI_Free (send_requests);
01061 ADIOI_Free (send_statuses);
01062 }
01063 }
01064 }
01065
01066 static void post_aggregator_comm (MPI_Comm comm, int rw_type,
01067 int nproc, void *cb_buf,
01068 MPI_Datatype *client_comm_dtype_arr,
01069 ADIO_Offset *client_comm_sz_arr,
01070 MPI_Request **requests_p,
01071 int *aggs_client_count_p)
01072 {
01073 int aggs_client_count = 0;
01074 MPI_Request *requests;
01075 int i;
01076
01077 #ifdef DEBUG
01078 printf ("posting aggregator communication\n");
01079 #endif
01080
01081 for (i=0; i < nproc; i++)
01082 if (client_comm_sz_arr[i] > 0)
01083 aggs_client_count++;
01084 #ifdef DEBUG
01085 printf ("aggregator needs to talk to %d clients\n",
01086 aggs_client_count);
01087 #endif
01088 *aggs_client_count_p = aggs_client_count;
01089 if (aggs_client_count) {
01090 requests = (MPI_Request *)
01091 ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
01092 aggs_client_count = 0;
01093 #ifdef AGGREGATION_PROFILE
01094 MPE_Log_event (5032, 0, NULL);
01095 #endif
01096 for (i=0; i < nproc; i++) {
01097 if (client_comm_sz_arr[i] > 0) {
01098 if (rw_type == ADIOI_WRITE)
01099 MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
01100 DATA_TAG, comm,
01101 &requests[aggs_client_count]);
01102 else
01103 MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
01104 DATA_TAG, comm,
01105 &requests[aggs_client_count]);
01106
01107 aggs_client_count++;
01108 }
01109 }
01110 *requests_p = requests;
01111 }
01112 }
01113
01114 static void post_client_comm (ADIO_File fd, int rw_type,
01115 int agg_rank, void *buf,
01116 MPI_Datatype agg_comm_dtype,
01117 int agg_alltoallw_count,
01118 MPI_Request *request)
01119 {
01120 if (agg_alltoallw_count) {
01121 if (rw_type == ADIOI_READ)
01122 MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
01123 request);
01124 else
01125 MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
01126 request);
01127 }
01128 }
01129
01130
01131