00001 #ifndef _CK_H_
00002 #define _CK_H_
00003
00004 #include <string.h>
00005 #include <stdlib.h>
00006 #include <math.h>
00007 #include "qd.h"
00008 #include "charm++.h"
00009 #include "envelope.h"
00010 #include "register.h"
00011 #include "stats.h"
00012 #include "ckfutures.h"
00013 #include "TopoManager.h"
00014
00015 #if CMK_ERROR_CHECKING
00016 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
00017 #else
00018 #define _CHECK_VALID(p, msg) do { } while(0)
00019 #endif
00020
00021
00022 extern int _replaySystem;
00023
00024 #if CMK_CHARMDEBUG
00025 int ConverseDeliver(int pe);
00026 inline void _CldEnqueue(int pe, void *msg, int infofn) {
00027 if (!ConverseDeliver(pe)) {
00028 CmiFree(msg);
00029 return;
00030 }
00031 #if CMK_ONESIDED_IMPL
00032 envelope *env = (envelope *)msg;
00033
00034 if(CMI_IS_ZC_BCAST(msg))
00035 CkRdmaPrepareBcastMsg(env);
00036 #endif
00037 CldEnqueue(pe, msg, infofn);
00038 }
00039 inline void _CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn) {
00040 if (!ConverseDeliver(-1)) {
00041 CmiFree(msg);
00042 return;
00043 }
00044 CldEnqueueMulti(npes, pes, msg, infofn);
00045 }
00046 inline void _CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
00047 if (!ConverseDeliver(-1)) {
00048 CmiFree(msg);
00049 return;
00050 }
00051 CldEnqueueGroup(grp, msg, infofn);
00052 }
00053 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
00054 if (!ConverseDeliver(node)) {
00055 CmiFree(msg);
00056 return;
00057 }
00058 #if CMK_ONESIDED_IMPL
00059 envelope *env = (envelope *)msg;
00060
00061 if(CMI_IS_ZC_BCAST(msg))
00062 CkRdmaPrepareBcastMsg(env);
00063 #endif
00064 CldNodeEnqueue(node, msg, infofn);
00065 }
00066 #else
00067
00068 inline void _CldEnqueue(int pe, void *msg, int infofn) {
00069 #if CMK_ONESIDED_IMPL
00070 envelope *env = (envelope *)msg;
00071
00072 if(CMI_IS_ZC_BCAST(msg))
00073 CkRdmaPrepareBcastMsg(env);
00074 #endif
00075 CldEnqueue(pe, msg, infofn);
00076 }
00077
00078 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
00079 #if CMK_ONESIDED_IMPL
00080 envelope *env = (envelope *)msg;
00081
00082 if(CMI_IS_ZC_BCAST(msg))
00083 CkRdmaPrepareBcastMsg(env);
00084 #endif
00085 CldNodeEnqueue(node, msg, infofn);
00086 }
00087 #define _CldEnqueueMulti CldEnqueueMulti
00088 #define _CldEnqueueGroup CldEnqueueGroup
00089 #endif
00090
00091 #ifndef CMK_CHARE_USE_PTR
00092 CkpvExtern(std::vector<void *>, chare_objs);
00093 #endif
00094
00095 #include <unordered_map>
00096 typedef std::unordered_map<CmiUInt8, ArrayElement*> ArrayObjMap;
00097 CkpvExtern(ArrayObjMap, array_objs);
00098
00100 class VidBlock {
00101 enum VidState : uint8_t {FILLED, UNFILLED};
00102 VidState state;
00103 PtrQ *msgQ;
00104 CkChareID actualID;
00105 void msgDeliver(envelope *env) {
00106
00107
00108 env->setMsgtype(ForChareMsg);
00109 env->setObjPtr(actualID.objPtr);
00110 _CldEnqueue(actualID.onPE, env, _infoIdx);
00111 CpvAccess(_qd)->create();
00112 }
00113 public:
00114 VidBlock() ;
00115 void send(envelope *env) {
00116 if(state==UNFILLED) {
00117 msgQ->enq((void *)env);
00118 } else {
00119 msgDeliver(env);
00120 }
00121 }
00122 void fill(int onPE, void *oPtr) {
00123 state = FILLED;
00124 actualID.onPE = onPE;
00125 actualID.objPtr = oPtr;
00126 envelope *env;
00127 while(NULL!=(env=(envelope*)msgQ->deq())) {
00128 msgDeliver(env);
00129 }
00130 delete msgQ; msgQ=0;
00131 }
00132 void *getLocalChare(void) {
00133 if (state==FILLED && actualID.onPE==CkMyPe())
00134 return actualID.objPtr;
00135 return NULL;
00136 }
00137 void *getLocalChareObj(void) {
00138
00139 if (state==FILLED && actualID.onPE==CkMyPe())
00140 #ifdef CMK_CHARE_USE_PTR
00141 return actualID.objPtr;
00142 #else
00143 return CkpvAccess(chare_objs)[(CmiIntPtr)actualID.objPtr];
00144 #endif
00145 return NULL;
00146 }
00147 void pup(PUP::er &p) {
00148 #ifndef CMK_CHARE_USE_PTR
00149 int s = 0;
00150 if (!p.isUnpacking()) s = state-FILLED;
00151 p|s;
00152 if (p.isUnpacking()) state = (VidState)(FILLED+s);
00153 if (p.isUnpacking()) msgQ = NULL;
00154 p|actualID;
00155 #endif
00156 }
00157 };
00158
00159 class CkCoreState;
00160
00162 class CkMessageWatcher {
00163 protected:
00164 FILE *f;
00165 CkMessageWatcher *next;
00166 public:
00167 CkMessageWatcher() : f(NULL), next(NULL) { }
00168 virtual ~CkMessageWatcher();
00175 #define PROCESS_MACRO(name,type) inline bool process##name(type *input,CkCoreState *ck) { \
00176 bool result = true; \
00177 if (next != NULL) result &= next->process##name(input, ck); \
00178 result &= process(input, ck); \
00179 return result; \
00180 }
00181
00182 PROCESS_MACRO(Message,envelope*);
00183 PROCESS_MACRO(Thread,CthThreadToken);
00184 PROCESS_MACRO(LBMessage,LBMigrateMsg*);
00185
00186 #undef PROCESS_MACRO
00187 protected:
00189 virtual bool process(envelope **env,CkCoreState *ck) =0;
00190 virtual bool process(CthThreadToken *token, CkCoreState *ck) {return true;}
00191 virtual bool process(LBMigrateMsg **msg, CkCoreState *ck) {return true;}
00192 public:
00193 inline void setNext(CkMessageWatcher *w) { next = w; }
00194 };
00195
00197 class CkCoreState {
00198 GroupTable *groupTable;
00199 QdState *qd;
00200 public:
00201 CkMessageWatcher *watcher;
00203 inline void addWatcher(CkMessageWatcher *w) {
00204 w->setNext(watcher);
00205 watcher = w;
00206 }
00207
00208 CkCoreState()
00209 :groupTable(CkpvAccess(_groupTable)),
00210 qd(CpvAccess(_qd)) { watcher=NULL; }
00211 ~CkCoreState() { delete watcher;}
00212
00213 inline GroupTable *getGroupTable() const {
00214 return groupTable;
00215 }
00216 inline IrrGroup *localBranch(CkGroupID gID) const {
00217 return groupTable->find(gID).getObj();
00218 }
00219
00220 inline QdState *getQD() {return qd;}
00221
00222
00223 inline void process(int n=1) {
00224 if (CmiImmIsRunning())
00225 CpvAccessOther(_qd, 1)->process(n);
00226 else
00227 qd->process(n);
00228 }
00229 inline void create(int n=1) {
00230 if (CmiImmIsRunning())
00231 CpvAccessOther(_qd, 1)->create(n);
00232 else
00233 qd->create(n);
00234 }
00235 };
00236
00237 CkpvExtern(CkCoreState *, _coreState);
00238
00239 void CpdHandleLBMessage(LBMigrateMsg **msg);
00240 void CkMessageWatcherInit(char **argv,CkCoreState *ck);
00241
00242 extern void _processHandler(void *converseMsg,CkCoreState *ck);
00243 extern void _processBocInitMsg(CkCoreState *ck,envelope *msg);
00244 extern void _processNodeBocInitMsg(CkCoreState *ck,envelope *msg);
00245 extern void _infoFn(void *msg, CldPackFn *pfn, int *len,
00246 int *queueing, int *priobits, UInt **prioptr);
00247 extern void CkCreateLocalGroup(CkGroupID groupID, int eIdx, envelope *env);
00248 extern void CkCreateLocalNodeGroup(CkGroupID groupID, int eIdx, envelope *env);
00249 extern void _createGroup(CkGroupID groupID, envelope *env);
00250 extern void _createNodeGroup(CkGroupID groupID, envelope *env);
00251 extern int _getGroupIdx(int,int,int);
00252 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env,const CkGroupID &groupID);
00253 extern IrrGroup* _getCkLocalBranchFromGroupID(CkGroupID &gID);
00254
00255 void QdCreate(int n);
00256 void QdProcess(int n);
00257 #endif