00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ad_lustre.h"
00012 #include "adio_extern.h"
00013
00014 #undef AGG_DEBUG
00015
00016 void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
00017 int mode)
00018 {
00019 int *striping_info = NULL;
00020
00021
00022
00023
00024
00025 int stripe_size, stripe_count, CO = 1;
00026 int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
00027
00028
00029
00030 stripe_size = fd->hints->striping_unit;
00031
00032
00033 stripe_count = fd->hints->striping_factor;
00034
00035
00036 if (!mode) {
00037
00038
00039
00040
00041
00042
00043 CO = 1;
00044
00045 } else {
00046
00047 CO = fd->hints->fs_hints.lustre.co_ratio;
00048 }
00049
00050
00051
00052
00053
00054
00055
00056 if (nprocs_for_coll >= stripe_count)
00057
00058
00059
00060
00061
00062
00063 avail_cb_nodes =
00064 stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
00065 else {
00066
00067
00068
00069
00070
00071
00072
00073
00074 divisor = 2;
00075 avail_cb_nodes = 1;
00076
00077 while (stripe_count >= divisor*divisor) {
00078 if ((stripe_count % divisor) == 0) {
00079 if (stripe_count/divisor <= nprocs_for_coll) {
00080
00081 avail_cb_nodes = stripe_count/divisor;
00082 break;
00083 }
00084
00085
00086 else if (divisor <= nprocs_for_coll)
00087 avail_cb_nodes = divisor;
00088 }
00089 divisor++;
00090 }
00091 }
00092
00093 *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
00094 striping_info = *striping_info_ptr;
00095 striping_info[0] = stripe_size;
00096 striping_info[1] = stripe_count;
00097 striping_info[2] = avail_cb_nodes;
00098 }
00099
00100 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
00101 ADIO_Offset *len, int *striping_info)
00102 {
00103 int rank_index, rank;
00104 ADIO_Offset avail_bytes;
00105 int stripe_size = striping_info[0];
00106 int avail_cb_nodes = striping_info[2];
00107
00108
00109 rank_index = (int)((off / stripe_size) % avail_cb_nodes);
00110
00111
00112
00113
00114
00115 if (rank_index >= fd->hints->cb_nodes)
00116 MPI_Abort(MPI_COMM_WORLD, 1);
00117
00118 avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
00119 (ADIO_Offset)stripe_size - off;
00120 if (avail_bytes < *len) {
00121
00122 *len = avail_bytes;
00123 }
00124
00125
00126 rank = fd->hints->ranklist[rank_index];
00127
00128 return rank;
00129 }
00130
00131
00132
00133
00134
00135
00136
00137 void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
00138 ADIO_Offset *len_list, int contig_access_count,
00139 int *striping_info, int nprocs,
00140 int *count_my_req_procs_ptr,
00141 int **count_my_req_per_proc_ptr,
00142 ADIOI_Access **my_req_ptr,
00143 int ***buf_idx_ptr)
00144 {
00145
00146
00147 int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
00148 int i, l, proc;
00149 ADIO_Offset avail_len, rem_len, curr_idx, off;
00150 ADIOI_Access *my_req;
00151
00152 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00153 count_my_req_per_proc = *count_my_req_per_proc_ptr;
00154
00155
00156
00157
00158
00159
00160 buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
00161
00162
00163
00164
00165 for (i = 0; i < contig_access_count; i++) {
00166
00167
00168
00169 if (len_list[i] == 0)
00170 continue;
00171 off = offset_list[i];
00172 avail_len = len_list[i];
00173
00174
00175
00176
00177 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00178 count_my_req_per_proc[proc]++;
00179
00180
00181
00182
00183
00184 rem_len = len_list[i] - avail_len;
00185
00186 while (rem_len != 0) {
00187 off += avail_len;
00188 avail_len = rem_len;
00189 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00190 count_my_req_per_proc[proc]++;
00191 rem_len -= avail_len;
00192 }
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202 for (i = 0; i < nprocs; i++) {
00203
00204 buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
00205 * sizeof(int));
00206 }
00207
00208
00209 *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
00210 my_req = *my_req_ptr;
00211
00212 count_my_req_procs = 0;
00213 for (i = 0; i < nprocs; i++) {
00214 if (count_my_req_per_proc[i]) {
00215 my_req[i].offsets = (ADIO_Offset *)
00216 ADIOI_Malloc(count_my_req_per_proc[i] *
00217 sizeof(ADIO_Offset));
00218 my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
00219 sizeof(int));
00220 count_my_req_procs++;
00221 }
00222 my_req[i].count = 0;
00223 }
00224
00225
00226 curr_idx = 0;
00227 for (i = 0; i < contig_access_count; i++) {
00228
00229
00230 if (len_list[i] == 0)
00231 continue;
00232 off = offset_list[i];
00233 avail_len = len_list[i];
00234 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00235
00236 l = my_req[proc].count;
00237
00238 ADIOI_Assert(curr_idx == (int) curr_idx);
00239 ADIOI_Assert(l < count_my_req_per_proc[proc]);
00240 buf_idx[proc][l] = (int) curr_idx;
00241 curr_idx += avail_len;
00242
00243 rem_len = len_list[i] - avail_len;
00244
00245
00246
00247
00248
00249
00250 my_req[proc].offsets[l] = off;
00251 ADIOI_Assert(avail_len == (int) avail_len);
00252 my_req[proc].lens[l] = (int) avail_len;
00253 my_req[proc].count++;
00254
00255 while (rem_len != 0) {
00256 off += avail_len;
00257 avail_len = rem_len;
00258 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
00259 striping_info);
00260
00261 l = my_req[proc].count;
00262 ADIOI_Assert(curr_idx == (int) curr_idx);
00263 ADIOI_Assert(l < count_my_req_per_proc[proc]);
00264 buf_idx[proc][l] = (int) curr_idx;
00265
00266 curr_idx += avail_len;
00267 rem_len -= avail_len;
00268
00269 my_req[proc].offsets[l] = off;
00270 ADIOI_Assert(avail_len == (int) avail_len);
00271 my_req[proc].lens[l] = (int) avail_len;
00272 my_req[proc].count++;
00273 }
00274 }
00275
00276 #ifdef AGG_DEBUG
00277 for (i = 0; i < nprocs; i++) {
00278 if (count_my_req_per_proc[i] > 0) {
00279 FPRINTF(stdout, "data needed from %d (count = %d):\n",
00280 i, my_req[i].count);
00281 for (l = 0; l < my_req[i].count; l++) {
00282 FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n",
00283 l, my_req[i].offsets[l], l, my_req[i].lens[l]);
00284 }
00285 }
00286 }
00287 #endif
00288
00289 *count_my_req_procs_ptr = count_my_req_procs;
00290 *buf_idx_ptr = buf_idx;
00291 }
00292
00293 int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
00294 ADIO_Offset *len_list, int nprocs)
00295 {
00296
00297
00298
00299
00300
00301
00302 int i, docollect = 1, big_req_size = 0;
00303 ADIO_Offset req_size = 0, total_req_size;
00304 int avg_req_size, total_access_count;
00305
00306
00307 for (i = 0; i < contig_access_count; i++)
00308 req_size += len_list[i];
00309 MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
00310 fd->comm);
00311 MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
00312 fd->comm);
00313
00314 avg_req_size = (int)(total_req_size / total_access_count);
00315
00316 big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
00317
00318 if ((big_req_size > 0) && (avg_req_size > big_req_size))
00319 docollect = 0;
00320
00321 return docollect;
00322 }