00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <unistd.h>
00011 #include <stdlib.h>
00012 #include <mpi.h>
00013 #include <stdio.h>
00014 #include <string.h>
00015
00016 #define NUM_OBJS 4
00017 #define OBJ_SIZE 1048576
00018
00019 extern char *optarg;
00020 extern int optind, opterr, optopt;
00021
00022
00023 char *prog = NULL;
00024 int debug = 0;
00025
00026 static void
00027 Usage( int line ) {
00028 int rank;
00029 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00030 if ( rank == 0 ) {
00031 fprintf( stderr,
00032 "Usage (line %d): %s [-d] [-h] -f filename\n"
00033 "\t-d for debugging\n"
00034 "\t-h to turn on the hints to force collective aggregation\n",
00035 line, prog );
00036 }
00037 exit( 0 );
00038 }
00039
00040 static void
00041 fatal_error( int mpi_ret, MPI_Status *mpi_stat, char *msg ) {
00042 fprintf( stderr, "Fatal error %s: %d\n", msg, mpi_ret );
00043 MPI_Abort( MPI_COMM_WORLD, -1 );
00044 }
00045
00046 static void
00047 print_hints( int rank, MPI_File *mfh ) {
00048 MPI_Info info;
00049 int nkeys;
00050 int i, dummy_int;
00051 char key[1024];
00052 char value[1024];
00053
00054 MPI_Barrier( MPI_COMM_WORLD );
00055 if ( rank == 0 ) {
00056 MPI_File_get_info( *mfh, &info );
00057 MPI_Info_get_nkeys( info, &nkeys );
00058
00059 printf( "HINTS:\n" );
00060 for( i = 0; i < nkeys; i++ ) {
00061 MPI_Info_get_nthkey( info, i, key );
00062 printf( "%35s -> ", key );
00063 MPI_Info_get( info, key, 1024, value, &dummy_int );
00064 printf( "%s\n", value );
00065 }
00066 MPI_Info_free(&info);
00067 }
00068 MPI_Barrier( MPI_COMM_WORLD );
00069 }
00070
00071 static void
00072 fill_buffer( char *buffer, int bufsize, int rank, MPI_Offset offset ) {
00073 memset( (void*)buffer, 0, bufsize );
00074 snprintf( buffer, bufsize, "Hello from %d at %lld\n", rank, offset );
00075 }
00076
00077 static MPI_Offset
00078 get_offset( int rank, int num_objs, int obj_size, int which_obj ) {
00079 MPI_Offset offset;
00080 offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size;
00081 return offset;
00082 }
00083
00084 static void
00085 write_file( char *target, int rank, MPI_Info *info ) {
00086 MPI_File wfh;
00087 MPI_Status mpi_stat;
00088 int mpi_ret;
00089 int i;
00090 char buffer[OBJ_SIZE];
00091
00092 if ( debug ) printf( "%d writing file %s\n", rank, target );
00093
00094 if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
00095 MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh ) )
00096 != MPI_SUCCESS )
00097 {
00098 fatal_error( mpi_ret, NULL, "open for write" );
00099 }
00100
00101 for( i = 0; i < NUM_OBJS; i++ ) {
00102 MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
00103 fill_buffer( buffer, OBJ_SIZE, rank, offset );
00104 if ( debug ) printf( "%s", buffer );
00105 if ( (mpi_ret = MPI_File_write_at_all( wfh, offset, buffer, OBJ_SIZE,
00106 MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
00107 {
00108 fatal_error( mpi_ret, &mpi_stat, "write" );
00109 }
00110 }
00111
00112 if ( debug ) print_hints( rank, &wfh );
00113
00114 if( (mpi_ret = MPI_File_close( &wfh ) ) != MPI_SUCCESS ) {
00115 fatal_error( mpi_ret, NULL, "close for write" );
00116 }
00117 if ( debug ) printf( "%d wrote file %s\n", rank, target );
00118 }
00119
00120 static int
00121 reduce_corruptions( int corrupt_blocks ) {
00122 int mpi_ret;
00123 int sum;
00124 if ( ( mpi_ret = MPI_Reduce( &corrupt_blocks, &sum, 1,
00125 MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD ) ) != MPI_SUCCESS )
00126 {
00127 fatal_error( mpi_ret, NULL, "MPI_Reduce" );
00128 }
00129 return sum;
00130 }
00131
00132 static void
00133 read_file( char *target, int rank, MPI_Info *info, int *corrupt_blocks ) {
00134 MPI_File rfh;
00135 MPI_Status mpi_stat;
00136 int mpi_ret;
00137 int i;
00138 char buffer[OBJ_SIZE];
00139 char *verify_buf = NULL;
00140 verify_buf = (char *)malloc(OBJ_SIZE);
00141
00142 if ( debug ) printf( "%d reading file %s\n", rank, target );
00143
00144 if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
00145 MPI_MODE_RDONLY, *info, &rfh ) ) != MPI_SUCCESS )
00146 {
00147 fatal_error( mpi_ret, NULL, "open for read" );
00148 }
00149
00150 for( i = 0; i < NUM_OBJS; i++ ) {
00151 MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
00152 fill_buffer( verify_buf, OBJ_SIZE, rank, offset );
00153 if ( debug ) printf( "Expecting %s", buffer );
00154 if ( (mpi_ret = MPI_File_read_at_all( rfh, offset, buffer, OBJ_SIZE,
00155 MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
00156 {
00157 fatal_error( mpi_ret, &mpi_stat, "read" );
00158 }
00159 if ( memcmp( verify_buf, buffer, OBJ_SIZE ) != 0 ) {
00160 (*corrupt_blocks)++;
00161 printf( "Corruption at %lld\n", offset );
00162 if ( debug ) {
00163 printf( "\tExpecting %s\n"
00164 "\tRecieved %s\n",
00165 verify_buf, buffer );
00166 }
00167 }
00168 }
00169
00170 if( (mpi_ret = MPI_File_close( &rfh ) ) != MPI_SUCCESS ) {
00171 fatal_error( mpi_ret, NULL, "close for read" );
00172 }
00173 free(verify_buf);
00174
00175 }
00176
00177 static void
00178 set_hints( MPI_Info *info ) {
00179 MPI_Info_set( *info, "romio_cb_write", "enable" );
00180 MPI_Info_set( *info, "romio_no_indep_rw", "1" );
00181 MPI_Info_set( *info, "cb_nodes", "1" );
00182 MPI_Info_set( *info, "cb_buffer_size", "4194304" );
00183 }
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205 int
00206 main( int argc, char *argv[] ) {
00207 int nproc = 1, rank = 0;
00208 char *target = NULL;
00209 int c;
00210 MPI_Info info;
00211 int mpi_ret;
00212 int corrupt_blocks = 0;
00213
00214 MPI_Init( &argc, &argv );
00215 MPI_Comm_size(MPI_COMM_WORLD, &nproc);
00216 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00217
00218 if( (mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
00219 if(rank == 0) fatal_error( mpi_ret, NULL, "MPI_info_create.\n");
00220 }
00221
00222 prog = strdup( argv[0] );
00223
00224 while( ( c = getopt( argc, argv, "df:h" ) ) != EOF ) {
00225 switch( c ) {
00226 case 'd':
00227 debug = 1;
00228 break;
00229 case 'f':
00230 target = strdup( optarg );
00231 break;
00232 case 'h':
00233 set_hints( &info );
00234 break;
00235 default:
00236 Usage( __LINE__ );
00237 }
00238 }
00239 if ( ! target ) {
00240 Usage( __LINE__ );
00241 }
00242
00243 write_file( target, rank, &info );
00244 read_file( target, rank, &info, &corrupt_blocks );
00245
00246 corrupt_blocks = reduce_corruptions( corrupt_blocks );
00247 if ( rank == 0 ) {
00248 if (corrupt_blocks == 0) {
00249 fprintf(stdout, " No Errors\n");
00250 } else {
00251 fprintf(stdout, "%d/%d blocks corrupt\n",
00252 corrupt_blocks, nproc * NUM_OBJS );
00253 }
00254 }
00255 MPI_Info_free(&info);
00256
00257 MPI_Finalize();
00258 free(prog);
00259 exit( 0 );
00260 }