00001 #include "converse.h"
00002 #include "cldb.prioritycentralized.h"
00003 #include "queueing.h"
00004 #include "cldb.h"
00005
00006 #include "priorityqueue.C"
00007
00008 #define IDLE_IMMEDIATE 1
00009 #define TRACE_USEREVENTS 0
00010
00011 #define PERIOD 50
00012 #define MAXOVERLOAD 1
00013
00014 #define YH_DEBUG 0
00015 #define THRESHOLD_LOAD 5
00016 #define _U_INT_MAX 2000000000
00017
00018 #define LOAD_WEIGHT 0.1
00019 #define PRIOR_WEIGHT 0.1
00020
00021 CpvDeclare(CldProcInfo, CldData);
00022 extern char *_lbtopo;
00023 int _lbsteal = 0;
00024
00025 CpvDeclare(MsgHeap, CldManagerLoadQueue);
00026 CpvDeclare(CldSlavePriorInfo*, CldSlavesPriorityQueue);
00027
00028 CpvDeclare(int, CldAskLoadHandlerIndex);
00029 CpvDeclare(int, CldstorecharemsgHandlerIndex);
00030 CpvDeclare(int, CldHigherPriorityComesHandlerIndex);
00031 CpvDeclare(int, CldReadytoExecHandlerIndex);
00032 CpvDeclare(void*, CldRequestQueue);
00033
00034 void LoadNotifyFn(int l)
00035 {
00036 CldProcInfo cldData = CpvAccess(CldData);
00037 cldData->sent = 0;
00038 }
00039
00040 const char *CldGetStrategy(void)
00041 {
00042 return "prioritycentralized";
00043 }
00044
00045 void SendTasktoPe(int receiver, void *msg)
00046 {
00047 CldInfoFn ifn;
00048 CldPackFn pfn;
00049 int len, queueing, priobits, avg;
00050 unsigned int *prioptr;
00051 int old_load;
00052 int new_load;
00053
00054 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00055 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00056 CldRestoreHandler((char *)msg);
00057 CldSwitchHandler((char *)msg, CpvAccess(CldHigherPriorityComesHandlerIndex));
00058 CmiSyncSendAndFree(receiver, len, msg);
00059
00060 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00061 new_load = old_load + 1;
00062 if(old_load == 0)
00063 {
00064 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = *prioptr;
00065 }else
00066 {
00067 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority/(new_load)*old_load + *prioptr/(new_load);
00068 }
00069 CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
00070
00071 #if YH_DEBUG
00072 CmiPrintf(" P%d====>P%d sending this msg with prior %u to processor %d len=%d \n", CmiMyPe(), receiver, *prioptr, receiver, len);
00073 #endif
00074 }
00075
00076 static void CldStoreCharemsg(void *msg)
00077 {
00078
00079 CldInfoFn ifn;
00080 CldPackFn pfn;
00081 int len, queueing, priobits, avg;
00082 unsigned int *prioptr;
00083 priormsg *p_msg ;
00084
00085 requestmsg *request_msg;
00086 int request_pe;
00087 void* loadmsg;
00088
00089
00090
00091 int i=0;
00092 int index = 1;
00093 double max_evaluation = 0;
00094 int old_load;
00095
00096 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00097 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00098 #if YH_DEBUG
00099 CmiPrintf(" Step 2: on processor 0, Get new created msg and store it , PRIOR=%u Timer=%f\n", *prioptr, CmiTimer());
00100 #endif
00101
00102 for(i=1; i<CmiNumPes();i++)
00103 {
00104 #if YH_DEBUG
00105 CmiPrintf(" processor %d has load num:%d\n", i, CpvAccess(CldSlavesPriorityQueue)[i].load);
00106 #endif
00107 old_load = CpvAccess(CldSlavesPriorityQueue)[i].load;
00108 if(old_load == 0)
00109 {
00110 index = i;
00111 break;
00112 }
00113 double evaluation = (CpvAccess(CldSlavesPriorityQueue)[i].average_priority)* PRIOR_WEIGHT * (THRESHOLD_LOAD - CpvAccess(CldSlavesPriorityQueue)[i].load);
00114 if(evaluation > max_evaluation)
00115 {
00116 max_evaluation = evaluation;
00117 index = i;
00118 }
00119 }
00120 if(old_load == 0 || CpvAccess(CldSlavesPriorityQueue)[index].average_priority > *prioptr)
00121 {
00122
00123 SendTasktoPe(index, msg);
00124 #if YH_DEBUG
00125 CmiPrintf(" Step 2-1: processor 0 send task to idle processor %d, msg prior=%u Timer=%f\n", index, *prioptr, CmiTimer());
00126 #endif
00127 return;
00128 }
00129
00130 p_msg = (priormsg*)malloc(sizeof(priormsg));
00131 p_msg->priority = *prioptr;
00132 p_msg->msg = msg;
00133
00134 if(heap_isFull(&CpvAccess(CldManagerLoadQueue)))
00135 {
00136 CmiPrintf("Queue is already full, message will be lost\n");
00137 }
00138 else
00139 heap_addItem(&CpvAccess(CldManagerLoadQueue), p_msg);
00140 #if YH_DEBUG
00141 CmiPrintf(" Step 2-3: processor 0 , all processors are busy , store this msg msg prior=%u Queuesize=%d Timer=%f\n", *prioptr, heap_size(&CpvAccess(CldManagerLoadQueue)), CmiTimer());
00142 #endif
00143
00144 }
00145
00146
00147 static void CldAskLoadHandler(requestmsg *msg)
00148 {
00149
00150
00151 int receiver, rank, recvIdx, i;
00152 void* loadmsg;
00153 CldInfoFn ifn;
00154 CldPackFn pfn;
00155 int len, queueing, priobits, avg;
00156 unsigned int *prioptr;
00157
00158 double old_load;
00159 double new_load;
00160 double old_average_prior;
00161
00162 receiver = msg->from_pe;
00163 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00164 old_average_prior = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority;
00165 #if YH_DEBUG
00166 CmiPrintf(" Step 6 :%f %d<======= getrequest from processor queue current size=%d, notidle=%d, load=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)), msg->notidle, CpvAccess(CldSlavesPriorityQueue)[receiver].load);
00167 #endif
00168 if(!msg->notidle || old_load == 0 || old_load == 1)
00169 {
00170 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = _U_INT_MAX;
00171 CpvAccess(CldSlavesPriorityQueue)[receiver].load = 0;
00172 }else
00173 {
00174 new_load = old_load - 1;
00175 CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
00176 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = old_average_prior/new_load * old_load - msg->priority/new_load;
00177 }
00178
00179 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00180 if(old_load < THRESHOLD_LOAD)
00181 {
00182 priormsg *p_msg = heap_extractMin(&CpvAccess(CldManagerLoadQueue));
00183 if(p_msg == 0)
00184 {
00185 #if YH_DEBUG
00186 CmiPrintf(" Step 6-1 :%f Queue is empty no task %d<======= getrequest from processor queue current size=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)));
00187 #endif
00188 return;
00189 }
00190
00191 loadmsg = p_msg->msg;
00192 SendTasktoPe(receiver, loadmsg);
00193 }
00194 }
00195
00196
00197
00198 static void CldBeginIdle(void *dummy)
00199 {
00200 CpvAccess(CldData)->lastCheck = CmiWallTimer();
00201 }
00202
00203 static void CldEndIdle(void *dummy)
00204 {
00205 CpvAccess(CldData)->lastCheck = -1;
00206 }
00207
00208 static void CldStillIdle(void *dummy, double curT)
00209 {
00210 if(CmiMyPe() == 0)
00211 {
00212 #if YH_DEBUG
00213 CmiPrintf(" Processor %d is idle, queue size=%d \n", CmiMyPe(), heap_size(&CpvAccess(CldManagerLoadQueue)) );
00214 #endif
00215 return;
00216 }else
00217 {
00218 #if YH_DEBUG
00219 CmiPrintf("Processor %d, has task number of %d\n", CmiMyPe(), CpvAccess(CldData)->load);
00220 #endif
00221 }
00222
00223 int i;
00224 double startT;
00225 requestmsg msg;
00226 CldProcInfo cldData = CpvAccess(CldData);
00227 double now = curT;
00228 double lt = cldData->lastCheck;
00229
00230 cldData->load = 0;
00231 msg.notidle = 0;
00232 if ((lt!=-1 && now-lt< PERIOD*0.001) ) return;
00233 #if YH_DEBUG
00234 CmiPrintf("Step 1000: processor %d task is already zero ", CmiMyPe());
00235 #endif
00236
00237 cldData->lastCheck = now;
00238 msg.from_pe = CmiMyPe();
00239 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
00240
00241 cldData->sent = 1;
00242 #if YH_DEBUG
00243 CmiPrintf("Step 1000: processor %d task is already zero sentidle=%d", CmiMyPe(), (&msg)->notidle);
00244 #endif
00245 CmiSyncSend(0, sizeof(requestmsg), &msg);
00246 }
00247 void CldReadytoExec(void *msg)
00248 {
00249
00250 CldProcInfo cldData = CpvAccess(CldData);
00251 CldRestoreHandler((char *)msg);
00252 CmiHandleMessage(msg);
00253 cldData->load = cldData->load - 1;
00254
00255 requestmsg r_msg;
00256
00257 r_msg.notidle = 1;
00258 r_msg.from_pe = CmiMyPe();
00259 CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
00260 CmiSyncSend(0, sizeof(requestmsg), &r_msg);
00261
00262 #if YH_DEBUG
00263 CmiPrintf(" Step final: message is handled on processor %d, task left=%d", CmiMyPe(), cldData->load);
00264 #endif
00265 }
00266 void HigherPriorityWork(void *msg)
00267 {
00268
00269
00270 CldInfoFn ifn;
00271 CldPackFn pfn;
00272 int len, queueing, priobits;
00273 unsigned int *prioptr;
00274 CldProcInfo cldData = CpvAccess(CldData);
00275 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00276 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00277 CldRestoreHandler((char *)msg);
00278 CldSwitchHandler((char *)msg, CpvAccess(CldReadytoExecHandlerIndex));
00279 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00280 cldData->load = cldData->load + 1;
00281
00282 #if YH_DEBUG
00283 CmiPrintf(" Step 3: processor %d, Task arrives and put it into charm++ queue, prior=%u Timer=%f\n", CmiMyPe(), *prioptr, CmiTimer());
00284 #endif
00285 }
00286
00287
00288 void CldEnqueue(int pe, void *msg, int infofn)
00289 {
00290 int len, queueing, priobits, avg;
00291 unsigned int *prioptr;
00292 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00293 CldPackFn pfn;
00294
00295 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00296 CmiSetInfo(msg,infofn);
00297 #if YH_DEBUG
00298 CmiPrintf(" Step 1: Creation New msg on pe %d priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(), *prioptr, CmiTimer(), len);
00299 #endif
00300
00301 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00302 pe = CmiMyPe();
00303
00304
00305 CldSwitchHandler((char *)msg, CpvAccess(CldstorecharemsgHandlerIndex));
00306 if(pe == 0)
00307 {
00308 CldStoreCharemsg(msg);
00309 }else{
00310 if (pfn && CmiNumNodes()>1) {
00311 pfn(&msg);
00312 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00313 }
00314 #if YH_DEBUG
00315 CmiPrintf(" Step 1-1: Creation New msg on pe%d ==> p0 priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(), *prioptr, CmiTimer(), len);
00316 #endif
00317
00318 CmiSyncSendAndFree(0, len, msg);
00319 }
00320 }else if((pe == CmiMyPe()) || (CmiNumPes() == 1) ) {
00321
00322 CsdEnqueueGeneral(msg, CQS_QUEUEING_IFIFO, priobits, prioptr);
00323 }else {
00324 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00325 pfn(&msg);
00326 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00327 }
00328 if (pe==CLD_BROADCAST)
00329 CmiSyncBroadcastAndFree(len, msg);
00330 else if (pe==CLD_BROADCAST_ALL)
00331 CmiSyncBroadcastAllAndFree(len, msg);
00332 else CmiSyncSendAndFree(pe, len, msg);
00333
00334 }
00335 }
00336
00337 void CldHandler(char *msg)
00338 {
00339 int len, queueing, priobits;
00340 unsigned int *prioptr;
00341 CldInfoFn ifn; CldPackFn pfn;
00342 CldRestoreHandler(msg);
00343 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00344 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00345 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00346 }
00347
00348 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00349 {
00350 int len, queueing, priobits,i;
00351 unsigned int *prioptr;
00352 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00353 CldPackFn pfn;
00354 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00355 if (pfn) {
00356 pfn(&msg);
00357 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00358 }
00359 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00360 CmiSetInfo(msg,infofn);
00361
00362 CmiSyncMulticastAndFree(grp, len, msg);
00363 }
00364
00365 void CldOtherInit()
00366 {
00367
00368 CpvInitialize(CldProcInfo, CldData);
00369 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00370 CpvAccess(CldData)->lastCheck = -1;
00371 CpvAccess(CldData)->sent = 0;
00372 CpvAccess(CldData)->load = 0;
00373 #if 1
00374 _lbsteal = 1;
00375 if (_lbsteal) {
00376
00377 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
00378 (CcdVoidFn) CldBeginIdle, NULL);
00379 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
00380 (CcdVoidFn) CldStillIdle, NULL);
00381 CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
00382 (CcdVoidFn) CldEndIdle, NULL);
00383 if (CmiMyPe() == 0)
00384 CmiPrintf("Charm++> Work stealing is enabled. \n");
00385 }
00386 #endif
00387
00388
00389 if (CmiMyPe() == 0){
00390 int numpes = CmiNumPes();
00391 CpvAccess(CldSlavesPriorityQueue) = (CldSlavePriorInfo*)CmiAlloc(sizeof(CldSlavePriorInfo) * numpes);
00392 int i=0;
00393 for(i=0; i<numpes; i++){
00394 CpvAccess(CldSlavesPriorityQueue)[i].average_priority = _U_INT_MAX;
00395 CpvAccess(CldSlavesPriorityQueue)[i].load = 0;
00396 }
00397 }
00398 }
00399
00400 void CldModuleInit(char **argv)
00401 {
00402
00403 CpvInitialize(int, CldHandlerIndex);
00404 CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
00405
00406 CpvInitialize(int, CldAskLoadHandlerIndex);
00407 CpvInitialize(int, CldstorecharemsgHandlerIndex);
00408 CpvInitialize(int, CldHigherPriorityComesHandlerIndex);
00409 CpvInitialize(int, CldReadytoExecHandlerIndex);
00410 CpvInitialize(MsgHeap, CldManagerLoadQueue);
00411 CpvInitialize(CldSlavePriorInfo*, CldSlavesPriorityQueue);
00412 CpvInitialize(void*, CldRequestQueue);
00413
00414 CpvAccess(CldstorecharemsgHandlerIndex) = CmiRegisterHandler(CldStoreCharemsg);
00415 CpvAccess(CldHigherPriorityComesHandlerIndex) = CmiRegisterHandler(HigherPriorityWork);
00416 CpvAccess(CldAskLoadHandlerIndex) = CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
00417 CpvAccess(CldReadytoExecHandlerIndex) = CmiRegisterHandler((CmiHandler)CldReadytoExec);
00418 CpvAccess(CldRequestQueue) = (void *)CqsCreate();
00419 CldModuleGeneralInit(argv);
00420
00421 CldOtherInit();
00422
00423 CpvAccess(CldLoadNotify) = 1;
00424
00425
00426
00427 }
00428
00429
00430
00431 void CldNodeEnqueue(int node, void *msg, int infofn)
00432 {
00433 int len, queueing, priobits;
00434 unsigned int *prioptr;
00435 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00436 CldPackFn pfn;
00437 if (node == CLD_ANYWHERE) {
00438 node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
00439 if (node != CmiMyNode())
00440 CpvAccess(CldRelocatedMessages)++;
00441 }
00442 if (node == CmiMyNode() && !CmiImmIsRunning()) {
00443 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00444 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00445 } else {
00446 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00447 if (pfn) {
00448 pfn(&msg);
00449 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00450 }
00451 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00452 CmiSetInfo(msg,infofn);
00453 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
00454 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
00455 else CmiSyncNodeSendAndFree(node, len, msg);
00456 }
00457 }
00458
00459 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
00460 {
00461 int len, queueing, priobits,i;
00462 unsigned int *prioptr;
00463 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00464 CldPackFn pfn;
00465 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00466 if (pfn) {
00467 pfn(&msg);
00468 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00469 }
00470 CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
00471 CmiSetInfo(msg,infofn);
00472
00473 CmiSyncListSendAndFree(npes, pes, len, msg);
00474 }
00475
00476
00477 void CldCallback()
00478 {}