00001
00002
00003
00004
00005
00006
00007
00008 #include "ad_nfs.h"
00009 #include "adio_extern.h"
00010
00011 void ADIOI_NFS_ReadContig(ADIO_File fd, void *buf, int count,
00012 MPI_Datatype datatype, int file_ptr_type,
00013 ADIO_Offset offset, ADIO_Status *status, int *error_code)
00014 {
00015 int err=-1, datatype_size, len;
00016 static char myname[] = "ADIOI_NFS_READCONTIG";
00017
00018 MPI_Type_size(datatype, &datatype_size);
00019 len = datatype_size * count;
00020
00021 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00022 if (fd->fp_sys_posn != offset) {
00023 #ifdef ADIOI_MPE_LOGGING
00024 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL );
00025 #endif
00026 lseek(fd->fd_sys, offset, SEEK_SET);
00027 #ifdef ADIOI_MPE_LOGGING
00028 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL );
00029 #endif
00030 }
00031 if (fd->atomicity)
00032 ADIOI_WRITE_LOCK(fd, offset, SEEK_SET, len);
00033 else ADIOI_READ_LOCK(fd, offset, SEEK_SET, len);
00034 #ifdef ADIOI_MPE_LOGGING
00035 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
00036 #endif
00037 err = read(fd->fd_sys, buf, len);
00038 #ifdef ADIOI_MPE_LOGGING
00039 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
00040 #endif
00041 ADIOI_UNLOCK(fd, offset, SEEK_SET, len);
00042 fd->fp_sys_posn = offset + err;
00043
00044 }
00045 else {
00046 offset = fd->fp_ind;
00047 if (fd->fp_sys_posn != fd->fp_ind) {
00048 #ifdef ADIOI_MPE_LOGGING
00049 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL );
00050 #endif
00051 lseek(fd->fd_sys, fd->fp_ind, SEEK_SET);
00052 #ifdef ADIOI_MPE_LOGGING
00053 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL );
00054 #endif
00055 }
00056 if (fd->atomicity)
00057 ADIOI_WRITE_LOCK(fd, offset, SEEK_SET, len);
00058 else ADIOI_READ_LOCK(fd, offset, SEEK_SET, len);
00059 #ifdef ADIOI_MPE_LOGGING
00060 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
00061 #endif
00062 err = read(fd->fd_sys, buf, len);
00063 #ifdef ADIOI_MPE_LOGGING
00064 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
00065 #endif
00066 ADIOI_UNLOCK(fd, offset, SEEK_SET, len);
00067 fd->fp_ind += err;
00068 fd->fp_sys_posn = fd->fp_ind;
00069 }
00070
00071
00072 if (err == -1) {
00073 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00074 myname, __LINE__, MPI_ERR_IO,
00075 "**io", "**io %s", strerror(errno));
00076 return;
00077 }
00078
00079
00080 #ifdef HAVE_STATUS_SET_BYTES
00081 MPIR_Status_set_bytes(status, datatype, err);
00082 #endif
00083
00084 *error_code = MPI_SUCCESS;
00085 }
00086
00087
00088
00089 #ifdef ADIOI_MPE_LOGGING
00090 #define ADIOI_BUFFERED_READ \
00091 { \
00092 if (req_off >= readbuf_off + readbuf_len) { \
00093 readbuf_off = req_off; \
00094 readbuf_len = (int) (ADIOI_MIN(max_bufsize, end_offset-readbuf_off+1));\
00095 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL ); \
00096 lseek(fd->fd_sys, readbuf_off, SEEK_SET);\
00097 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL ); \
00098 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, readbuf_off, SEEK_SET, readbuf_len);\
00099 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL ); \
00100 err = read(fd->fd_sys, readbuf, readbuf_len);\
00101 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL ); \
00102 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, readbuf_off, SEEK_SET, readbuf_len);\
00103 if (err == -1) err_flag = 1; \
00104 } \
00105 while (req_len > readbuf_off + readbuf_len - req_off) { \
00106 partial_read = (int) (readbuf_off + readbuf_len - req_off); \
00107 tmp_buf = (char *) ADIOI_Malloc(partial_read); \
00108 memcpy(tmp_buf, readbuf+readbuf_len-partial_read, partial_read); \
00109 ADIOI_Free(readbuf); \
00110 readbuf = (char *) ADIOI_Malloc(partial_read + max_bufsize); \
00111 memcpy(readbuf, tmp_buf, partial_read); \
00112 ADIOI_Free(tmp_buf); \
00113 readbuf_off += readbuf_len-partial_read; \
00114 readbuf_len = (int) (partial_read + ADIOI_MIN(max_bufsize, \
00115 end_offset-readbuf_off+1)); \
00116 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL ); \
00117 lseek(fd->fd_sys, readbuf_off+partial_read, SEEK_SET);\
00118 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL ); \
00119 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, readbuf_off+partial_read, SEEK_SET, readbuf_len-partial_read);\
00120 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL ); \
00121 err = read(fd->fd_sys, readbuf+partial_read, readbuf_len-partial_read);\
00122 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL ); \
00123 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, readbuf_off+partial_read, SEEK_SET, readbuf_len-partial_read);\
00124 if (err == -1) err_flag = 1; \
00125 } \
00126 memcpy((char *)buf + userbuf_off, readbuf+req_off-readbuf_off, req_len); \
00127 }
00128 #else
00129 #define ADIOI_BUFFERED_READ \
00130 { \
00131 if (req_off >= readbuf_off + readbuf_len) { \
00132 readbuf_off = req_off; \
00133 readbuf_len = (int) (ADIOI_MIN(max_bufsize, end_offset-readbuf_off+1));\
00134 lseek(fd->fd_sys, readbuf_off, SEEK_SET);\
00135 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, readbuf_off, SEEK_SET, readbuf_len);\
00136 err = read(fd->fd_sys, readbuf, readbuf_len);\
00137 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, readbuf_off, SEEK_SET, readbuf_len);\
00138 if (err == -1) err_flag = 1; \
00139 } \
00140 while (req_len > readbuf_off + readbuf_len - req_off) { \
00141 partial_read = (int) (readbuf_off + readbuf_len - req_off); \
00142 tmp_buf = (char *) ADIOI_Malloc(partial_read); \
00143 memcpy(tmp_buf, readbuf+readbuf_len-partial_read, partial_read); \
00144 ADIOI_Free(readbuf); \
00145 readbuf = (char *) ADIOI_Malloc(partial_read + max_bufsize); \
00146 memcpy(readbuf, tmp_buf, partial_read); \
00147 ADIOI_Free(tmp_buf); \
00148 readbuf_off += readbuf_len-partial_read; \
00149 readbuf_len = (int) (partial_read + ADIOI_MIN(max_bufsize, \
00150 end_offset-readbuf_off+1)); \
00151 lseek(fd->fd_sys, readbuf_off+partial_read, SEEK_SET);\
00152 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, readbuf_off+partial_read, SEEK_SET, readbuf_len-partial_read);\
00153 err = read(fd->fd_sys, readbuf+partial_read, readbuf_len-partial_read);\
00154 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, readbuf_off+partial_read, SEEK_SET, readbuf_len-partial_read);\
00155 if (err == -1) err_flag = 1; \
00156 } \
00157 memcpy((char *)buf + userbuf_off, readbuf+req_off-readbuf_off, req_len); \
00158 }
00159 #endif
00160
00161
00162 void ADIOI_NFS_ReadStrided(ADIO_File fd, void *buf, int count,
00163 MPI_Datatype datatype, int file_ptr_type,
00164 ADIO_Offset offset, ADIO_Status *status, int
00165 *error_code)
00166 {
00167
00168
00169 ADIOI_Flatlist_node *flat_buf, *flat_file;
00170 int i, j, k, err=-1, brd_size, frd_size=0, st_index=0;
00171 int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype;
00172 int n_filetypes, etype_in_filetype;
00173 ADIO_Offset abs_off_in_filetype=0;
00174 int filetype_size, etype_size, buftype_size, req_len, partial_read;
00175 MPI_Aint filetype_extent, buftype_extent;
00176 int buf_count, buftype_is_contig, filetype_is_contig;
00177 ADIO_Offset userbuf_off;
00178 ADIO_Offset off, req_off, disp, end_offset=0, readbuf_off, start_off;
00179 char *readbuf, *tmp_buf, *value;
00180 int st_frd_size, st_n_filetypes, readbuf_len;
00181 int new_brd_size, new_frd_size, err_flag=0, info_flag, max_bufsize;
00182
00183 static char myname[] = "ADIOI_NFS_READSTRIDED";
00184
00185 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00186 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00187
00188 MPI_Type_size(fd->filetype, &filetype_size);
00189 if ( ! filetype_size ) {
00190 *error_code = MPI_SUCCESS;
00191 return;
00192 }
00193
00194 MPI_Type_extent(fd->filetype, &filetype_extent);
00195 MPI_Type_size(datatype, &buftype_size);
00196 MPI_Type_extent(datatype, &buftype_extent);
00197 etype_size = fd->etype_size;
00198
00199 bufsize = buftype_size * count;
00200
00201
00202
00203 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
00204 ADIOI_Info_get(fd->info, "ind_rd_buffer_size", MPI_MAX_INFO_VAL, value,
00205 &info_flag);
00206 max_bufsize = atoi(value);
00207 ADIOI_Free(value);
00208
00209 if (!buftype_is_contig && filetype_is_contig) {
00210
00211
00212
00213 ADIOI_Flatten_datatype(datatype);
00214 flat_buf = CtvAccess(ADIOI_Flatlist);
00215 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00216
00217 off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00218 fd->disp + etype_size * offset;
00219
00220 start_off = off;
00221 end_offset = off + bufsize - 1;
00222 readbuf_off = off;
00223 readbuf = (char *) ADIOI_Malloc(max_bufsize);
00224 readbuf_len = (int) (ADIOI_MIN(max_bufsize, end_offset-readbuf_off+1));
00225
00226
00227 if (fd->atomicity)
00228 ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00229
00230 #ifdef ADIOI_MPE_LOGGING
00231 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL );
00232 #endif
00233 lseek(fd->fd_sys, readbuf_off, SEEK_SET);
00234 #ifdef ADIOI_MPE_LOGGING
00235 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL );
00236 #endif
00237 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, readbuf_off, SEEK_SET, readbuf_len);
00238 #ifdef ADIOI_MPE_LOGGING
00239 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
00240 #endif
00241 err = read(fd->fd_sys, readbuf, readbuf_len);
00242 #ifdef ADIOI_MPE_LOGGING
00243 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
00244 #endif
00245 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, readbuf_off, SEEK_SET, readbuf_len);
00246 if (err == -1) err_flag = 1;
00247
00248 for (j=0; j<count; j++)
00249 for (i=0; i<flat_buf->count; i++) {
00250 userbuf_off = j*buftype_extent + flat_buf->indices[i];
00251 req_off = off;
00252 req_len = flat_buf->blocklens[i];
00253 ADIOI_BUFFERED_READ
00254 off += flat_buf->blocklens[i];
00255 }
00256
00257 if (fd->atomicity)
00258 ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00259
00260 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00261
00262 ADIOI_Free(readbuf);
00263
00264 if (err_flag) {
00265 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00266 MPIR_ERR_RECOVERABLE, myname,
00267 __LINE__, MPI_ERR_IO, "**io",
00268 "**io %s", strerror(errno));
00269 }
00270 else *error_code = MPI_SUCCESS;
00271 }
00272
00273 else {
00274
00275
00276 flat_file = CtvAccess(ADIOI_Flatlist);
00277 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
00278 disp = fd->disp;
00279
00280 if (file_ptr_type == ADIO_INDIVIDUAL) {
00281
00282 offset = fd->fp_ind - disp;
00283 n_filetypes = (offset - flat_file->indices[0]) / filetype_extent;
00284 offset -= (ADIO_Offset)n_filetypes * filetype_extent;
00285
00286
00287
00288 for (i=0; i<flat_file->count; i++) {
00289 ADIO_Offset dist;
00290 if (flat_file->blocklens[i] == 0) continue;
00291 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
00292
00293 if (dist == 0) {
00294 i++;
00295 offset = flat_file->indices[i];
00296 frd_size = flat_file->blocklens[i];
00297 break;
00298 }
00299 if (dist > 0 ) {
00300 frd_size = dist;
00301 break;
00302 }
00303 }
00304 st_index = i;
00305 offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
00306 }
00307 else {
00308 n_etypes_in_filetype = filetype_size/etype_size;
00309 n_filetypes = (int) (offset / n_etypes_in_filetype);
00310 etype_in_filetype = (int) (offset % n_etypes_in_filetype);
00311 size_in_filetype = etype_in_filetype * etype_size;
00312
00313 sum = 0;
00314 for (i=0; i<flat_file->count; i++) {
00315 sum += flat_file->blocklens[i];
00316 if (sum > size_in_filetype) {
00317 st_index = i;
00318 frd_size = sum - size_in_filetype;
00319 abs_off_in_filetype = flat_file->indices[i] +
00320 size_in_filetype - (sum - flat_file->blocklens[i]);
00321 break;
00322 }
00323 }
00324
00325
00326 offset = disp + (ADIO_Offset) n_filetypes*filetype_extent +
00327 abs_off_in_filetype;
00328 }
00329
00330 start_off = offset;
00331
00332
00333
00334
00335 if (buftype_is_contig && bufsize <= frd_size) {
00336 ADIO_ReadContig(fd, buf, bufsize, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00337 offset, status, error_code);
00338
00339 if (file_ptr_type == ADIO_INDIVIDUAL) {
00340
00341
00342 fd->fp_ind = offset + bufsize;
00343 if (bufsize == frd_size) {
00344 do {
00345 st_index++;
00346 if (st_index == flat_file->count) {
00347 st_index = 0;
00348 n_filetypes++;
00349 }
00350 } while (flat_file->blocklens[st_index] == 0);
00351 fd->fp_ind = disp + flat_file->indices[st_index]
00352 + n_filetypes*filetype_extent;
00353 }
00354 }
00355 fd->fp_sys_posn = -1;
00356 #ifdef HAVE_STATUS_SET_BYTES
00357 MPIR_Status_set_bytes(status, datatype, bufsize);
00358 #endif
00359 return;
00360 }
00361
00362
00363
00364
00365 st_frd_size = frd_size;
00366 st_n_filetypes = n_filetypes;
00367 i = 0;
00368 j = st_index;
00369 off = offset;
00370 frd_size = ADIOI_MIN(st_frd_size, bufsize);
00371 while (i < bufsize) {
00372 i += frd_size;
00373 end_offset = off + frd_size - 1;
00374 j = (j+1) % flat_file->count;
00375 n_filetypes += (j == 0) ? 1 : 0;
00376 while (flat_file->blocklens[j]==0) {
00377 j = (j+1) % flat_file->count;
00378 n_filetypes += (j == 0) ? 1 : 0;
00379 }
00380
00381 off = disp + flat_file->indices[j] + (ADIO_Offset) n_filetypes*filetype_extent;
00382 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);
00383 }
00384
00385
00386 if (fd->atomicity)
00387 ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00388
00389
00390 readbuf_off = offset;
00391 readbuf = (char *) ADIOI_Malloc(max_bufsize);
00392 readbuf_len = (int) (ADIOI_MIN(max_bufsize, end_offset-readbuf_off+1));
00393
00394 #ifdef ADIOI_MPE_LOGGING
00395 MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL );
00396 #endif
00397 lseek(fd->fd_sys, offset, SEEK_SET);
00398 #ifdef ADIOI_MPE_LOGGING
00399 MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL );
00400 #endif
00401 if (!(fd->atomicity)) ADIOI_READ_LOCK(fd, offset, SEEK_SET, readbuf_len);
00402 #ifdef ADIOI_MPE_LOGGING
00403 MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
00404 #endif
00405 err = read(fd->fd_sys, readbuf, readbuf_len);
00406 #ifdef ADIOI_MPE_LOGGING
00407 MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
00408 #endif
00409 if (!(fd->atomicity)) ADIOI_UNLOCK(fd, offset, SEEK_SET, readbuf_len);
00410
00411 if (err == -1) err_flag = 1;
00412
00413 if (buftype_is_contig && !filetype_is_contig) {
00414
00415
00416
00417
00418 i = 0;
00419 j = st_index;
00420 off = offset;
00421 n_filetypes = st_n_filetypes;
00422 frd_size = ADIOI_MIN(st_frd_size, bufsize);
00423 while (i < bufsize) {
00424 if (frd_size) {
00425
00426
00427
00428
00429
00430 req_off = off;
00431 req_len = frd_size;
00432 userbuf_off = i;
00433 ADIOI_BUFFERED_READ
00434 }
00435 i += frd_size;
00436
00437 if (off + frd_size < disp + flat_file->indices[j] +
00438 flat_file->blocklens[j] + (ADIO_Offset) n_filetypes*filetype_extent)
00439 off += frd_size;
00440
00441
00442 else {
00443 j = (j+1) % flat_file->count;
00444 n_filetypes += (j == 0) ? 1 : 0;
00445 while (flat_file->blocklens[j]==0) {
00446 j = (j+1) % flat_file->count;
00447 n_filetypes += (j == 0) ? 1 : 0;
00448 }
00449 off = disp + flat_file->indices[j] +
00450 (ADIO_Offset) n_filetypes*filetype_extent;
00451 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);
00452 }
00453 }
00454 }
00455 else {
00456
00457
00458 ADIOI_Flatten_datatype(datatype);
00459 flat_buf = CtvAccess(ADIOI_Flatlist);
00460 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00461
00462 k = num = buf_count = 0;
00463 i = (int) (flat_buf->indices[0]);
00464 j = st_index;
00465 off = offset;
00466 n_filetypes = st_n_filetypes;
00467 frd_size = st_frd_size;
00468 brd_size = flat_buf->blocklens[0];
00469
00470 while (num < bufsize) {
00471 size = ADIOI_MIN(frd_size, brd_size);
00472 if (size) {
00473
00474
00475
00476 req_off = off;
00477 req_len = size;
00478 userbuf_off = i;
00479 ADIOI_BUFFERED_READ
00480 }
00481
00482 new_frd_size = frd_size;
00483 new_brd_size = brd_size;
00484
00485 if (size == frd_size) {
00486
00487 j = (j+1) % flat_file->count;
00488 n_filetypes += (j == 0) ? 1 : 0;
00489 while (flat_file->blocklens[j]==0) {
00490 j = (j+1) % flat_file->count;
00491 n_filetypes += (j == 0) ? 1 : 0;
00492 }
00493 off = disp + flat_file->indices[j] +
00494 (ADIO_Offset) n_filetypes*filetype_extent;
00495
00496 new_frd_size = flat_file->blocklens[j];
00497 if (size != brd_size) {
00498 i += size;
00499 new_brd_size -= size;
00500 }
00501 }
00502
00503 if (size == brd_size) {
00504
00505
00506 k = (k + 1)%flat_buf->count;
00507 buf_count++;
00508 i = (int) (buftype_extent*(buf_count/flat_buf->count) +
00509 flat_buf->indices[k]);
00510 new_brd_size = flat_buf->blocklens[k];
00511 if (size != frd_size) {
00512 off += size;
00513 new_frd_size -= size;
00514 }
00515 }
00516 num += size;
00517 frd_size = new_frd_size;
00518 brd_size = new_brd_size;
00519 }
00520 }
00521
00522 if (fd->atomicity)
00523 ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00524
00525 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00526
00527 ADIOI_Free(readbuf);
00528
00529 if (err_flag) {
00530 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00531 MPIR_ERR_RECOVERABLE, myname,
00532 __LINE__, MPI_ERR_IO, "**io",
00533 "**io %s", strerror(errno));
00534 }
00535 else *error_code = MPI_SUCCESS;
00536 }
00537
00538 fd->fp_sys_posn = -1;
00539
00540 #ifdef HAVE_STATUS_SET_BYTES
00541 MPIR_Status_set_bytes(status, datatype, bufsize);
00542
00543
00544
00545 #endif
00546
00547 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00548 }