00001 #include "queueing.h"
00002 #include <converse.h>
00003 #include <string.h>
00004
00005 #if CMK_USE_STL_MSGQ
00006 #include "msgq.h"
00007 typedef CMK_MSG_PRIO_TYPE prio_t;
00008 #endif
00009
00010
00039 #if 1
00040
00041
00042 #define CmiAlloc malloc
00043 #define CmiFree free
00044 #endif
00045
00047 int schedAdaptMemThresholdMB;
00048
00049
00051 static void CqsDeqInit(_deq d)
00052 {
00053 d->bgn = d->space;
00054 d->end = d->space+4;
00055 d->head = d->space;
00056 d->tail = d->space;
00057 }
00058
00060 static void CqsDeqExpand(_deq d)
00061 {
00062 int rsize = (d->end - d->head);
00063 int lsize = (d->head - d->bgn);
00064 int oldsize = (d->end - d->bgn);
00065 int newsize = (oldsize << 1);
00066 void **ovec = d->bgn;
00067 void **nvec = (void **)CmiAlloc(newsize * sizeof(void *));
00068 memcpy(nvec, d->head, rsize * sizeof(void *));
00069 memcpy(nvec+rsize, d->bgn, lsize * sizeof(void *));
00070 d->bgn = nvec;
00071 d->end = nvec + newsize;
00072 d->head = nvec;
00073 d->tail = nvec + oldsize;
00074 if (ovec != d->space) CmiFree(ovec);
00075 }
00076
00078 void CqsDeqEnqueueFifo(_deq d, void *data)
00079 {
00080 void **tail = d->tail;
00081 *tail = data;
00082 tail++;
00083 if (tail == d->end) tail = d->bgn;
00084 d->tail = tail;
00085 if (tail == d->head) CqsDeqExpand(d);
00086 }
00087
00089 void CqsDeqEnqueueLifo(_deq d, void *data)
00090 {
00091 void **head = d->head;
00092 if (head == d->bgn) head = d->end;
00093 head--;
00094 *head = data;
00095 d->head = head;
00096 if (head == d->tail) CqsDeqExpand(d);
00097 }
00098
00100 void *CqsDeqDequeue(_deq d)
00101 {
00102 void **head;
00103 void **tail;
00104 void *data;
00105 head = d->head;
00106 tail = d->tail;
00107 if (head == tail) return 0;
00108 data = *head;
00109 head++;
00110 if (head == d->end) head = d->bgn;
00111 d->head = head;
00112 return data;
00113 }
00114
00116 static void CqsPrioqInit(_prioq pq)
00117 {
00118 int i;
00119 pq->heapsize = 100;
00120 pq->heapnext = 1;
00121 pq->hash_key_size = PRIOQ_TABSIZE;
00122 pq->hash_entry_size = 0;
00123 pq->heap = (_prioqelt *)CmiAlloc(100 * sizeof(_prioqelt));
00124 pq->hashtab = (_prioqelt *)CmiAlloc(pq->hash_key_size * sizeof(_prioqelt));
00125 for (i=0; i<pq->hash_key_size; i++) pq->hashtab[i]=0;
00126 }
00127
00128 #if CMK_C_INLINE
00129 inline
00130 #endif
00131
00132 static void CqsPrioqExpand(_prioq pq)
00133 {
00134 int oldsize = pq->heapsize;
00135 int newsize = oldsize * 2;
00136 _prioqelt *oheap = pq->heap;
00137 _prioqelt *nheap = (_prioqelt *)CmiAlloc(newsize*sizeof(_prioqelt));
00138 memcpy(nheap, oheap, oldsize * sizeof(_prioqelt));
00139 pq->heap = nheap;
00140 pq->heapsize = newsize;
00141 CmiFree(oheap);
00142 }
00143 #ifndef FASTQ
00144
00145 void CqsPrioqRehash(_prioq pq)
00146 {
00147 int oldHsize = pq->hash_key_size;
00148 int newHsize = oldHsize * 2;
00149 unsigned int hashval;
00150 _prioqelt pe, pe1, pe2;
00151 int i,j;
00152
00153 _prioqelt *ohashtab = pq->hashtab;
00154 _prioqelt *nhashtab = (_prioqelt *)CmiAlloc(newHsize*sizeof(_prioqelt));
00155
00156 pq->hash_key_size = newHsize;
00157
00158 for(i=0; i<newHsize; i++)
00159 nhashtab[i] = 0;
00160
00161 for(i=0; i<oldHsize; i++) {
00162 for(pe=ohashtab[i]; pe; ) {
00163 pe2 = pe->ht_next;
00164 hashval = pe->pri.bits;
00165 for (j=0; j<pe->pri.ints; j++) hashval ^= pe->pri.data[j];
00166 hashval = (hashval&0x7FFFFFFF)%newHsize;
00167
00168 pe1=nhashtab[hashval];
00169 pe->ht_next = pe1;
00170 pe->ht_handle = (nhashtab+hashval);
00171 if (pe1) pe1->ht_handle = &(pe->ht_next);
00172 nhashtab[hashval]=pe;
00173 pe = pe2;
00174 }
00175 }
00176 pq->hashtab = nhashtab;
00177 pq->hash_key_size = newHsize;
00178 CmiFree(ohashtab);
00179 }
00180 #endif
00181
00182 int CqsPrioGT_(unsigned int ints1, unsigned int *data1, unsigned int ints2, unsigned int *data2)
00183 {
00184 unsigned int val1;
00185 unsigned int val2;
00186 while (1) {
00187 if (ints1==0) return 0;
00188 if (ints2==0) return 1;
00189 val1 = *data1++;
00190 val2 = *data2++;
00191 if (val1 < val2) return 0;
00192 if (val1 > val2) return 1;
00193 ints1--;
00194 ints2--;
00195 }
00196 }
00197
00204 int CqsPrioGT(_prio prio1, _prio prio2)
00205 {
00206 #ifndef FASTQ
00207 unsigned int ints1 = prio1->ints;
00208 unsigned int ints2 = prio2->ints;
00209 #endif
00210 unsigned int *data1 = prio1->data;
00211 unsigned int *data2 = prio2->data;
00212 #ifndef FASTQ
00213 unsigned int val1;
00214 unsigned int val2;
00215 #endif
00216 while (1) {
00217 #ifndef FASTQ
00218 if (ints1==0) return 0;
00219 if (ints2==0) return 1;
00220 #else
00221 if (prio1->ints==0) return 0;
00222 if (prio2->ints==0) return 1;
00223 #endif
00224 #ifndef FASTQ
00225 val1 = *data1++;
00226 val2 = *data2++;
00227 if (val1 < val2) return 0;
00228 if (val1 > val2) return 1;
00229 ints1--;
00230 ints2--;
00231 #else
00232 if(*data1++ < *data2++) return 0;
00233 if(*data1++ > *data2++) return 1;
00234 (prio1->ints)--;
00235 (prio2->ints)--;
00236 #endif
00237 }
00238 }
00239
00241 _deq CqsPrioqGetDeq(_prioq pq, unsigned int priobits, unsigned int *priodata)
00242 {
00243 unsigned int prioints = (priobits+CINTBITS-1)/CINTBITS;
00244 unsigned int hashval, i;
00245 int heappos;
00246 _prioqelt *heap, pe, next, parent;
00247 _prio pri;
00248 int mem_cmp_res;
00249 unsigned int pri_bits_cmp;
00250 static int cnt_nilesh=0;
00251
00252 #ifdef FASTQ
00253
00254 #endif
00255
00256 hashval = priobits;
00257 for (i=0; i<prioints; i++) hashval ^= priodata[i];
00258 hashval = (hashval&0x7FFFFFFF)%PRIOQ_TABSIZE;
00259 #ifndef FASTQ
00260 for (pe=pq->hashtab[hashval]; pe; pe=pe->ht_next)
00261 if (priobits == pe->pri.bits)
00262 if (memcmp(priodata, pe->pri.data, sizeof(int)*prioints)==0)
00263 return &(pe->data);
00264 #else
00265 parent=NULL;
00266 for(pe=pq->hashtab[hashval]; pe; )
00267 {
00268 parent=pe;
00269 pri_bits_cmp=pe->pri.bits;
00270 mem_cmp_res=memcmp(priodata,pe->pri.data,sizeof(int)*prioints);
00271 if(priobits == pri_bits_cmp && mem_cmp_res==0)
00272 return &(pe->data);
00273 else if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
00274 {
00275 pe=pe->ht_right;
00276 }
00277 else
00278 {
00279 pe=pe->ht_left;
00280 }
00281 }
00282 #endif
00283
00284
00285 pe = (_prioqelt)CmiAlloc(sizeof(struct prioqelt_struct)+((prioints-1)*sizeof(int)));
00286 pe->pri.bits = priobits;
00287 pe->pri.ints = prioints;
00288 memcpy(pe->pri.data, priodata, (prioints*sizeof(int)));
00289 CqsDeqInit(&(pe->data));
00290 pri=&(pe->pri);
00291
00292
00293 next = pq->hashtab[hashval];
00294 #ifndef FASTQ
00295 pe->ht_next = next;
00296 pe->ht_handle = (pq->hashtab+hashval);
00297 if (next) next->ht_handle = &(pe->ht_next);
00298 pq->hashtab[hashval] = pe;
00299 #else
00300 pe->ht_parent = parent;
00301 pe->ht_left = NULL;
00302 pe->ht_right = NULL;
00303 if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
00304 {
00305 if(parent) {
00306 parent->ht_right = pe;
00307 pe->ht_handle = &(parent->ht_right);
00308 }
00309 else {
00310 pe->ht_handle = (pq->hashtab+hashval);
00311 pq->hashtab[hashval] = pe;
00312 }
00313
00314 }
00315 else
00316 {
00317 if(parent) {
00318 parent->ht_left = pe;
00319 pe->ht_handle = &(parent->ht_left);
00320 }
00321 else {
00322 pe->ht_handle = (pq->hashtab+hashval);
00323 pq->hashtab[hashval] = pe;
00324 }
00325
00326 }
00327 if(!next)
00328 pq->hashtab[hashval] = pe;
00329 #endif
00330 pq->hash_entry_size++;
00331 #ifndef FASTQ
00332 if(pq->hash_entry_size > 2*pq->hash_key_size)
00333 CqsPrioqRehash(pq);
00334 #endif
00335
00336 heappos = pq->heapnext++;
00337 if (heappos == pq->heapsize) CqsPrioqExpand(pq);
00338 heap = pq->heap;
00339 while (heappos > 1) {
00340 int parentpos = (heappos >> 1);
00341 _prioqelt parent = heap[parentpos];
00342 if (CqsPrioGT(pri, &(parent->pri))) break;
00343 heap[heappos] = parent; heappos=parentpos;
00344 }
00345 heap[heappos] = pe;
00346
00347 #ifdef FASTQ
00348
00349 #endif
00350
00351 return &(pe->data);
00352 }
00353
00355 void *CqsPrioqDequeue(_prioq pq)
00356 {
00357 _prio pri;
00358 _prioqelt pe, old; void *data;
00359 int heappos, heapnext;
00360 _prioqelt *heap = pq->heap;
00361 int left_child;
00362 _prioqelt temp1_ht_right, temp1_ht_left, temp1_ht_parent;
00363 _prioqelt *temp1_ht_handle;
00364 static int cnt_nilesh1=0;
00365
00366 #ifdef FASTQ
00367
00368 #endif
00369 if (pq->heapnext==1) return 0;
00370 pe = heap[1];
00371 data = CqsDeqDequeue(&(pe->data));
00372 if (pe->data.head == pe->data.tail) {
00373
00374 #ifndef FASTQ
00375 _prioqelt next = pe->ht_next;
00376 _prioqelt *handle = pe->ht_handle;
00377 if (next) next->ht_handle = handle;
00378 *handle = next;
00379 old=pe;
00380 #else
00381 old=pe;
00382 prioqelt *handle;
00383 if(pe->ht_parent)
00384 {
00385 if(pe->ht_parent->ht_left==pe) left_child=1;
00386 else left_child=0;
00387 }
00388 else
00389 {
00390 handle = pe->ht_handle;
00391 }
00392
00393 if(!pe->ht_left && !pe->ht_right)
00394 {
00395 if(pe->ht_parent) {
00396 if(left_child) pe->ht_parent->ht_left=NULL;
00397 else pe->ht_parent->ht_right=NULL;
00398 }
00399 else {
00400 *handle = NULL;
00401 }
00402 }
00403 else if(!pe->ht_right)
00404 {
00405
00406 pe->ht_left->ht_parent=pe->ht_parent;
00407 if(pe->ht_parent)
00408 {
00409 if(left_child) {
00410 pe->ht_parent->ht_left = pe->ht_left;
00411 pe->ht_left->ht_handle = &(pe->ht_parent->ht_left);
00412 }
00413 else {
00414 pe->ht_parent->ht_right = pe->ht_left;
00415 pe->ht_left->ht_handle = &(pe->ht_parent->ht_right);
00416 }
00417 }
00418 else {
00419 pe->ht_left->ht_handle = handle;
00420 *handle = pe->ht_left;
00421 }
00422 }
00423 else if(!pe->ht_left)
00424 {
00425
00426 pe->ht_right->ht_parent=pe->ht_parent;
00427
00428 if(pe->ht_parent)
00429 {
00430 if(left_child) {
00431 pe->ht_parent->ht_left = pe->ht_right;
00432 pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
00433 }
00434 else {
00435 pe->ht_parent->ht_right = pe->ht_right;
00436 pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
00437 }
00438 }
00439 else {
00440 pe->ht_right->ht_handle = handle;
00441 *handle = pe->ht_right;
00442 }
00443 }
00444 else if(!pe->ht_right->ht_left)
00445 {
00446 pe->ht_right->ht_parent=pe->ht_parent;
00447 if(pe->ht_parent)
00448 {
00449 if(left_child) {
00450 pe->ht_parent->ht_left = pe->ht_right;
00451 pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
00452 }
00453 else {
00454 pe->ht_parent->ht_right = pe->ht_right;
00455 pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
00456 }
00457 }
00458 else {
00459 pe->ht_right->ht_handle = handle;
00460 *handle = pe->ht_right;
00461 }
00462 if(pe->ht_left) {
00463 pe->ht_right->ht_left = pe->ht_left;
00464 pe->ht_left->ht_parent = pe->ht_right;
00465 pe->ht_left->ht_handle = &(pe->ht_right->ht_left);
00466 }
00467 }
00468 else
00469 {
00470
00471 for(pe=pe->ht_right; pe; )
00472 {
00473 if(pe->ht_left) pe=pe->ht_left;
00474 else
00475 {
00476 if(old->ht_parent)
00477 {
00478 if(left_child) {
00479 old->ht_parent->ht_left = pe;
00480 pe->ht_handle = &(old->ht_parent->ht_left);
00481 }
00482 else {
00483 old->ht_parent->ht_right = pe;
00484 pe->ht_handle = &(old->ht_parent->ht_right);
00485 }
00486 }
00487 else {
00488 pe->ht_handle = handle;
00489 *handle = pe;
00490 }
00491 temp1_ht_right = pe->ht_right;
00492 temp1_ht_left = pe->ht_left;
00493 temp1_ht_parent = pe->ht_parent;
00494 temp1_ht_handle = pe->ht_handle;
00495
00496 pe->ht_parent = old->ht_parent;
00497 pe->ht_left = old->ht_left;
00498 pe->ht_right = old->ht_right;
00499 if(pe->ht_left) {
00500 pe->ht_left->ht_parent = pe;
00501 pe->ht_right->ht_handle = &(pe->ht_right);
00502 }
00503 if(pe->ht_right) {
00504 pe->ht_right->ht_parent = pe;
00505 pe->ht_right->ht_handle = &(pe->ht_right);
00506 }
00507 temp1_ht_parent->ht_left = temp1_ht_right;
00508 if(temp1_ht_right) {
00509 temp1_ht_right->ht_handle = &(temp1_ht_parent->ht_left);
00510 temp1_ht_right->ht_parent = temp1_ht_parent;
00511 }
00512 break;
00513 }
00514 }
00515 }
00516 #endif
00517 pq->hash_entry_size--;
00518
00519
00520 heapnext = (--pq->heapnext);
00521 pe = heap[heapnext];
00522 pri = &(pe->pri);
00523 heappos = 1;
00524 while (1) {
00525 int childpos1, childpos2, childpos;
00526 _prioqelt ch1, ch2, child;
00527 childpos1 = heappos<<1;
00528 if (childpos1>=heapnext) break;
00529 childpos2 = childpos1+1;
00530 if (childpos2>=heapnext)
00531 { childpos=childpos1; child=heap[childpos1]; }
00532 else {
00533 ch1 = heap[childpos1];
00534 ch2 = heap[childpos2];
00535 if (CqsPrioGT(&(ch1->pri), &(ch2->pri)))
00536 {childpos=childpos2; child=ch2;}
00537 else {childpos=childpos1; child=ch1;}
00538 }
00539 if (CqsPrioGT(&(child->pri), pri)) break;
00540 heap[heappos]=child; heappos=childpos;
00541 }
00542 heap[heappos]=pe;
00543
00544
00545 if (old->data.bgn != old->data.space) CmiFree(old->data.bgn);
00546 CmiFree(old);
00547 }
00548 return data;
00549 }
00550
00551 Queue CqsCreate(void)
00552 {
00553 Queue q = (Queue)CmiAlloc(sizeof(struct Queue_struct));
00554 #if CMK_USE_STL_MSGQ
00555 q->stlQ = (void*) new conv::msgQ<prio_t>;
00556 #else
00557 q->length = 0;
00558 q->maxlen = 0;
00559 CqsDeqInit(&(q->zeroprio));
00560 CqsPrioqInit(&(q->negprioq));
00561 CqsPrioqInit(&(q->posprioq));
00562 #endif
00563 return q;
00564 }
00565
00566 void CqsDelete(Queue q)
00567 {
00568 #if CMK_USE_STL_MSGQ
00569 if (q->stlQ != NULL) delete (conv::msgQ<prio_t>*)(q->stlQ);
00570 #else
00571 CmiFree(q->negprioq.heap);
00572 CmiFree(q->posprioq.heap);
00573 #endif
00574 CmiFree(q);
00575 }
00576
00577 unsigned int CqsMaxLength(Queue q)
00578 {
00579 #if CMK_USE_STL_MSGQ
00580 return (unsigned int)((conv::msgQ<prio_t> *)q->stlQ)->max_size();
00581 #else
00582 return q->maxlen;
00583 #endif
00584 }
00585
00586 #if CMK_USE_STL_MSGQ
00587
00588 unsigned int CqsLength(Queue q)
00589 { return ( (conv::msgQ<prio_t>*)(q->stlQ) )->size(); }
00590
00591 int CqsEmpty(Queue q)
00592 { return ( (conv::msgQ<prio_t>*)(q->stlQ) )->empty(); }
00593
00594 void CqsEnqueueGeneral(Queue q, void *data, int strategy, int priobits,unsigned int *prioptr)
00595 {
00596 bool isFifo = (strategy == CQS_QUEUEING_FIFO ||
00597 strategy == CQS_QUEUEING_IFIFO ||
00598 strategy == CQS_QUEUEING_BFIFO ||
00599 strategy == CQS_QUEUEING_LFIFO);
00600 if (priobits >= sizeof(int)*8 && strategy != CQS_QUEUEING_FIFO && strategy != CQS_QUEUEING_LIFO)
00601 ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq( data, prioptr[0], isFifo);
00602 else
00603 ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq( data, 0, isFifo);
00604 }
00605
00606 void CqsEnqueueFifo(Queue q, void *data)
00607 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data); }
00608
00609 void CqsEnqueueLifo(Queue q, void *data)
00610 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data, 0, false); }
00611
00612 void CqsEnqueue(Queue q, void *data)
00613 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data); }
00614
00615 void CqsDequeue(Queue q, void **resp)
00616 { *resp = (void*) ( (conv::msgQ<prio_t>*)(q->stlQ) )->deq(); }
00617
00618 #else
00619
00620 unsigned int CqsLength(Queue q)
00621 {
00622 return q->length;
00623 }
00624
00625 int CqsEmpty(Queue q)
00626 {
00627 return (q->length == 0);
00628 }
00629
00630 void CqsEnqueueGeneral(Queue q, void *data, int strategy,
00631 int priobits,unsigned int *prioptr)
00632 {
00633 _deq d; int iprio;
00634 CmiInt8 lprio0, lprio;
00635 switch (strategy) {
00636 case CQS_QUEUEING_FIFO:
00637 CqsDeqEnqueueFifo(&(q->zeroprio), data);
00638 break;
00639 case CQS_QUEUEING_LIFO:
00640 CqsDeqEnqueueLifo(&(q->zeroprio), data);
00641 break;
00642 case CQS_QUEUEING_IFIFO:
00643 iprio=prioptr[0]+(1U<<(CINTBITS-1));
00644 if ((int)iprio<0)
00645 d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, (unsigned int*)&iprio);
00646 else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, (unsigned int*)&iprio);
00647 CqsDeqEnqueueFifo(d, data);
00648 break;
00649 case CQS_QUEUEING_ILIFO:
00650 iprio=prioptr[0]+(1U<<(CINTBITS-1));
00651 if ((int)iprio<0)
00652 d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, (unsigned int*)&iprio);
00653 else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, (unsigned int*)&iprio);
00654 CqsDeqEnqueueLifo(d, data);
00655 break;
00656 case CQS_QUEUEING_BFIFO:
00657 if (priobits&&(((int)(prioptr[0]))<0))
00658 d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
00659 else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
00660 CqsDeqEnqueueFifo(d, data);
00661 break;
00662 case CQS_QUEUEING_BLIFO:
00663 if (priobits&&(((int)(prioptr[0]))<0))
00664 d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
00665 else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
00666 CqsDeqEnqueueLifo(d, data);
00667 break;
00668
00669
00670
00671
00672
00673 case CQS_QUEUEING_LFIFO:
00674 CmiAssert(priobits == CLONGBITS);
00675 lprio0 =((CmiInt8 *)prioptr)[0];
00676 lprio0 += (1ULL<<(CLONGBITS-1));
00677 if (CmiEndianness() == 0) {
00678 lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
00679 }
00680 else {
00681 lprio = lprio0;
00682 }
00683 if (lprio0<0)
00684 d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
00685 else
00686 d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
00687 CqsDeqEnqueueFifo(d, data);
00688 break;
00689 case CQS_QUEUEING_LLIFO:
00690 lprio0 =((CmiInt8 *)prioptr)[0];
00691 lprio0 += (1ULL<<(CLONGBITS-1));
00692 if (CmiEndianness() == 0) {
00693 lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
00694 }
00695 else {
00696 lprio = lprio0;
00697 }
00698 if (lprio0<0)
00699 d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
00700 else
00701 d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
00702 CqsDeqEnqueueLifo(d, data);
00703 break;
00704 default:
00705 CmiAbort("CqsEnqueueGeneral: invalid queueing strategy.\n");
00706 }
00707 q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
00708 }
00709
00710 void CqsEnqueueFifo(Queue q, void *data)
00711 {
00712 CqsDeqEnqueueFifo(&(q->zeroprio), data);
00713 q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
00714 }
00715
00716 void CqsEnqueueLifo(Queue q, void *data)
00717 {
00718 CqsDeqEnqueueLifo(&(q->zeroprio), data);
00719 q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
00720 }
00721
00722 void CqsEnqueue(Queue q, void *data)
00723 {
00724 CqsDeqEnqueueFifo(&(q->zeroprio), data);
00725 q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
00726 }
00727
00728 void CqsDequeue(Queue q, void **resp)
00729 {
00730 #ifdef ADAPT_SCHED_MEM
00731
00732 if((q->length > 1) && (CmiMemoryUsage() > schedAdaptMemThresholdMB*1024*1024) ){
00733
00734 CqsIncreasePriorityForMemCriticalEntries(q);
00735 }
00736 #endif
00737
00738 if (q->length==0)
00739 { *resp = 0; return; }
00740 if (q->negprioq.heapnext>1)
00741 { *resp = CqsPrioqDequeue(&(q->negprioq)); q->length--; return; }
00742 if (q->zeroprio.head != q->zeroprio.tail)
00743 { *resp = CqsDeqDequeue(&(q->zeroprio)); q->length--; return; }
00744 if (q->posprioq.heapnext>1)
00745 { *resp = CqsPrioqDequeue(&(q->posprioq)); q->length--; return; }
00746 *resp = 0; return;
00747 }
00748
00749 #endif // CMK_USE_STL_MSGQ
00750 static struct prio_struct kprio_zero = { 0, 0, {0} };
00751 static struct prio_struct kprio_max = { 32, 1, {((unsigned int)(-1))} };
00752
00753 _prio CqsGetPriority(Queue q)
00754 {
00755 #if !CMK_USE_STL_MSGQ
00756 if (q->negprioq.heapnext>1) return &(q->negprioq.heap[1]->pri);
00757 if (q->zeroprio.head != q->zeroprio.tail) { return &kprio_zero; }
00758 if (q->posprioq.heapnext>1) return &(q->posprioq.heap[1]->pri);
00759 #endif
00760 return &kprio_max;
00761 }
00762
00763
00764
00765
00766
00767
00768
00769
00770
00776 void** CqsEnumerateDeq(_deq q, int *num){
00777 void **head, **tail;
00778 void **result;
00779 int count = 0;
00780 int i;
00781
00782 head = q->head;
00783 tail = q->tail;
00784
00785 while(head != tail){
00786 count++;
00787 head++;
00788 if(head == q->end)
00789 head = q->bgn;
00790 }
00791
00792 result = (void **)CmiAlloc(count * sizeof(void *));
00793 i = 0;
00794 head = q->head;
00795 tail = q->tail;
00796 while(head != tail){
00797 result[i] = *head;
00798 i++;
00799 head++;
00800 if(head == q->end)
00801 head = q->bgn;
00802 }
00803 *num = count;
00804 return(result);
00805 }
00806
00812 void** CqsEnumeratePrioq(_prioq q, int *num){
00813 void **head, **tail;
00814 void **result;
00815 int i,j;
00816 int count = 0;
00817 _prioqelt pe;
00818
00819 for(i = 1; i < q->heapnext; i++){
00820 pe = (q->heap)[i];
00821 head = pe->data.head;
00822 tail = pe->data.tail;
00823 while(head != tail){
00824 count++;
00825 head++;
00826 if(head == (pe->data).end)
00827 head = (pe->data).bgn;
00828 }
00829 }
00830
00831 result = (void **)CmiAlloc((count) * sizeof(void *));
00832 *num = count;
00833
00834 j = 0;
00835 for(i = 1; i < q->heapnext; i++){
00836 pe = (q->heap)[i];
00837 head = pe->data.head;
00838 tail = pe->data.tail;
00839 while(head != tail){
00840 result[j] = *head;
00841 j++;
00842 head++;
00843 if(head ==(pe->data).end)
00844 head = (pe->data).bgn;
00845 }
00846 }
00847
00848 return result;
00849 }
00850
00851 #if CMK_USE_STL_MSGQ
00852 void CqsEnumerateQueue(Queue q, void ***resp){
00853 conv::msgQ<prio_t> *stlQ = (conv::msgQ<prio_t>*) q->stlQ;
00854 *resp = (void **)CmiAlloc(stlQ->size() * sizeof(conv::msg_t*));
00855 stlQ->enumerate(*resp, *resp + stlQ->size());
00856 }
00857
00858 #else
00859 void CqsEnumerateQueue(Queue q, void ***resp){
00860 void **result;
00861 int num;
00862 int i,j;
00863
00864 *resp = (void **)CmiAlloc(q->length * sizeof(void *));
00865 j = 0;
00866
00867 result = CqsEnumeratePrioq(&(q->negprioq), &num);
00868 for(i = 0; i < num; i++){
00869 (*resp)[j] = result[i];
00870 j++;
00871 }
00872 CmiFree(result);
00873
00874 result = CqsEnumerateDeq(&(q->zeroprio), &num);
00875 for(i = 0; i < num; i++){
00876 (*resp)[j] = result[i];
00877 j++;
00878 }
00879 CmiFree(result);
00880
00881 result = CqsEnumeratePrioq(&(q->posprioq), &num);
00882 for(i = 0; i < num; i++){
00883 (*resp)[j] = result[i];
00884 j++;
00885 }
00886 CmiFree(result);
00887 }
00888 #endif
00889
00899 int CqsRemoveSpecificDeq(_deq q, const void *msgPtr){
00900 void **head, **tail;
00901
00902 head = q->head;
00903 tail = q->tail;
00904
00905 while(head != tail){
00906 if(*head == msgPtr){
00907
00908
00909 return 1;
00910 }
00911 head++;
00912 if(head == q->end)
00913 head = q->bgn;
00914 }
00915 return 0;
00916 }
00917
00927 int CqsRemoveSpecificPrioq(_prioq q, const void *msgPtr){
00928 void **head, **tail;
00929 void **result;
00930 int i;
00931 _prioqelt pe;
00932
00933 for(i = 1; i < q->heapnext; i++){
00934 pe = (q->heap)[i];
00935 head = pe->data.head;
00936 tail = pe->data.tail;
00937 while(head != tail){
00938 if(*head == msgPtr){
00939
00940 *head = NULL;
00941 return 1;
00942 }
00943 head++;
00944 if(head == (pe->data).end)
00945 head = (pe->data).bgn;
00946 }
00947 }
00948 return 0;
00949 }
00950
00951 void CqsRemoveSpecific(Queue q, const void *msgPtr){
00952 #if !CMK_USE_STL_MSGQ
00953 if( CqsRemoveSpecificPrioq(&(q->negprioq), msgPtr) == 0 )
00954 if( CqsRemoveSpecificDeq(&(q->zeroprio), msgPtr) == 0 )
00955 if(CqsRemoveSpecificPrioq(&(q->posprioq), msgPtr) == 0){
00956 CmiPrintf("Didn't remove the specified entry because it was not found\n");
00957 }
00958 #endif
00959 }
00960
00961 #ifdef ADAPT_SCHED_MEM
00962 int numMemCriticalEntries=0;
00963 int *memCriticalEntries=NULL;
00964 #endif
00965