00001
00002
00003
00004
00005
00006
00007
00008 #include "adio.h"
00009
00010 #ifdef HAVE_UNISTD_H
00011 #include <unistd.h>
00012 #endif
00013 #ifdef HAVE_SIGNAL_H
00014 #include <signal.h>
00015 #endif
00016 #ifdef HAVE_SYS_TYPES_H
00017 #include <sys/types.h>
00018 #endif
00019 #ifdef HAVE_AIO_H
00020 #include <aio.h>
00021 #endif
00022 #ifdef HAVE_SYS_AIO_H
00023 #include <sys/aio.h>
00024 #endif
00025 #include <time.h>
00026
00027 #include "../../mpi-io/mpioimpl.h"
00028 #include "../../mpi-io/mpioprof.h"
00029 #include "mpiu_greq.h"
00030
00031
00032 #if !defined(__REDIRECT) && defined(__USE_FILE_OFFSET64)
00033 #define aiocb aiocb64
00034 #endif
00035
00036 #ifdef ROMIO_HAVE_WORKING_AIO
00037
00038 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
00039
00040
00041
00042
00043
00044
00045
00046
00047 void ADIOI_GEN_IwriteContig(ADIO_File fd, void *buf, int count,
00048 MPI_Datatype datatype, int file_ptr_type,
00049 ADIO_Offset offset, ADIO_Request *request,
00050 int *error_code)
00051 {
00052 int len, typesize;
00053 int aio_errno = 0;
00054 static char myname[] = "ADIOI_GEN_IWRITECONTIG";
00055
00056 MPI_Type_size(datatype, &typesize);
00057 len = count * typesize;
00058 ADIOI_Assert(len == (int)((ADIO_Offset)count * (ADIO_Offset)typesize));
00059
00060 if (file_ptr_type == ADIO_INDIVIDUAL) offset = fd->fp_ind;
00061 aio_errno = ADIOI_GEN_aio(fd, buf, len, offset, 1, request);
00062 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind += len;
00063
00064 fd->fp_sys_posn = -1;
00065
00066
00067 if (aio_errno != 0) {
00068 MPIO_ERR_CREATE_CODE_ERRNO(myname, aio_errno, error_code);
00069 return;
00070 }
00071
00072
00073 *error_code = MPI_SUCCESS;
00074 }
00075
00076
00077
00078
00079
00080
00081 int ADIOI_GEN_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
00082 int wr, MPI_Request *request)
00083 {
00084 int err=-1, fd_sys;
00085
00086 int error_code;
00087 struct aiocb *aiocbp;
00088 ADIOI_AIO_Request *aio_req;
00089 MPI_Status status;
00090 #if defined(ROMIO_XFS)
00091 unsigned maxiosz = wr ? fd->hints->fs_hints.xfs.write_chunk_sz :
00092 fd->hints->fs_hints.xfs.read_chunk_sz;
00093 #endif
00094
00095 fd_sys = fd->fd_sys;
00096
00097 #if defined(ROMIO_XFS)
00098
00099 if (fd->fns == &ADIO_XFS_operations &&
00100 ((wr && fd->direct_write) || (!wr && fd->direct_read)) &&
00101 !(((long) buf) % fd->d_mem) && !(offset % fd->d_miniosz) &&
00102 !(len % fd->d_miniosz) && (len >= fd->d_miniosz) &&
00103 (len <= maxiosz)) {
00104 fd_sys = fd->fd_direct;
00105 }
00106 #endif
00107
00108 aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
00109 aiocbp = (struct aiocb *) ADIOI_Calloc(sizeof(struct aiocb), 1);
00110 aiocbp->aio_offset = offset;
00111 aiocbp->aio_buf = buf;
00112 aiocbp->aio_nbytes = len;
00113
00114 #ifdef ROMIO_HAVE_STRUCT_AIOCB_WITH_AIO_WHENCE
00115 aiocbp->aio_whence = SEEK_SET;
00116 #endif
00117 #ifdef ROMIO_HAVE_STRUCT_AIOCB_WITH_AIO_FILDES
00118 aiocbp->aio_fildes = fd_sys;
00119 #endif
00120 #ifdef ROMIO_HAVE_STRUCT_AIOCB_WITH_AIO_SIGEVENT
00121 # ifdef AIO_SIGNOTIFY_NONE
00122 aiocbp->aio_sigevent.sigev_notify = SIGEV_NONE;
00123 # endif
00124 aiocbp->aio_sigevent.sigev_signo = 0;
00125 #endif
00126 #ifdef ROMIO_HAVE_STRUCT_AIOCB_WITH_AIO_REQPRIO
00127 # ifdef AIO_PRIO_DFL
00128 aiocbp->aio_reqprio = AIO_PRIO_DFL;
00129 # else
00130 aiocbp->aio_reqprio = 0;
00131 # endif
00132 #endif
00133
00134 #ifndef ROMIO_HAVE_AIO_CALLS_NEED_FILEDES
00135 #ifndef ROMIO_HAVE_STRUCT_AIOCB_WITH_AIO_FILDES
00136 #error 'No fildes set for aio structure'
00137 #endif
00138 if (wr) err = aio_write(aiocbp);
00139 else err = aio_read(aiocbp);
00140 #else
00141
00142 if (wr) err = aio_write(fd_sys, aiocbp);
00143 else err = aio_read(fd_sys, aiocbp);
00144 #endif
00145
00146 if (err == -1) {
00147 if (errno == EAGAIN) {
00148
00149
00150 if (wr)
00151 ADIO_WriteContig(fd, buf, len, MPI_BYTE,
00152 ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);
00153 else
00154 ADIO_ReadContig(fd, buf, len, MPI_BYTE,
00155 ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);
00156
00157 MPIO_Completed_request_create(&fd, len, &error_code, request);
00158 return 0;
00159 } else {
00160 return -errno;
00161 }
00162 }
00163 aio_req->aiocbp = aiocbp;
00164 if (ADIOI_GEN_greq_class == 0) {
00165 MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn,
00166 ADIOI_GEN_aio_free_fn, MPIU_Greq_cancel_fn,
00167 ADIOI_GEN_aio_poll_fn, ADIOI_GEN_aio_wait_fn,
00168 &ADIOI_GEN_greq_class);
00169 }
00170 MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, aio_req, request);
00171 memcpy(&(aio_req->req), request, sizeof(MPI_Request));
00172 return 0;
00173 }
00174 #endif
00175
00176
00177
00178
00179
00180 void ADIOI_GEN_IwriteStrided(ADIO_File fd, void *buf, int count,
00181 MPI_Datatype datatype, int file_ptr_type,
00182 ADIO_Offset offset, MPI_Request *request,
00183 int *error_code)
00184 {
00185 ADIO_Status status;
00186 int typesize;
00187 MPI_Offset nbytes=0;
00188
00189
00190
00191
00192 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00193 offset, &status, error_code);
00194
00195 if (*error_code == MPI_SUCCESS) {
00196 MPI_Type_size(datatype, &typesize);
00197 nbytes = (MPI_Offset)count * (MPI_Offset)typesize;
00198 }
00199 MPIO_Completed_request_create(&fd, nbytes, error_code, request);
00200 }
00201
00202 #ifdef ROMIO_HAVE_WORKING_AIO
00203
00204 int ADIOI_GEN_aio_poll_fn(void *extra_state, MPI_Status *status)
00205 {
00206 ADIOI_AIO_Request *aio_req;
00207 int errcode=MPI_SUCCESS;
00208
00209 aio_req = (ADIOI_AIO_Request *)extra_state;
00210
00211
00212 errno = aio_error(aio_req->aiocbp);
00213 if (errno == EINPROGRESS) {
00214
00215 }
00216 else if (errno == ECANCELED) {
00217
00218 } else if (errno == 0) {
00219 int n = aio_return(aio_req->aiocbp);
00220 aio_req->nbytes = n;
00221 errcode = MPI_Grequest_complete(aio_req->req);
00222
00223 if (errcode != MPI_SUCCESS) {
00224 errcode = MPIO_Err_create_code(MPI_SUCCESS,
00225 MPIR_ERR_RECOVERABLE,
00226 "ADIOI_GEN_aio_poll_fn", __LINE__,
00227 MPI_ERR_IO, "**mpi_grequest_complete",
00228 0);
00229 }
00230
00231 }
00232 return errcode;
00233 }
00234
00235
00236 int ADIOI_GEN_aio_wait_fn(int count, void ** array_of_states,
00237 double timeout, MPI_Status *status)
00238 {
00239 const struct aiocb **cblist;
00240 int err, errcode=MPI_SUCCESS;
00241 int nr_complete=0;
00242 double starttime;
00243 struct timespec aio_timer;
00244 struct timespec *aio_timer_p = NULL;
00245
00246 ADIOI_AIO_Request **aio_reqlist;
00247 int i;
00248
00249 aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
00250
00251 cblist = (const struct aiocb**) ADIOI_Calloc(count, sizeof(struct aiocb*));
00252
00253 starttime = MPI_Wtime();
00254 if (timeout >0) {
00255 aio_timer.tv_sec = (time_t)timeout;
00256 aio_timer.tv_nsec = timeout - aio_timer.tv_sec;
00257 aio_timer_p = &aio_timer;
00258 }
00259 for (i=0; i< count; i++)
00260 {
00261 cblist[i] = aio_reqlist[i]->aiocbp;
00262 }
00263
00264 while(nr_complete < count) {
00265 do {
00266 err = aio_suspend(cblist, count, aio_timer_p);
00267 } while (err < 0 && errno == EINTR);
00268 if (err == 0)
00269 {
00270
00271 for (i=0; i< count; i++)
00272 {
00273
00274 if (aio_reqlist[i]->aiocbp == NULL)
00275 continue;
00276 errno = aio_error(aio_reqlist[i]->aiocbp);
00277 if (errno == 0) {
00278 int n = aio_return(aio_reqlist[i]->aiocbp);
00279 aio_reqlist[i]->nbytes = n;
00280 errcode = MPI_Grequest_complete(aio_reqlist[i]->req);
00281 if (errcode != MPI_SUCCESS) {
00282 errcode = MPIO_Err_create_code(MPI_SUCCESS,
00283 MPIR_ERR_RECOVERABLE,
00284 "ADIOI_GEN_aio_wait_fn",
00285 __LINE__, MPI_ERR_IO,
00286 "**mpi_grequest_complete", 0);
00287 }
00288 ADIOI_Free(aio_reqlist[i]->aiocbp);
00289 aio_reqlist[i]->aiocbp = NULL;
00290 cblist[i] = NULL;
00291 nr_complete++;
00292 }
00293
00294 }
00295 }
00296 if ( (timeout > 0) && (timeout < (MPI_Wtime() - starttime) ))
00297 break;
00298 }
00299
00300 if (cblist != NULL) ADIOI_Free(cblist);
00301 return errcode;
00302 }
00303
00304 int ADIOI_GEN_aio_free_fn(void *extra_state)
00305 {
00306 ADIOI_AIO_Request *aio_req;
00307 aio_req = (ADIOI_AIO_Request*)extra_state;
00308
00309 if (aio_req->aiocbp != NULL)
00310 ADIOI_Free(aio_req->aiocbp);
00311 ADIOI_Free(aio_req);
00312
00313 return MPI_SUCCESS;
00314 }
00315 #endif
00316
00317 int ADIOI_GEN_aio_query_fn(void *extra_state, MPI_Status *status)
00318 {
00319 ADIOI_AIO_Request *aio_req;
00320
00321 aio_req = (ADIOI_AIO_Request *)extra_state;
00322
00323
00324 MPI_Status_set_elements(status, MPI_BYTE, aio_req->nbytes);
00325
00326
00327 MPI_Status_set_cancelled(status, 0);
00328
00329
00330 status->MPI_SOURCE = MPI_UNDEFINED;
00331
00332 status->MPI_TAG = MPI_UNDEFINED;
00333
00334 return MPI_SUCCESS;
00335 }
00336
00337
00338