00001 #include "converse.h"
00002 #include "cldb.prioritycentralized.h"
00003 #include "queueing.h"
00004 #include "cldb.h"
00005
00006 #include "priorityqueue.c"
00007
00008 #define IDLE_IMMEDIATE 1
00009 #define TRACE_USEREVENTS 0
00010
00011 #define PERIOD 50
00012 #define MAXOVERLOAD 1
00013
00014 #define YH_DEBUG 0
00015 #define THRESHOLD_LOAD 5
00016 #define _U_INT_MAX 2000000000
00017
00018 #define LOAD_WEIGHT 0.1
00019 #define PRIOR_WEIGHT 0.1
00020
00021 CpvDeclare(CldProcInfo, CldData);
00022 extern char *_lbtopo;
00023 int _lbsteal = 0;
00024
00025 CpvDeclare(MsgHeap, CldManagerLoadQueue);
00026 CpvDeclare(CldSlavePriorInfo*, CldSlavesPriorityQueue);
00027
00028 CpvDeclare(int, CldAskLoadHandlerIndex);
00029 CpvDeclare(int, CldstorecharemsgHandlerIndex);
00030 CpvDeclare(int, CldHigherPriorityComesHandlerIndex);
00031 CpvDeclare(int, CldReadytoExecHandlerIndex);
00032 CpvDeclare(void*, CldRequestQueue);
00033
00034 void LoadNotifyFn(int l)
00035 {
00036 CldProcInfo cldData = CpvAccess(CldData);
00037 cldData->sent = 0;
00038 }
00039
00040 char *CldGetStrategy(void)
00041 {
00042 return "prioritycentralized";
00043 }
00044
00045 void SendTasktoPe(int receiver, void *msg)
00046 {
00047 CldInfoFn ifn;
00048 CldPackFn pfn;
00049 int len, queueing, priobits, avg;
00050 unsigned int *prioptr;
00051 int old_load;
00052 int new_load;
00053
00054 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00055 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00056 CldRestoreHandler(msg);
00057 CldSwitchHandler(msg, CpvAccess(CldHigherPriorityComesHandlerIndex));
00058 CmiSyncSendAndFree(receiver, len, msg);
00059
00060 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00061 new_load = old_load + 1;
00062 if(old_load == 0)
00063 {
00064 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = *prioptr;
00065 }else
00066 {
00067 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority/(new_load)*old_load + *prioptr/(new_load);
00068 }
00069 CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
00070
00071 #if YH_DEBUG
00072 CmiPrintf(" P%d====>P%d sending this msg with prior %u to processor %d len=%d \n", CmiMyPe(), receiver, *prioptr, receiver, len);
00073 #endif
00074 }
00075
00076 static void CldStoreCharemsg(void *msg)
00077 {
00078
00079 CldInfoFn ifn;
00080 CldPackFn pfn;
00081 int len, queueing, priobits, avg;
00082 unsigned int *prioptr;
00083 priormsg *p_msg ;
00084
00085 requestmsg *request_msg;
00086 int request_pe;
00087 void* loadmsg;
00088
00089
00090
00091 int i=0;
00092 unsigned int _max_ = *prioptr;
00093 int index = 1;
00094 double max_evaluation = 0;
00095 int old_load;
00096
00097 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00098 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00099 #if YH_DEBUG
00100 CmiPrintf(" Step 2: on processor 0, Get new created msg and store it , PRIOR=%u Timer=%f\n", *prioptr, CmiTimer());
00101 #endif
00102
00103 for(i=1; i<CmiNumPes();i++)
00104 {
00105 #if YH_DEBUG
00106 CmiPrintf(" processor %d has load num:%d\n", i, CpvAccess(CldSlavesPriorityQueue)[i].load);
00107 #endif
00108 old_load = CpvAccess(CldSlavesPriorityQueue)[i].load;
00109 if(old_load == 0)
00110 {
00111 index = i;
00112 break;
00113 }
00114 double evaluation = (CpvAccess(CldSlavesPriorityQueue)[i].average_priority)* PRIOR_WEIGHT * (THRESHOLD_LOAD - CpvAccess(CldSlavesPriorityQueue)[i].load);
00115 if(evaluation > max_evaluation)
00116 {
00117 max_evaluation = evaluation;
00118 index = i;
00119 }
00120 }
00121 if(old_load == 0 || CpvAccess(CldSlavesPriorityQueue)[index].average_priority > *prioptr)
00122 {
00123
00124 SendTasktoPe(index, msg);
00125 #if YH_DEBUG
00126 CmiPrintf(" Step 2-1: processor 0 send task to idle processor %d, msg prior=%u Timer=%f\n", index, *prioptr, CmiTimer());
00127 #endif
00128 return;
00129 }
00130
00131 p_msg = (priormsg*)malloc(sizeof(priormsg));
00132 p_msg->priority = *prioptr;
00133 p_msg->msg = msg;
00134
00135 if(heap_isFull(&CpvAccess(CldManagerLoadQueue)))
00136 {
00137 CmiPrintf("Queue is already full, message will be lost\n");
00138 }
00139 else
00140 heap_addItem(&CpvAccess(CldManagerLoadQueue), p_msg);
00141 #if YH_DEBUG
00142 CmiPrintf(" Step 2-3: processor 0 , all processors are busy , store this msg msg prior=%u Queuesize=%d Timer=%f\n", *prioptr, heap_size(&CpvAccess(CldManagerLoadQueue)), CmiTimer());
00143 #endif
00144
00145 }
00146
00147
00148 static void CldAskLoadHandler(requestmsg *msg)
00149 {
00150
00151
00152 int receiver, rank, recvIdx, i;
00153 void* loadmsg;
00154 CldInfoFn ifn;
00155 CldPackFn pfn;
00156 int len, queueing, priobits, avg;
00157 unsigned int *prioptr;
00158
00159 double old_load;
00160 double new_load;
00161 double old_average_prior;
00162
00163 receiver = msg->from_pe;
00164 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00165 old_average_prior = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority;
00166 #if YH_DEBUG
00167 CmiPrintf(" Step 6 :%f %d<======= getrequest from processor queue current size=%d, notidle=%d, load=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)), msg->notidle, CpvAccess(CldSlavesPriorityQueue)[receiver].load);
00168 #endif
00169 if(!msg->notidle || old_load == 0 || old_load == 1)
00170 {
00171 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = _U_INT_MAX;
00172 CpvAccess(CldSlavesPriorityQueue)[receiver].load = 0;
00173 }else
00174 {
00175 new_load = old_load - 1;
00176 CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
00177 CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = old_average_prior/new_load * old_load - msg->priority/new_load;
00178 }
00179
00180 old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
00181 if(old_load < THRESHOLD_LOAD)
00182 {
00183 priormsg *p_msg = heap_extractMin(&CpvAccess(CldManagerLoadQueue));
00184 if(p_msg == 0)
00185 {
00186 #if YH_DEBUG
00187 CmiPrintf(" Step 6-1 :%f Queue is empty no task %d<======= getrequest from processor queue current size=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)));
00188 #endif
00189 return;
00190 }
00191
00192 loadmsg = p_msg->msg;
00193 SendTasktoPe(receiver, loadmsg);
00194 }
00195 }
00196
00197
00198
00199 static void CldBeginIdle(void *dummy)
00200 {
00201 CpvAccess(CldData)->lastCheck = CmiWallTimer();
00202 }
00203
00204 static void CldEndIdle(void *dummy)
00205 {
00206 CpvAccess(CldData)->lastCheck = -1;
00207 }
00208
00209 static void CldStillIdle(void *dummy, double curT)
00210 {
00211 if(CmiMyPe() == 0)
00212 {
00213 #if YH_DEBUG
00214 CmiPrintf(" Processor %d is idle, queue size=%d \n", CmiMyPe(), heap_size(&CpvAccess(CldManagerLoadQueue)) );
00215 #endif
00216 return;
00217 }else
00218 {
00219 #if YH_DEBUG
00220 CmiPrintf("Processor %d, has task number of %d\n", CmiMyPe(), CpvAccess(CldData)->load);
00221 #endif
00222 }
00223
00224 int i;
00225 double startT;
00226 requestmsg msg;
00227 CldProcInfo cldData = CpvAccess(CldData);
00228 double now = curT;
00229 double lt = cldData->lastCheck;
00230
00231 cldData->load = 0;
00232 msg.notidle = 0;
00233 if ((lt!=-1 && now-lt< PERIOD*0.001) ) return;
00234 #if YH_DEBUG
00235 CmiPrintf("Step 1000: processor %d task is already zero ", CmiMyPe());
00236 #endif
00237
00238 cldData->lastCheck = now;
00239 msg.from_pe = CmiMyPe();
00240 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
00241
00242 cldData->sent = 1;
00243 #if YH_DEBUG
00244 CmiPrintf("Step 1000: processor %d task is already zero sentidle=%d", CmiMyPe(), (&msg)->notidle);
00245 #endif
00246 CmiSyncSend(0, sizeof(requestmsg), &msg);
00247 }
00248 void CldReadytoExec(void *msg)
00249 {
00250
00251 CldProcInfo cldData = CpvAccess(CldData);
00252 CldRestoreHandler(msg);
00253 CmiHandleMessage(msg);
00254 cldData->load = cldData->load - 1;
00255
00256 requestmsg r_msg;
00257
00258 r_msg.notidle = 1;
00259 r_msg.from_pe = CmiMyPe();
00260 CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
00261 CmiSyncSend(0, sizeof(requestmsg), &r_msg);
00262
00263 #if YH_DEBUG
00264 CmiPrintf(" Step final: message is handled on processor %d, task left=%d", CmiMyPe(), cldData->load);
00265 #endif
00266 }
00267 void HigherPriorityWork(void *msg)
00268 {
00269
00270
00271 CldInfoFn ifn;
00272 CldPackFn pfn;
00273 int len, queueing, priobits;
00274 unsigned int *prioptr;
00275 CldProcInfo cldData = CpvAccess(CldData);
00276 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00277 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00278 CldRestoreHandler(msg);
00279 CldSwitchHandler(msg, CpvAccess(CldReadytoExecHandlerIndex));
00280 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00281 cldData->load = cldData->load + 1;
00282
00283 #if YH_DEBUG
00284 CmiPrintf(" Step 3: processor %d, Task arrives and put it into charm++ queue, prior=%u Timer=%f\n", CmiMyPe(), *prioptr, CmiTimer());
00285 #endif
00286 }
00287
00288
00289 void CldEnqueue(int pe, void *msg, int infofn)
00290 {
00291 int len, queueing, priobits, avg;
00292 unsigned int *prioptr;
00293 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00294 CldPackFn pfn;
00295
00296 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00297 CmiSetInfo(msg,infofn);
00298 #if YH_DEBUG
00299 CmiPrintf(" Step 1: Creation New msg on pe %d priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(), *prioptr, CmiTimer(), len);
00300 #endif
00301
00302 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
00303 pe = CmiMyPe();
00304
00305
00306 CldSwitchHandler(msg, CpvAccess(CldstorecharemsgHandlerIndex));
00307 if(pe == 0)
00308 {
00309 CldStoreCharemsg(msg);
00310 }else{
00311 if (pfn && CmiNumNodes()>1) {
00312 pfn(&msg);
00313 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00314 }
00315 #if YH_DEBUG
00316 CmiPrintf(" Step 1-1: Creation New msg on pe%d ==> p0 priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(), *prioptr, CmiTimer(), len);
00317 #endif
00318
00319 CmiSyncSendAndFree(0, len, msg);
00320 }
00321 }else if((pe == CmiMyPe()) || (CmiNumPes() == 1) ) {
00322
00323 CsdEnqueueGeneral(msg, CQS_QUEUEING_IFIFO, priobits, prioptr);
00324 }else {
00325 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00326 pfn(&msg);
00327 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00328 }
00329 if (pe==CLD_BROADCAST)
00330 CmiSyncBroadcastAndFree(len, msg);
00331 else if (pe==CLD_BROADCAST_ALL)
00332 CmiSyncBroadcastAllAndFree(len, msg);
00333 else CmiSyncSendAndFree(pe, len, msg);
00334
00335 }
00336 }
00337
00338 void CldHandler(char *msg)
00339 {
00340 int len, queueing, priobits;
00341 unsigned int *prioptr;
00342 CldInfoFn ifn; CldPackFn pfn;
00343 CldRestoreHandler(msg);
00344 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00345 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00346 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00347 }
00348
00349 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00350 {
00351 int len, queueing, priobits,i;
00352 unsigned int *prioptr;
00353 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00354 CldPackFn pfn;
00355 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00356 if (pfn) {
00357 pfn(&msg);
00358 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00359 }
00360 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00361 CmiSetInfo(msg,infofn);
00362
00363 CmiSyncMulticastAndFree(grp, len, msg);
00364 }
00365
00366 void CldOtherInit()
00367 {
00368
00369 CpvInitialize(CldProcInfo, CldData);
00370 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
00371 CpvAccess(CldData)->lastCheck = -1;
00372 CpvAccess(CldData)->sent = 0;
00373 CpvAccess(CldData)->load = 0;
00374 #if 1
00375 _lbsteal = 1;
00376 if (_lbsteal) {
00377
00378 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
00379 (CcdVoidFn) CldBeginIdle, NULL);
00380 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
00381 (CcdVoidFn) CldStillIdle, NULL);
00382 CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
00383 (CcdVoidFn) CldEndIdle, NULL);
00384 if (CmiMyPe() == 0)
00385 CmiPrintf("Charm++> Work stealing is enabled. \n");
00386 }
00387 #endif
00388
00389
00390 if (CmiMyPe() == 0){
00391 int numpes = CmiNumPes();
00392 CpvAccess(CldSlavesPriorityQueue) = (CldSlavePriorInfo*)CmiAlloc(sizeof(CldSlavePriorInfo) * numpes);
00393 int i=0;
00394 for(i=0; i<numpes; i++){
00395 CpvAccess(CldSlavesPriorityQueue)[i].average_priority = _U_INT_MAX;
00396 CpvAccess(CldSlavesPriorityQueue)[i].load = 0;
00397 }
00398 }
00399 }
00400
00401 void CldModuleInit(char **argv)
00402 {
00403
00404 CpvInitialize(int, CldHandlerIndex);
00405 CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
00406
00407 CpvInitialize(int, CldAskLoadHandlerIndex);
00408 CpvInitialize(int, CldstorecharemsgHandlerIndex);
00409 CpvInitialize(int, CldHigherPriorityComesHandlerIndex);
00410 CpvInitialize(int, CldReadytoExecHandlerIndex);
00411 CpvInitialize(MsgHeap, CldManagerLoadQueue);
00412 CpvInitialize(CldSlavePriorInfo*, CldSlavesPriorityQueue);
00413 CpvInitialize(void*, CldRequestQueue);
00414
00415 CpvAccess(CldstorecharemsgHandlerIndex) = CmiRegisterHandler(CldStoreCharemsg);
00416 CpvAccess(CldHigherPriorityComesHandlerIndex) = CmiRegisterHandler(HigherPriorityWork);
00417 CpvAccess(CldAskLoadHandlerIndex) = CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
00418 CpvAccess(CldReadytoExecHandlerIndex) = CmiRegisterHandler((CmiHandler)CldReadytoExec);
00419 CpvAccess(CldRequestQueue) = (void *)CqsCreate();
00420 CldModuleGeneralInit(argv);
00421
00422 CldOtherInit();
00423
00424 CpvAccess(CldLoadNotify) = 1;
00425
00426
00427
00428 }
00429
00430
00431
00432 void CldNodeEnqueue(int node, void *msg, int infofn)
00433 {
00434 int len, queueing, priobits;
00435 unsigned int *prioptr;
00436 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00437 CldPackFn pfn;
00438 if (node == CLD_ANYWHERE) {
00439 node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
00440 if (node != CmiMyNode())
00441 CpvAccess(CldRelocatedMessages)++;
00442 }
00443 if (node == CmiMyNode() && !CmiImmIsRunning()) {
00444 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00445 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00446 } else {
00447 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00448 if (pfn) {
00449 pfn(&msg);
00450 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00451 }
00452 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00453 CmiSetInfo(msg,infofn);
00454 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
00455 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
00456 else CmiSyncNodeSendAndFree(node, len, msg);
00457 }
00458 }
00459
00460 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
00461 {
00462 int len, queueing, priobits,i;
00463 unsigned int *prioptr;
00464 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00465 CldPackFn pfn;
00466 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00467 if (pfn) {
00468 pfn(&msg);
00469 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00470 }
00471 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
00472 CmiSetInfo(msg,infofn);
00473
00474 CmiSyncListSendAndFree(npes, pes, len, msg);
00475 }
00476
00477
00478 void CldCallback()
00479 {}