00001 #include <string>
00002 #include <map>
00003 #include <algorithm>
00004 #include <sstream>
00005
00006 typedef int FileToken;
00007 #include "CkIO.decl.h"
00008 #include "CkIO_impl.decl.h"
00009
00010 #include <sys/stat.h>
00011 #include <fcntl.h>
00012 #include <errno.h>
00013 #include <pup_stl.h>
00014
00015 #if defined(_WIN32)
00016 #include <io.h>
00017 #else
00018 #include <unistd.h>
00019 #endif
00020
00021 #include "fs_parameters.h"
00022
00023 using std::min;
00024 using std::max;
00025 using std::map;
00026 using std::string;
00027
00028 namespace Ck { namespace IO {
00029 namespace impl {
00030 CProxy_Director director;
00031 CkpvDeclare(Manager *, manager);
00032 }
00033
00034
00035 namespace impl {
00036 struct FileInfo {
00037 string name;
00038 CkCallback opened;
00039 Options opts;
00040 int fd;
00041 int sessionID;
00042 CProxy_WriteSession session;
00043 CkCallback complete;
00044
00045 FileInfo(string name_, CkCallback opened_, Options opts_)
00046 : name(name_), opened(opened_), opts(opts_), fd(-1)
00047 { }
00048 FileInfo(string name_, Options opts_)
00049 : name(name_), opened(), opts(opts_), fd(-1)
00050 { }
00051 FileInfo()
00052 : fd(-1)
00053 { }
00054 };
00055
00056 void fatalError(string desc, string file) {
00057 std::stringstream out;
00058 out << "FATAL ERROR on PE " << CkMyPe()
00059 << " working on file '" << file << "': "
00060 << desc << "; system reported " << strerror(errno) << std::endl;
00061 CkAbort(out.str().c_str());
00062 }
00063
00064 class Director : public CBase_Director {
00065 int filesOpened;
00066 map<FileToken, impl::FileInfo> files;
00067 CProxy_Manager managers;
00068 int opnum, sessionID;
00069 Director_SDAG_CODE
00070
00071 public:
00072 Director(CkArgMsg *m)
00073 : filesOpened(0), opnum(0), sessionID(0)
00074 {
00075 delete m;
00076 director = thisProxy;
00077 managers = CProxy_Manager::ckNew();
00078 }
00079
00080 Director(CkMigrateMessage *m) : CBase_Director(m) { }
00081
00082 void pup(PUP::er &p) {
00083
00084 if (files.size() != 0)
00085 CkAbort("CkIO: All files must be closed across checkpoint/restart");
00086
00087 p | filesOpened;
00088 p | managers;
00089 p | opnum;
00090 p | sessionID;
00091 }
00092
00093 void openFile(string name, CkCallback opened, Options opts) {
00094 if (0 == opts.writeStripe)
00095 opts.writeStripe = CkGetFileStripeSize(name.c_str());
00096 if (0 == opts.peStripe)
00097 opts.peStripe = 4 * opts.writeStripe;
00098 if (-1 == opts.activePEs)
00099 opts.activePEs = min(CkNumPes(), 32);
00100 if (-1 == opts.basePE)
00101 opts.basePE = 0;
00102 if (-1 == opts.skipPEs)
00103 opts.skipPEs = CkMyNodeSize();
00104
00105 files[filesOpened] = FileInfo(name, opened, opts);
00106 managers.openFile(opnum++, filesOpened++, name, opts);
00107 }
00108
00109 void fileOpened(FileToken file) {
00110 files[file].opened.send(new FileReadyMsg(file));
00111 }
00112
00113 void prepareWriteSession_helper(FileToken file, size_t bytes, size_t offset,
00114 CkCallback ready, CkCallback complete) {
00115 Options &opts = files[file].opts;
00116 files[file].sessionID = sessionID;
00117
00118 int numStripes = 0;
00119 size_t bytesLeft = bytes, delta = opts.peStripe - offset % opts.peStripe;
00120
00121 if (offset % opts.peStripe != 0 && delta < bytesLeft) {
00122 bytesLeft -= delta;
00123 numStripes++;
00124 }
00125 numStripes += bytesLeft / opts.peStripe;
00126 if (bytesLeft % opts.peStripe != 0)
00127 numStripes++;
00128
00129 CkArrayOptions sessionOpts(numStripes);
00130 sessionOpts.setStaticInsertion(true);
00131
00132 CkCallback sessionInitDone(CkIndex_Director::sessionReady(NULL), thisProxy);
00133 sessionInitDone.setRefnum(sessionID);
00134 sessionOpts.setInitCallback(sessionInitDone);
00135
00136
00137 files[file].session =
00138 CProxy_WriteSession::ckNew(file, offset, bytes, sessionOpts);
00139 CkAssert(files[file].complete.isInvalid());
00140 files[file].complete = complete;
00141 }
00142
00143 void sessionComplete(FileToken token) {
00144 CProxy_CkArray(files[token].session.ckGetArrayID()).ckDestroy();
00145 files[token].complete.send(CkReductionMsg::buildNew(0, NULL, CkReduction::nop));
00146 files[token].complete = CkCallback(CkCallback::invalid);
00147 }
00148
00149 void close(FileToken token, CkCallback closed) {
00150 managers.close(opnum++, token, closed);
00151 files.erase(token);
00152 }
00153 };
00154
00155 class Manager : public CBase_Manager {
00156 Manager_SDAG_CODE
00157 int opnum;
00158
00159 public:
00160 Manager()
00161 : opnum(0)
00162 {
00163 CkpvInitialize(Manager*, manager);
00164 CkpvAccess(manager) = this;
00165 thisProxy[CkMyPe()].run();
00166 }
00167
00168 Manager(CkMigrateMessage *m)
00169 : CBase_Manager(m)
00170 {
00171 CkpvInitialize(Manager*, manager);
00172 CkpvAccess(manager) = this;
00173 }
00174
00175 void pup(PUP::er &p) {
00176 p | opnum;
00177
00178
00179 if (files.size() != 0)
00180 CkAbort("CkIO: All files must be closed across checkpoint/restart");
00181 }
00182
00183 void prepareFile(FileToken token, string name, Options opts) {
00184 CkAssert(files.end() == files.find(token));
00185
00186 CkAssert(opts.writeStripe <= opts.peStripe);
00187 files[token] = impl::FileInfo(name, opts);
00188
00189 contribute(sizeof(FileToken), &token, CkReduction::max_int,
00190 CkCallback(CkReductionTarget(Director, fileOpened), director));
00191 }
00192
00193 impl::FileInfo* get(FileToken token) {
00194 CkAssert(files.find(token) != files.end());
00195
00196
00197
00198 if (files[token].fd == -1) {
00199 string& name = files[token].name;
00200 #if defined(_WIN32)
00201 int fd = CmiOpen(name.c_str(), _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE);
00202 #else
00203 int fd = CmiOpen(name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
00204 #endif
00205 if (-1 == fd)
00206 fatalError("Failed to open a file for parallel output", name);
00207
00208 files[token].fd = fd;
00209 }
00210
00211 return &(files[token]);
00212 }
00213
00214 void write(Session session, const char *data, size_t bytes, size_t offset) {
00215 Options &opts = files[session.file].opts;
00216 size_t stripe = opts.peStripe;
00217
00218 CkAssert(offset >= session.offset);
00219 CkAssert(offset + bytes <= session.offset + session.bytes);
00220
00221 size_t sessionStripeBase = (session.offset / stripe) * stripe;
00222
00223 while (bytes > 0) {
00224 size_t stripeIndex = (offset - sessionStripeBase) / stripe;
00225 size_t bytesToSend = min(bytes, stripe - offset % stripe);
00226
00227 CProxy_WriteSession(session.sessionID)[stripeIndex]
00228 .forwardData(data, bytesToSend, offset);
00229
00230 data += bytesToSend;
00231 offset += bytesToSend;
00232 bytes -= bytesToSend;
00233 }
00234 }
00235
00236 void doClose(FileToken token, CkCallback closed) {
00237 int fd = files[token].fd;
00238 if (fd != -1) {
00239 int ret;
00240 do {
00241 #if defined(_WIN32)
00242 ret = _close(fd);
00243 #else
00244 ret = ::close(fd);
00245 #endif
00246 } while (ret < 0 && errno == EINTR);
00247 if (ret < 0)
00248 fatalError("close failed", files[token].name);
00249 }
00250 files.erase(token);
00251 contribute(closed);
00252 }
00253
00254 int procNum(int arrayHdl,const CkArrayIndex &element)
00255 {
00256 #if 0
00257 int peIndex = stripeIndex % opts.activePEs;
00258 int pe = opts.basePE + peIndex * opts.skipPEs;
00259 #endif
00260 return 0;
00261 }
00262
00263 private:
00264 map<FileToken, impl::FileInfo> files;
00265
00266 int lastActivePE(const Options &opts) {
00267 return opts.basePE + (opts.activePEs-1)*opts.skipPEs;
00268 }
00269 };
00270
00271 class WriteSession : public CBase_WriteSession {
00272 const FileInfo *file;
00273 size_t sessionOffset, myOffset;
00274 size_t sessionBytes, myBytes, myBytesWritten;
00275 FileToken token;
00276
00277 struct buffer {
00278 std::vector<char> array;
00279 int bytes_filled_so_far;
00280
00281 buffer() {
00282 bytes_filled_so_far = 0;
00283 }
00284
00285 void expect(size_t bytes) {
00286 array.resize(bytes);
00287 }
00288
00289 void insertData(const char *data, size_t length, size_t offset) {
00290 char *dest = &array[offset];
00291 memcpy(dest, data, length);
00292
00293 bytes_filled_so_far += length;
00294 }
00295
00296 bool isFull() {
00297 return bytes_filled_so_far == array.size();
00298 }
00299 };
00300 map<size_t, struct buffer> bufferMap;
00301
00302 public:
00303 WriteSession(FileToken file_, size_t offset_, size_t bytes_)
00304 : file(CkpvAccess(manager)->get(file_))
00305 , sessionOffset(offset_)
00306 , myOffset((sessionOffset / file->opts.peStripe + thisIndex)
00307 * file->opts.peStripe)
00308 , sessionBytes(bytes_)
00309 , myBytes(min(file->opts.peStripe, sessionOffset + sessionBytes - myOffset))
00310 , myBytesWritten(0)
00311 , token(file_)
00312 {
00313 CkAssert(file->fd != -1);
00314 CkAssert(myOffset >= sessionOffset);
00315 CkAssert(myOffset + myBytes <= sessionOffset + sessionBytes);
00316 }
00317
00318 WriteSession(CkMigrateMessage *m) { }
00319
00320 void forwardData(const char *data, size_t bytes, size_t offset) {
00321 CkAssert(offset >= myOffset);
00322 CkAssert(offset + bytes <= myOffset + myBytes);
00323
00324 size_t stripeSize = file->opts.writeStripe;
00325
00326 while (bytes > 0) {
00327 size_t stripeBase = (offset/stripeSize)*stripeSize;
00328 size_t stripeOffset = max(stripeBase, myOffset);
00329 size_t nextStripe = stripeBase + stripeSize;
00330 size_t expectedBufferSize = min(nextStripe, myOffset + myBytes) - stripeOffset;
00331 size_t bytesInCurrentStripe = min(nextStripe - offset, bytes);
00332
00333 buffer& currentBuffer = bufferMap[stripeOffset];
00334 currentBuffer.expect(expectedBufferSize);
00335
00336 currentBuffer.insertData(data, bytesInCurrentStripe, offset - stripeOffset);
00337
00338 if (currentBuffer.isFull()) {
00339 flushBuffer(currentBuffer, stripeOffset);
00340 bufferMap.erase(stripeOffset);
00341 }
00342
00343 bytes -= bytesInCurrentStripe;
00344 data += bytesInCurrentStripe;
00345 offset += bytesInCurrentStripe;
00346 }
00347
00348 if (myBytesWritten == myBytes)
00349 contribute(CkCallback(CkIndex_WriteSession::syncData(), thisProxy));
00350 }
00351
00352 void syncData() {
00353 int status;
00354 CkAssert(bufferMap.size() == 0);
00355 #if CMK_HAS_FDATASYNC_FUNC
00356 while ((status = fdatasync(file->fd)) < 0) {
00357 if (errno != EINTR) {
00358 fatalError("fdatasync failed", file->name);
00359 }
00360 }
00361 #elif CMK_HAS_FSYNC_FUNC
00362 while ((status = fsync(file->fd)) < 0) {
00363 if (errno != EINTR) {
00364 fatalError("fsync failed", file->name);
00365 }
00366 }
00367 #elif defined(_WIN32)
00368 intptr_t hFile = _get_osfhandle(file->fd);
00369 if (FlushFileBuffers((HANDLE)hFile) == 0)
00370 fatalError("FlushFileBuffers failed", file->name);
00371 #elif CMK_HAS_SYNC_FUNC
00372 #warning "Will call sync() for every completed write"
00373 sync();
00374 #else
00375 #warning "No file synchronization function available!"
00376 #endif
00377
00378 contribute(sizeof(FileToken), &token, CkReduction::max_int,
00379 CkCallback(CkReductionTarget(Director, sessionComplete), director));
00380 }
00381
00382 void flushBuffer(buffer& buf, size_t bufferOffset) {
00383 int l = buf.bytes_filled_so_far;
00384 char *d = &(buf.array[0]);
00385
00386 CmiInt8 ret = CmiPwrite(file->fd, d, l, bufferOffset);
00387 if (ret < 0)
00388 fatalError("Call to pwrite failed", file->name);
00389
00390 CkAssert(ret == l);
00391 myBytesWritten += l;
00392 }
00393 };
00394
00395 class Map : public CBase_Map {
00396 public:
00397 Map()
00398 { }
00399
00400 int procNum(int arrayHdl, const CkArrayIndex &element) {
00401 return 0;
00402 }
00403 };
00404 }
00405
00406 void open(string name, CkCallback opened, Options opts) {
00407 impl::director.openFile(name, opened, opts);
00408 }
00409
00410 void startSession(File file, size_t bytes, size_t offset,
00411 CkCallback ready, CkCallback complete) {
00412 impl::director.prepareWriteSession(file.token, bytes, offset, ready, complete);
00413 }
00414 void startSession(File file, size_t bytes, size_t offset, CkCallback ready,
00415 const char *commitData, size_t commitBytes, size_t commitOffset,
00416 CkCallback complete) {
00417 impl::director.prepareWriteSession(file.token, bytes, offset, ready,
00418 commitData, commitBytes, commitOffset,
00419 complete);
00420 }
00421
00422 void write(Session session, const char *data, size_t bytes, size_t offset) {
00423 using namespace impl;
00424 CkpvAccess(manager)->write(session, data, bytes, offset);
00425 }
00426
00427 void close(File file, CkCallback closed) {
00428 impl::director.close(file.token, closed);
00429 }
00430
00431 class SessionCommitMsg : public CMessage_SessionCommitMsg {
00432
00433 };
00434 }
00435 }
00436
00437 #include "CkIO.def.h"
00438 #include "CkIO_impl.def.h"