00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ad_lustre.h"
00012 #include "adio_extern.h"
00013
00014 #define ADIOI_BUFFERED_WRITE \
00015 { \
00016 if (req_off >= writebuf_off + writebuf_len) { \
00017 if (writebuf_len) { \
00018 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00019 ADIO_EXPLICIT_OFFSET, writebuf_off, \
00020 &status1, error_code); \
00021 if (!(fd->atomicity)) \
00022 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
00023 if (*error_code != MPI_SUCCESS) { \
00024 *error_code = MPIO_Err_create_code(*error_code, \
00025 MPIR_ERR_RECOVERABLE, \
00026 myname, \
00027 __LINE__, MPI_ERR_IO, \
00028 "**iowswc", 0); \
00029 ADIOI_Free(writebuf); \
00030 return; \
00031 } \
00032 } \
00033 writebuf_off = req_off; \
00034 \
00035 writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
00036 (writebuf_off / stripe_size + 1) * \
00037 stripe_size - writebuf_off); \
00038 if (!(fd->atomicity)) \
00039 ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
00040 ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00041 ADIO_EXPLICIT_OFFSET, \
00042 writebuf_off, &status1, error_code); \
00043 if (*error_code != MPI_SUCCESS) { \
00044 *error_code = MPIO_Err_create_code(*error_code, \
00045 MPIR_ERR_RECOVERABLE, \
00046 myname, \
00047 __LINE__, MPI_ERR_IO, \
00048 "**iowsrc", 0); \
00049 ADIOI_Free(writebuf); \
00050 return; \
00051 } \
00052 } \
00053 write_sz = (unsigned) (ADIOI_MIN(req_len, \
00054 writebuf_off + writebuf_len - req_off)); \
00055 ADIOI_Assert((ADIO_Offset)write_sz == \
00056 ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
00057 memcpy(writebuf + req_off - writebuf_off, (char *)buf +userbuf_off, write_sz); \
00058 while (write_sz != req_len) { \
00059 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00060 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
00061 if (!(fd->atomicity)) \
00062 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
00063 if (*error_code != MPI_SUCCESS) { \
00064 *error_code = MPIO_Err_create_code(*error_code, \
00065 MPIR_ERR_RECOVERABLE, myname, \
00066 __LINE__, MPI_ERR_IO, \
00067 "**iowswc", 0); \
00068 ADIOI_Free(writebuf); \
00069 return; \
00070 } \
00071 req_len -= write_sz; \
00072 userbuf_off += write_sz; \
00073 writebuf_off += writebuf_len; \
00074 \
00075 writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
00076 (writebuf_off / stripe_size + 1) * \
00077 stripe_size - writebuf_off); \
00078 if (!(fd->atomicity)) \
00079 ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
00080 ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00081 ADIO_EXPLICIT_OFFSET, \
00082 writebuf_off, &status1, error_code); \
00083 if (*error_code != MPI_SUCCESS) { \
00084 *error_code = MPIO_Err_create_code(*error_code, \
00085 MPIR_ERR_RECOVERABLE, myname, \
00086 __LINE__, MPI_ERR_IO, \
00087 "**iowsrc", 0); \
00088 ADIOI_Free(writebuf); \
00089 return; \
00090 } \
00091 write_sz = ADIOI_MIN(req_len, writebuf_len); \
00092 memcpy(writebuf, (char *)buf + userbuf_off, write_sz); \
00093 } \
00094 }
00095
00096
00097
00098
00099 #define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
00100 { \
00101 if (req_off >= writebuf_off + writebuf_len) { \
00102 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00103 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, \
00104 error_code); \
00105 if (*error_code != MPI_SUCCESS) { \
00106 *error_code = MPIO_Err_create_code(*error_code, \
00107 MPIR_ERR_RECOVERABLE, \
00108 myname, \
00109 __LINE__, MPI_ERR_IO, \
00110 "**iowswc", 0); \
00111 ADIOI_Free(writebuf); \
00112 return; \
00113 } \
00114 writebuf_off = req_off; \
00115 \
00116 writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
00117 (writebuf_off / stripe_size + 1) * \
00118 stripe_size - writebuf_off); \
00119 } \
00120 write_sz = (unsigned) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
00121 ADIOI_Assert((ADIO_Offset)write_sz == ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
00122 memcpy(writebuf + req_off - writebuf_off, \
00123 (char *)buf + userbuf_off, write_sz); \
00124 while (write_sz != req_len) { \
00125 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
00126 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
00127 if (*error_code != MPI_SUCCESS) { \
00128 *error_code = MPIO_Err_create_code(*error_code, \
00129 MPIR_ERR_RECOVERABLE, myname, \
00130 __LINE__, MPI_ERR_IO, \
00131 "**iowswc", 0); \
00132 ADIOI_Free(writebuf); \
00133 return; \
00134 } \
00135 req_len -= write_sz; \
00136 userbuf_off += write_sz; \
00137 writebuf_off += writebuf_len; \
00138 \
00139 writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
00140 (writebuf_off / stripe_size + 1) * \
00141 stripe_size - writebuf_off); \
00142 write_sz = ADIOI_MIN(req_len, writebuf_len); \
00143 memcpy(writebuf, (char *)buf + userbuf_off, write_sz); \
00144 } \
00145 }
00146
00147 void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count,
00148 MPI_Datatype datatype, int file_ptr_type,
00149 ADIO_Offset offset, ADIO_Status * status,
00150 int *error_code)
00151 {
00152
00153 ADIOI_Flatlist_node *flat_buf, *flat_file;
00154 ADIO_Offset i_offset, sum, size_in_filetype;
00155 int i, j, k, st_index=0;
00156 int n_etypes_in_filetype;
00157 ADIO_Offset num, size, n_filetypes, etype_in_filetype, st_n_filetypes;
00158 ADIO_Offset abs_off_in_filetype=0;
00159 int filetype_size, etype_size, buftype_size;
00160 MPI_Aint filetype_extent, buftype_extent;
00161 int buf_count, buftype_is_contig, filetype_is_contig;
00162 ADIO_Offset userbuf_off;
00163 ADIO_Offset off, req_off, disp, end_offset=0, writebuf_off, start_off;
00164 char *writebuf;
00165 unsigned bufsize, writebuf_len, write_sz;
00166 ADIO_Status status1;
00167 ADIO_Offset new_bwr_size, new_fwr_size, st_fwr_size, fwr_size=0, bwr_size, req_len;
00168 int stripe_size;
00169 static char myname[] = "ADIOI_LUSTRE_WriteStrided";
00170
00171 if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
00172
00173
00174
00175 ADIOI_GEN_WriteStrided_naive(fd,
00176 buf,
00177 count,
00178 datatype,
00179 file_ptr_type,
00180 offset, status, error_code);
00181 return;
00182 }
00183
00184 *error_code = MPI_SUCCESS;
00185
00186 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00187 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00188
00189 MPI_Type_size(fd->filetype, &filetype_size);
00190 if (!filetype_size) {
00191 *error_code = MPI_SUCCESS;
00192 return;
00193 }
00194
00195 MPI_Type_extent(fd->filetype, &filetype_extent);
00196 MPI_Type_size(datatype, &buftype_size);
00197 MPI_Type_extent(datatype, &buftype_extent);
00198 etype_size = fd->etype_size;
00199
00200 ADIOI_Assert((buftype_size * count) == ((ADIO_Offset)(unsigned)buftype_size * (ADIO_Offset)count));
00201 bufsize = buftype_size * count;
00202
00203
00204 stripe_size = fd->hints->striping_unit;
00205
00206
00207 if (!buftype_is_contig && filetype_is_contig) {
00208
00209 ADIOI_Flatten_datatype(datatype);
00210 flat_buf = ADIOI_Flatlist;
00211 while (flat_buf->type != datatype)
00212 flat_buf = flat_buf->next;
00213
00214 off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
00215 fd->disp + (ADIO_Offset)etype_size * offset;
00216
00217 start_off = off;
00218 end_offset = start_off + bufsize - 1;
00219
00220 writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
00221 writebuf_off = 0;
00222 writebuf_len = 0;
00223
00224
00225 if (fd->atomicity)
00226 ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
00227
00228 for (j = 0; j < count; j++) {
00229 for (i = 0; i < flat_buf->count; i++) {
00230 userbuf_off = (ADIO_Offset)j * (ADIO_Offset)buftype_extent +
00231 flat_buf->indices[i];
00232 req_off = off;
00233 req_len = flat_buf->blocklens[i];
00234 ADIOI_BUFFERED_WRITE_WITHOUT_READ
00235 off += flat_buf->blocklens[i];
00236 }
00237 }
00238
00239
00240 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
00241 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
00242 error_code);
00243
00244 if (fd->atomicity)
00245 ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
00246 if (*error_code != MPI_SUCCESS) {
00247 ADIOI_Free(writebuf);
00248 return;
00249 }
00250 ADIOI_Free(writebuf);
00251 if (file_ptr_type == ADIO_INDIVIDUAL)
00252 fd->fp_ind = off;
00253 } else {
00254
00255
00256 flat_file = ADIOI_Flatlist;
00257 while (flat_file->type != fd->filetype)
00258 flat_file = flat_file->next;
00259 disp = fd->disp;
00260
00261 if (file_ptr_type == ADIO_INDIVIDUAL) {
00262
00263 offset = fd->fp_ind - disp;
00264 n_filetypes = (offset - flat_file->indices[0]) / filetype_extent;
00265 offset -= (ADIO_Offset)n_filetypes * filetype_extent;
00266
00267
00268
00269 for (i=0; i<flat_file->count; i++) {
00270 ADIO_Offset dist;
00271 if (flat_file->blocklens[i] == 0) continue;
00272 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
00273
00274 if (dist == 0) {
00275 i++;
00276 offset = flat_file->indices[i];
00277 fwr_size = flat_file->blocklens[i];
00278 break;
00279 }
00280 if (dist > 0) {
00281 fwr_size = dist;
00282 break;
00283 }
00284 }
00285 st_index = i;
00286 offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
00287 }
00288 else {
00289 n_etypes_in_filetype = filetype_size/etype_size;
00290 n_filetypes = offset / n_etypes_in_filetype;
00291 etype_in_filetype = offset % n_etypes_in_filetype;
00292 size_in_filetype = etype_in_filetype * etype_size;
00293
00294 sum = 0;
00295 for (i = 0; i < flat_file->count; i++) {
00296 sum += flat_file->blocklens[i];
00297 if (sum > size_in_filetype) {
00298 st_index = i;
00299 fwr_size = sum - size_in_filetype;
00300 abs_off_in_filetype = flat_file->indices[i] +
00301 size_in_filetype - (sum - flat_file->blocklens[i]);
00302 break;
00303 }
00304 }
00305
00306
00307 offset = disp + (ADIO_Offset) n_filetypes *filetype_extent +
00308 abs_off_in_filetype;
00309 }
00310
00311 start_off = offset;
00312
00313
00314
00315
00316
00317 if (buftype_is_contig && bufsize <= fwr_size) {
00318 req_off = start_off;
00319 req_len = bufsize;
00320 end_offset = start_off + bufsize - 1;
00321 writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
00322 memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size));
00323 writebuf_off = 0;
00324 writebuf_len = 0;
00325 userbuf_off = 0;
00326 ADIOI_BUFFERED_WRITE_WITHOUT_READ
00327
00328 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
00329 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
00330 error_code);
00331
00332 if (file_ptr_type == ADIO_INDIVIDUAL) {
00333
00334
00335 fd->fp_ind = offset + bufsize;
00336 if (bufsize == fwr_size) {
00337 do {
00338 st_index++;
00339 if (st_index == flat_file->count) {
00340 st_index = 0;
00341 n_filetypes++;
00342 }
00343 } while (flat_file->blocklens[st_index] == 0);
00344 fd->fp_ind = disp + flat_file->indices[st_index]
00345 + (ADIO_Offset)n_filetypes*filetype_extent;
00346 }
00347 }
00348 fd->fp_sys_posn = -1;
00349 #ifdef HAVE_STATUS_SET_BYTES
00350 MPIR_Status_set_bytes(status, datatype, bufsize);
00351 #endif
00352 ADIOI_Free(writebuf);
00353 return;
00354 }
00355
00356
00357
00358
00359 st_fwr_size = fwr_size;
00360 st_n_filetypes = n_filetypes;
00361 i_offset = 0;
00362 j = st_index;
00363 off = offset;
00364 fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
00365 while (i_offset < bufsize) {
00366 i_offset += fwr_size;
00367 end_offset = off + fwr_size - 1;
00368
00369 j = (j+1) % flat_file->count;
00370 n_filetypes += (j == 0) ? 1 : 0;
00371 while (flat_file->blocklens[j]==0) {
00372 j = (j+1) % flat_file->count;
00373 n_filetypes += (j == 0) ? 1 : 0;
00374 }
00375
00376 off = disp + flat_file->indices[j] +
00377 n_filetypes*(ADIO_Offset)filetype_extent;
00378 fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
00379 }
00380
00381
00382 if (fd->atomicity)
00383 ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00384
00385 writebuf_off = 0;
00386 writebuf_len = 0;
00387 writebuf = (char *) ADIOI_Malloc(stripe_size);
00388 memset(writebuf, -1, stripe_size);
00389
00390 if (buftype_is_contig && !filetype_is_contig) {
00391
00392
00393
00394
00395 i_offset = 0;
00396 j = st_index;
00397 off = offset;
00398 n_filetypes = st_n_filetypes;
00399 fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
00400 while (i_offset < bufsize) {
00401 if (fwr_size) {
00402
00403
00404
00405
00406
00407 req_off = off;
00408 req_len = fwr_size;
00409 userbuf_off = i_offset;
00410 ADIOI_BUFFERED_WRITE
00411 }
00412 i_offset += fwr_size;
00413
00414 if (off + fwr_size < disp + flat_file->indices[j] +
00415 flat_file->blocklens[j] +
00416 n_filetypes*(ADIO_Offset)filetype_extent)
00417 off += fwr_size;
00418
00419
00420 else {
00421 j = (j+1) % flat_file->count;
00422 n_filetypes += (j == 0) ? 1 : 0;
00423 while (flat_file->blocklens[j]==0) {
00424 j = (j+1) % flat_file->count;
00425 n_filetypes += (j == 0) ? 1 : 0;
00426 }
00427 off = disp + flat_file->indices[j] +
00428 n_filetypes*(ADIO_Offset)filetype_extent;
00429 fwr_size = ADIOI_MIN(flat_file->blocklens[j],
00430 bufsize-i_offset);
00431 }
00432 }
00433 }
00434 else {
00435
00436
00437 ADIOI_Flatten_datatype(datatype);
00438 flat_buf = ADIOI_Flatlist;
00439 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00440
00441 k = num = buf_count = 0;
00442 i_offset = flat_buf->indices[0];
00443 j = st_index;
00444 off = offset;
00445 n_filetypes = st_n_filetypes;
00446 fwr_size = st_fwr_size;
00447 bwr_size = flat_buf->blocklens[0];
00448
00449 while (num < bufsize) {
00450 size = ADIOI_MIN(fwr_size, bwr_size);
00451 if (size) {
00452
00453
00454
00455 req_off = off;
00456 req_len = size;
00457 userbuf_off = i_offset;
00458 ADIOI_BUFFERED_WRITE
00459 }
00460
00461 new_fwr_size = fwr_size;
00462 new_bwr_size = bwr_size;
00463
00464 if (size == fwr_size) {
00465
00466 j = (j+1) % flat_file->count;
00467 n_filetypes += (j == 0) ? 1 : 0;
00468 while (flat_file->blocklens[j]==0) {
00469 j = (j+1) % flat_file->count;
00470 n_filetypes += (j == 0) ? 1 : 0;
00471 }
00472
00473 off = disp + flat_file->indices[j] +
00474 n_filetypes*(ADIO_Offset)filetype_extent;
00475
00476 new_fwr_size = flat_file->blocklens[j];
00477 if (size != bwr_size) {
00478 i_offset += size;
00479 new_bwr_size -= size;
00480 }
00481 }
00482
00483 if (size == bwr_size) {
00484
00485
00486 k = (k + 1)%flat_buf->count;
00487 buf_count++;
00488 i_offset = (ADIO_Offset)buftype_extent *
00489 (ADIO_Offset)(buf_count/flat_buf->count) +
00490 flat_buf->indices[k];
00491 new_bwr_size = flat_buf->blocklens[k];
00492 if (size != fwr_size) {
00493 off += size;
00494 new_fwr_size -= size;
00495 }
00496 }
00497 num += size;
00498 fwr_size = new_fwr_size;
00499 bwr_size = new_bwr_size;
00500 }
00501 }
00502
00503
00504 if (writebuf_len) {
00505 ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
00506 ADIO_EXPLICIT_OFFSET,
00507 writebuf_off, &status1, error_code);
00508 if (!(fd->atomicity))
00509 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
00510 if (*error_code != MPI_SUCCESS) return;
00511 }
00512 if (fd->atomicity)
00513 ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
00514
00515 ADIOI_Free(writebuf);
00516
00517 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00518 }
00519
00520 fd->fp_sys_posn = -1;
00521
00522 #ifdef HAVE_STATUS_SET_BYTES
00523 MPIR_Status_set_bytes(status, datatype, bufsize);
00524
00525
00526 #endif
00527
00528 if (!buftype_is_contig)
00529 ADIOI_Delete_flattened(datatype);
00530 }