00001 #include <ckio.h>
00002 #include <errno.h>
00003 #include <algorithm>
00004
00005
00006 namespace Ck { namespace IO {
00007 Manager::Manager() : nextToken(0) {
00008 __sdag_init();
00009 run();
00010 }
00011
00012 void Manager::prepareOutput(const char *name, size_t bytes,
00013 CkCallback ready, CkCallback complete,
00014 Options opts) {
00015 thisProxy[0].prepareOutput_central(name, bytes, ready, complete, opts);
00016 }
00017
00018 void Manager::write(Token token, const char *data, size_t bytes, size_t offset) {
00019 Options &opts = files[token].opts;
00020 do {
00021 size_t stripe = offset / opts.peStripe;
00022 int pe = opts.basePE + stripe * opts.skipPEs;
00023 size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
00024 thisProxy[pe].write_forwardData(token, data, bytesToSend, offset);
00025 data += bytesToSend;
00026 offset += bytesToSend;
00027 bytes -= bytesToSend;
00028 } while (bytes > 0);
00029 }
00030
00031 void Manager::write_forwardData(Token token, const char *data, size_t bytes,
00032 size_t offset) {
00033
00034 CkAssert(offset + bytes <= files[token].bytes);
00035
00036
00037 size_t stripeSize = files[token].opts.peStripe;
00038 while(bytes > 0)
00039 {
00040 size_t stripeOffset = (offset/stripeSize)*stripeSize;
00041 size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
00042 struct buffer & currentBuffer = files[token].bufferMap[stripeOffset];
00043 size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
00044
00045
00046 currentBuffer.expect(expectedBufferSize);
00047
00048 currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
00049
00050
00051 if(currentBuffer.isFull()) {
00052
00053 int l = currentBuffer.bytes_filled_so_far;
00054 char *d = &(currentBuffer.array[0]);
00055 size_t bufferOffset = stripeOffset;
00056
00057 while (l > 0) {
00058 ssize_t ret = pwrite(files[token].fd, d, l, bufferOffset);
00059 if (ret < 0)
00060 if (errno == EINTR)
00061 continue;
00062 else {
00063 CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
00064 CkAbort("Giving up");
00065 }
00066 l -= ret;
00067 d += ret;
00068 bufferOffset += ret;
00069 }
00070
00071 thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
00072 files[token].bufferMap.erase(stripeOffset);
00073 }
00074
00075 bytes -= bytesInCurrentStripe;
00076 data += bytesInCurrentStripe;
00077 offset += bytesInCurrentStripe;
00078 }
00079 }
00080
00081 void Manager::write_dataWritten(Token token, size_t bytes) {
00082 CkAssert(CkMyPe() == 0);
00083
00084 files[token].total_written += bytes;
00085
00086 if (files[token].total_written == files[token].bytes)
00087 files[token].complete.send();
00088 }
00089
00090 void Manager::prepareInput(const char *name, CkCallback ready, Options opts) {
00091 CkAbort("not yet implemented");
00092 }
00093
00094 void Manager::read(Token token, void *data, size_t bytes, size_t offset,
00095 CkCallback complete) {
00096 CkAbort("not yet implemented");
00097 }
00098 }
00099 }
00100
00101 #include "CkIO.def.h"