00001
00002
00003
00004
00005
00006
00007
00008 #include "ad_gridftp.h"
00009 #include "adioi.h"
00010 #include "adio_extern.h"
00011
00012 static globus_mutex_t readcontig_ctl_lock;
00013 static globus_cond_t readcontig_ctl_cond;
00014 static globus_bool_t readcontig_ctl_done;
00015 static void readcontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00016 {
00017 if (error)
00018 {
00019 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00020 }
00021 globus_mutex_lock(&readcontig_ctl_lock);
00022 if ( readcontig_ctl_done!=GLOBUS_TRUE )
00023 readcontig_ctl_done=GLOBUS_TRUE;
00024 globus_cond_signal(&readcontig_ctl_cond);
00025 globus_mutex_unlock(&readcontig_ctl_lock);
00026 return;
00027 }
00028
00029 static void readcontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
00030 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
00031 globus_bool_t eof)
00032 {
00033 globus_size_t *bytes_read;
00034
00035 bytes_read=(globus_size_t *)myargs;
00036 if (error)
00037 {
00038 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00039 }
00040 *bytes_read+=length;
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 if ( !eof )
00054 globus_ftp_client_register_read(handle,
00055 buffer+length,
00056 length,
00057 readcontig_data_cb,
00058 (void *)(bytes_read));
00059 return;
00060 }
00061
00062 static globus_mutex_t readdiscontig_ctl_lock;
00063 static globus_cond_t readdiscontig_ctl_cond;
00064 static globus_bool_t readdiscontig_ctl_done;
00065 static void readdiscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00066 {
00067 if (error)
00068 {
00069 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00070 }
00071 globus_mutex_lock(&readdiscontig_ctl_lock);
00072 if ( readdiscontig_ctl_done!=GLOBUS_TRUE )
00073 readdiscontig_ctl_done=GLOBUS_TRUE;
00074 globus_cond_signal(&readdiscontig_ctl_cond);
00075 globus_mutex_unlock(&readdiscontig_ctl_lock);
00076 return;
00077 }
00078
00079 static void readdiscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
00080 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
00081 globus_bool_t eof)
00082 {
00083 globus_size_t *bytes_read;
00084
00085 bytes_read=(globus_size_t *)myargs;
00086 if (error)
00087 {
00088 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00089 }
00090 *bytes_read+=length;
00091
00092
00093
00094 if ( !eof )
00095 globus_ftp_client_register_read(handle,
00096 buffer,
00097 length,
00098 readdiscontig_data_cb,
00099 (void *)(bytes_read));
00100 return;
00101 }
00102
00103 void ADIOI_GRIDFTP_ReadContig(ADIO_File fd, void *buf, int count,
00104 MPI_Datatype datatype, int file_ptr_type,
00105 ADIO_Offset offset, ADIO_Status *status, int
00106 *error_code)
00107 {
00108 static char myname[]="ADIOI_GRIDFTP_ReadContig";
00109 int myrank, nprocs, datatype_size;
00110 globus_size_t len,bytes_read=0;
00111 globus_off_t goff;
00112 globus_result_t result;
00113
00114 if ( fd->access_mode&ADIO_WRONLY )
00115 {
00116 *error_code=MPIR_ERR_MODE_WRONLY;
00117 return;
00118 }
00119
00120 *error_code = MPI_SUCCESS;
00121
00122 MPI_Comm_size(fd->comm, &nprocs);
00123 MPI_Comm_rank(fd->comm, &myrank);
00124 MPI_Type_size(datatype, &datatype_size);
00125
00126 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00127 {
00128 offset = fd->fp_ind;
00129 }
00130
00131
00132 goff = (globus_off_t)offset;
00133 len = ((globus_size_t)datatype_size)*((globus_size_t)count);
00134
00135 globus_mutex_init(&readcontig_ctl_lock, GLOBUS_NULL);
00136 globus_cond_init(&readcontig_ctl_cond, GLOBUS_NULL);
00137 readcontig_ctl_done=GLOBUS_FALSE;
00138 if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
00139 fd->filename,
00140 &(oattr[fd->fd_sys]),
00141 GLOBUS_NULL,
00142 goff,
00143 goff+(globus_off_t)len,
00144 readcontig_ctl_cb,
00145 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00146 {
00147 globus_err_handler("globus_ftp_client_partial_get",myname,result);
00148 *error_code=MPI_ERR_IO;
00149 ADIOI_Error(fd,*error_code,myname);
00150 return;
00151 }
00152 result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
00153 (globus_byte_t *)buf, len, readcontig_data_cb,
00154 (void *)(&bytes_read));
00155 if ( result != GLOBUS_SUCCESS )
00156 {
00157 globus_err_handler("globus_ftp_client_register_read",myname,result);
00158 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00159 MPIR_ERR_RECOVERABLE, myname, __LINE__,
00160 MPI_ERR_IO, "**io", "**io %s",
00161 globus_object_printable_to_string(globus_error_get(result)));
00162 return;
00163 }
00164
00165
00166
00167
00168 globus_mutex_lock(&readcontig_ctl_lock);
00169 while ( readcontig_ctl_done!=GLOBUS_TRUE )
00170 globus_cond_wait(&readcontig_ctl_cond,&readcontig_ctl_lock);
00171 globus_mutex_unlock(&readcontig_ctl_lock);
00172
00173 globus_mutex_destroy(&readcontig_ctl_lock);
00174 globus_cond_destroy(&readcontig_ctl_cond);
00175
00176 #ifdef HAVE_STATUS_SET_BYTES
00177 MPIR_Status_set_bytes(status, datatype, bytes_read);
00178 #endif
00179 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00180 {
00181 fd->fp_ind += bytes_read;
00182 fd->fp_sys_posn = fd->fp_ind;
00183 }
00184 else {
00185 fd->fp_sys_posn = offset + bytes_read;
00186 }
00187 }
00188
00189 void ADIOI_GRIDFTP_ReadDiscontig(ADIO_File fd, void *buf, int count,
00190 MPI_Datatype datatype, int file_ptr_type,
00191 ADIO_Offset offset, ADIO_Status *status, int
00192 *error_code)
00193 {
00194 char myname[]="ADIOI_GRIDFTP_ReadDiscontig";
00195 int myrank,nprocs;
00196
00197 MPI_Aint btype_size,btype_extent;
00198
00199 MPI_Aint ftype_size,ftype_extent;
00200
00201 MPI_Aint etype_size;
00202 MPI_Aint extent;
00203 ADIOI_Flatlist_node *flat_file;
00204 int i,buf_contig,boff,nblks;
00205 globus_off_t start,end,goff;
00206 globus_size_t bytes_read;
00207 globus_result_t result;
00208 globus_byte_t *tmp;
00209
00210 if ( fd->access_mode&ADIO_WRONLY )
00211 {
00212 *error_code=MPIR_ERR_MODE_WRONLY;
00213 return;
00214 }
00215
00216 *error_code=MPI_SUCCESS;
00217
00218 MPI_Comm_rank(fd->comm,&myrank);
00219 MPI_Comm_size(fd->comm,&nprocs);
00220
00221 etype_size=fd->etype_size;
00222 MPI_Type_size(fd->filetype,&ftype_size);
00223 MPI_Type_extent(fd->filetype,&ftype_extent);
00224
00225
00226 MPI_Type_size(datatype,&btype_size);
00227 MPI_Type_extent(datatype,&btype_extent);
00228 ADIOI_Datatype_iscontig(datatype,&buf_contig);
00229
00230 if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
00231 {
00232 FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
00233 myrank,nprocs,myname);
00234 fflush(stderr);
00235 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00236 MPIR_ERR_RECOVERABLE, myname, __LINE__,
00237 MPI_ERR_IO, "**io", 0 );
00238 return;
00239 }
00240
00241
00242
00243 ADIOI_Flatten_datatype(fd->filetype);
00244 flat_file = ADIOI_Flatlist;
00245 while (flat_file->type != fd->filetype && flat_file->next!=NULL)
00246 flat_file = flat_file->next;
00247
00248
00249 start=(globus_off_t)(offset*etype_size);
00250 goff=start;
00251 boff=0;
00252 extent=0;
00253 nblks=0;
00254 while ( boff < (count*btype_size) )
00255 {
00256 int blklen=0;
00257
00258 for (i=0;i<flat_file->count;i++)
00259 {
00260
00261 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
00262 blklen=flat_file->blocklens[i];
00263 else
00264 blklen=(count*btype_size)-boff;
00265
00266 boff+=blklen;
00267
00268
00269
00270 extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
00271 if ( boff>=(count*btype_size) )
00272 break;
00273 }
00274 nblks++;
00275 }
00276 if ( extent < count*btype_size )
00277 {
00278 FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
00279 myrank,nprocs,myname,extent,count*btype_size);
00280 fflush(stderr);
00281 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00282 MPIR_ERR_RECOVERABLE, myname, __LINE__,
00283 MPI_ERR_IO, "**io", 0);
00284 return;
00285 }
00286 end=start+(globus_off_t)extent;
00287 tmp=(globus_byte_t *)ADIOI_Malloc((size_t)extent*sizeof(globus_byte_t));
00288
00289
00290 globus_mutex_init(&readdiscontig_ctl_lock, GLOBUS_NULL);
00291 globus_cond_init(&readdiscontig_ctl_cond, GLOBUS_NULL);
00292 readdiscontig_ctl_done=GLOBUS_FALSE;
00293 if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
00294 fd->filename,
00295 &(oattr[fd->fd_sys]),
00296 GLOBUS_NULL,
00297 start,
00298 end,
00299 readdiscontig_ctl_cb,
00300 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00301 {
00302 globus_err_handler("globus_ftp_client_partial_get",myname,result);
00303 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00304 MPIR_ERR_RECOVERABLE, myname, __LINE__,
00305 MPI_ERR_IO, "**io", "**io %s",
00306 globus_object_printable_to_string(globus_error_get(result)));
00307 return;
00308 }
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 if ( (result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
00319 tmp,
00320 (globus_size_t)extent,
00321 readdiscontig_data_cb,
00322 (void *)(&bytes_read)))!=GLOBUS_SUCCESS )
00323 {
00324 globus_err_handler("globus_ftp_client_register_read",myname,result);
00325 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00326 myname, __LINE__, MPI_ERR_IO,
00327 "**io",
00328 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00329 return;
00330 }
00331
00332
00333 globus_mutex_lock(&readdiscontig_ctl_lock);
00334 while ( readdiscontig_ctl_done!=GLOBUS_TRUE )
00335 globus_cond_wait(&readdiscontig_ctl_cond,&readdiscontig_ctl_lock);
00336 globus_mutex_unlock(&readdiscontig_ctl_lock);
00337
00338 globus_mutex_destroy(&readdiscontig_ctl_lock);
00339 globus_cond_destroy(&readdiscontig_ctl_cond);
00340
00341 boff=0;
00342 nblks=0;
00343 goff=0;
00344 while ( boff < (count*btype_size) )
00345 {
00346 int i,blklen;
00347
00348 for (i=0;i<flat_file->count;i++)
00349 {
00350 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
00351 blklen=flat_file->blocklens[i];
00352 else
00353 blklen=(count*btype_size)-boff;
00354 if ( blklen > 0 )
00355 {
00356 goff=nblks*ftype_extent+flat_file->indices[i];
00357 memcpy((globus_byte_t *)buf+boff,tmp+goff,(size_t)blklen);
00358 boff+=blklen;
00359 if ( boff>=(count*btype_size) )
00360 break;
00361 }
00362 }
00363 nblks++;
00364 }
00365 ADIOI_Free(tmp);
00366
00367 #ifdef HAVE_STATUS_SET_BYTES
00368 MPIR_Status_set_bytes(status, datatype, bytes_read);
00369 #endif
00370 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00371 {
00372 fd->fp_ind += extent;
00373 fd->fp_sys_posn = fd->fp_ind;
00374 }
00375 else {
00376 fd->fp_sys_posn = offset + extent;
00377 }
00378 }
00379
00380 void ADIOI_GRIDFTP_ReadStrided(ADIO_File fd, void *buf, int count,
00381 MPI_Datatype datatype, int file_ptr_type,
00382 ADIO_Offset offset, ADIO_Status *status, int
00383 *error_code)
00384 {
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404 char myname[]="ADIOI_GRIDFTP_ReadStrided";
00405 int myrank, nprocs;
00406 int i,j;
00407 int buf_contig,file_contig;
00408 MPI_Aint btype_size,bufsize;
00409 globus_off_t start,disp;
00410 globus_size_t bytes_read;
00411 globus_byte_t *intermediate;
00412
00413 *error_code = MPI_SUCCESS;
00414
00415 MPI_Comm_size(fd->comm, &nprocs);
00416 MPI_Comm_rank(fd->comm, &myrank);
00417
00418 MPI_Type_size(datatype,&btype_size);
00419 bufsize=count*btype_size;
00420 ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
00421 ADIOI_Datatype_iscontig(datatype,&buf_contig);
00422 if ( buf_contig && !file_contig )
00423 {
00424
00425 ADIOI_GRIDFTP_ReadDiscontig(fd, buf, count, datatype,
00426 file_ptr_type, offset, status, error_code);
00427 }
00428 else if ( !buf_contig && file_contig )
00429 {
00430
00431 int posn=0;
00432
00433
00434 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
00435 ADIOI_GRIDFTP_ReadContig(fd, intermediate, bufsize, MPI_BYTE,
00436 file_ptr_type, offset, status, error_code);
00437
00438
00439 MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
00440
00441 ADIOI_Free(intermediate);
00442 }
00443 else if ( !buf_contig && !file_contig )
00444 {
00445
00446 int posn=0;
00447
00448
00449 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
00450 ADIOI_GRIDFTP_ReadDiscontig(fd, intermediate, bufsize, MPI_BYTE,
00451 file_ptr_type, offset, status, error_code);
00452
00453
00454 posn=0;
00455 MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
00456
00457 ADIOI_Free(intermediate);
00458 }
00459 else
00460 {
00461
00462 ADIOI_GRIDFTP_ReadContig(fd, buf, count, datatype,
00463 file_ptr_type, offset, status, error_code);
00464 }
00465
00466 }
00467