00001 #ifndef _sdag_H_
00002 #define _sdag_H_
00003
00004 #include "pup.h"
00005
00006 namespace SDAG {
00007 struct Closure : public PUP::able {
00008 virtual void pup(PUP::er& p) = 0;
00009 PUPable_abstract(Closure);
00010 int continuations;
00011 bool hasRefnum;
00012 CMK_REFNUM_TYPE refnum;
00013
00014 void ref() { continuations++; }
00015 void deref() { if (--continuations <= 0) delete this; }
00016
00017
00018 void packClosure(PUP::er& p) { p | continuations; p | hasRefnum; p | refnum; }
00019 void init() { continuations = 1; hasRefnum = false; refnum = 0; }
00020 virtual ~Closure() { }
00021
00022
00023
00024 template <typename T>
00025 inline void setRefnum(T t) {}
00026 inline void setRefnum(char c) { hasRefnum = true; refnum = c; }
00027 inline void setRefnum(int i) { hasRefnum = true; refnum = i; }
00028 inline void setRefnum(short s) { hasRefnum = true; refnum = s; }
00029 inline void setRefnum(long l) { hasRefnum = true; refnum = l; }
00030 inline void setRefnum(unsigned char c) { hasRefnum = true; refnum = c; }
00031 inline void setRefnum(unsigned int i) { hasRefnum = true; refnum = i; }
00032 inline void setRefnum(unsigned short s) { hasRefnum = true; refnum = s; }
00033 inline void setRefnum(unsigned long l) { hasRefnum = true; refnum = l; }
00034 inline void setRefnum(float f) { hasRefnum = true; refnum = f; }
00035 inline void setRefnum(double d) { hasRefnum = true; refnum = d; }
00036 void unsetRefnum() { hasRefnum = false; refnum = 0; }
00037 };
00038 }
00039
00040 #include <vector>
00041 #include <list>
00042 #include <unordered_set>
00043 #include <memory>
00044
00045 #include <pup_stl.h>
00046 #include <envelope.h>
00047 #include <debug-charm.h>
00048
00049 class CkMessage;
00050 #if USE_CRITICAL_PATH_HEADER_ARRAY
00051 class MergeablePathHistory;
00052 #endif
00053 namespace SDAG {
00054 struct TransportableBigSimLog : public Closure {
00055 void* log;
00056 TransportableBigSimLog() : log(0) { init(); }
00057 TransportableBigSimLog(CkMigrateMessage*) : log(0) { init(); }
00058
00059 TransportableBigSimLog(void* log)
00060 : log(log) { init(); }
00061
00062 void pup(PUP::er& p) {
00063 if (p.isUnpacking()) log = 0;
00064 else if (log != 0)
00065 CkAbort("BigSim logs stored by SDAG are not migratable\n");
00066 packClosure(p);
00067 }
00068 PUPable_decl(TransportableBigSimLog);
00069 };
00070
00071 struct ForallClosure : public Closure {
00072 int val;
00073 ForallClosure() : val(0) { init(); }
00074 ForallClosure(CkMigrateMessage*) : val(0) { init(); }
00075 ForallClosure(int val) : val(val) { init(); }
00076
00077 void pup(PUP::er& p) {
00078 p | val;
00079 packClosure(p);
00080 }
00081 PUPable_decl(ForallClosure);
00082 int& getP0() { return val; }
00083 };
00084
00085 struct MsgClosure : public Closure {
00086 void* msg;
00087
00088 MsgClosure() : msg(0) { init(); continuations = 0; }
00089 MsgClosure(CkMigrateMessage*) : msg(0) { init(); continuations = 0; }
00090
00091 MsgClosure(void* msg)
00092 : msg(msg) {
00093 init();
00094 setRefnum(CkGetRefNum(msg));
00095 continuations = 0;
00096 CmiReference(UsrToEnv(msg));
00097 }
00098
00099 void pup(PUP::er& p) {
00100 bool hasMsg = msg;
00101 p | hasMsg;
00102 if (hasMsg) CkPupMessage(p, (void**)&msg);
00103 if (hasMsg && p.isUnpacking())
00104 CmiReference(UsrToEnv(msg));
00105 packClosure(p);
00106 }
00107
00108 virtual ~MsgClosure() {
00109 if (msg) CmiFree(UsrToEnv(msg));
00110 }
00111
00112 PUPable_decl(MsgClosure);
00113 };
00114
00115 class CCounter : public Closure {
00116 private:
00117 unsigned int count;
00118 public:
00119 CCounter() { init(); }
00120 CCounter(CkMigrateMessage*) { init(); }
00121 CCounter(int c) : count(c) { init(); }
00122 CCounter(int first, int last, int stride) {
00123 init();
00124 count = ((last - first) / stride) + 1;
00125 }
00126 void decrement(void) { count--; }
00127 int isDone(void) { return (count == 0); }
00128
00129 void pup(PUP::er& p) {
00130 p | count;
00131 packClosure(p);
00132 }
00133 PUPable_decl(CCounter);
00134 };
00135
00136 struct CSpeculator : public Closure {
00137 int speculationIndex;
00138
00139 CSpeculator() : speculationIndex(0) { init(); }
00140 CSpeculator(CkMigrateMessage*) : speculationIndex(0) { init(); }
00141
00142 CSpeculator(int speculationIndex_)
00143 : speculationIndex(speculationIndex_) { init(); }
00144
00145 void pup(PUP::er& p) {
00146 p | speculationIndex;
00147 packClosure(p);
00148 }
00149 PUPable_decl(CSpeculator);
00150 };
00151
00152 struct Continuation : public PUP::able {
00153 int whenID;
00154 std::vector<Closure*> closure;
00155 std::vector<CMK_REFNUM_TYPE> entries, refnums;
00156 std::vector<int> anyEntries;
00157 int speculationIndex;
00158 Continuation() : speculationIndex(-1) { }
00159 Continuation(CkMigrateMessage*) : speculationIndex(-1) { }
00160
00161 Continuation(int whenID)
00162 : whenID(whenID)
00163 , speculationIndex(-1) { }
00164
00165 void pup(PUP::er& p) {
00166 p | whenID;
00167 p | closure;
00168 p | entries;
00169 p | refnums;
00170 p | anyEntries;
00171 p | speculationIndex;
00172 }
00173
00174 void addClosure(Closure* cl) {
00175 if (cl) cl->ref();
00176 closure.push_back(cl);
00177 }
00178
00179 #if USE_CRITICAL_PATH_HEADER_ARRAY
00180 MergeablePathHistory *saved;
00181 void setPath(MergeablePathHistory *tmp) {saved = tmp;}
00182
00183 MergeablePathHistory *getPath() { return saved;}
00184 #endif
00185
00186 virtual ~Continuation() {
00187 for (size_t i = 0; i < closure.size(); i++)
00188 if (closure[i])
00189 closure[i]->deref();
00190 }
00191
00192 PUPable_decl(Continuation);
00193 };
00194
00195 struct Buffer : public PUP::able {
00196 int entry;
00197 Closure* cl;
00198 #if USE_CRITICAL_PATH_HEADER_ARRAY
00199 MergeablePathHistory *savedPath;
00200 #endif
00201 #if CMK_BIGSIM_CHARM
00202 void *bgLog1, *bgLog2;
00203 #endif
00204
00205 Buffer(CkMigrateMessage*) { }
00206
00207 Buffer(int entry, Closure* cl)
00208 : entry(entry)
00209 , cl(cl)
00210 #if CMK_BIGSIM_CHARM
00211 , bgLog1(0)
00212 , bgLog2(0)
00213 #endif
00214 #if USE_CRITICAL_PATH_HEADER_ARRAY
00215 ,savedPath(NULL)
00216 #endif
00217 {
00218 cl->ref();
00219 }
00220
00221 #if USE_CRITICAL_PATH_HEADER_ARRAY
00222 void setPath(MergeablePathHistory* p)
00223 {
00224 savedPath = p;
00225 }
00226
00227 MergeablePathHistory* getPath()
00228 {
00229 return savedPath;
00230 }
00231 #endif
00232 void pup(PUP::er& p) {
00233 p | entry;
00234 p | cl;
00235 #if CMK_BIGSIM_CHARM
00236 if (p.isUnpacking())
00237 bgLog1 = bgLog2 = 0;
00238 else if (bgLog1 != 0 && bgLog2 != 0)
00239 CkAbort("BigSim logs stored by SDAG are not migratable\n");
00240 #endif
00241 }
00242
00243 virtual ~Buffer() {
00244 cl->deref();
00245 }
00246
00247 PUPable_decl(Buffer);
00248 };
00249
00250 struct Dependency {
00251 std::vector<std::list<int> > entryToWhen;
00252 std::vector<std::list<Continuation*> > whenToContinuation;
00253
00254
00255
00256
00257 std::vector<std::list<Buffer*> > buffer;
00258
00259 int curSpeculationIndex;
00260
00261 void pup(PUP::er& p) {
00262 p | curSpeculationIndex;
00263 p | entryToWhen;
00264 p | buffer;
00265 p | whenToContinuation;
00266 }
00267
00268 Dependency(int numEntries, int numWhens)
00269 : entryToWhen(numEntries)
00270 , whenToContinuation(numWhens)
00271 , buffer(numEntries)
00272 , curSpeculationIndex(0)
00273 { }
00274
00275
00276 Dependency() { }
00277
00278
00279 ~Dependency() {
00280 for (std::vector<std::list<Buffer*> >::iterator iter = buffer.begin();
00281 iter != buffer.end(); ++iter) {
00282 std::list<Buffer*> lst = *iter;
00283 for (std::list<Buffer*>::iterator iter2 = lst.begin();
00284 iter2 != lst.end(); ++iter2) {
00285 delete *iter2;
00286 }
00287 }
00288
00289 for (size_t i = 0; i < whenToContinuation.size(); i++) {
00290 for (std::list<Continuation*>::iterator iter2 = whenToContinuation[i].begin();
00291 iter2 != whenToContinuation[i].end(); ++iter2) {
00292 delete *iter2;
00293 }
00294 }
00295 }
00296
00297 void addDepends(int whenID, int entry) {
00298 entryToWhen[entry].push_back(whenID);
00299 }
00300
00301 void reg(Continuation *c) {
00302
00303 whenToContinuation[c->whenID].push_back(c);
00304 }
00305
00306 void dereg(Continuation *c) {
00307 CkAssert(c->whenID < (int)whenToContinuation.size());
00308 std::list<Continuation*>& lst = whenToContinuation[c->whenID];
00309 lst.remove(c);
00310 }
00311
00312 Buffer* pushBuffer(int entry, Closure *cl) {
00313 Buffer* buf = new Buffer(entry, cl);
00314 buffer[entry].push_back(buf);
00315 return buf;
00316 }
00317
00318 Continuation *tryFindContinuation(int entry) {
00319 for (std::list<int>::iterator iter = entryToWhen[entry].begin();
00320 iter != entryToWhen[entry].end();
00321 ++iter) {
00322 int whenID = *iter;
00323
00324 for (std::list<Continuation*>::iterator iter2 = whenToContinuation[whenID].begin();
00325 iter2 != whenToContinuation[whenID].end();
00326 iter2++) {
00327 Continuation* c = *iter2;
00328 if (searchBufferedMatching(c)) {
00329 dereg(c);
00330 return c;
00331 }
00332 }
00333 }
00334
00335 return 0;
00336 }
00337
00338 bool searchBufferedMatching(Continuation* t) {
00339 CkAssert(t->entries.size() == t->refnums.size());
00340 for (size_t i = 0; i < t->entries.size(); i++) {
00341 if (!tryFindMessage(t->entries[i], true, t->refnums[i], 0)) {
00342 return false;
00343 }
00344 }
00345 for (size_t i = 0; i < t->anyEntries.size(); i++) {
00346 if (!tryFindMessage(t->anyEntries[i], false, 0, 0)) {
00347 return false;
00348 }
00349 }
00350 return true;
00351 }
00352
00353 Buffer* tryFindMessage(int entry, bool hasRef, CMK_REFNUM_TYPE refnum, std::unordered_set<Buffer*>* ignore) {
00354
00355 for (std::list<Buffer*>::iterator iter = buffer[entry].begin();
00356 iter != buffer[entry].end();
00357 ++iter) {
00358 if ((!hasRef || ((*iter)->cl->hasRefnum && (*iter)->cl->refnum == refnum)) &&
00359 (!ignore || ignore->find(*iter) == ignore->end()))
00360 return *iter;
00361 }
00362 return 0;
00363 }
00364
00365 Buffer* tryFindMessage(int entry) {
00366 if (buffer[entry].size() == 0)
00367 return 0;
00368 else
00369 return buffer[entry].front();
00370 }
00371
00372 void removeMessage(Buffer *buf) {
00373 buffer[buf->entry].remove(buf);
00374 }
00375
00376 int getAndIncrementSpeculationIndex() {
00377 return curSpeculationIndex++;
00378 }
00379
00380 void removeAllSpeculationIndex(int speculationIndex) {
00381 for (std::vector<std::list<Continuation*> >::iterator iter = whenToContinuation.begin();
00382 iter != whenToContinuation.end();
00383 ++iter) {
00384 std::list<Continuation*>& lst = *iter;
00385
00386 for (std::list<Continuation*>::iterator iter2 = lst.begin();
00387 iter2 != lst.end();
00388
00389 ) {
00390 if ((*iter2)->speculationIndex == speculationIndex) {
00391 Continuation *cancelled = *iter2;
00392
00393 iter2 = lst.erase(iter2);
00394 delete cancelled;
00395 } else {
00396 iter2++;
00397 }
00398 }
00399 }
00400 }
00401 };
00402
00403 typedef std::unique_ptr<Dependency> dep_ptr;
00404
00405 void registerPUPables();
00406 }
00407
00408 #endif