00001
00015 #include "conv-config.h"
00016
00017 #if CMI_QD
00018
00019 #include "converse.h"
00020 #include "quiescence.h"
00021 #include <stdio.h>
00022 #include <stdlib.h>
00023 #ifndef DEBUGF
00024 #define DEBUGF(x)
00025 #endif
00026
00027 CpvDeclare(CQdState, cQdState);
00028 unsigned int _CQdHandlerIdx;
00029 unsigned int _CQdAnnounceHandlerIdx;
00030
00031
00032 int CQdMsgGetPhase(CQdMsg msg)
00033 { return msg->phase; }
00034
00035 void CQdMsgSetPhase(CQdMsg msg, int p)
00036 { msg->phase = p; }
00037
00038 CmiInt8 CQdMsgGetCreated(CQdMsg msg)
00039 { CmiAssert(msg->phase==1); return msg->u.p1.created; }
00040
00041 void CQdMsgSetCreated(CQdMsg msg, CmiInt8 c)
00042 { CmiAssert(msg->phase==1); msg->u.p1.created = c; }
00043
00044 CmiInt8 CQdMsgGetProcessed(CQdMsg msg)
00045 { CmiAssert(msg->phase==1); return msg->u.p1.processed; }
00046
00047 void CQdMsgSetProcessed(CQdMsg msg, CmiInt8 p)
00048 { CmiAssert(msg->phase==1); msg->u.p1.processed = p; }
00049
00050 char CQdMsgGetDirty(CQdMsg msg)
00051 { CmiAssert(msg->phase==2); return msg->u.p2.dirty; }
00052
00053 void CQdMsgSetDirty(CQdMsg msg, char d)
00054 { CmiAssert(msg->phase==2); msg->u.p2.dirty = d; }
00055
00056
00057 CmiInt8 CQdGetCreated(CQdState state)
00058 { return state->mCreated; }
00059
00060 void CQdCreate(CQdState state, CmiInt8 n)
00061 { state->mCreated += n; }
00062
00063 CmiInt8 CQdGetProcessed(CQdState state)
00064 { return state->mProcessed; }
00065
00066 void CQdProcess(CQdState state, CmiInt8 n)
00067 { state->mProcessed += n; }
00068
00069 void CQdPropagate(CQdState state, CQdMsg msg)
00070 {
00071 int i;
00072 CmiSetHandler(msg, _CQdHandlerIdx);
00073 for(i=0; i<state->nChildren; i++) {
00074 CQdCreate(state, -1);
00075 CmiSyncSend(state->children[i], sizeof(struct ConvQdMsg), (char *)msg);
00076 }
00077 }
00078
00079 int CQdGetParent(CQdState state)
00080 { return state->parent; }
00081
00082 CmiInt8 CQdGetCCreated(CQdState state)
00083 { return state->cCreated; }
00084
00085 CmiInt8 CQdGetCProcessed(CQdState state)
00086 { return state->cProcessed; }
00087
00088 void CQdSubtreeCreate(CQdState state, CmiInt8 c)
00089 { state->cCreated += c; }
00090
00091 void CQdSubtreeProcess(CQdState state, CmiInt8 p)
00092 { state->cProcessed += p; }
00093
00094 int CQdGetStage(CQdState state)
00095 { return state->stage; }
00096
00097 void CQdSetStage(CQdState state, int p)
00098 { state->stage = p; }
00099
00100 void CQdReported(CQdState state)
00101 { state->nReported++; }
00102
00103 int CQdAllReported(CQdState state)
00104 { return state->nReported==(state->nChildren+1);}
00105
00106 void CQdReset(CQdState state)
00107 {
00108 state->nReported=0; state->cCreated=0;
00109 state->cProcessed=0; state->cDirty=0;
00110 }
00111
00112 void CQdMarkProcessed(CQdState state)
00113 { state->oProcessed = state->mProcessed; }
00114
00115 char CQdIsDirty(CQdState state)
00116 { return ((state->mProcessed > state->oProcessed) || state->cDirty); }
00117
00118 void CQdSubtreeSetDirty(CQdState state, char d)
00119 { state->cDirty = state->cDirty || d; }
00120
00121 CQdState CQdStateCreate(void)
00122 {
00123 CQdState state = (CQdState) malloc(sizeof(struct ConvQdState));
00124 _MEMCHECK(state);
00125 state->mCreated = 0;
00126 state->mProcessed = 0;
00127 state->stage = 0;
00128 state->nReported = 0;
00129 state->oProcessed = 0;
00130 state->cCreated = 0;
00131 state->cProcessed = 0;
00132 state->cDirty = 0;
00133 state->nChildren = CmiNumSpanTreeChildren(CmiMyPe());
00134 state->parent = CmiSpanTreeParent(CmiMyPe());
00135
00136 if (state->nChildren) {
00137 state->children = (int *) malloc(state->nChildren*sizeof(int));
00138 _MEMCHECK(state->children);
00139 }
00140 else
00141 state->children = NULL;
00142 CmiSpanTreeChildren(CmiMyPe(), state->children);
00143
00144 return state;
00145 }
00146
00147
00148 static void CQdBcastQD1(CQdState state, CQdMsg msg)
00149 {
00150 CQdMsgSetPhase(msg, 0);
00151 CQdPropagate(state, msg);
00152 CQdMsgSetPhase(msg, 1);
00153 CQdMsgSetCreated(msg, CQdGetCreated(state));
00154 CQdMsgSetProcessed(msg, CQdGetProcessed(state));
00155 CQdCreate(state, -1);
00156 CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
00157 CQdMarkProcessed(state);
00158 CQdReset(state);
00159 CQdSetStage(state, 1);
00160 }
00161
00162
00163 static void CQdBcastQD2(CQdState state, CQdMsg msg)
00164 {
00165 CQdMsgSetPhase(msg, 1);
00166 CQdPropagate(state, msg);
00167 CQdMsgSetPhase(msg, 2);
00168 CQdMsgSetDirty(msg, CQdIsDirty(state));
00169 CQdCreate(state, -1);
00170 CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
00171 CQdReset(state);
00172 CQdSetStage(state, 2);
00173 }
00174
00175
00176 static void CQdHandlePhase0(CQdState state, CQdMsg msg)
00177 {
00178 CmiAssert(CmiMyPe()==0 || CQdGetStage(state)==0);
00179 if(CQdGetStage(state)==0)
00180 CQdBcastQD1(state, msg);
00181 else
00182 CmiFree(msg);
00183 }
00184
00185
00186 static void CQdHandlePhase1(CQdState state, CQdMsg msg)
00187 {
00188 switch(CQdGetStage(state)) {
00189 case 0 :
00190 CmiAssert(CmiMyPe()!=0);
00191 CQdBcastQD2(state, msg);
00192 break;
00193 case 1 :
00194 CQdSubtreeCreate(state, CQdMsgGetCreated(msg));
00195 CQdSubtreeProcess(state, CQdMsgGetProcessed(msg));
00196 CQdReported(state);
00197
00198 if(CQdAllReported(state)) {
00199 if(CmiMyPe()==0) {
00200 if(CQdGetCCreated(state) == CQdGetCProcessed(state))
00201 CQdBcastQD2(state, msg);
00202 else
00203 CQdBcastQD1(state, msg);
00204 }
00205 else {
00206 CQdMsgSetCreated(msg, CQdGetCCreated(state));
00207 CQdMsgSetProcessed(msg, CQdGetCProcessed(state));
00208 CQdCreate(state, -1);
00209 CmiSyncSendAndFree(CQdGetParent(state),
00210 sizeof(struct ConvQdMsg), (char *) msg);
00211 DEBUGF(("PE = %d, My parent = %d\n", CmiMyPe(), CQdGetParent(state)));
00212 CQdReset(state);
00213 CQdSetStage(state, 0);
00214 }
00215 }
00216 else
00217 CmiFree(msg);
00218 break;
00219 default:
00220 CmiAbort("Internal QD Error. Contact Developers.!\n");
00221 }
00222 }
00223
00224
00225 static void CQdHandlePhase2(CQdState state, CQdMsg msg)
00226 {
00227 CmiAssert(CQdGetStage(state)==2);
00228 CQdSubtreeSetDirty(state, CQdMsgGetDirty(msg));
00229 CQdReported(state);
00230 if(CQdAllReported(state)) {
00231 if(CmiMyPe()==0) {
00232 if(CQdIsDirty(state))
00233 CQdBcastQD1(state, msg);
00234 else {
00235 CmiSetHandler(msg, _CQdAnnounceHandlerIdx);
00236 CQdCreate(state, 0-CmiNumPes());
00237 CmiSyncBroadcastAllAndFree(sizeof(struct ConvQdMsg), (char *) msg);
00238 CQdReset(state);
00239 CQdSetStage(state, 0);
00240 }
00241 }
00242 else {
00243 CQdMsgSetDirty(msg, CQdIsDirty(state));
00244 CQdCreate(state, -1);
00245 CmiSyncSendAndFree(CQdGetParent(state),
00246 sizeof(struct ConvQdMsg), (char *) msg);
00247 CQdReset(state);
00248 CQdSetStage(state, 0);
00249 }
00250 }
00251 else
00252 CmiFree(msg);
00253 }
00254
00255
00256 static void CQdCallWhenIdle(CQdMsg msg)
00257 {
00258 CQdState state = CpvAccess(cQdState);
00259
00260 switch(CQdMsgGetPhase(msg)) {
00261 case 0 : CQdHandlePhase0(state, msg); break;
00262 case 1 : CQdHandlePhase1(state, msg); break;
00263 case 2 : CQdHandlePhase2(state, msg); break;
00264 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
00265 }
00266 }
00267
00268
00269 void CQdHandler(CQdMsg msg)
00270 {
00271 CQdProcess(CpvAccess(cQdState), -1);
00272 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE,
00273 (CcdVoidFn)CQdCallWhenIdle, (void*) msg);
00274 }
00275
00276
00277 void CQdRegisterCallback(CQdVoidFn fn, void *arg)
00278 {
00279 CcdCallOnCondition(CcdQUIESCENCE, fn, arg);
00280 }
00281
00282 void CQdAnnounceHandler(CQdMsg msg)
00283 {
00284 CQdProcess(CpvAccess(cQdState), -1);
00285 CcdRaiseCondition(CcdQUIESCENCE);
00286 }
00287
00288 void CQdCpvInit(void) {
00289 CpvInitialize(CQdState, cQdState);
00290 CpvAccess(cQdState) = CQdStateCreate();
00291 }
00292
00293 void CQdInit(void)
00294 {
00295 CQdCpvInit();
00296 _CQdHandlerIdx = CmiRegisterHandler((CmiHandler)CQdHandler);
00297 _CQdAnnounceHandlerIdx =
00298 CmiRegisterHandler((CmiHandler)CQdAnnounceHandler);
00299 }
00300
00301 void CmiStartQD(CQdVoidFn fn, void *arg)
00302 {
00303 CQdMsg msg = (CQdMsg) CmiAlloc(sizeof(struct ConvQdMsg));
00304 CQdRegisterCallback(fn, arg);
00305 CQdMsgSetPhase(msg, 0);
00306 CmiSetHandler(msg, _CQdHandlerIdx);
00307 CQdCreate(CpvAccess(cQdState), -1);
00308 CmiSyncSendAndFree(0, sizeof(struct ConvQdMsg), (char *)msg);
00309 }
00310
00311 #endif
00312