00001 #include <stdlib.h>
00002
00003 #include "converse.h"
00004 #include "cldb.workstealing.h"
00005 #include "queueing.h"
00006 #include "cldb.h"
00007
00008 #define TRACE_USEREVENTS 1
00009 #define LOADTHRESH 3
00010
00011 typedef struct CldProcInfo_s {
00012 int askEvt;
00013 int askNoEvt;
00014 int idleEvt;
00015 } *CldProcInfo;
00016
00017 static int WS_Threshold = -1;
00018 static int _steal_prio = 0;
00019 static int _stealonly1 = 0;
00020 static int _steal_immediate = 0;
00021 static int workstealingproactive = 0;
00022
00023 CpvStaticDeclare(CldProcInfo, CldData);
00024 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
00025 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
00026 CpvStaticDeclare(int, isStealing);
00027
00028
00029 char *CldGetStrategy(void)
00030 {
00031 return "work stealing";
00032 }
00033
00034
00035 static void StealLoad()
00036 {
00037 int i;
00038 double now;
00039 requestmsg msg;
00040 int victim;
00041 int mype;
00042 int numpes;
00043
00044 if (CpvAccess(isStealing)) return;
00045
00046 CpvAccess(isStealing) = 1;
00047
00048 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00049 now = CmiWallTimer();
00050 #endif
00051
00052 mype = CmiMyPe();
00053 msg.from_pe = mype;
00054 numpes = CmiNumPes();
00055 do{
00056 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
00057 }while(victim == mype);
00058
00059 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
00060 #if CMK_IMMEDIATE_MSG
00061
00062 if (_steal_immediate) CmiBecomeImmediate(&msg);
00063 #endif
00064
00065 msg.to_pe = victim;
00066 CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
00067
00068 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00069 traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
00070 #endif
00071 }
00072
00073 void LoadNotifyFn(int l)
00074 {
00075 if(CldCountTokens() <= WS_Threshold)
00076 StealLoad();
00077 }
00078
00079
00080 static void CldBeginIdle(void *dummy)
00081 {
00082
00083 CmiAssert(CldCountTokens()==0);
00084 StealLoad();
00085 }
00086
00087
00088
00089 static void CldAskLoadHandler(requestmsg *msg)
00090 {
00091 int receiver, rank, recvIdx, i;
00092 int myload, sendLoad;
00093 double now;
00094
00095
00096 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00097 now = CmiWallTimer();
00098 #endif
00099
00100
00101 CmiAssert(msg->to_pe!=-1);
00102 rank = CmiRankOf(msg->to_pe);
00103 CmiAssert(rank!=-1);
00104 myload = CldCountTokensRank(rank);
00105
00106 receiver = msg->from_pe;
00107
00108 if (myload>LOADTHRESH) {
00109 if(_stealonly1) sendLoad = 1;
00110 else sendLoad = myload/2;
00111 if(sendLoad > 0) {
00112 #if ! CMK_USE_IBVERBS
00113 if (_steal_prio)
00114 CldMultipleSendPrio(receiver, sendLoad, rank, 0);
00115 else
00116 CldMultipleSend(receiver, sendLoad, rank, 0);
00117 #else
00118 CldSimpleMultipleSend(receiver, sendLoad, rank);
00119 #endif
00120 }
00121 CmiFree(msg);
00122 }else
00123 {
00124 int pe = msg->to_pe;
00125 msg->to_pe = msg->from_pe;
00126 msg->from_pe = pe;
00127
00128
00129 CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
00130 CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
00131 }
00132
00133 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00134 traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
00135 #endif
00136 }
00137
00138 void CldAckNoTaskHandler(requestmsg *msg)
00139 {
00140 double now;
00141 int victim;
00142
00143 int mype = CmiMyPe();
00144 int numpes = CmiNumPes();
00145
00146 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00147 now = CmiWallTimer();
00148 #endif
00149
00150 do{
00151
00152 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
00153 } while(victim == mype);
00154
00155
00156 #if CMK_IMMEDIATE_MSG
00157
00158 if (_steal_immediate) CmiBecomeImmediate(msg);
00159 #endif
00160
00161 msg->to_pe = victim;
00162 msg->from_pe = mype;
00163 CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
00164 CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
00165
00166 CpvAccess(isStealing) = 1;
00167
00168 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
00169 traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
00170 #endif
00171 }
00172
00173 void CldHandler(void *msg)
00174 {
00175 CldInfoFn ifn; CldPackFn pfn;
00176 int len, queueing, priobits; unsigned int *prioptr;
00177
00178 CldRestoreHandler(msg);
00179 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00180 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00181 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00182
00183 }
00184
00185 #define CldPUTTOKEN(msg) \
00186 if (_steal_prio) \
00187 CldPutTokenPrio(msg); \
00188 else \
00189 CldPutToken(msg);
00190
00191 void CldBalanceHandler(void *msg)
00192 {
00193 CldRestoreHandler(msg);
00194 CldPUTTOKEN(msg);
00195 CpvAccess(isStealing) = 0;
00196 }
00197
00198 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00199 {
00200 int len, queueing, priobits,i; unsigned int *prioptr;
00201 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00202 CldPackFn pfn;
00203 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00204 if (pfn) {
00205 pfn(&msg);
00206 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00207 }
00208 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00209 CmiSetInfo(msg,infofn);
00210
00211 CmiSyncMulticastAndFree(grp, len, msg);
00212 }
00213
00214 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
00215 {
00216 int len, queueing, priobits,i; unsigned int *prioptr;
00217 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00218 CldPackFn pfn;
00219 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00220 if (pfn) {
00221 pfn(&msg);
00222 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00223 }
00224 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00225 CmiSetInfo(msg,infofn);
00226 CmiSyncListSendAndFree(npes, pes, len, msg);
00227 }
00228
00229 void CldEnqueue(int pe, void *msg, int infofn)
00230 {
00231 int len, queueing, priobits, avg; unsigned int *prioptr;
00232 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00233 CldPackFn pfn;
00234
00235 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00236 pe = CmiMyPe();
00237
00238
00239 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00240 if (pfn && CmiNumNodes()>1) {
00241 pfn(&msg);
00242 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00243 }
00244 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00245 CmiSetInfo(msg,infofn);
00246 CldPUTTOKEN(msg);
00247 }
00248 else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
00249 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00250 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00251 }
00252 else {
00253 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00254 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00255 pfn(&msg);
00256 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00257 }
00258 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00259 CmiSetInfo(msg,infofn);
00260 if (pe==CLD_BROADCAST)
00261 CmiSyncBroadcastAndFree(len, msg);
00262 else if (pe==CLD_BROADCAST_ALL)
00263 CmiSyncBroadcastAllAndFree(len, msg);
00264 else CmiSyncSendAndFree(pe, len, msg);
00265 }
00266 }
00267
00268 void CldNodeEnqueue(int node, void *msg, int infofn)
00269 {
00270 int len, queueing, priobits, pe, avg; unsigned int *prioptr;
00271 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00272 CldPackFn pfn;
00273 if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00274 pe = CmiMyPe();
00275 node = CmiNodeOf(pe);
00276 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00277 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00278 }
00279 else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
00280 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00281 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00282 }
00283 else {
00284 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00285 if (pfn) {
00286 pfn(&msg);
00287 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00288 }
00289 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00290 CmiSetInfo(msg,infofn);
00291 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
00292 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
00293 else CmiSyncNodeSendAndFree(node, len, msg);
00294 }
00295 }
00296
00297
00298 void CldGraphModuleInit(char **argv)
00299 {
00300 CpvInitialize(CldProcInfo, CldData);
00301 CpvInitialize(int, CldAskLoadHandlerIndex);
00302 CpvInitialize(int, CldAckNoTaskHandlerIndex);
00303 CpvInitialize(int, CldBalanceHandlerIndex);
00304
00305 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00306 #if CMK_TRACE_ENABLED
00307 CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
00308 CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
00309 CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
00310 #endif
00311
00312 CpvAccess(CldBalanceHandlerIndex) =
00313 CmiRegisterHandler(CldBalanceHandler);
00314 CpvAccess(CldAskLoadHandlerIndex) =
00315 CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
00316
00317 CpvAccess(CldAckNoTaskHandlerIndex) =
00318 CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
00319
00320
00321 if (CmiMyRank() == CmiMyNodeSize()) return;
00322
00323 _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
00324
00325 if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
00326 {
00327 CmiAssert(WS_Threshold>=0);
00328 }
00329
00330 _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
00331
00332 _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
00333
00334
00335 if(CmiNumPes() > 1)
00336 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
00337 (CcdVoidFn) CldBeginIdle, NULL);
00338 if(WS_Threshold >= 0 && CmiMyPe() == 0)
00339 CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
00340 #if CMK_IMMEDIATE_MSG
00341 if(_steal_immediate && CmiMyPe() == 0)
00342 CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
00343 #endif
00344 }
00345
00346
00347 void CldModuleInit(char **argv)
00348 {
00349 CpvInitialize(int, CldHandlerIndex);
00350 CpvInitialize(int, CldRelocatedMessages);
00351 CpvInitialize(int, CldLoadBalanceMessages);
00352 CpvInitialize(int, CldMessageChunks);
00353 CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
00354 CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) =
00355 CpvAccess(CldMessageChunks) = 0;
00356
00357 CldModuleGeneralInit(argv);
00358 CldGraphModuleInit(argv);
00359
00360 CpvAccess(CldLoadNotify) = 1;
00361
00362 CpvInitialize(int, isStealing);
00363 CpvAccess(isStealing) = 0;
00364 }
00365
00366 void CldCallback()
00367 {}