00001
00002
00003
00004
00005
00006
00007 #include "ad_gridftp.h"
00008 #include "adioi.h"
00009
00010 static globus_mutex_t lock;
00011 static globus_cond_t cond;
00012
00013 static globus_bool_t file_exists,exists_done;
00014 static void exists_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00015 {
00016 if (error)
00017 {
00018 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00019 }
00020 else
00021 {
00022 file_exists=GLOBUS_TRUE;
00023 }
00024 exists_done=GLOBUS_TRUE;
00025 }
00026
00027 static globus_bool_t touch_ctl_done;
00028 static void touch_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
00029 {
00030 if (error)
00031 {
00032 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00033 }
00034 globus_mutex_lock(&lock);
00035 touch_ctl_done=GLOBUS_TRUE;
00036 globus_cond_signal(&cond);
00037 globus_mutex_unlock(&lock);
00038 }
00039
00040 static void touch_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
00041 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
00042 globus_bool_t eof)
00043 {
00044 if (error)
00045 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
00046 globus_ftp_client_register_read(handle,buffer,length,touch_data_cb,myargs);
00047 return;
00048 }
00049
00050 void ADIOI_GRIDFTP_Open(ADIO_File fd, int *error_code)
00051 {
00052 static char myname[]="ADIOI_GRIDFTP_Open";
00053 int myrank, nprocs, keyfound;
00054 char hintval[MPI_MAX_INFO_VAL+1];
00055 globus_ftp_client_handleattr_t hattr;
00056 globus_result_t result;
00057
00058 MPI_Comm_size(fd->comm, &nprocs);
00059 MPI_Comm_rank(fd->comm, &myrank);
00060
00061
00062
00063 globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
00064 fd->fd_sys = num_gridftp_handles;
00065
00066 fd->shared_fp_fname = NULL;
00067 *error_code = MPI_SUCCESS;
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 result=globus_ftp_client_handleattr_init(&hattr);
00078 if ( result != GLOBUS_SUCCESS )
00079 {
00080
00081
00082 globus_err_handler("globus_ftp_client_handleattr_init",
00083 myname,result);
00084 fd->fd_sys = -1;
00085 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00086 myname, __LINE__, MPI_ERR_IO,
00087 "**io",
00088 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00089 return;
00090 }
00091 result = globus_ftp_client_operationattr_init(&(oattr[fd->fd_sys]));
00092 if ( result != GLOBUS_SUCCESS )
00093 {
00094 globus_err_handler("globus_ftp_client_operationattr_init",
00095 myname,result);
00096 fd->fd_sys = -1;
00097 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00098 myname, __LINE__, MPI_ERR_IO,
00099 "**io",
00100 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00101 return;
00102 }
00103
00104
00105
00106 result=globus_ftp_client_handleattr_set_cache_all(&hattr,GLOBUS_TRUE);
00107 if ( result !=GLOBUS_SUCCESS )
00108 globus_err_handler("globus_ftp_client_handleattr_set_cache_all",myname,result);
00109
00110
00111 if ( (fd->access_mode&ADIO_RDONLY) &&
00112 (result=globus_ftp_client_handleattr_add_cached_url(&hattr,fd->filename))!=GLOBUS_SUCCESS )
00113 globus_err_handler("globus_ftp_client_handleattr_add_cached_url",myname,result);
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 if ( (fd->access_mode&ADIO_APPEND) &&
00132 ((result=globus_ftp_client_operationattr_set_append(&(oattr[fd->fd_sys]),GLOBUS_TRUE))!=GLOBUS_SUCCESS) )
00133 globus_err_handler("globus_ftp_client_operationattr_set_append",myname,result);
00134
00135
00136
00137 if ( fd->info!=MPI_INFO_NULL )
00138 {
00139 ADIOI_Info_get(fd->info,"ftp_control_mode",MPI_MAX_INFO_VAL,hintval,&keyfound);
00140 if ( keyfound )
00141 {
00142 if ( ( !strcmp(hintval,"extended") || !strcmp(hintval,"extended_block") ) &&
00143 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK))!=GLOBUS_SUCCESS )
00144 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
00145 else if ( !strcmp(hintval,"block") &&
00146 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_BLOCK))!=GLOBUS_SUCCESS )
00147 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
00148 else if ( !strcmp(hintval,"compressed") &&
00149 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_COMPRESSED))!=GLOBUS_SUCCESS )
00150 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
00151 else if ( !strcmp(hintval,"stream") &&
00152 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_STREAM))!=GLOBUS_SUCCESS )
00153 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
00154 }
00155
00156 ADIOI_Info_get(fd->info,"parallelism",MPI_MAX_INFO_VAL,hintval,&keyfound);
00157 if ( keyfound )
00158 {
00159 int nftpthreads;
00160
00161 if ( sscanf(hintval,"%d",&nftpthreads)==1 )
00162 {
00163 globus_ftp_control_parallelism_t parallelism;
00164
00165 parallelism.mode = GLOBUS_FTP_CONTROL_PARALLELISM_FIXED;
00166 parallelism.fixed.size = nftpthreads;
00167 if ( (result=globus_ftp_client_operationattr_set_parallelism(&(oattr[fd->fd_sys]),
00168 ¶llelism))!=GLOBUS_SUCCESS )
00169 globus_err_handler("globus_ftp_client_operationattr_set_parallelism",myname,result);
00170 }
00171 }
00172
00173 ADIOI_Info_get(fd->info,"striped_ftp",MPI_MAX_INFO_VAL,hintval,&keyfound);
00174 if ( keyfound )
00175 {
00176
00177 if ( !strncmp("true",hintval,4) || !strncmp("TRUE",hintval,4) ||
00178 !strncmp("enable",hintval,4) || !strncmp("ENABLE",hintval,4) )
00179 {
00180 ADIOI_Info_get(fd->info,"striping_factor",MPI_MAX_INFO_VAL,hintval,&keyfound);
00181 if ( keyfound )
00182 {
00183 int striping_factor;
00184
00185 if ( sscanf(hintval,"%d",&striping_factor)==1 )
00186 {
00187 globus_ftp_control_layout_t layout;
00188
00189 layout.mode = GLOBUS_FTP_CONTROL_STRIPING_BLOCKED_ROUND_ROBIN;
00190 layout.round_robin.block_size = striping_factor;
00191 if ( (result=globus_ftp_client_operationattr_set_layout(&(oattr[fd->fd_sys]),
00192 &layout))!=GLOBUS_SUCCESS )
00193 globus_err_handler("globus_ftp_client_operationattr_set_layout",
00194 myname,result);
00195 }
00196 }
00197 }
00198 }
00199
00200 ADIOI_Info_get(fd->info,"tcp_buffer",MPI_MAX_INFO_VAL,hintval,&keyfound);
00201 if ( keyfound )
00202 {
00203
00204 int buffer_size;
00205 if ( sscanf(hintval,"%d",&buffer_size)==1 )
00206 {
00207 globus_ftp_control_tcpbuffer_t tcpbuf;
00208
00209 tcpbuf.mode = GLOBUS_FTP_CONTROL_TCPBUFFER_FIXED;
00210 tcpbuf.fixed.size = buffer_size;
00211 if ( (result=globus_ftp_client_operationattr_set_tcp_buffer(&(oattr[fd->fd_sys]),
00212 &tcpbuf))!=GLOBUS_SUCCESS )
00213 globus_err_handler("globus_ftp_client_operationattr_set_tcp_buffer",myname,result);
00214 }
00215 }
00216
00217 ADIOI_Info_get(fd->info,"transfer_type",MPI_MAX_INFO_VAL,hintval,&keyfound);
00218 if ( keyfound )
00219 {
00220 globus_ftp_control_type_t filetype;
00221
00222 if ( !strcmp("ascii",hintval) || !strcmp("ASCII",hintval) )
00223 {
00224 filetype=GLOBUS_FTP_CONTROL_TYPE_ASCII;
00225 }
00226 else
00227 {
00228 filetype=GLOBUS_FTP_CONTROL_TYPE_IMAGE;
00229 }
00230 if ( (result=globus_ftp_client_operationattr_set_type(&(oattr[fd->fd_sys]),filetype))!=GLOBUS_SUCCESS )
00231 globus_err_handler("globus_ftp_client_operationattr_set_type",myname,result);
00232 }
00233 }
00234 else
00235 FPRINTF(stderr,"no MPI_Info object associated with %s\n",fd->filename);
00236
00237
00238 result=globus_ftp_client_handle_init(&(gridftp_fh[fd->fd_sys]),&hattr);
00239 if ( result != GLOBUS_SUCCESS )
00240 {
00241 globus_err_handler("globus_ftp_client_handle_init",myname,result);
00242 fd->fd_sys = -1;
00243 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00244 myname, __LINE__, MPI_ERR_IO,
00245 "**io",
00246 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
00247 return;
00248 }
00249
00250
00251 globus_mutex_init(&lock, GLOBUS_NULL);
00252 globus_cond_init(&cond, GLOBUS_NULL);
00253 file_exists=GLOBUS_FALSE;
00254 exists_done=GLOBUS_FALSE;
00255 if ( myrank==0 )
00256 {
00257 if ( (result=globus_ftp_client_exists(&(gridftp_fh[fd->fd_sys]),
00258 fd->filename,
00259 &(oattr[fd->fd_sys]),
00260 exists_cb,
00261 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00262 {
00263 globus_err_handler("globus_ftp_client_exists",myname,result);
00264 fd->fd_sys = -1;
00265 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00266 myname, __LINE__, MPI_ERR_IO,
00267 "**io", "**io %s",
00268 globus_object_printable_to_string(globus_error_get(result)));
00269 return;
00270 }
00271
00272 globus_mutex_lock(&lock);
00273 while ( exists_done!=GLOBUS_TRUE )
00274 globus_cond_wait(&cond,&lock);
00275 globus_mutex_unlock(&lock);
00276 }
00277 MPI_Barrier(fd->comm);
00278 MPI_Bcast(&file_exists,1,MPI_INT,0,fd->comm);
00279
00280
00281 if ( (file_exists!=GLOBUS_TRUE) && (fd->access_mode&ADIO_CREATE) &&
00282 !(fd->access_mode&ADIO_EXCL) && !(fd->access_mode&ADIO_RDONLY) )
00283 {
00284 if ( myrank==0 )
00285 {
00286
00287 globus_byte_t touchbuf=(globus_byte_t)'\0';
00288 touch_ctl_done=GLOBUS_FALSE;
00289 if ( (result=globus_ftp_client_put(&(gridftp_fh[fd->fd_sys]),
00290 fd->filename,
00291 &(oattr[fd->fd_sys]),
00292 GLOBUS_NULL,
00293 touch_ctl_cb,
00294 GLOBUS_NULL))!=GLOBUS_SUCCESS )
00295 {
00296 globus_err_handler("globus_ftp_client_put",myname,result);
00297 fd->fd_sys = -1;
00298 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00299 MPIR_ERR_RECOVERABLE,
00300 myname, __LINE__, MPI_ERR_IO,
00301 "**io", "**io %s",
00302 globus_object_printable_to_string(globus_error_get(result)));
00303 return;
00304 }
00305 result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
00306 (globus_byte_t *)&touchbuf, 0,
00307 (globus_off_t)0, GLOBUS_TRUE,
00308 touch_data_cb, GLOBUS_NULL);
00309
00310 if ( result != GLOBUS_SUCCESS )
00311 {
00312 globus_err_handler("globus_ftp_client_register_write",myname,result);
00313 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00314 MPIR_ERR_RECOVERABLE,
00315 myname, __LINE__, MPI_ERR_IO,
00316 "**io", "**io %s",
00317 globus_object_printable_to_string(globus_error_get(result)));
00318 return;
00319 }
00320 globus_mutex_lock(&lock);
00321 while ( touch_ctl_done!=GLOBUS_TRUE )
00322 globus_cond_wait(&cond,&lock);
00323 globus_mutex_unlock(&lock);
00324 }
00325 MPI_Barrier(fd->comm);
00326 }
00327 else if ( (fd->access_mode&ADIO_EXCL) && (file_exists==GLOBUS_TRUE) )
00328 {
00329 fd->fd_sys = -1;
00330 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
00331 myname, __LINE__, MPI_ERR_IO,
00332 "**io", 0);
00333 return;
00334 }
00335 else if ( (fd->access_mode&ADIO_RDONLY) && (file_exists!=GLOBUS_TRUE) )
00336 {
00337 if ( myrank==0 )
00338 {
00339 FPRINTF(stderr,"WARNING: read-only file %s does not exist!\n",fd->filename);
00340 }
00341 }
00342 num_gridftp_handles++;
00343 }