00001 #define DEBUGP(x) // CmiPrintf x;
00002
00003 #include "ck.h"
00004
00005 extern int _qdCommHandlerIdx;
00006
00007
00008 int _dummy_dq = 0;
00009
00010 #if CMK_BIGSIM_CHARM
00011
00012
00013 #undef CmiSyncSendAndFree
00014 #define CmiSyncSendAndFree CmiFreeSendFn
00015 #endif
00016
00017 CpvDeclare(QdState*, _qd);
00018
00019
00020
00021
00022 static inline void _bcastQD1(QdState* state, QdMsg *msg)
00023 {
00024 msg->setPhase(0);
00025 state->propagate(msg);
00026 msg->setPhase(1);
00027 DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
00028 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046 #endif
00047 msg->setCreated(state->getCreated());
00048 msg->setProcessed(state->getProcessed());
00049 envelope *env = UsrToEnv((void*)msg);
00050 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00051 state->markProcessed();
00052 state->reset();
00053 state->setStage(1);
00054 DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
00055 }
00056
00057
00058
00059 static inline void _bcastQD2(QdState* state, QdMsg *msg)
00060 {
00061 DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
00062 msg->setPhase(1);
00063 state->propagate(msg);
00064 msg->setPhase(2);
00065 msg->setDirty(state->isDirty());
00066 envelope *env = UsrToEnv((void*)msg);
00067 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00068 state->reset();
00069 state->setStage(2);
00070 DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
00071 }
00072
00073 static inline void _handlePhase0(QdState *state, QdMsg *msg)
00074 {
00075 DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00076 CkAssert(CmiMyPe()==0 || state->getStage()==0);
00077 if(CmiMyPe()==0) {
00078 QdCallback *qdcb = new QdCallback(msg->getCb());
00079 _MEMCHECK(qdcb);
00080 state->enq(qdcb);
00081 }
00082 if(state->getStage()==0)
00083 _bcastQD1(state, msg);
00084 else
00085 CkFreeMsg(msg);
00086 }
00087
00088
00089 static inline void _handlePhase1(QdState *state, QdMsg *msg)
00090 {
00091 DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00092 switch(state->getStage()) {
00093 case 0 :
00094 CkAssert(CmiMyPe()!=0);
00095 _bcastQD2(state, msg);
00096 break;
00097 case 1 :
00098 DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
00099
00100 state->subtreeCreate(msg->getCreated());
00101 state->subtreeProcess(msg->getProcessed());
00102 state->reported();
00103 if(state->allReported()) {
00104 if(CmiMyPe()==0) {
00105 DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
00106 if(state->getCCreated()==state->getCProcessed()) {
00107 _bcastQD2(state, msg);
00108 } else {
00109 _bcastQD1(state, msg);
00110 }
00111 } else {
00112
00113 msg->setCreated(state->getCCreated());
00114 msg->setProcessed(state->getCProcessed());
00115 envelope *env = UsrToEnv((void*)msg);
00116 CmiSyncSendAndFree(state->getParent(),
00117 env->getTotalsize(), (char *)env);
00118 state->reset();
00119 state->setStage(0);
00120 }
00121 } else
00122 CkFreeMsg(msg);
00123 break;
00124 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00125 }
00126 }
00127
00128
00129 static inline void _handlePhase2(QdState *state, QdMsg *msg)
00130 {
00131
00132 DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
00133 CkAssert(state->getStage()==2);
00134 state->subtreeSetDirty(msg->getDirty());
00135 state->reported();
00136 if(state->allReported()) {
00137 if(CmiMyPe()==0) {
00138 if(state->isDirty()) {
00139 _bcastQD1(state, msg);
00140 } else {
00141
00142 DEBUGP(("[%d] quiescence detected at %f.\n", CmiMyPe(), CmiWallTimer()));
00143 QdCallback* cb;
00144 while(NULL!=(cb=state->deq())) {
00145 cb->send();
00146 delete cb;
00147 }
00148 state->reset();
00149 state->setStage(0);
00150 CkFreeMsg(msg);
00151 }
00152 } else {
00153
00154 DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), state->isDirty()));
00155 msg->setDirty(state->isDirty());
00156 envelope *env = UsrToEnv((void*)msg);
00157 CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
00158 state->reset();
00159 state->setStage(0);
00160 }
00161 } else
00162 CkFreeMsg(msg);
00163 }
00164
00165 static void _callWhenIdle(QdMsg *msg)
00166 {
00167 DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
00168 QdState *state = CpvAccess(_qd);
00169 switch(msg->getPhase()) {
00170 case 0 : _handlePhase0(state, msg); break;
00171 case 1 : _handlePhase1(state, msg); break;
00172 case 2 : _handlePhase2(state, msg); break;
00173 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00174 }
00175 }
00176
00177 static void _invokeQD(QdMsg *msg)
00178 {
00179 QdCallback *cb = new QdCallback(msg->getCb());
00180 cb->send();
00181 delete cb;
00182 }
00183
00184 void _qdHandler(envelope *env)
00185 {
00186 register QdMsg *msg = (QdMsg*) EnvToUsr(env);
00187 DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
00188 if (_dummy_dq > 0)
00189 CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, _dummy_dq*1000);
00190 else
00191 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
00192 }
00193
00194
00195
00196 void _qdCommHandler(envelope *env)
00197 {
00198 register QdCommMsg *msg = (QdCommMsg*) EnvToUsr(env);
00199 DEBUGP(("[%d] _qdCommHandler msg:%p \n", CmiMyPe(), msg));
00200 if (msg->flag == 0)
00201 CpvAccess(_qd)->create(msg->count);
00202 else
00203 CpvAccess(_qd)->process(msg->count);
00204 CmiFree(env);
00205 }
00206
00207 void QdState::sendCount(int flag, int count)
00208 {
00209 if (_dummy_dq == 0) {
00210 #if CMK_NET_VERSION && ! CMK_SMP && ! defined(CMK_CPV_IS_SMP)
00211 if (CmiImmIsRunning())
00212 #else
00213 if (CmiMyRank() == CmiMyNodeSize())
00214 #endif
00215 {
00216 register QdCommMsg *msg = (QdCommMsg*) CkAllocMsg(0,sizeof(QdCommMsg),0);
00217 msg->flag = flag;
00218 msg->count = count;
00219 register envelope *env = UsrToEnv((void *)msg);
00220 CmiSetHandler(env, _qdCommHandlerIdx);
00221 CmiFreeSendFn(CmiNodeFirst(CmiMyNode()), env->getTotalsize(), (char *)env);
00222 }
00223 }
00224 }
00225
00226 void CkStartQD(const CkCallback& cb)
00227 {
00228 register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
00229 msg->setPhase(0);
00230 msg->setCb(cb);
00231 register envelope *env = UsrToEnv((void *)msg);
00232 CmiSetHandler(env, _qdHandlerIdx);
00233 #if CMK_MEM_CHECKPOINT
00234 CmiGetRestartPhase(env) = 9999;
00235 #endif
00236 #if CMK_BIGSIM_CHARM
00237 CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
00238 #else
00239 _CldEnqueue(0, env, _infoIdx);
00240 #endif
00241 }
00242
00243 extern "C"
00244 void CkStartQD(int eIdx, const CkChareID *cid)
00245 {
00246 CkStartQD(CkCallback(eIdx, *cid));
00247 }