00001
00002
00003
00004
00005
00006
00007
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010 #include "ad_pvfs2.h"
00011 #include <string.h>
00012
00013 #include "ad_pvfs2_common.h"
00014 #include "mpiu_greq.h"
00015 #include "../../mpi-io/mpioimpl.h"
00016
00017 #define READ 0
00018 #define WRITE 1
00019
00020 static int ADIOI_PVFS2_greq_class = 0;
00021 int ADIOI_PVFS2_aio_free_fn(void *extra_state);
00022 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status);
00023 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states,
00024 double timeout, MPI_Status *status);
00025
00026 void ADIOI_PVFS2_IReadContig(ADIO_File fd, void *buf, int count,
00027 MPI_Datatype datatype, int file_ptr_type,
00028 ADIO_Offset offset, MPI_Request *request,
00029 int *error_code)
00030 {
00031 ADIOI_PVFS2_AIO_contig(fd, buf, count, datatype, file_ptr_type,
00032 offset, request, READ, error_code);
00033 }
00034
00035 void ADIOI_PVFS2_IWriteContig(ADIO_File fd, void *buf, int count,
00036 MPI_Datatype datatype, int file_ptr_type,
00037 ADIO_Offset offset, MPI_Request *request,
00038 int *error_code)
00039 {
00040 ADIOI_PVFS2_AIO_contig(fd, buf, count, datatype, file_ptr_type,
00041 offset, request, WRITE, error_code);
00042 }
00043
00044 void ADIOI_PVFS2_AIO_contig(ADIO_File fd, void *buf, int count,
00045 MPI_Datatype datatype, int file_ptr_type,
00046 ADIO_Offset offset, MPI_Request *request,
00047 int flag, int *error_code)
00048 {
00049
00050 int ret, datatype_size, len;
00051 ADIOI_PVFS2_fs *pvfs_fs;
00052 ADIOI_AIO_Request *aio_req;
00053 static char myname[] = "ADIOI_PVFS2_AIO_contig";
00054
00055 pvfs_fs = (ADIOI_PVFS2_fs*)fd->fs_ptr;
00056
00057 aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
00058
00059 MPI_Type_size(datatype, &datatype_size);
00060 len = datatype_size * count;
00061
00062 ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->mem_req));
00063
00064 if (ret != 0) {
00065 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00066 MPIR_ERR_RECOVERABLE,
00067 myname, __LINE__,
00068 ADIOI_PVFS2_error_convert(ret),
00069 "Error in pvfs_request_contig (memory)", 0);
00070 return;
00071 }
00072
00073
00074 ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->file_req));
00075
00076 if (ret != 0) {
00077 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00078 MPIR_ERR_RECOVERABLE,
00079 myname, __LINE__,
00080 ADIOI_PVFS2_error_convert(ret),
00081 "Error in pvfs_request_contig (file)", 0);
00082 return;
00083 }
00084
00085
00086 if (file_ptr_type == ADIO_INDIVIDUAL) {
00087
00088 offset = fd->fp_ind;
00089 }
00090 if (flag == READ) {
00091 #ifdef ADIOI_MPE_LOGGING
00092 MPE_Log_event( ADIOI_MPE_iread_a, 0, NULL );
00093 #endif
00094 ret = PVFS_isys_read(pvfs_fs->object_ref, aio_req->file_req, offset,
00095 buf, aio_req->mem_req, &(pvfs_fs->credentials),
00096 &(aio_req->resp_io), &(aio_req->op_id), NULL);
00097 #ifdef ADIOI_MPE_LOGGING
00098 MPE_Log_event( ADIOI_MPE_iread_b, 0, NULL );
00099 #endif
00100 } else if (flag == WRITE) {
00101 #ifdef ADIOI_MPE_LOGGING
00102 MPE_Log_event( ADIOI_MPE_iwrite_a, 0, NULL );
00103 #endif
00104 ret = PVFS_isys_write(pvfs_fs->object_ref, aio_req->file_req, offset,
00105 buf, aio_req->mem_req, &(pvfs_fs->credentials),
00106 &(aio_req->resp_io), &(aio_req->op_id), NULL);
00107 #ifdef ADIOI_MPE_LOGGING
00108 MPE_Log_event( ADIOI_MPE_iwrite_b, 0, NULL );
00109 #endif
00110 }
00111
00112
00113 if (ret < 0 ) {
00114 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00115 MPIR_ERR_RECOVERABLE,
00116 myname, __LINE__,
00117 ADIOI_PVFS2_error_convert(ret),
00118 "Error in PVFS_isys_io", 0);
00119 goto fn_exit;
00120 }
00121
00122
00123
00124 if (ret == 0) {
00125 if (ADIOI_PVFS2_greq_class == 0) {
00126 MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn,
00127 ADIOI_PVFS2_aio_free_fn, MPIU_Greq_cancel_fn,
00128 ADIOI_PVFS2_aio_poll_fn, ADIOI_PVFS2_aio_wait_fn,
00129 &ADIOI_PVFS2_greq_class);
00130 }
00131 MPIX_Grequest_class_allocate(ADIOI_PVFS2_greq_class, aio_req, request);
00132 memcpy(&(aio_req->req), request, sizeof(request));
00133 }
00134
00135
00136 if (ret == 1) {
00137 MPIO_Completed_request_create(&fd, len, error_code, request);
00138 }
00139
00140 if (file_ptr_type == ADIO_INDIVIDUAL) {
00141 fd->fp_ind += len;
00142 }
00143 fd->fp_sys_posn = offset + len;
00144
00145 *error_code = MPI_SUCCESS;
00146 fn_exit:
00147 return;
00148 }
00149
00150 int ADIOI_PVFS2_aio_free_fn(void *extra_state)
00151 {
00152 ADIOI_AIO_Request *aio_req;
00153 aio_req = (ADIOI_AIO_Request*)extra_state;
00154
00155 PVFS_Request_free(&(aio_req->mem_req));
00156 PVFS_Request_free(&(aio_req->file_req));
00157 ADIOI_Free(aio_req);
00158
00159 return MPI_SUCCESS;
00160 }
00161
00162 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status)
00163 {
00164 ADIOI_AIO_Request *aio_req;
00165 int ret, error;
00166
00167 aio_req = (ADIOI_AIO_Request *)extra_state;
00168
00169
00170 ret = PVFS_sys_wait(aio_req->op_id, "ADIOI_PVFS2_aio_poll_fn", &error);
00171 if (ret == 0) {
00172 aio_req->nbytes = aio_req->resp_io.total_completed;
00173 MPI_Grequest_complete(aio_req->req);
00174 return MPI_SUCCESS;
00175 } else
00176 return MPI_UNDEFINED;
00177 }
00178
00179
00180 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states,
00181 double timeout, MPI_Status *status)
00182 {
00183
00184 ADIOI_AIO_Request **aio_reqlist;
00185 PVFS_sys_op_id *op_id_array;
00186 int i,j, greq_count, completed_count=0;
00187 int *error_array;
00188
00189 aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
00190
00191 op_id_array = (PVFS_sys_op_id*)ADIOI_Calloc(count, sizeof(PVFS_sys_op_id));
00192 error_array = (int *)ADIOI_Calloc(count, sizeof(int));
00193 greq_count = count;
00194
00195
00196
00197
00198
00199 while (completed_count < greq_count ) {
00200 count = greq_count;
00201 PVFS_sys_testsome(op_id_array, &count, NULL, error_array, INT_MAX);
00202 completed_count += count;
00203 for (i=0; i< count; i++) {
00204 for (j=0; j<greq_count; j++) {
00205 if (op_id_array[i] == aio_reqlist[j]->op_id) {
00206 aio_reqlist[j]->nbytes =
00207 aio_reqlist[j]->resp_io.total_completed;
00208 MPI_Grequest_complete(aio_reqlist[j]->req);
00209 }
00210 }
00211 }
00212 }
00213 return MPI_SUCCESS;
00214 }
00215
00216
00217
00218
00219