00001 #define DEBUGP(x) // CmiPrintf x;
00002
00003 #include "ck.h"
00004
00005 class QdMsg {
00006 private:
00007 int phase;
00008 union {
00009 struct { } p1;
00010 struct { CmiInt8 created; CmiInt8 processed; } p2;
00011 struct { } p3;
00012 struct { char dirty; } p4;
00013 } u;
00014 CkCallback cb;
00015 public:
00016 int getPhase(void) { return phase; }
00017 void setPhase(int p) { phase = p; }
00018 CkCallback getCb(void) { CkAssert(phase==0); return cb; }
00019 void setCb(CkCallback cb_) { CkAssert(phase==0); cb = cb_; }
00020 CmiInt8 getCreated(void) { CkAssert(phase==1); return u.p2.created; }
00021 void setCreated(CmiInt8 c) { CkAssert(phase==1); u.p2.created = c; }
00022 CmiInt8 getProcessed(void) { CkAssert(phase==1); return u.p2.processed; }
00023 void setProcessed(CmiInt8 p) { CkAssert(phase==1); u.p2.processed = p; }
00024 char getDirty(void) { CkAssert(phase==2); return u.p4.dirty; }
00025 void setDirty(char d) { CkAssert(phase==2); u.p4.dirty = d; }
00026 };
00027
00028 class QdCommMsg {
00029 public:
00030 bool isCreated;
00031 int count;
00032 };
00033
00034 class QdCallback {
00035 public:
00036 CkCallback cb;
00037 public:
00038 QdCallback(int e, CkChareID c) : cb(e, c) {}
00039 QdCallback(CkCallback cb_) : cb(cb_) {}
00040
00041 void send(void) {
00042
00043 #if CMK_CONDS_USE_SPECIAL_CODE
00044 int old = CmiSwitchToPE(0);
00045 #endif
00046 cb.send(NULL);
00047 #if CMK_CONDS_USE_SPECIAL_CODE
00048 CmiSwitchToPE(old);
00049 #endif
00050 }
00051 };
00052
00053 extern int _qdCommHandlerIdx;
00054
00055
00056 int _dummy_dq = 0;
00057
00058 #if CMK_BIGSIM_CHARM
00059
00060
00061 #undef CmiSyncSendAndFree
00062 #define CmiSyncSendAndFree CmiFreeSendFn
00063 #endif
00064
00065 CpvDeclare(QdState*, _qd);
00066
00067
00068
00069
00070 static inline void _bcastQD1(QdState* state, QdMsg *msg)
00071 {
00072 msg->setPhase(0);
00073 state->propagate(msg);
00074 msg->setPhase(1);
00075 DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
00076 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 #endif
00094 msg->setCreated(state->getCreated());
00095 msg->setProcessed(state->getProcessed());
00096 envelope *env = UsrToEnv((void*)msg);
00097 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00098 state->markProcessed();
00099 state->reset();
00100 state->setStage(1);
00101 DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
00102 }
00103
00104
00105
00106 static inline void _bcastQD2(QdState* state, QdMsg *msg)
00107 {
00108 DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
00109 msg->setPhase(1);
00110 state->propagate(msg);
00111 msg->setPhase(2);
00112 msg->setDirty(state->isDirty());
00113 envelope *env = UsrToEnv((void*)msg);
00114 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00115 state->reset();
00116 state->setStage(2);
00117 DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
00118 }
00119
00120 static inline void _handlePhase0(QdState *state, QdMsg *msg)
00121 {
00122 DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00123 CkAssert(CmiMyPe()==0 || state->getStage()==0);
00124 if(CmiMyPe()==0) {
00125 QdCallback *qdcb = new QdCallback(msg->getCb());
00126 _MEMCHECK(qdcb);
00127 state->enq(qdcb);
00128 }
00129 if(state->getStage()==0)
00130 _bcastQD1(state, msg);
00131 else
00132 CkFreeMsg(msg);
00133 }
00134
00135
00136 static inline void _handlePhase1(QdState *state, QdMsg *msg)
00137 {
00138 DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00139 switch(state->getStage()) {
00140 case 0 :
00141 CkAssert(CmiMyPe()!=0);
00142 _bcastQD2(state, msg);
00143 break;
00144 case 1 :
00145 DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
00146
00147 state->subtreeCreate(msg->getCreated());
00148 state->subtreeProcess(msg->getProcessed());
00149 state->reported();
00150 if(state->allReported()) {
00151 if(CmiMyPe()==0) {
00152 DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
00153 if(state->getCCreated()==state->getCProcessed()) {
00154 if(state->oldCount == state->getCProcessed()) {
00155 _bcastQD2(state, msg);
00156 } else {
00157 state->oldCount = state->getCProcessed();
00158 _bcastQD1(state, msg);
00159 }
00160 } else {
00161 _bcastQD1(state, msg);
00162 }
00163 } else {
00164
00165 msg->setCreated(state->getCCreated());
00166 msg->setProcessed(state->getCProcessed());
00167 envelope *env = UsrToEnv((void*)msg);
00168 CmiSyncSendAndFree(state->getParent(),
00169 env->getTotalsize(), (char *)env);
00170 state->reset();
00171 state->setStage(0);
00172 }
00173 } else
00174 CkFreeMsg(msg);
00175 break;
00176 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00177 }
00178 }
00179
00180
00181 static inline void _handlePhase2(QdState *state, QdMsg *msg)
00182 {
00183
00184 DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
00185 CkAssert(state->getStage()==2);
00186 state->subtreeSetDirty(msg->getDirty());
00187 state->reported();
00188 if(state->allReported()) {
00189 if(CmiMyPe()==0) {
00190 if(state->isDirty()) {
00191 _bcastQD1(state, msg);
00192 } else {
00193
00194 DEBUGP(("[%d] quiescence detected at %f.\n", CmiMyPe(), CmiWallTimer()));
00195 QdCallback* cb;
00196 while(NULL!=(cb=state->deq())) {
00197 cb->send();
00198 delete cb;
00199 }
00200 state->reset();
00201 state->setStage(0);
00202 CkFreeMsg(msg);
00203 }
00204 } else {
00205
00206 DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), (int)state->isDirty()));
00207 msg->setDirty(state->isDirty());
00208 envelope *env = UsrToEnv((void*)msg);
00209 CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
00210 state->reset();
00211 state->setStage(0);
00212 }
00213 } else
00214 CkFreeMsg(msg);
00215 }
00216
00217 static void _callWhenIdle(QdMsg *msg)
00218 {
00219 DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
00220 QdState *state = CpvAccess(_qd);
00221 switch(msg->getPhase()) {
00222 case 0 : _handlePhase0(state, msg); break;
00223 case 1 : _handlePhase1(state, msg); break;
00224 case 2 : _handlePhase2(state, msg); break;
00225 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00226 }
00227 }
00228
00229 static void _invokeQD(QdMsg *msg)
00230 {
00231 QdCallback *cb = new QdCallback(msg->getCb());
00232 cb->send();
00233 delete cb;
00234 }
00235
00236 void _qdHandler(envelope *env)
00237 {
00238 QdMsg *msg = (QdMsg*) EnvToUsr(env);
00239 DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
00240 if (_dummy_dq > 0)
00241 CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, _dummy_dq*1000);
00242 else
00243 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
00244 }
00245
00246
00247
00248 void _qdCommHandler(envelope *env)
00249 {
00250 QdCommMsg *msg = (QdCommMsg*) EnvToUsr(env);
00251 DEBUGP(("[%d] _qdCommHandler msg:%p \n", CmiMyPe(), msg));
00252 if (!msg->isCreated)
00253 CpvAccess(_qd)->create(msg->count);
00254 else
00255 CpvAccess(_qd)->process(msg->count);
00256 CmiFree(env);
00257 }
00258
00259 void QdState::sendCount(bool isCreated, int count)
00260 {
00261 if (_dummy_dq == 0) {
00262 #if CMK_NET_VERSION && ! CMK_SMP && ! defined(CMK_CPV_IS_SMP)
00263 if (CmiImmIsRunning())
00264 #else
00265 if (CmiMyRank() == CmiMyNodeSize())
00266 #endif
00267 {
00268 QdCommMsg *msg = (QdCommMsg*) CkAllocMsg(0,sizeof(QdCommMsg),0);
00269 msg->isCreated = isCreated;
00270 msg->count = count;
00271 envelope *env = UsrToEnv((void *)msg);
00272 CmiSetHandler(env, _qdCommHandlerIdx);
00273 CmiFreeSendFn(CmiNodeFirst(CmiMyNode()), env->getTotalsize(), (char *)env);
00274 }
00275 }
00276 }
00277
00278 void CkStartQD(const CkCallback& cb)
00279 {
00280 QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
00281 msg->setPhase(0);
00282 msg->setCb(cb);
00283 envelope *env = UsrToEnv((void *)msg);
00284 CmiSetHandler(env, _qdHandlerIdx);
00285 #if CMK_MEM_CHECKPOINT
00286 CmiGetRestartPhase(env) = 9999;
00287 #endif
00288 #if CMK_BIGSIM_CHARM
00289 CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
00290 #else
00291 _CldEnqueue(0, env, _infoIdx);
00292 #endif
00293 }
00294
00295 void CkStartQD(int eIdx, const CkChareID *cid)
00296 {
00297 CkStartQD(CkCallback(eIdx, *cid));
00298 }