00001
00002
00003
00004
00005
00006
00007 #include "ad_ntfs.h"
00008
00009 #include "../../mpi-io/mpioimpl.h"
00010 #include "../../mpi-io/mpioprof.h"
00011 #include "mpiu_greq.h"
00012
00013 static MPIX_Grequest_class ADIOI_NTFS_greq_class = 0;
00014
00015
00016
00017 void ADIOI_NTFS_Strerror(int error, char *errMsg, int errMsgLen)
00018 {
00019 LPTSTR str;
00020 int num_bytes;
00021 num_bytes = FormatMessage(
00022 FORMAT_MESSAGE_FROM_SYSTEM |
00023 FORMAT_MESSAGE_ALLOCATE_BUFFER,
00024 NULL,
00025 error,
00026 0,
00027 &str,
00028 FORMAT_MESSAGE_MIN_SIZE,
00029 0);
00030 if (num_bytes == 0)
00031 {
00032 strncpy(errMsg, "\0", errMsgLen);
00033 }
00034 else
00035 {
00036 strncpy(errMsg, str, errMsgLen);
00037 LocalFree(str);
00038 }
00039 }
00040
00041
00042 int ADIOI_NTFS_aio_poll_fn(void *extra_state, MPI_Status *status)
00043 {
00044 ADIOI_AIO_Request *aio_req;
00045 int mpi_errno = MPI_SUCCESS;
00046
00047
00048
00049
00050 aio_req = (ADIOI_AIO_Request *)extra_state;
00051
00052
00053 if(!GetOverlappedResult( aio_req->fd, aio_req->lpOvl,
00054 &(aio_req->nbytes), FALSE)){
00055 if(GetLastError() == ERROR_IO_INCOMPLETE){
00056
00057
00058 }else{
00059
00060
00061 }
00062 }else{
00063 mpi_errno = MPI_Grequest_complete(aio_req->req);
00064 if (mpi_errno != MPI_SUCCESS) {
00065 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS,
00066 MPIR_ERR_RECOVERABLE,
00067 "ADIOI_NTFS_aio_poll_fn", __LINE__,
00068 MPI_ERR_IO, "**mpi_grequest_complete",
00069 0);
00070 }
00071 }
00072 return mpi_errno;
00073 }
00074
00075
00076
00077 int ADIOI_NTFS_aio_wait_fn(int count, void **array_of_states,
00078 double timeout, MPI_Status *status)
00079 {
00080 int i, mpi_errno = MPI_SUCCESS;
00081 ADIOI_AIO_Request **aio_reqlist;
00082 LPHANDLE lpHandles;
00083 DWORD retObject=0;
00084
00085
00086
00087 aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
00088 lpHandles = (LPHANDLE) ADIOI_Calloc(count, sizeof(HANDLE));
00089 if (lpHandles == NULL)
00090 {
00091 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00092 "ADIOI_NTFS_aio_wait_fn", __LINE__, MPI_ERR_IO,
00093 "**nomem", "**nomem %s", "Event handles");
00094 return mpi_errno;
00095 }
00096
00097 for(i=0; i<count; i++){
00098 lpHandles[i] = (aio_reqlist[i])->lpOvl->hEvent;
00099 }
00100
00101
00102
00103 timeout = (timeout <= 0) ? INFINITE : (timeout * 1000);
00104
00105 if((retObject = WaitForMultipleObjects(count, lpHandles,
00106 FALSE, timeout)) != WAIT_FAILED){
00107 retObject = retObject - WAIT_OBJECT_0;
00108 if(GetOverlappedResult( aio_reqlist[retObject]->fd,
00109 aio_reqlist[retObject]->lpOvl, &(aio_reqlist[retObject]->nbytes),
00110 FALSE)){
00111
00112 mpi_errno = MPI_Grequest_complete(aio_reqlist[retObject]->req);
00113 if (mpi_errno != MPI_SUCCESS) {
00114 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS,
00115 MPIR_ERR_RECOVERABLE,
00116 "ADIOI_NTFS_aio_wait_fn", __LINE__,
00117 MPI_ERR_IO, "**mpi_grequest_complete",
00118 0);
00119 }
00120 }else{
00121 if(GetLastError() == ERROR_IO_INCOMPLETE){
00122
00123
00124 }else{
00125
00126
00127 }
00128 }
00129 }else{
00130
00131 }
00132 ADIOI_Free(lpHandles);
00133 return mpi_errno;
00134 }
00135
00136 int ADIOI_NTFS_aio_query_fn(void *extra_state, MPI_Status *status)
00137 {
00138 ADIOI_AIO_Request *aio_req;
00139
00140 aio_req = (ADIOI_AIO_Request *)extra_state;
00141
00142
00143 MPI_Status_set_elements(status, MPI_BYTE, aio_req->nbytes);
00144
00145
00146 MPI_Status_set_cancelled(status, 0);
00147
00148
00149 status->MPI_SOURCE = MPI_UNDEFINED;
00150
00151 status->MPI_TAG = MPI_UNDEFINED;
00152
00153 return MPI_SUCCESS;
00154 }
00155
00156
00157 int ADIOI_NTFS_aio_free_fn(void *extra_state)
00158 {
00159 ADIOI_AIO_Request *aio_req;
00160
00161
00162 aio_req = (ADIOI_AIO_Request*)extra_state;
00163 CloseHandle(aio_req->lpOvl->hEvent);
00164 ADIOI_Free(aio_req->lpOvl);
00165 ADIOI_Free(aio_req);
00166 return MPI_SUCCESS;
00167 }
00168
00169 void ADIOI_NTFS_IwriteContig(ADIO_File fd, void *buf, int count,
00170 MPI_Datatype datatype, int file_ptr_type,
00171 ADIO_Offset offset, ADIO_Request *request,
00172 int *error_code)
00173 {
00174 int len, typesize;
00175 int err;
00176 static char myname[] = "ADIOI_NTFS_IwriteContig";
00177
00178 MPI_Type_size(datatype, &typesize);
00179 len = count * typesize;
00180
00181 if (file_ptr_type == ADIO_INDIVIDUAL)
00182 {
00183 offset = fd->fp_ind;
00184 }
00185 err = ADIOI_NTFS_aio(fd, buf, len, offset, 1, request);
00186 if (file_ptr_type == ADIO_INDIVIDUAL)
00187 {
00188 fd->fp_ind += len;
00189 }
00190
00191
00192 if (err != MPI_SUCCESS)
00193 {
00194 *error_code = MPIO_Err_create_code(err, MPIR_ERR_RECOVERABLE,
00195 myname, __LINE__, MPI_ERR_IO,
00196 "**io", 0);
00197 return;
00198 }
00199
00200 *error_code = MPI_SUCCESS;
00201
00202 fd->fp_sys_posn = -1;
00203 }
00204
00205
00206
00207
00208
00209
00210
00211 int ADIOI_NTFS_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
00212 int wr, MPI_Request *request)
00213 {
00214 static char myname[] = "ADIOI_NTFS_aio";
00215
00216 ADIOI_AIO_Request *aio_req;
00217 static DWORD dwNumWritten, dwNumRead;
00218 BOOL ret_val = FALSE;
00219 FDTYPE fd_sys;
00220 int mpi_errno = MPI_SUCCESS;
00221 DWORD err;
00222
00223 fd_sys = fd->fd_sys;
00224
00225 aio_req = (ADIOI_AIO_Request *)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
00226 if (aio_req == NULL)
00227 {
00228 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00229 myname, __LINE__, MPI_ERR_IO,
00230 "**nomem", "**nomem %s", "AIO_REQ");
00231 return mpi_errno;
00232 }
00233 aio_req->lpOvl = (LPOVERLAPPED ) ADIOI_Calloc(sizeof(OVERLAPPED), 1);
00234 if (aio_req->lpOvl == NULL)
00235 {
00236 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00237 myname, __LINE__, MPI_ERR_IO,
00238 "**nomem", "**nomem %s", "OVERLAPPED");
00239 ADIOI_Free(aio_req);
00240 return mpi_errno;
00241 }
00242 aio_req->lpOvl->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
00243 if (aio_req->lpOvl->hEvent == NULL)
00244 {
00245 char errMsg[ADIOI_NTFS_ERR_MSG_MAX];
00246 err = GetLastError();
00247 ADIOI_NTFS_Strerror(err, errMsg, ADIOI_NTFS_ERR_MSG_MAX);
00248 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00249 myname, __LINE__, MPI_ERR_IO,
00250 "**io", "**io %s", errMsg);
00251 ADIOI_Free(aio_req->lpOvl);
00252 ADIOI_Free(aio_req);
00253 return mpi_errno;
00254 }
00255 aio_req->lpOvl->Offset = DWORDLOW(offset);
00256 aio_req->lpOvl->OffsetHigh = DWORDHIGH(offset);
00257 aio_req->fd = fd_sys;
00258
00259
00260 if (wr)
00261 {
00262 ret_val = WriteFile(fd_sys, buf, len, &dwNumWritten, aio_req->lpOvl);
00263 }
00264 else
00265 {
00266 ret_val = ReadFile(fd_sys, buf, len, &dwNumRead, aio_req->lpOvl);
00267 }
00268
00269
00270 if (ret_val == FALSE)
00271 {
00272 mpi_errno = GetLastError();
00273 if (mpi_errno != ERROR_IO_PENDING)
00274 {
00275 char errMsg[ADIOI_NTFS_ERR_MSG_MAX];
00276 ADIOI_NTFS_Strerror(mpi_errno, errMsg, ADIOI_NTFS_ERR_MSG_MAX);
00277 mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00278 myname, __LINE__, MPI_ERR_IO,
00279 "**io",
00280 "**io %s", errMsg);
00281 return mpi_errno;
00282 }
00283 mpi_errno = MPI_SUCCESS;
00284 }
00285
00286
00287
00288 if (ADIOI_NTFS_greq_class == 0) {
00289 mpi_errno = MPIX_Grequest_class_create(ADIOI_NTFS_aio_query_fn,
00290 ADIOI_NTFS_aio_free_fn, MPIU_Greq_cancel_fn,
00291 ADIOI_NTFS_aio_poll_fn, ADIOI_NTFS_aio_wait_fn,
00292 &ADIOI_NTFS_greq_class);
00293 if(mpi_errno != MPI_SUCCESS){
00294
00295 }
00296 }
00297 mpi_errno = MPIX_Grequest_class_allocate(ADIOI_NTFS_greq_class, aio_req, request);
00298 if(mpi_errno != MPI_SUCCESS){
00299
00300 }
00301 memcpy(&(aio_req->req), request, sizeof(MPI_Request));
00302 return mpi_errno;
00303 }