00001 #ifndef CK_IO_H
00002 #define CK_IO_H
00003
00004 #include <cstring>
00005 #include <string>
00006 #include <algorithm>
00007 #include <utility>
00008 #include <fcntl.h>
00009 #include <pup_stl.h>
00010
00011 namespace Ck { namespace IO {
00013 typedef int Token;
00014
00015 struct Options {
00016 Options()
00017 : peStripe(-1), writeStripe(-1), activePEs(-1), basePE(-1), skipPEs(-1)
00018 { }
00019
00021 size_t peStripe;
00023 size_t writeStripe;
00025 int activePEs;
00027 int basePE;
00029 int skipPEs;
00030
00031 void pup(PUP::er &p) {
00032 p|peStripe;
00033 p|writeStripe;
00034 p|activePEs;
00035 p|basePE;
00036 p|skipPEs;
00037 }
00038 };
00039
00040 struct FileReadyMsg;
00041 }
00042 }
00043
00044 #include "CkIO.decl.h"
00045 #include <map>
00046 #include <vector>
00047
00048 namespace Ck { namespace IO {
00049 struct FileReadyMsg : public CMessage_FileReadyMsg {
00050 Token token;
00051 FileReadyMsg(const Token &tok) : token(tok) {}
00052 };
00053
00054 struct buffer
00055 {
00056 std::vector<char> array;
00057 int bytes_filled_so_far;
00058
00059 buffer()
00060 {
00061 bytes_filled_so_far = 0;
00062 }
00063
00064 void expect(size_t bytes)
00065 {
00066 array.resize(bytes);
00067 }
00068
00069 void insertData(const char *data, size_t length, size_t offset)
00070 {
00071 char *dest = &array[offset];
00072 memcpy(dest, data, length);
00073
00074 bytes_filled_so_far += length;
00075 }
00076
00077 bool isFull()
00078 {
00079 return bytes_filled_so_far == array.size();
00080 }
00081 };
00082
00083
00084 struct FileInfo {
00085 std::string name;
00086 Options opts;
00087 size_t bytes, total_written;
00088 int fd;
00089 CkCallback complete;
00090 std::map<size_t, struct buffer> bufferMap;
00091
00092 FileInfo(std::string name_, size_t bytes_, Options opts_)
00093 : name(name_), opts(opts_), bytes(bytes_), total_written(0), fd(-1)
00094 { }
00095 FileInfo()
00096 : bytes(-1), total_written(-1), fd(-1)
00097 { }
00098 };
00099
00105 class Manager : public CBase_Manager {
00106 public:
00107 Manager();
00108
00109 Manager_SDAG_CODE;
00110
00112 void prepareOutput(const char *name, size_t bytes,
00113 CkCallback ready, CkCallback complete,
00114 Options opts = Options());
00115 void write(Token token, const char *data, size_t bytes, size_t offset);
00116
00117 void prepareInput(const char *name, CkCallback ready,
00118 Options opts = Options());
00119 void read(Token token, void *data, size_t bytes, size_t offset,
00120 CkCallback complete);
00121
00122
00124 void write_forwardData(Token token, const char *data, size_t bytes, size_t offset);
00125 void write_dataWritten(Token token, size_t bytes);
00126
00127 private:
00128 int filesOpened;
00129 Token nextToken;
00130 std::map<Token, FileInfo> files;
00131 CkCallback nextReady;
00132
00133 int lastActivePE(const Options &opts) {
00134 return opts.basePE + (opts.activePEs-1)*opts.skipPEs;
00135 }
00136
00137 };
00138
00139 }}
00140 #endif