00001
00002
00003
00004
00005
00006
00007
00008 #define DEBUGP(x) // CmiPrintf x;
00009
00010 #include "ck.h"
00011
00012
00013
00014 #define CMK_DUMMY_QD 0
00015
00016 #if CMK_BLUEGENE_CHARM
00017
00018
00019 #undef CmiSyncSendAndFree
00020 #define CmiSyncSendAndFree CmiFreeSendFn
00021 #endif
00022
00023 CpvDeclare(QdState*, _qd);
00024
00025
00026
00027
00028 static inline void _bcastQD1(QdState* state, QdMsg *msg)
00029 {
00030 msg->setPhase(0);
00031 state->propagate(msg);
00032 msg->setPhase(1);
00033 DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
00034 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 #endif
00053 msg->setCreated(state->getCreated());
00054 msg->setProcessed(state->getProcessed());
00055 envelope *env = UsrToEnv((void*)msg);
00056 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00057 state->markProcessed();
00058 state->reset();
00059 state->setStage(1);
00060 DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
00061 }
00062
00063
00064
00065 static inline void _bcastQD2(QdState* state, QdMsg *msg)
00066 {
00067 DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
00068 msg->setPhase(1);
00069 state->propagate(msg);
00070 msg->setPhase(2);
00071 msg->setDirty(state->isDirty());
00072 envelope *env = UsrToEnv((void*)msg);
00073 CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00074 state->reset();
00075 state->setStage(2);
00076 DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
00077 }
00078
00079 static inline void _handlePhase0(QdState *state, QdMsg *msg)
00080 {
00081 DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00082 CkAssert(CmiMyPe()==0 || state->getStage()==0);
00083 if(CmiMyPe()==0) {
00084 QdCallback *qdcb = new QdCallback(msg->getCb());
00085 _MEMCHECK(qdcb);
00086 state->enq(qdcb);
00087 }
00088 if(state->getStage()==0)
00089 _bcastQD1(state, msg);
00090 else
00091 CkFreeMsg(msg);
00092 }
00093
00094
00095 static inline void _handlePhase1(QdState *state, QdMsg *msg)
00096 {
00097 DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00098 switch(state->getStage()) {
00099 case 0 :
00100 CkAssert(CmiMyPe()!=0);
00101 _bcastQD2(state, msg);
00102 break;
00103 case 1 :
00104 DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
00105
00106 state->subtreeCreate(msg->getCreated());
00107 state->subtreeProcess(msg->getProcessed());
00108 state->reported();
00109 if(state->allReported()) {
00110 if(CmiMyPe()==0) {
00111 DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
00112 if(state->getCCreated()==state->getCProcessed()) {
00113 _bcastQD2(state, msg);
00114 } else {
00115 _bcastQD1(state, msg);
00116 }
00117 } else {
00118
00119 msg->setCreated(state->getCCreated());
00120 msg->setProcessed(state->getCProcessed());
00121 envelope *env = UsrToEnv((void*)msg);
00122 CmiSyncSendAndFree(state->getParent(),
00123 env->getTotalsize(), (char *)env);
00124 state->reset();
00125 state->setStage(0);
00126 }
00127 } else
00128 CkFreeMsg(msg);
00129 break;
00130 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00131 }
00132 }
00133
00134
00135 static inline void _handlePhase2(QdState *state, QdMsg *msg)
00136 {
00137
00138 DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
00139 CkAssert(state->getStage()==2);
00140 state->subtreeSetDirty(msg->getDirty());
00141 state->reported();
00142 if(state->allReported()) {
00143 if(CmiMyPe()==0) {
00144 if(state->isDirty()) {
00145 _bcastQD1(state, msg);
00146 } else {
00147
00148 DEBUGP(("[%d] quiescence detected,\n", CmiMyPe()));
00149 QdCallback* cb;
00150 while(NULL!=(cb=state->deq())) {
00151 cb->send();
00152 delete cb;
00153 }
00154 state->reset();
00155 state->setStage(0);
00156 CkFreeMsg(msg);
00157 }
00158 } else {
00159
00160 DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), state->isDirty()));
00161 msg->setDirty(state->isDirty());
00162 envelope *env = UsrToEnv((void*)msg);
00163 CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
00164 state->reset();
00165 state->setStage(0);
00166 }
00167 } else
00168 CkFreeMsg(msg);
00169 }
00170
00171 static void _callWhenIdle(QdMsg *msg)
00172 {
00173 DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
00174 QdState *state = CpvAccess(_qd);
00175 switch(msg->getPhase()) {
00176 case 0 : _handlePhase0(state, msg); break;
00177 case 1 : _handlePhase1(state, msg); break;
00178 case 2 : _handlePhase2(state, msg); break;
00179 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00180 }
00181 }
00182
00183 #if CMK_DUMMY_QD
00184 static void _invokeQD(QdMsg *msg)
00185 {
00186 QdCallback *cb = new QdCallback(msg->getCb());
00187 cb->send();
00188 delete cb;
00189 }
00190 #endif
00191
00192 void _qdHandler(envelope *env)
00193 {
00194 register QdMsg *msg = (QdMsg*) EnvToUsr(env);
00195 DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
00196 #if CMK_DUMMY_QD
00197 CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, CMK_DUMMY_QD*1000);
00198 #else
00199 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
00200 #endif
00201 }
00202
00203
00204 void CkStartQD(const CkCallback& cb)
00205 {
00206 register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
00207 msg->setPhase(0);
00208 msg->setCb(cb);
00209 register envelope *env = UsrToEnv((void *)msg);
00210 CmiSetHandler(env, _qdHandlerIdx);
00211 #if CMK_BLUEGENE_CHARM
00212 CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
00213 #else
00214 CldEnqueue(0, env, _infoIdx);
00215 #endif
00216 }
00217
00218 extern "C"
00219 void CkStartQD(int eIdx, const CkChareID *cid)
00220 {
00221 CkStartQD(CkCallback(eIdx, *cid));
00222 }