ck-core/qd.C

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/ck-core/qd.C,v $
00003  * $Author: gzheng $
00004  * $Date: 2008-05-08 06:32:57 $
00005  * $Revision: 2.25 $
00006  *****************************************************************************/
00007 
00008 #define  DEBUGP(x)    // CmiPrintf x;
00009 
00010 #include "ck.h"
00011 
00012 
00013 // a fake QD which just wait for several seconds to triger QD callback
00014 #define CMK_DUMMY_QD             0      /* seconds to wait for */
00015 
00016 #if CMK_BLUEGENE_CHARM
00017 // this is a hack for bgcharm++, I need to figure out a better
00018 // way to do this
00019 #undef CmiSyncSendAndFree
00020 #define CmiSyncSendAndFree    CmiFreeSendFn
00021 #endif
00022 
00023 CpvDeclare(QdState*, _qd);
00024 
00025 // called when a node asks children for their counters
00026 // send broadcast msg (phase 0) to children, and report to itself (phase 1)
00027 // stage 1 means the node is waiting for reports from children
00028 static inline void _bcastQD1(QdState* state, QdMsg *msg)
00029 {
00030   msg->setPhase(0);
00031   state->propagate(msg);
00032   msg->setPhase(1);
00033   DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
00034 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
00035 /* immediate message does not count in QD now */
00036 /*
00037   QdState *comm_state;
00038   static int comm_create=0, comm_process=0;
00039   if (CmiMyRank()==0) {
00040     comm_state = CpvAccessOther(_qd, CmiMyNodeSize());
00041     int new_create = comm_state->getCreated();
00042     int new_process = comm_state->getProcessed();
00043     // combine counters with comm thread
00044     CmiAssert(new_create==0);
00045     CmiAssert(new_create == 0&& new_process==0);
00046     state->create(new_create-comm_create);
00047     state->process(new_process-comm_process);
00048     comm_create = new_create;
00049     comm_process = new_process;
00050   }
00051 */
00052 #endif
00053   msg->setCreated(state->getCreated());
00054   msg->setProcessed(state->getProcessed());
00055   envelope *env = UsrToEnv((void*)msg);
00056   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00057   state->markProcessed();
00058   state->reset();
00059   state->setStage(1);
00060   DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
00061 }
00062 
00063 // final phase to check if the counters become dirty or not
00064 // stage 2 means the node is waiting for children to report their dirty state
00065 static inline void _bcastQD2(QdState* state, QdMsg *msg)
00066 {
00067   DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
00068   msg->setPhase(1);
00069   state->propagate(msg);
00070   msg->setPhase(2);
00071   msg->setDirty(state->isDirty());
00072   envelope *env = UsrToEnv((void*)msg);
00073   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
00074   state->reset();
00075   state->setStage(2);
00076   DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
00077 }
00078 
00079 static inline void _handlePhase0(QdState *state, QdMsg *msg)
00080 {
00081   DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00082   CkAssert(CmiMyPe()==0 || state->getStage()==0);
00083   if(CmiMyPe()==0) {
00084     QdCallback *qdcb = new QdCallback(msg->getCb());
00085     _MEMCHECK(qdcb);
00086     state->enq(qdcb);           // stores qd callback
00087   }
00088   if(state->getStage()==0)
00089     _bcastQD1(state, msg);        // start asking children for the counters
00090   else
00091     CkFreeMsg(msg);               // already in the middle of processing
00092 }
00093 
00094 // collecting counters from children
00095 static inline void _handlePhase1(QdState *state, QdMsg *msg)
00096 {
00097   DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
00098   switch(state->getStage()) {
00099     case 0 :
00100       CkAssert(CmiMyPe()!=0);
00101       _bcastQD2(state, msg);
00102       break;
00103     case 1 :
00104       DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
00105         // add children's counters
00106       state->subtreeCreate(msg->getCreated());
00107       state->subtreeProcess(msg->getProcessed());
00108       state->reported();
00109       if(state->allReported()) {
00110         if(CmiMyPe()==0) {
00111           DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
00112           if(state->getCCreated()==state->getCProcessed()) {
00113             _bcastQD2(state, msg);    // almost reached, one pass to make sure
00114           } else {
00115             _bcastQD1(state, msg);    // not reached, go over again
00116           }
00117         } else {
00118             // report counters to parent
00119           msg->setCreated(state->getCCreated());
00120           msg->setProcessed(state->getCProcessed());
00121           envelope *env = UsrToEnv((void*)msg);
00122           CmiSyncSendAndFree(state->getParent(), 
00123                              env->getTotalsize(), (char *)env);
00124           state->reset();
00125           state->setStage(0);
00126         }
00127       } else
00128           CkFreeMsg(msg);
00129       break;
00130     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00131   }
00132 }
00133 
00134 // check if counters became dirty and notify parents
00135 static inline void _handlePhase2(QdState *state, QdMsg *msg)
00136 {
00137 //  This assertion seems too strong for smp and uth version.
00138   DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
00139   CkAssert(state->getStage()==2);
00140   state->subtreeSetDirty(msg->getDirty());
00141   state->reported();
00142   if(state->allReported()) {
00143     if(CmiMyPe()==0) {
00144       if(state->isDirty()) {
00145         _bcastQD1(state, msg);   // dirty, restart again
00146       } else {             
00147           // quiescence detected, send callbacks
00148         DEBUGP(("[%d] quiescence detected,\n", CmiMyPe()));
00149         QdCallback* cb;
00150         while(NULL!=(cb=state->deq())) {
00151           cb->send();
00152           delete cb;
00153         }
00154         state->reset();
00155         state->setStage(0);
00156         CkFreeMsg(msg);
00157       }
00158     } else {
00159         // tell parent if the counters on the node is dirty or not
00160       DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), state->isDirty()));
00161       msg->setDirty(state->isDirty());
00162       envelope *env = UsrToEnv((void*)msg);
00163       CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
00164       state->reset();
00165       state->setStage(0);
00166     }
00167   } else
00168     CkFreeMsg(msg);
00169 }
00170 
00171 static void _callWhenIdle(QdMsg *msg)
00172 {
00173   DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
00174   QdState *state = CpvAccess(_qd);
00175   switch(msg->getPhase()) {
00176     case 0 : _handlePhase0(state, msg); break;
00177     case 1 : _handlePhase1(state, msg); break;
00178     case 2 : _handlePhase2(state, msg); break;
00179     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00180   }
00181 }
00182 
00183 #if CMK_DUMMY_QD
00184 static void _invokeQD(QdMsg *msg)
00185 {
00186   QdCallback *cb = new QdCallback(msg->getCb());
00187   cb->send();
00188   delete cb;
00189 }
00190 #endif
00191 
00192 void _qdHandler(envelope *env)
00193 {
00194   register QdMsg *msg = (QdMsg*) EnvToUsr(env);
00195   DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
00196 #if CMK_DUMMY_QD
00197   CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, CMK_DUMMY_QD*1000); // in ms
00198 #else
00199   CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
00200 #endif
00201 }
00202 
00203 
00204 void CkStartQD(const CkCallback& cb)
00205 {
00206   register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
00207   msg->setPhase(0);
00208   msg->setCb(cb);
00209   register envelope *env = UsrToEnv((void *)msg);
00210   CmiSetHandler(env, _qdHandlerIdx);
00211 #if CMK_BLUEGENE_CHARM
00212   CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
00213 #else
00214   CldEnqueue(0, env, _infoIdx);
00215 #endif
00216 }
00217 
00218 extern "C"
00219 void CkStartQD(int eIdx, const CkChareID *cid)
00220 {
00221   CkStartQD(CkCallback(eIdx, *cid));
00222 }

Generated on Sun Jun 29 13:29:09 2008 for Charm++ by  doxygen 1.5.1