00001
00002
00003
00004
00005
00006
00007
00008 #include "ad_gridftp.h"
00009 #include "adioi.h"
00010 #include "adio_extern.h"
00011
00012 static globus_mutex_t writecontig_ctl_lock;
00013 static globus_cond_t writecontig_ctl_cond;
00014 static globus_bool_t writecontig_ctl_done;
00015 static void writecontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00016 {
00017 if (error)
00018 {
00019 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00020 }
00021 globus_mutex_lock(&writecontig_ctl_lock);
00022 if ( writecontig_ctl_done!=GLOBUS_TRUE )
00023 writecontig_ctl_done=GLOBUS_TRUE;
00024 globus_cond_signal(&writecontig_ctl_cond);
00025 globus_mutex_unlock(&writecontig_ctl_lock);
00026 #ifdef PRINT_ERR_MSG
00027 FPRINTF(stderr,"finished with contig write transaction\n");
00028 #endif
00029 return;
00030 }
00031
00032 static void writecontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
00033 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
00034 globus_bool_t eof)
00035 {
00036 globus_size_t *bytes_written;
00037
00038 bytes_written=(globus_size_t *)myargs;
00039 if (error)
00040 {
00041 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00042 }
00043 *bytes_written+=length;
00044
00045
00046
00047 if ( !eof )
00048 {
00049 globus_ftp_client_register_write(handle,
00050 buffer,
00051 length,
00052 offset,
00053 GLOBUS_TRUE,
00054 writecontig_data_cb,
00055 (void *)(bytes_written));
00056 }
00057 #ifdef PRINT_ERR_MSG
00058 FPRINTF(stderr,"wrote %Ld bytes...",(long long)length);
00059 #endif
00060 return;
00061 }
00062
00063
00064 static globus_mutex_t writediscontig_ctl_lock;
00065 static globus_cond_t writediscontig_ctl_cond;
00066 static globus_bool_t writediscontig_ctl_done;
00067 static void writediscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00068 {
00069 if (error)
00070 {
00071 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00072 }
00073 globus_mutex_lock(&writediscontig_ctl_lock);
00074 if ( writediscontig_ctl_done!=GLOBUS_TRUE )
00075 writediscontig_ctl_done=GLOBUS_TRUE;
00076 globus_cond_signal(&writediscontig_ctl_cond);
00077 globus_mutex_unlock(&writediscontig_ctl_lock);
00078 return;
00079 }
00080
00081 static void writediscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
00082 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
00083 globus_bool_t eof)
00084 {
00085 globus_size_t *bytes_written;
00086
00087 bytes_written=(globus_size_t *)myargs;
00088 if (error)
00089 {
00090 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00091 }
00092 *bytes_written+=length;
00093
00094
00095
00096 if ( !eof )
00097 globus_ftp_client_register_write(handle,
00098 buffer,
00099 length,
00100 offset,
00101 eof,
00102 writediscontig_data_cb,
00103 (void *)(bytes_written));
00104 FPRINTF(stderr,"wrote %Ld bytes...",(long long)length);
00105 return;
00106 }
00107
00108
00109 void ADIOI_GRIDFTP_WriteContig(ADIO_File fd, void *buf, int count,
00110 MPI_Datatype datatype, int file_ptr_type,
00111 ADIO_Offset offset, ADIO_Status *status, int
00112 *error_code)
00113 {
00114 char myname[]="ADIOI_GRIDFTP_WriteContig";
00115 int myrank, nprocs, datatype_size;
00116 globus_size_t len,bytes_written=0;
00117 globus_off_t goff;
00118 globus_result_t result;
00119
00120 if ( fd->access_mode&ADIO_RDONLY )
00121 {
00122 *error_code=MPI_ERR_AMODE;
00123 return;
00124 }
00125
00126 *error_code = MPI_SUCCESS;
00127
00128 MPI_Comm_size(fd->comm, &nprocs);
00129 MPI_Comm_rank(fd->comm, &myrank);
00130 MPI_Type_size(datatype, &datatype_size);
00131
00132 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00133 {
00134 offset = fd->fp_ind;
00135 }
00136
00137
00138 goff = (globus_off_t)offset;
00139 len = ((globus_size_t)datatype_size)*((globus_size_t)count);
00140
00141 globus_mutex_init(&writecontig_ctl_lock, GLOBUS_NULL);
00142 globus_cond_init(&writecontig_ctl_cond, GLOBUS_NULL);
00143 writecontig_ctl_done=GLOBUS_FALSE;
00144 if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
00145 fd->filename,
00146 &(oattr[fd->fd_sys]),
00147 GLOBUS_NULL,
00148 goff,
00149 goff+(globus_off_t)len,
00150 writecontig_ctl_cb,
00151 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00152 {
00153 globus_err_handler("globus_ftp_client_partial_put",myname,result);
00154 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00155 myname, __LINE__, MPI_ERR_IO,
00156 "**io",
00157 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00158 return;
00159 }
00160 if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
00161 (globus_byte_t *)buf,
00162 len,
00163 goff,
00164 GLOBUS_TRUE,
00165 writecontig_data_cb,
00166 (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
00167 {
00168 globus_err_handler("globus_ftp_client_register_write",myname,result);
00169 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00170 myname, __LINE__, MPI_ERR_IO,
00171 "**io",
00172 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00173 return;
00174 }
00175
00176
00177
00178
00179 globus_mutex_lock(&writecontig_ctl_lock);
00180 while ( writecontig_ctl_done!=GLOBUS_TRUE )
00181 globus_cond_wait(&writecontig_ctl_cond,&writecontig_ctl_lock);
00182 globus_mutex_unlock(&writecontig_ctl_lock);
00183
00184 globus_mutex_destroy(&writecontig_ctl_lock);
00185 globus_cond_destroy(&writecontig_ctl_cond);
00186
00187 #ifdef HAVE_STATUS_SET_BYTES
00188 MPIR_Status_set_bytes(status, datatype, bytes_written);
00189 #endif
00190 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00191 {
00192 offset = fd->fp_ind;
00193 fd->fp_ind += bytes_written;
00194 fd->fp_sys_posn = fd->fp_ind;
00195 }
00196 else {
00197 fd->fp_sys_posn = offset + bytes_written;
00198 }
00199 }
00200
00201
00202 void ADIOI_GRIDFTP_WriteDiscontig(ADIO_File fd, void *buf, int count,
00203 MPI_Datatype datatype, int file_ptr_type,
00204 ADIO_Offset offset, ADIO_Status *status, int
00205 *error_code)
00206 {
00207 char myname[]="ADIOI_GRIDFTP_WriteDiscontig";
00208 int myrank,nprocs;
00209 MPI_Aint btype_size,btype_extent;
00210 MPI_Aint ftype_size,ftype_extent;
00211 MPI_Aint etype_size;
00212 MPI_Aint extent;
00213 ADIOI_Flatlist_node *flat_file;
00214 int buf_contig,boff,i,nblks;
00215 globus_off_t start,end,goff;
00216 globus_size_t bytes_written;
00217 globus_result_t result;
00218
00219 MPI_Comm_rank(fd->comm,&myrank);
00220 MPI_Comm_size(fd->comm,&nprocs);
00221 etype_size=fd->etype_size;
00222 MPI_Type_size(fd->filetype,&ftype_size);
00223 MPI_Type_extent(fd->filetype,&ftype_extent);
00224
00225
00226 MPI_Type_size(datatype,&btype_size);
00227 MPI_Type_extent(datatype,&btype_extent);
00228 ADIOI_Datatype_iscontig(datatype,&buf_contig);
00229
00230 if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
00231 {
00232 FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
00233 myrank,nprocs,myname);
00234 fflush(stderr);
00235 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00236 myname, __LINE__, MPI_ERR_IO,
00237 "**io",
00238 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00239 return;
00240 }
00241
00242
00243
00244 ADIOI_Flatten_datatype(fd->filetype);
00245 flat_file = ADIOI_Flatlist;
00246 while (flat_file->type != fd->filetype && flat_file->next!=NULL)
00247 flat_file = flat_file->next;
00248
00249
00250
00251 start=(globus_off_t)(offset*etype_size);
00252 goff=start;
00253 boff=0;
00254 extent=0;
00255 nblks=0;
00256 while ( boff < (count*btype_size) )
00257 {
00258 int blklen;
00259
00260 for (i=0;i<flat_file->count;i++)
00261 {
00262 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
00263 blklen=flat_file->blocklens[i];
00264 else
00265 blklen=(count*btype_size)-boff;
00266 boff+=blklen;
00267 extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
00268 if ( boff>=(count*btype_size) )
00269 break;
00270 }
00271 nblks++;
00272 }
00273 if ( extent < count*btype_size )
00274 {
00275 FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
00276 myrank,nprocs,myname,extent,count*btype_size);
00277 fflush(stderr);
00278 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00279 myname, __LINE__, MPI_ERR_IO,
00280 "**io",
00281 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00282 return;
00283 }
00284 end=start+(globus_off_t)extent;
00285 FPRINTF(stderr,"[%d/%d] %s writing %d bytes into extent of %d bytes starting at offset %Ld\n",
00286 myrank,nprocs,myname,count*btype_size,extent,(long long)start);
00287 fflush(stderr);
00288
00289
00290 globus_mutex_init(&writediscontig_ctl_lock, GLOBUS_NULL);
00291 globus_cond_init(&writediscontig_ctl_cond, GLOBUS_NULL);
00292 writediscontig_ctl_done=GLOBUS_FALSE;
00293 if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
00294 fd->filename,
00295 &(oattr[fd->fd_sys]),
00296 GLOBUS_NULL,
00297 start,
00298 end,
00299 writediscontig_ctl_cb,
00300 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00301 {
00302 globus_err_handler("globus_ftp_client_partial_get",myname,result);
00303 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00304 myname, __LINE__, MPI_ERR_IO,
00305 "**io",
00306 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00307 return;
00308 }
00309
00310
00311 boff=0;
00312 nblks=0;
00313 while ( boff < (count*btype_size) )
00314 {
00315 int i,blklen;
00316
00317 for (i=0;i<flat_file->count;i++)
00318 {
00319 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
00320 blklen=flat_file->blocklens[i];
00321 else
00322 blklen=(count*btype_size)-boff;
00323 if ( blklen > 0 )
00324 {
00325 goff=start+nblks*ftype_extent+((globus_off_t)flat_file->indices[i]);
00326
00327
00328
00329 if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
00330 ((globus_byte_t *)buf)+boff,
00331 (globus_size_t)blklen,
00332 goff,
00333 GLOBUS_TRUE,
00334 writediscontig_data_cb,
00335 (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
00336 {
00337 globus_err_handler("globus_ftp_client_register_write",myname,result);
00338 *error_code=MPI_ERR_IO;
00339 ADIOI_Error(fd,*error_code,myname);
00340 return;
00341 }
00342 boff+=blklen;
00343 if ( boff>=(count*btype_size) )
00344 break;
00345 }
00346 }
00347 nblks++;
00348 }
00349
00350
00351
00352
00353 globus_mutex_lock(&writediscontig_ctl_lock);
00354 while ( writediscontig_ctl_done!=GLOBUS_TRUE )
00355 globus_cond_wait(&writediscontig_ctl_cond,&writediscontig_ctl_lock);
00356 globus_mutex_unlock(&writediscontig_ctl_lock);
00357 globus_mutex_destroy(&writediscontig_ctl_lock);
00358 globus_cond_destroy(&writediscontig_ctl_cond);
00359
00360 #ifdef HAVE_STATUS_SET_BYTES
00361 MPIR_Status_set_bytes(status, datatype, bytes_written);
00362 #endif
00363 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
00364 {
00365 fd->fp_ind += extent;
00366 fd->fp_sys_posn = fd->fp_ind;
00367 }
00368 else {
00369 fd->fp_sys_posn = offset + extent;
00370 }
00371 }
00372
00373
00374 #define GRIDFTP_USE_GENERIC_STRIDED
00375 void ADIOI_GRIDFTP_WriteStrided(ADIO_File fd, void *buf, int count,
00376 MPI_Datatype datatype, int file_ptr_type,
00377 ADIO_Offset offset, ADIO_Status *status,
00378 int *error_code)
00379 {
00380 #ifdef GRIDFTP_USE_GENERIC_STRIDED
00381 int myrank, nprocs;
00382
00383 if ( fd->access_mode&ADIO_RDONLY )
00384 {
00385 *error_code=MPI_ERR_AMODE;
00386 return;
00387 }
00388
00389 *error_code = MPI_SUCCESS;
00390
00391 MPI_Comm_size(fd->comm, &nprocs);
00392 MPI_Comm_rank(fd->comm, &myrank);
00393
00394 ADIOI_GEN_WriteStrided(fd, buf, count, datatype, file_ptr_type, offset,
00395 status, error_code);
00396 return;
00397 #else
00398 char myname[]="ADIOI_GRIDFTP_WriteStrided";
00399 int myrank, nprocs;
00400 int buf_contig,file_contig;
00401 MPI_Aint btype_size,bufsize;
00402 globus_byte_t *intermediate;
00403
00404 *error_code = MPI_SUCCESS;
00405
00406 MPI_Comm_size(fd->comm, &nprocs);
00407 MPI_Comm_rank(fd->comm, &myrank);
00408
00409 MPI_Type_size(datatype,&btype_size);
00410 bufsize=count*btype_size;
00411 ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
00412 ADIOI_Datatype_iscontig(datatype,&buf_contig);
00413 if ( buf_contig && !file_contig )
00414 {
00415
00416 FPRINTF(stderr,"[%d/%d] %s called w/ contig mem, discontig file\n",
00417 myrank,nprocs,myname);
00418 fflush(stderr);
00419
00420 ADIOI_GRIDFTP_WriteDiscontig(fd, buf, count, datatype,
00421 file_ptr_type, offset, status, error_code);
00422 }
00423 else if ( !buf_contig && file_contig )
00424 {
00425
00426 int posn=0;
00427
00428 FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, contig file\n",
00429 myrank,nprocs,myname);
00430 fflush(stderr);
00431
00432
00433
00434 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
00435 MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
00436
00437
00438 ADIOI_GRIDFTP_WriteContig(fd, intermediate, bufsize, MPI_BYTE,
00439 file_ptr_type, offset, status, error_code);
00440
00441 ADIOI_Free(intermediate);
00442 }
00443 else if ( !buf_contig && !file_contig )
00444 {
00445
00446 int posn=0;
00447
00448 FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, discontig file\n",
00449 myrank,nprocs,myname);
00450 fflush(stderr);
00451
00452
00453 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
00454 MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
00455
00456
00457 ADIOI_GRIDFTP_WriteDiscontig(fd, intermediate, bufsize, MPI_BYTE,
00458 file_ptr_type, offset, status, error_code);
00459
00460 ADIOI_Free(intermediate);
00461 }
00462 else
00463 {
00464
00465 FPRINTF(stderr,"[%d/%d] Why the heck did you call %s with contiguous buffer *and* file types?\n",
00466 myrank,nprocs,myname);
00467 ADIOI_GRIDFTP_WriteContig(fd, buf, count, datatype,
00468 file_ptr_type, offset, status, error_code);
00469 }
00470 #endif
00471 }
00472