00001 #include <stdlib.h>
00002
00003 #include "converse.h"
00004 #include "cldb.workstealing.h"
00005 #include "queueing.h"
00006 #include "cldb.h"
00007
00008 #define TRACE_USEREVENTS 1
00009 #define LOADTHRESH 3
00010
00011 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed);
00012
00013 typedef struct CldProcInfo_s {
00014 int askEvt;
00015 int askNoEvt;
00016 int idleEvt;
00017 } *CldProcInfo;
00018
00019 static int WS_Threshold = -1;
00020 static int _steal_prio = 0;
00021 static int _stealonly1 = 0;
00022 static int _steal_immediate = 0;
00023 static int workstealingproactive = 0;
00024
00025 CpvStaticDeclare(CldProcInfo, CldData);
00026 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
00027 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
00028 CpvStaticDeclare(int, isStealing);
00029
00030
00031 const char *CldGetStrategy(void)
00032 {
00033 return "work stealing";
00034 }
00035
00036
00037 static void StealLoad(void)
00038 {
00039 int i;
00040 double now;
00041 requestmsg msg;
00042 int victim;
00043 int mype;
00044 int numpes;
00045
00046 if (CpvAccess(isStealing)) return;
00047
00048 CpvAccess(isStealing) = 1;
00049
00050 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00051 now = CmiWallTimer();
00052 #endif
00053
00054 mype = CmiMyPe();
00055 msg.from_pe = mype;
00056 numpes = CmiNumPes();
00057 do{
00058 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
00059 }while(victim == mype);
00060
00061 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
00062 #if CMK_IMMEDIATE_MSG
00063
00064 if (_steal_immediate) CmiBecomeImmediate(&msg);
00065 #endif
00066
00067 msg.to_pe = victim;
00068 CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
00069
00070 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00071 traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
00072 #endif
00073 }
00074
00075 void LoadNotifyFn(int l)
00076 {
00077 if(CldCountTokens() <= WS_Threshold)
00078 StealLoad();
00079 }
00080
00081
00082 static void CldBeginIdle(void *dummy)
00083 {
00084
00085 CmiAssert(CldCountTokens()==0);
00086 StealLoad();
00087 }
00088
00089
00090
00091 static void CldAskLoadHandler(requestmsg *msg)
00092 {
00093 int receiver, rank, recvIdx, i;
00094 int myload, sendLoad;
00095 double now;
00096
00097
00098 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00099 now = CmiWallTimer();
00100 #endif
00101
00102
00103 CmiAssert(msg->to_pe!=-1);
00104 rank = CmiRankOf(msg->to_pe);
00105 CmiAssert(rank!=-1);
00106 myload = CldCountTokensRank(rank);
00107
00108 receiver = msg->from_pe;
00109
00110 if (myload>LOADTHRESH) {
00111 if(_stealonly1) sendLoad = 1;
00112 else sendLoad = myload/2;
00113 if(sendLoad > 0) {
00114 #if ! CMK_USE_IBVERBS
00115 if (_steal_prio)
00116 CldMultipleSendPrio(receiver, sendLoad, rank, 0);
00117 else
00118 CldMultipleSend(receiver, sendLoad, rank, 0);
00119 #else
00120 CldSimpleMultipleSend(receiver, sendLoad, rank);
00121 #endif
00122 }
00123 CmiFree(msg);
00124 }else
00125 {
00126 int pe = msg->to_pe;
00127 msg->to_pe = msg->from_pe;
00128 msg->from_pe = pe;
00129
00130
00131 CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
00132 CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
00133 }
00134
00135 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00136 traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
00137 #endif
00138 }
00139
00140 void CldAckNoTaskHandler(requestmsg *msg)
00141 {
00142 double now;
00143 int victim;
00144
00145 int mype = CmiMyPe();
00146 int numpes = CmiNumPes();
00147
00148 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00149 now = CmiWallTimer();
00150 #endif
00151
00152 do{
00153
00154 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
00155 } while(victim == mype);
00156
00157
00158 #if CMK_IMMEDIATE_MSG
00159
00160 if (_steal_immediate) CmiBecomeImmediate(msg);
00161 #endif
00162
00163 msg->to_pe = victim;
00164 msg->from_pe = mype;
00165 CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
00166 CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
00167
00168 CpvAccess(isStealing) = 1;
00169
00170 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00171 traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
00172 #endif
00173 }
00174
00175 void CldHandler(void *msg)
00176 {
00177 CldInfoFn ifn; CldPackFn pfn;
00178 int len, queueing, priobits; unsigned int *prioptr;
00179
00180 CldRestoreHandler((char *)msg);
00181 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00182 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00183 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00184
00185 }
00186
00187 #define CldPUTTOKEN(msg) \
00188 if (_steal_prio) \
00189 CldPutTokenPrio(msg); \
00190 else \
00191 CldPutToken(msg);
00192
00193 void CldBalanceHandler(void *msg)
00194 {
00195 CldRestoreHandler((char *)msg);
00196 CldPUTTOKEN((char *)msg);
00197 CpvAccess(isStealing) = 0;
00198 }
00199
00200 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00201 {
00202 int len, queueing, priobits,i; unsigned int *prioptr;
00203 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00204 CldPackFn pfn;
00205 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00206 if (pfn) {
00207 pfn(&msg);
00208 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00209 }
00210 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00211 CmiSetInfo(msg,infofn);
00212
00213 CmiSyncMulticastAndFree(grp, len, msg);
00214 }
00215
00216 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
00217 {
00218 int len, queueing, priobits,i; unsigned int *prioptr;
00219 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00220 CldPackFn pfn;
00221 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00222 if (pfn) {
00223 pfn(&msg);
00224 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00225 }
00226 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00227 CmiSetInfo(msg,infofn);
00228 CmiSyncListSendAndFree(npes, pes, len, msg);
00229 }
00230
00231 void CldEnqueue(int pe, void *msg, int infofn)
00232 {
00233 int len, queueing, priobits, avg; unsigned int *prioptr;
00234 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00235 CldPackFn pfn;
00236
00237 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00238 pe = CmiMyPe();
00239
00240
00241 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00242 if (pfn && CmiNumNodes()>1) {
00243 pfn(&msg);
00244 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00245 }
00246 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00247 CmiSetInfo(msg,infofn);
00248 CldPUTTOKEN((char *)msg);
00249 }
00250 else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
00251 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00252 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00253 }
00254 else {
00255 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00256 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00257 pfn(&msg);
00258 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00259 }
00260 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00261 CmiSetInfo(msg,infofn);
00262 if (pe==CLD_BROADCAST)
00263 CmiSyncBroadcastAndFree(len, msg);
00264 else if (pe==CLD_BROADCAST_ALL)
00265 CmiSyncBroadcastAllAndFree(len, msg);
00266 else CmiSyncSendAndFree(pe, len, msg);
00267 }
00268 }
00269
00270 void CldNodeEnqueue(int node, void *msg, int infofn)
00271 {
00272 int len, queueing, priobits, pe, avg; unsigned int *prioptr;
00273 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00274 CldPackFn pfn;
00275 if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00276 pe = CmiMyPe();
00277 node = CmiNodeOf(pe);
00278 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00279 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00280 }
00281 else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
00282 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00283 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00284 }
00285 else {
00286 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00287 if (pfn) {
00288 pfn(&msg);
00289 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00290 }
00291 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00292 CmiSetInfo(msg,infofn);
00293 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
00294 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
00295 else CmiSyncNodeSendAndFree(node, len, msg);
00296 }
00297 }
00298
00299
00300 void CldGraphModuleInit(char **argv)
00301 {
00302 CpvInitialize(CldProcInfo, CldData);
00303 CpvInitialize(int, CldAskLoadHandlerIndex);
00304 CpvInitialize(int, CldAckNoTaskHandlerIndex);
00305 CpvInitialize(int, CldBalanceHandlerIndex);
00306
00307 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00308 #if CMK_TRACE_ENABLED
00309 CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
00310 CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
00311 CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
00312 #endif
00313
00314 CpvAccess(CldBalanceHandlerIndex) =
00315 CmiRegisterHandler(CldBalanceHandler);
00316 CpvAccess(CldAskLoadHandlerIndex) =
00317 CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
00318
00319 CpvAccess(CldAckNoTaskHandlerIndex) =
00320 CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
00321
00322
00323 if (CmiMyRank() == CmiMyNodeSize()) return;
00324
00325 _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
00326
00327 if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
00328 {
00329 CmiAssert(WS_Threshold>=0);
00330 }
00331
00332 _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
00333
00334 _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
00335
00336
00337 if(CmiNumPes() > 1)
00338 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
00339 (CcdVoidFn) CldBeginIdle, NULL);
00340 if(WS_Threshold >= 0 && CmiMyPe() == 0)
00341 CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
00342 #if CMK_IMMEDIATE_MSG
00343 if(_steal_immediate && CmiMyPe() == 0)
00344 CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
00345 #endif
00346 }
00347
00348
00349 void CldModuleInit(char **argv)
00350 {
00351 CpvInitialize(int, CldHandlerIndex);
00352 CpvInitialize(int, CldRelocatedMessages);
00353 CpvInitialize(int, CldLoadBalanceMessages);
00354 CpvInitialize(int, CldMessageChunks);
00355 CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
00356 CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) =
00357 CpvAccess(CldMessageChunks) = 0;
00358
00359 CldModuleGeneralInit(argv);
00360 CldGraphModuleInit(argv);
00361
00362 CpvAccess(CldLoadNotify) = 1;
00363
00364 CpvInitialize(int, isStealing);
00365 CpvAccess(isStealing) = 0;
00366 }
00367
00368 void CldCallback(void)
00369 {}