00001 #include "converse.h"
00002 #include "queueing.h"
00003 #include "cldb.h"
00004 #include <time.h>
00005 #include <stdlib.h>
00006 #include <math.h>
00007
00008 void LoadNotifyFn(int l)
00009 {
00010 }
00011
00012 const char *CldGetStrategy(void)
00013 {
00014 return "spray";
00015 }
00016
00017 #define CYCLE_MILLISECONDS 500
00018 #define DEBUGGING_OUTPUT 0
00019
00020 typedef struct
00021 {
00022 int mype;
00023 int EnqueueHandler;
00024 int ReduceHandler;
00025 int AverageHandler;
00026 int HopHandler;
00027 double load_reported;
00028 double load_total;
00029 int load_count;
00030 int spantree_parent;
00031 int spantree_children;
00032 int spantree_root;
00033 int rebalance;
00034 }
00035 peinfo;
00036
00037 CpvStaticDeclare(peinfo, peinf);
00038
00039 struct loadmsg {
00040 char core[CmiMsgHeaderSizeBytes];
00041 double load_total;
00042 };
00043
00044 struct reqmsg {
00045 char core[CmiMsgHeaderSizeBytes];
00046 };
00047
00048 void CldPropagateLoad(double load);
00049
00050 int CldEstimate(void)
00051 {
00052 return CldLoad();
00053 }
00054
00055 void CldInitiateReduction(void)
00056 {
00057 double load = CldEstimate();
00058 peinfo *pinf = &(CpvAccess(peinf));
00059 pinf->load_reported = load;
00060 CldPropagateLoad(load);
00061 }
00062
00063 void CldPropagateLoad(double load)
00064 {
00065 struct loadmsg msg;
00066 peinfo *pinf = &(CpvAccess(peinf));
00067 pinf->load_total += load;
00068 pinf->load_count ++;
00069 if (pinf->load_count == pinf->spantree_children + 1) {
00070 msg.load_total = pinf->load_total;
00071 if (pinf->mype == pinf->spantree_root) {
00072 if (DEBUGGING_OUTPUT) CmiPrintf("---\n");
00073 CmiSetHandler(&msg, pinf->AverageHandler);
00074 CmiSyncBroadcastAll(sizeof(msg), &msg);
00075 } else {
00076 CmiSetHandler(&msg, pinf->ReduceHandler);
00077 CmiSyncSend(pinf->spantree_parent, sizeof(msg), &msg);
00078 }
00079 pinf->load_total = 0;
00080 pinf->load_count = 0;
00081 }
00082 }
00083
00084 void CldReduceHandler(struct loadmsg *msg)
00085 {
00086 CldPropagateLoad(msg->load_total);
00087 CmiFree(msg);
00088 }
00089
00090 void CldAverageHandler(struct loadmsg *msg)
00091 {
00092 peinfo *pinf = &(CpvAccess(peinf));
00093 double load = CldEstimate();
00094 double average = (msg->load_total / CmiNumPes());
00095 int rebalance;
00096 if (load < (average+10) * 1.2) rebalance=0;
00097 else rebalance = (int)(load - average);
00098 if (DEBUGGING_OUTPUT)
00099 CmiPrintf("PE %d load=%6d average=%6d rebalance=%d\n",
00100 CmiMyPe(), CldEstimate(), (int)average, rebalance);
00101 pinf->rebalance = rebalance;
00102 CmiFree(msg);
00103 CcdCallFnAfter((CcdVoidFn)CldInitiateReduction, 0, CYCLE_MILLISECONDS);
00104 }
00105
00106 void CldEnqueueHandler(char *msg)
00107 {
00108 int len, queueing, priobits; unsigned int *prioptr;
00109 CldInfoFn ifn; CldPackFn pfn;
00110 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00111 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00112 CmiSetHandler(msg, CmiGetXHandler(msg));
00113 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00114 }
00115
00116 void CldHopHandler(char *msg)
00117 {
00118 peinfo *pinf = &(CpvAccess(peinf));
00119 int len, queueing, priobits; unsigned int *prioptr;
00120 CldInfoFn ifn; CldPackFn pfn; int pe;
00121
00122 if (pinf->rebalance) {
00123
00124 do pe = ((CrnRand()&0x7FFFFFFF)%CmiNumPes());
00125 while (pe == pinf->mype);
00126 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00127 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00128 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00129 pfn(&msg);
00130 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00131 }
00132 CmiSyncSendAndFree(pe, len, msg);
00133 pinf->rebalance--;
00134 } else {
00135 CmiSetHandler(msg, CmiGetXHandler(msg));
00136 CmiHandleMessage(msg);
00137 }
00138 }
00139
00140 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00141 {
00142 int npes, *pes;
00143 int len, queueing, priobits,i; unsigned int *prioptr;
00144 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00145 peinfo *pinf = &(CpvAccess(peinf));
00146 CldPackFn pfn;
00147 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00148 if (pfn) {
00149 pfn(&msg);
00150 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00151 }
00152 CmiSetInfo(msg,infofn);
00153 CmiSetXHandler(msg, CmiGetHandler(msg));
00154 CmiSetHandler(msg, pinf->EnqueueHandler);
00155 CmiLookupGroup(grp, &npes, &pes);
00156 for(i=0;i<npes;i++) {
00157 CmiSyncSend(pes[i], len, msg);
00158 }
00159 CmiFree(msg);
00160 }
00161
00162 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
00163 {
00164 int len, queueing, priobits,i; unsigned int *prioptr;
00165 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00166 peinfo *pinf = &(CpvAccess(peinf));
00167 CldPackFn pfn;
00168 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00169 if (pfn) {
00170 pfn(&msg);
00171 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00172 }
00173 CmiSetInfo(msg,infofn);
00174 CmiSetXHandler(msg, CmiGetHandler(msg));
00175 CmiSetHandler(msg, pinf->EnqueueHandler);
00176 for(i=0;i<npes;i++) {
00177 CmiSyncSend(pes[i], len, msg);
00178 }
00179 CmiFree(msg);
00180 }
00181
00182 void CldEnqueue(int pe, void *msg, int infofn)
00183 {
00184 int len, queueing, priobits; unsigned int *prioptr;
00185 CldInfoFn ifn; CldPackFn pfn;
00186 peinfo *pinf = &(CpvAccess(peinf));
00187 ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00188 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00189 if (pe != CLD_ANYWHERE) {
00190 if (pfn && (CmiNodeOf(pe) != CmiMyNode())) {
00191 pfn(&msg);
00192 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00193 }
00194 CmiSetInfo(msg, infofn);
00195 CmiSetXHandler(msg, CmiGetHandler(msg));
00196 CmiSetHandler(msg, pinf->EnqueueHandler);
00197 if (pe==CLD_BROADCAST) CmiSyncBroadcastAndFree(len, msg);
00198 else if (pe==CLD_BROADCAST_ALL) CmiSyncBroadcastAllAndFree(len, msg);
00199 else CmiSyncSendAndFree(pe, len, msg);
00200 } else {
00201 CmiSetInfo(msg, infofn);
00202 CmiSetXHandler(msg, CmiGetHandler(msg));
00203 CmiSetHandler(msg, pinf->HopHandler);
00204 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00205 }
00206 }
00207
00208 void CldNodeEnqueue(int node, void *msg, int infofn)
00209 {
00210 int len, queueing, priobits; unsigned int *prioptr;
00211 CldInfoFn ifn; CldPackFn pfn;
00212 peinfo *pinf = &(CpvAccess(peinf));
00213 ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00214 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00215 if (node != CLD_ANYWHERE) {
00216 if (pfn && (node != CmiMyNode())) {
00217 pfn(&msg);
00218 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00219 }
00220 CmiSetInfo(msg, infofn);
00221 CmiSetXHandler(msg, CmiGetHandler(msg));
00222 CmiSetHandler(msg, pinf->EnqueueHandler);
00223 if (node==CLD_BROADCAST) CmiSyncNodeBroadcastAndFree(len, msg);
00224 else if (node==CLD_BROADCAST_ALL) CmiSyncNodeBroadcastAllAndFree(len, msg);
00225 else CmiSyncNodeSendAndFree(node, len, msg);
00226 } else {
00227 CmiSetInfo(msg, infofn);
00228 CmiSetXHandler(msg, CmiGetHandler(msg));
00229 CmiSetHandler(msg, pinf->HopHandler);
00230 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00231 }
00232 }
00233
00234 void CldModuleInit(char **argv)
00235 {
00236 peinfo *pinf;
00237 CpvInitialize(peinfo, peinf);
00238
00239 CrnSrand((int) (time(0)+CmiMyPe()));
00240 pinf = &CpvAccess(peinf);
00241 pinf->mype = CmiMyPe();
00242 pinf->EnqueueHandler = CmiRegisterHandler((CmiHandler)CldEnqueueHandler);
00243 pinf->ReduceHandler = CmiRegisterHandler((CmiHandler)CldReduceHandler);
00244 pinf->AverageHandler = CmiRegisterHandler((CmiHandler)CldAverageHandler);
00245 pinf->HopHandler = CmiRegisterHandler((CmiHandler)CldHopHandler);
00246 pinf->load_total = 0.0;
00247 pinf->load_count = 0;
00248 pinf->spantree_children = CmiNumSpanTreeChildren(CmiMyPe());
00249 pinf->spantree_parent = CmiSpanTreeParent(CmiMyPe());
00250 pinf->spantree_root = 0;
00251 pinf->rebalance = 0;
00252 CldModuleGeneralInit(argv);
00253 CldInitiateReduction();
00254 }
00255 void CldCallback(void)
00256 {}