00001
00018 #ifndef __PCQUEUE__
00019 #define __PCQUEUE__
00020
00021
00022
00023
00024
00025
00026
00027
00028 #if CMK_SMP && !CMK_PCQUEUE_LOCK
00029
00030
00031 #if !CMK_SMP_NO_PCQUEUE_PUSH_LOCK
00032 #define CMK_PCQUEUE_PUSH_LOCK 1
00033 #endif
00034
00035 #endif
00036
00037
00038
00039 #if CMK_PCQUEUE_LOCK
00040 #define PCQueue_CmiMemoryReadFence()
00041 #define PCQueue_CmiMemoryWriteFence()
00042 #define PCQueue_CmiMemoryAtomicIncrement(someInt) someInt=someInt+1
00043 #define PCQueue_CmiMemoryAtomicDecrement(someInt) someInt=someInt-1
00044 #else
00045 #define PCQueue_CmiMemoryReadFence CmiMemoryReadFence
00046 #define PCQueue_CmiMemoryWriteFence CmiMemoryWriteFence
00047 #define PCQueue_CmiMemoryAtomicIncrement CmiMemoryAtomicIncrement
00048 #define PCQueue_CmiMemoryAtomicDecrement CmiMemoryAtomicDecrement
00049 #endif
00050
00051 #if CMK_SMP
00052 #define CMK_SMP_volatile volatile
00053 #else
00054 #define CMK_SMP_volatile
00055 #endif
00056
00057 #define PCQueueSize 0x100
00058
00063 typedef struct CmiMemorySMPSeparation_t {
00064 unsigned char padding[128];
00065 } CmiMemorySMPSeparation_t;
00066
00073 #if !USE_SIMPLE_PCQUEUE
00074
00075 typedef struct CircQueueStruct
00076 {
00077 struct CircQueueStruct * CMK_SMP_volatile next;
00078 int push;
00079 #if CMK_SMP
00080 CmiMemorySMPSeparation_t pad1;
00081 #endif
00082 int pull;
00083 #if CMK_SMP
00084 CmiMemorySMPSeparation_t pad2;
00085 #endif
00086 char *data[PCQueueSize];
00087 }
00088 *CircQueue;
00089
00090 typedef struct PCQueueStruct
00091 {
00092 CircQueue head;
00093 #if CMK_SMP
00094 CmiMemorySMPSeparation_t pad1;
00095 #endif
00096 CircQueue CMK_SMP_volatile tail;
00097 #if CMK_SMP
00098 CmiMemorySMPSeparation_t pad2;
00099 #endif
00100 int len;
00101 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00102 CmiNodeLock lock;
00103 #endif
00104 }
00105 *PCQueue;
00106
00107
00108
00109
00110 #define FreeCircQueueStruct(dg) {\
00111 CircQueue d;\
00112 CmiMemLock();\
00113 d=(dg);\
00114 d->next = Cmi_freelist_circqueuestruct;\
00115 Cmi_freelist_circqueuestruct = d;\
00116 freeCount++;\
00117 CmiMemUnlock();\
00118 }
00119
00120 #if !XT3_PCQUEUE_HACK
00121 #define MallocCircQueueStruct(dg) {\
00122 CircQueue d;\
00123 CmiMemLock();\
00124 d = Cmi_freelist_circqueuestruct;\
00125 if (d==(CircQueue)0){\
00126 d = ((CircQueue)calloc(1, sizeof(struct CircQueueStruct))); \
00127 }\
00128 else{\
00129 freeCount--;\
00130 Cmi_freelist_circqueuestruct = d->next;\
00131 }\
00132 dg = d;\
00133 CmiMemUnlock();\
00134 }
00135 #else
00136 #define MallocCircQueueStruct(dg) {\
00137 CircQueue d;\
00138 CmiMemLock();\
00139 d = Cmi_freelist_circqueuestruct;\
00140 if (d==(CircQueue)0){\
00141 d = ((CircQueue)malloc(sizeof(struct CircQueueStruct))); \
00142 d = ((CircQueue)memset(d, 0, sizeof(struct CircQueueStruct))); \
00143 }\
00144 else{\
00145 freeCount--;\
00146 Cmi_freelist_circqueuestruct = d->next;\
00147 }\
00148 dg = d;\
00149 CmiMemUnlock();\
00150 }
00151 #endif
00152
00153 static PCQueue PCQueueCreate(void)
00154 {
00155 CircQueue circ;
00156 PCQueue Q;
00157
00158
00159 #if !XT3_PCQUEUE_HACK
00160 circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00161 #else
00162 circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
00163 circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
00164 #endif
00165 Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
00166 _MEMCHECK(Q);
00167 Q->head = circ;
00168 Q->tail = circ;
00169 Q->len = 0;
00170 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00171 Q->lock = CmiCreateLock();
00172 #endif
00173 return Q;
00174 }
00175
00176 static void PCQueueDestroy(PCQueue Q)
00177 {
00178 CircQueue circ = Q->head;
00179 while (circ != Q->tail) {
00180 free(circ);
00181 circ = circ->next;
00182 }
00183 free(circ);
00184 free(Q);
00185 }
00186
00187 static int PCQueueEmpty(PCQueue Q)
00188 {
00189 return (Q->len == 0);
00190 }
00191
00192 static int PCQueueLength(PCQueue Q)
00193 {
00194 return Q->len;
00195 }
00196
00197 static char *PCQueueTop(PCQueue Q)
00198 {
00199 CircQueue circ; int pull; char *data;
00200
00201 #if CMK_PCQUEUE_LOCK
00202 if (Q->len == 0) return 0;
00203 CmiLock(Q->lock);
00204 #endif
00205 circ = Q->head;
00206 pull = circ->pull;
00207 data = circ->data[pull];
00208
00209 #if CMK_PCQUEUE_LOCK
00210 CmiUnlock(Q->lock);
00211 #endif
00212 return data;
00213 }
00214
00215
00216 static char *PCQueuePop(PCQueue Q)
00217 {
00218 CircQueue circ; int pull; char *data;
00219
00220 #if CMK_PCQUEUE_LOCK
00221 if (Q->len == 0) return 0;
00222 CmiLock(Q->lock);
00223 #endif
00224 circ = Q->head;
00225 pull = circ->pull;
00226 data = circ->data[pull];
00227
00228 PCQueue_CmiMemoryReadFence();
00229
00230 #if XT3_ONLY_PCQUEUE_WORKAROUND
00231 if (data && (Q->len > 0)) {
00232 #else
00233 if (data) {
00234 #endif
00235 circ->pull = (pull + 1);
00236 circ->data[pull] = 0;
00237 if (pull == PCQueueSize - 1) {
00238
00239 PCQueue_CmiMemoryReadFence();
00240
00241 Q->head = circ-> next;
00242 CmiAssert(Q->head != NULL);
00243
00244
00245 free(circ);
00246
00247
00248
00249 }
00250 PCQueue_CmiMemoryAtomicDecrement(Q->len);
00251 #if CMK_PCQUEUE_LOCK
00252 CmiUnlock(Q->lock);
00253 #endif
00254 return data;
00255 }
00256 else {
00257
00258 #if CMK_PCQUEUE_LOCK
00259 CmiUnlock(Q->lock);
00260 #endif
00261 return 0;
00262 }
00263 }
00264
00265 static void PCQueuePush(PCQueue Q, char *data)
00266 {
00267 CircQueue circ, circ1; int push;
00268
00269 #if CMK_PCQUEUE_LOCK|| CMK_PCQUEUE_PUSH_LOCK
00270 CmiLock(Q->lock);
00271 #endif
00272 circ1 = Q->tail;
00273 #ifdef PCQUEUE_MULTIQUEUE
00274 CmiMemoryAtomicFetchAndInc(circ1->push, push);
00275 #else
00276 push = circ1->push;
00277 circ1->push = (push + 1);
00278 #endif
00279 #ifdef PCQUEUE_MULTIQUEUE
00280 while (push >= PCQueueSize) {
00281
00282
00283 PCQueue_CmiMemoryReadFence();
00284 while (Q->tail == circ1);
00285 circ1 = Q->tail;
00286 CmiMemoryAtomicFetchAndInc(circ1->push, push);
00287 }
00288 #endif
00289
00290 if (push == (PCQueueSize -1)) {
00291
00292
00293
00294 #if !XT3_PCQUEUE_HACK
00295 circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00296 #else
00297 circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
00298 circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
00299 #endif
00300
00301
00302 #ifdef PCQUEUE_MULTIQUEUE
00303 PCQueue_CmiMemoryWriteFence();
00304 #endif
00305
00306 Q->tail->next = circ;
00307 Q->tail = circ;
00308 }
00309 PCQueue_CmiMemoryWriteFence();
00310
00311 circ1->data[push] = data;
00312 PCQueue_CmiMemoryAtomicIncrement(Q->len);
00313
00314 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00315 CmiUnlock(Q->lock);
00316 #endif
00317 }
00318
00319 #else
00320
00324 typedef struct PCQueueStruct
00325 {
00326 char **head;
00327 #if CMK_SMP
00328 CmiMemorySMPSeparation_t pad1;
00329 #endif
00330
00331
00332 char** tail;
00333 #if CMK_SMP
00334 CmiMemorySMPSeparation_t pad2;
00335 #endif
00336
00337 int len;
00338 #if CMK_SMP
00339 CmiMemorySMPSeparation_t pad3;
00340 #endif
00341
00342 const char **data;
00343 const char **bufEnd;
00344
00345 #if CMK_PCQUEUE_LOCK
00346 CmiNodeLock lock;
00347 #endif
00348
00349 }
00350 *PCQueue;
00351
00352 static PCQueue PCQueueCreate(void)
00353 {
00354 PCQueue Q;
00355
00356 Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
00357 Q->data = (const char **)malloc(sizeof(char *)*PCQueueSize);
00358 memset(Q->data, 0, sizeof(char *)*PCQueueSize);
00359 _MEMCHECK(Q);
00360 Q->head = (char **)Q->data;
00361 Q->tail = (char **)Q->data;
00362 Q->len = 0;
00363 Q->bufEnd = Q->data + PCQueueSize;
00364
00365 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00366 Q->lock = CmiCreateLock();
00367 #endif
00368
00369 return Q;
00370 }
00371
00372 static void PCQueueDestroy(PCQueue Q)
00373 {
00374 free(Q->data);
00375 free(Q);
00376 }
00377
00378 static int PCQueueEmpty(PCQueue Q)
00379 {
00380 return (Q->len == 0);
00381 }
00382
00383
00384 static int PCQueueLength(PCQueue Q)
00385 {
00386 return Q->len;
00387 }
00388 static char *PCQueueTop(PCQueue Q)
00389 {
00390
00391 char *data;
00392
00393 #if CMK_PCQUEUE_LOCK
00394 CmiLock(Q->lock);
00395 #endif
00396
00397 data = *(Q->head);
00398
00399
00400 #if CMK_PCQUEUE_LOCK
00401 CmiUnlock(Q->lock);
00402 #endif
00403
00404 return data;
00405 }
00406
00407 static char *PCQueuePop(PCQueue Q)
00408 {
00409
00410 char *data;
00411
00412 #if CMK_PCQUEUE_LOCK
00413 CmiLock(Q->lock);
00414 #endif
00415
00416 data = *(Q->head);
00417
00418
00419 PCQueue_CmiMemoryReadFence();
00420
00421 if(data){
00422 *(Q->head) = 0;
00423 Q->head++;
00424
00425 if (Q->head == (char **)Q->bufEnd ) {
00426 Q->head = (char **)Q->data;
00427 }
00428 PCQueue_CmiMemoryAtomicDecrement(Q->len);
00429
00430 }
00431
00432 #if CMK_PCQUEUE_LOCK
00433 CmiUnlock(Q->lock);
00434 #endif
00435
00436 return data;
00437 }
00438 static void PCQueuePush(PCQueue Q, char *data)
00439 {
00440 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00441 CmiLock(Q->lock);
00442 #endif
00443
00444 PCQueue_CmiMemoryWriteFence();
00445
00446
00447
00448 *(Q->tail) = data;
00449 Q->tail++;
00450
00451 if (Q->tail == (char **)Q->bufEnd) {
00452
00453
00454 Q->tail = (char **)Q->data;
00455 }
00456
00457 #if 0
00458 if(Q->head == Q->tail && Q->len>0){
00459 CmiAbort("Simple PCQueue is full!!\n");
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470 }
00471 #endif
00472
00473 PCQueue_CmiMemoryAtomicIncrement(Q->len);
00474
00475 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00476 CmiUnlock(Q->lock);
00477 #endif
00478 }
00479 #endif
00480
00481
00482 #endif
00483