00001 #ifndef _CK_H_
00002 #define _CK_H_
00003
00004 #include <string.h>
00005 #include <stdlib.h>
00006 #include <math.h>
00007 #include "charm++.h"
00008 #include "envelope.h"
00009 #include "qd.h"
00010 #include "register.h"
00011 #include "stats.h"
00012 #include "ckfutures.h"
00013 #include "charisma.h"
00014 #include "TopoManager.h"
00015
00016 #if CMK_ERROR_CHECKING
00017 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
00018 #else
00019 #define _CHECK_VALID(p, msg) do { } while(0)
00020 #endif
00021
00022
00023 extern "C" int _replaySystem;
00024
00025 #if CMK_CHARMDEBUG
00026 extern "C" int ConverseDeliver(int pe);
00027 inline void _CldEnqueue(int pe, void *msg, int infofn) {
00028 if (!ConverseDeliver(pe)) {
00029 CmiFree(msg);
00030 return;
00031 }
00032 CldEnqueue(pe, msg, infofn);
00033 }
00034 inline void _CldEnqueueMulti(int npes, int *pes, void *msg, int infofn) {
00035 if (!ConverseDeliver(-1)) {
00036 CmiFree(msg);
00037 return;
00038 }
00039 CldEnqueueMulti(npes, pes, msg, infofn);
00040 }
00041 inline void _CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
00042 if (!ConverseDeliver(-1)) {
00043 CmiFree(msg);
00044 return;
00045 }
00046 CldEnqueueGroup(grp, msg, infofn);
00047 }
00048 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
00049 if (!ConverseDeliver(node)) {
00050 CmiFree(msg);
00051 return;
00052 }
00053 CldNodeEnqueue(node, msg, infofn);
00054 }
00055 #else
00056 #define _CldEnqueue CldEnqueue
00057 #define _CldEnqueueMulti CldEnqueueMulti
00058 #define _CldEnqueueGroup CldEnqueueGroup
00059 #define _CldNodeEnqueue CldNodeEnqueue
00060 #endif
00061
00062 #ifndef CMK_CHARE_USE_PTR
00063 CkpvExtern(CkVec<void *>, chare_objs);
00064 #endif
00065
00067 class VidBlock {
00068 enum VidState {FILLED, UNFILLED};
00069 VidState state;
00070 PtrQ *msgQ;
00071 CkChareID actualID;
00072 void msgDeliver(envelope *env) {
00073
00074
00075 env->setMsgtype(ForChareMsg);
00076 env->setObjPtr(actualID.objPtr);
00077 _CldEnqueue(actualID.onPE, env, _infoIdx);
00078 CpvAccess(_qd)->create();
00079 }
00080 public:
00081 VidBlock() ;
00082 void send(envelope *env) {
00083 if(state==UNFILLED) {
00084 msgQ->enq((void *)env);
00085 } else {
00086 msgDeliver(env);
00087 }
00088 }
00089 void fill(int onPE, void *oPtr) {
00090 state = FILLED;
00091 actualID.onPE = onPE;
00092 actualID.objPtr = oPtr;
00093 envelope *env;
00094 while(NULL!=(env=(envelope*)msgQ->deq())) {
00095 msgDeliver(env);
00096 }
00097 delete msgQ; msgQ=0;
00098 }
00099 void *getLocalChare(void) {
00100 if (state==FILLED && actualID.onPE==CkMyPe())
00101 return actualID.objPtr;
00102 return NULL;
00103 }
00104 void *getLocalChareObj(void) {
00105
00106 if (state==FILLED && actualID.onPE==CkMyPe())
00107 #ifdef CMK_CHARE_USE_PTR
00108 return actualID.objPtr;
00109 #else
00110 return CkpvAccess(chare_objs)[(CmiIntPtr)actualID.objPtr];
00111 #endif
00112 return NULL;
00113 }
00114 void pup(PUP::er &p) {
00115 #ifndef CMK_CHARE_USE_PTR
00116 int s;
00117 if (!p.isUnpacking()) s = state-FILLED;
00118 p|s;
00119 if (p.isUnpacking()) state = (VidState)(FILLED+s);
00120 if (p.isUnpacking()) msgQ = NULL;
00121 p|actualID;
00122 #endif
00123 }
00124 };
00125
00126 class CkCoreState;
00127
00129 class CkMessageWatcher {
00130 protected:
00131 FILE *f;
00132 CkMessageWatcher *next;
00133 public:
00134 CkMessageWatcher() : f(NULL), next(NULL) { }
00135 virtual ~CkMessageWatcher();
00142 #define PROCESS_MACRO(name,type) inline CmiBool process##name(type *input,CkCoreState *ck) { \
00143 CmiBool result = CmiTrue; \
00144 if (next != NULL) result &= next->process##name(input, ck); \
00145 result &= process(input, ck); \
00146 return result; \
00147 }
00148
00149 PROCESS_MACRO(Message,envelope*);
00150 PROCESS_MACRO(Thread,CthThreadToken);
00151 PROCESS_MACRO(LBMessage,LBMigrateMsg*);
00152
00153 #undef PROCESS_MACRO
00154 protected:
00156 virtual CmiBool process(envelope **env,CkCoreState *ck) =0;
00157 virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {return CmiTrue;}
00158 virtual CmiBool process(LBMigrateMsg **msg, CkCoreState *ck) {return CmiTrue;}
00159 public:
00160 inline void setNext(CkMessageWatcher *w) { next = w; }
00161 };
00162
00164 class CkCoreState {
00165 GroupTable *groupTable;
00166 QdState *qd;
00167 public:
00168 CkMessageWatcher *watcher;
00170 inline void addWatcher(CkMessageWatcher *w) {
00171 w->setNext(watcher);
00172 watcher = w;
00173 }
00174
00175 CkCoreState()
00176 :groupTable(CkpvAccess(_groupTable)),
00177 qd(CpvAccess(_qd)) { watcher=NULL; }
00178 ~CkCoreState() { delete watcher;}
00179
00180 inline GroupTable *getGroupTable() {
00181 return groupTable;
00182 }
00183 inline IrrGroup *localBranch(CkGroupID gID) {
00184 return groupTable->find(gID).getObj();
00185 }
00186
00187 inline QdState *getQD() {return qd;}
00188
00189
00190 inline void process(int n=1) {
00191 if (CmiImmIsRunning())
00192 CpvAccessOther(_qd, 1)->process(n);
00193 else
00194 qd->process(n);
00195 }
00196 inline void create(int n=1) {
00197 if (CmiImmIsRunning())
00198 CpvAccessOther(_qd, 1)->create(n);
00199 else
00200 qd->create(n);
00201 }
00202 };
00203
00204 CkpvExtern(CkCoreState *, _coreState);
00205
00206 void CpdHandleLBMessage(LBMigrateMsg **msg);
00207 void CkMessageWatcherInit(char **argv,CkCoreState *ck);
00208
00209 extern void _processHandler(void *converseMsg,CkCoreState *ck);
00210 extern void _processBocInitMsg(CkCoreState *ck,envelope *msg);
00211 extern void _processNodeBocInitMsg(CkCoreState *ck,envelope *msg);
00212 extern void _infoFn(void *msg, CldPackFn *pfn, int *len,
00213 int *queueing, int *priobits, UInt **prioptr);
00214 extern void CkCreateLocalGroup(CkGroupID groupID, int eIdx, envelope *env);
00215 extern void CkCreateLocalNodeGroup(CkGroupID groupID, int eIdx, envelope *env);
00216 extern void _createGroup(CkGroupID groupID, envelope *env);
00217 extern void _createNodeGroup(CkGroupID groupID, envelope *env);
00218 extern int _getGroupIdx(int,int,int);
00219
00220 #endif