00001 
00015 #include "conv-config.h"
00016 
00017 #if CMI_QD
00018 
00019 #include "converse.h"
00020 #include "quiescence.h"
00021 #include <stdio.h>
00022 #include <stdlib.h>
00023 #ifndef  DEBUGF
00024 #define  DEBUGF(x)  
00025 #endif
00026 
00027 CpvDeclare(CQdState, cQdState);
00028 unsigned int _CQdHandlerIdx;
00029 unsigned int _CQdAnnounceHandlerIdx;
00030 
00031 
00032 int  CQdMsgGetPhase(CQdMsg msg) 
00033 { return msg->phase; }
00034 
00035 void CQdMsgSetPhase(CQdMsg msg, int p) 
00036 { msg->phase = p; }
00037 
00038 CmiInt8 CQdMsgGetCreated(CQdMsg msg)
00039 { CmiAssert(msg->phase==1); return msg->u.p1.created; }
00040 
00041 void CQdMsgSetCreated(CQdMsg msg, CmiInt8 c)
00042 { CmiAssert(msg->phase==1); msg->u.p1.created = c; }
00043 
00044 CmiInt8 CQdMsgGetProcessed(CQdMsg msg)
00045 { CmiAssert(msg->phase==1); return msg->u.p1.processed; }
00046 
00047 void CQdMsgSetProcessed(CQdMsg msg, CmiInt8 p)
00048 { CmiAssert(msg->phase==1); msg->u.p1.processed = p; }
00049 
00050 char CQdMsgGetDirty(CQdMsg msg) 
00051 { CmiAssert(msg->phase==2); return msg->u.p2.dirty; }
00052 
00053 void CQdMsgSetDirty(CQdMsg msg, char d) 
00054 { CmiAssert(msg->phase==2); msg->u.p2.dirty = d; }
00055 
00056 
00057 CmiInt8 CQdGetCreated(CQdState state)
00058 { return state->mCreated; }
00059 
00060 void CQdCreate(CQdState state, CmiInt8 n)
00061 { state->mCreated += n; }
00062 
00063 CmiInt8 CQdGetProcessed(CQdState state)
00064 { return state->mProcessed; }
00065 
00066 void CQdProcess(CQdState state, CmiInt8 n)
00067 { state->mProcessed += n; }
00068 
00069 void CQdPropagate(CQdState state, CQdMsg msg) 
00070 {   
00071   int i;
00072   CmiSetHandler(msg, _CQdHandlerIdx);
00073   for(i=0; i<state->nChildren; i++) {
00074     CQdCreate(state, -1);
00075     CmiSyncSend(state->children[i], sizeof(struct ConvQdMsg), (char *)msg);
00076   }
00077 }
00078 
00079 int  CQdGetParent(CQdState state) 
00080 { return state->parent; }
00081     
00082 CmiInt8 CQdGetCCreated(CQdState state)
00083 { return state->cCreated; }
00084 
00085 CmiInt8 CQdGetCProcessed(CQdState state)
00086 { return state->cProcessed; }
00087 
00088 void CQdSubtreeCreate(CQdState state, CmiInt8 c)
00089 { state->cCreated += c; }
00090 
00091 void CQdSubtreeProcess(CQdState state, CmiInt8 p)
00092 { state->cProcessed += p; }
00093 
00094 int  CQdGetStage(CQdState state) 
00095 { return state->stage; }
00096 
00097 void CQdSetStage(CQdState state, int p) 
00098 { state->stage = p; }
00099 
00100 void CQdReported(CQdState state) 
00101 { state->nReported++; }
00102 
00103 int  CQdAllReported(CQdState state) 
00104 { return state->nReported==(state->nChildren+1);}
00105 
00106 void CQdReset(CQdState state) 
00107 { 
00108   state->nReported=0; state->cCreated=0; 
00109   state->cProcessed=0; state->cDirty=0; 
00110 }
00111 
00112 void CQdMarkProcessed(CQdState state) 
00113 { state->oProcessed = state->mProcessed; }
00114 
00115 char CQdIsDirty(CQdState state) 
00116 { return ((state->mProcessed > state->oProcessed) || state->cDirty); }
00117 
00118 void CQdSubtreeSetDirty(CQdState state, char d) 
00119 { state->cDirty = state->cDirty || d; }
00120 
00121 CQdState CQdStateCreate(void)
00122 {
00123   CQdState state = (CQdState) malloc(sizeof(struct ConvQdState));
00124   _MEMCHECK(state);
00125   state->mCreated = 0;
00126   state->mProcessed = 0;
00127   state->stage = 0;
00128   state->nReported = 0;
00129   state->oProcessed = 0;
00130   state->cCreated = 0;
00131   state->cProcessed = 0;
00132   state->cDirty = 0;
00133   state->nChildren = CmiNumSpanTreeChildren(CmiMyPe());
00134   state->parent = CmiSpanTreeParent(CmiMyPe());
00135   
00136   if (state->nChildren) {
00137     state->children = (int *) malloc(state->nChildren*sizeof(int));
00138     _MEMCHECK(state->children);
00139   }
00140   else 
00141     state->children = NULL;
00142   CmiSpanTreeChildren(CmiMyPe(), state->children);
00143 
00144   return state;
00145 }
00146 
00147 
00148 static void CQdBcastQD1(CQdState state, CQdMsg msg)
00149 {  
00150   CQdMsgSetPhase(msg, 0); 
00151   CQdPropagate(state, msg); 
00152   CQdMsgSetPhase(msg, 1); 
00153   CQdMsgSetCreated(msg, CQdGetCreated(state)); 
00154   CQdMsgSetProcessed(msg, CQdGetProcessed(state)); 
00155   CQdCreate(state, -1);
00156   CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
00157   CQdMarkProcessed(state); 
00158   CQdReset(state); 
00159   CQdSetStage(state, 1); 
00160 }
00161 
00162 
00163 static void CQdBcastQD2(CQdState state, CQdMsg msg)
00164 {
00165   CQdMsgSetPhase(msg, 1); 
00166   CQdPropagate(state, msg); 
00167   CQdMsgSetPhase(msg, 2); 
00168   CQdMsgSetDirty(msg, CQdIsDirty(state)); 
00169   CQdCreate(state, -1);
00170   CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
00171   CQdReset(state); 
00172   CQdSetStage(state, 2); 
00173 }
00174 
00175 
00176 static void CQdHandlePhase0(CQdState state, CQdMsg msg)
00177 {
00178   CmiAssert(CmiMyPe()==0 || CQdGetStage(state)==0);
00179   if(CQdGetStage(state)==0)
00180     CQdBcastQD1(state, msg);
00181   else
00182     CmiFree(msg);
00183 }
00184 
00185 
00186 static void CQdHandlePhase1(CQdState state, CQdMsg msg)
00187 {
00188   switch(CQdGetStage(state)) {      
00189   case 0 :
00190     CmiAssert(CmiMyPe()!=0);
00191     CQdBcastQD2(state, msg);
00192     break;
00193   case 1 :
00194     CQdSubtreeCreate(state, CQdMsgGetCreated(msg)); 
00195     CQdSubtreeProcess(state, CQdMsgGetProcessed(msg)); 
00196     CQdReported(state); 
00197     
00198     if(CQdAllReported(state)) {
00199       if(CmiMyPe()==0) {
00200     if(CQdGetCCreated(state) == CQdGetCProcessed(state)) 
00201       CQdBcastQD2(state, msg); 
00202     else 
00203       CQdBcastQD1(state, msg);
00204       } 
00205       else {
00206     CQdMsgSetCreated(msg, CQdGetCCreated(state)); 
00207     CQdMsgSetProcessed(msg, CQdGetCProcessed(state)); 
00208     CQdCreate(state, -1);
00209     CmiSyncSendAndFree(CQdGetParent(state), 
00210                sizeof(struct ConvQdMsg), (char *) msg);
00211     DEBUGF(("PE = %d, My parent = %d\n", CmiMyPe(), CQdGetParent(state)));
00212     CQdReset(state); 
00213     CQdSetStage(state, 0); 
00214       }
00215     } 
00216     else
00217       CmiFree(msg);
00218     break;
00219   default: 
00220     CmiAbort("Internal QD Error. Contact Developers.!\n");
00221   }
00222 }
00223 
00224 
00225 static void CQdHandlePhase2(CQdState state, CQdMsg msg)
00226 {
00227   CmiAssert(CQdGetStage(state)==2);
00228   CQdSubtreeSetDirty(state, CQdMsgGetDirty(msg));    
00229   CQdReported(state);
00230   if(CQdAllReported(state)) { 
00231     if(CmiMyPe()==0) {
00232       if(CQdIsDirty(state)) 
00233     CQdBcastQD1(state, msg);
00234       else {
00235     CmiSetHandler(msg, _CQdAnnounceHandlerIdx);
00236     CQdCreate(state, 0-CmiNumPes());
00237     CmiSyncBroadcastAllAndFree(sizeof(struct ConvQdMsg), (char *) msg);
00238     CQdReset(state); 
00239     CQdSetStage(state, 0); 
00240       }
00241     } 
00242     else {
00243       CQdMsgSetDirty(msg, CQdIsDirty(state)); 
00244       CQdCreate(state, -1);
00245       CmiSyncSendAndFree(CQdGetParent(state), 
00246              sizeof(struct ConvQdMsg), (char *) msg);
00247       CQdReset(state); 
00248       CQdSetStage(state, 0); 
00249     }
00250   } 
00251   else
00252     CmiFree(msg);
00253 }
00254 
00255 
00256 static void CQdCallWhenIdle(CQdMsg msg)
00257 {
00258   CQdState state = CpvAccess(cQdState);
00259   
00260   switch(CQdMsgGetPhase(msg)) {
00261   case 0 : CQdHandlePhase0(state, msg); break;
00262   case 1 : CQdHandlePhase1(state, msg); break;
00263   case 2 : CQdHandlePhase2(state, msg); break;
00264   default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00265   }
00266 }
00267 
00268 
00269 void CQdHandler(CQdMsg msg)
00270 {
00271   CQdProcess(CpvAccess(cQdState), -1);
00272   CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, 
00273              (CcdVoidFn)CQdCallWhenIdle, (void*) msg);  
00274 }
00275 
00276 
00277 void CQdRegisterCallback(CQdVoidFn fn, void *arg)
00278 {
00279   CcdCallOnCondition(CcdQUIESCENCE, fn, arg);
00280 }
00281 
00282 void CQdAnnounceHandler(CQdMsg msg)
00283 {
00284   CQdProcess(CpvAccess(cQdState), -1);
00285   CcdRaiseCondition(CcdQUIESCENCE);
00286 }
00287 
00288 void CQdCpvInit(void) {
00289   CpvInitialize(CQdState, cQdState);
00290   CpvAccess(cQdState) = CQdStateCreate();
00291 }
00292 
00293 void CQdInit(void)
00294 {
00295   CQdCpvInit();
00296   _CQdHandlerIdx = CmiRegisterHandler((CmiHandler)CQdHandler);
00297   _CQdAnnounceHandlerIdx = 
00298     CmiRegisterHandler((CmiHandler)CQdAnnounceHandler);
00299 }
00300 
00301 void CmiStartQD(CQdVoidFn fn, void *arg)
00302 {
00303   CQdMsg msg = (CQdMsg) CmiAlloc(sizeof(struct ConvQdMsg));
00304   CQdRegisterCallback(fn, arg);
00305   CQdMsgSetPhase(msg, 0);  
00306   CmiSetHandler(msg, _CQdHandlerIdx);
00307   CQdCreate(CpvAccess(cQdState), -1);
00308   CmiSyncSendAndFree(0, sizeof(struct ConvQdMsg), (char *)msg);
00309 }
00310 
00311 #endif
00312