00001
00018 #ifndef __PCQUEUE__
00019 #define __PCQUEUE__
00020
00021 #include "conv-config.h"
00022
00023
00024
00025
00026
00027
00028
00029 #if CMK_SMP && !CMK_PCQUEUE_LOCK
00030
00031
00032 #if !CMK_SMP_NO_PCQUEUE_PUSH_LOCK
00033 #define CMK_PCQUEUE_PUSH_LOCK 1
00034 #endif
00035
00036 #endif
00037
00038 #if CMK_SMP
00039 #include <atomic>
00040 #define CMK_SMP_volatile volatile
00041 #else
00042 #define CMK_SMP_volatile
00043 #endif
00044
00045
00046
00047 #if !CMK_SMP || CMK_PCQUEUE_LOCK
00048 #define PCQueue_CmiMemoryReadFence()
00049 #define PCQueue_CmiMemoryWriteFence()
00050 #define PCQueue_CmiMemoryAtomicIncrement(k, mem) k=k+1
00051 #define PCQueue_CmiMemoryAtomicDecrement(k, mem) k=k-1
00052 #define PCQueue_CmiMemoryAtomicLoad(k, mem) k
00053 #define PCQueue_CmiMemoryAtomicStore(k, v, mem) k=v
00054 #else
00055 #define PCQueue_CmiMemoryReadFence CmiMemoryReadFence
00056 #define PCQueue_CmiMemoryWriteFence CmiMemoryWriteFence
00057 #define PCQueue_CmiMemoryAtomicIncrement(k, mem) std::atomic_fetch_add_explicit(&k, 1, mem)
00058 #define PCQueue_CmiMemoryAtomicDecrement(k, mem) std::atomic_fetch_sub_explicit(&k, 1, mem)
00059 #define PCQueue_CmiMemoryAtomicLoad(k, mem) std::atomic_load_explicit(&k, mem)
00060 #define PCQueue_CmiMemoryAtomicStore(k, v, mem) std::atomic_store_explicit(&k, v, mem);
00061 #endif
00062
00063 #define PCQueueSize 0x100
00064
00071 #if !USE_SIMPLE_PCQUEUE
00072
00073 typedef struct CircQueueStruct
00074 {
00075 struct CircQueueStruct * CMK_SMP_volatile next;
00076 int push;
00077 #if CMK_SMP
00078 char _pad1[CMI_CACHE_LINE_SIZE - (sizeof(struct CircQueueStruct *) + sizeof(int))];
00079 #endif
00080 int pull;
00081 #if CMK_SMP
00082 char _pad2[CMI_CACHE_LINE_SIZE - sizeof(int)];
00083 std::atomic<char *> data[PCQueueSize];
00084 #else
00085 char *data[PCQueueSize];
00086 #endif
00087 }
00088 *CircQueue;
00089
00090 typedef struct PCQueueStruct
00091 {
00092 CircQueue head;
00093 #if CMK_SMP
00094 char _pad1[CMI_CACHE_LINE_SIZE - sizeof(CircQueue)];
00095 #endif
00096 CircQueue CMK_SMP_volatile tail;
00097 #if CMK_SMP
00098 char _pad2[CMI_CACHE_LINE_SIZE - sizeof(CircQueue)];
00099 std::atomic<int> len;
00100 #else
00101 int len;
00102 #endif
00103 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00104 CmiNodeLock lock;
00105 #endif
00106 }
00107 *PCQueue;
00108
00109 static PCQueue PCQueueCreate(void)
00110 {
00111 CircQueue circ;
00112 PCQueue Q;
00113
00114 circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00115 Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
00116 _MEMCHECK(Q);
00117 Q->head = circ;
00118 Q->tail = circ;
00119 Q->len = 0;
00120 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00121 Q->lock = CmiCreateLock();
00122 #endif
00123 return Q;
00124 }
00125
00126 static void PCQueueDestroy(PCQueue Q)
00127 {
00128 CircQueue circ = Q->head;
00129 while (circ != Q->tail) {
00130 free(circ);
00131 circ = circ->next;
00132 }
00133 free(circ);
00134 free(Q);
00135 }
00136
00137 static int PCQueueEmpty(PCQueue Q)
00138 {
00139 return (PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_acquire) == 0);
00140 }
00141
00142 static int PCQueueLength(PCQueue Q)
00143 {
00144 return PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_acquire);
00145 }
00146
00147 static char *PCQueueTop(PCQueue Q)
00148 {
00149 CircQueue circ; int pull; char *data;
00150
00151 if (PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_relaxed) == 0) return 0;
00152 #if CMK_PCQUEUE_LOCK
00153 CmiLock(Q->lock);
00154 #endif
00155 circ = Q->head;
00156 pull = circ->pull;
00157 data = PCQueue_CmiMemoryAtomicLoad(circ->data[pull], std::memory_order_acquire);
00158
00159 #if CMK_PCQUEUE_LOCK
00160 CmiUnlock(Q->lock);
00161 #endif
00162 return data;
00163 }
00164
00165
00166 static char *PCQueuePop(PCQueue Q)
00167 {
00168 CircQueue circ; int pull; char *data;
00169
00170 if (PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_relaxed) == 0) return 0;
00171 #if CMK_PCQUEUE_LOCK
00172 CmiLock(Q->lock);
00173 #endif
00174 circ = Q->head;
00175 pull = circ->pull;
00176 data = PCQueue_CmiMemoryAtomicLoad(circ->data[pull], std::memory_order_acquire);
00177
00178
00179 if (data) {
00180 circ->pull = (pull + 1);
00181 circ->data[pull] = 0;
00182 if (pull == PCQueueSize - 1) {
00183
00184 PCQueue_CmiMemoryReadFence();
00185
00186 Q->head = circ-> next;
00187 CmiAssert(Q->head != NULL);
00188
00189 free(circ);
00190
00191
00192
00193 }
00194 PCQueue_CmiMemoryAtomicDecrement(Q->len, std::memory_order_release);
00195 #if CMK_PCQUEUE_LOCK
00196 CmiUnlock(Q->lock);
00197 #endif
00198 return data;
00199 }
00200 else {
00201
00202 #if CMK_PCQUEUE_LOCK
00203 CmiUnlock(Q->lock);
00204 #endif
00205 return 0;
00206 }
00207 }
00208
00209 static void PCQueuePush(PCQueue Q, char *data)
00210 {
00211 CircQueue circ, circ1; int push;
00212
00213 #if CMK_PCQUEUE_LOCK|| CMK_PCQUEUE_PUSH_LOCK
00214 CmiLock(Q->lock);
00215 #endif
00216 circ1 = Q->tail;
00217 #ifdef PCQUEUE_MULTIQUEUE
00218 CmiMemoryAtomicFetchAndInc(circ1->push, push);
00219 #else
00220 push = circ1->push;
00221 circ1->push = (push + 1);
00222 #endif
00223 #ifdef PCQUEUE_MULTIQUEUE
00224 while (push >= PCQueueSize) {
00225
00226
00227 PCQueue_CmiMemoryReadFence();
00228 while (Q->tail == circ1);
00229 circ1 = Q->tail;
00230 CmiMemoryAtomicFetchAndInc(circ1->push, push);
00231 }
00232 #endif
00233
00234 if (push == (PCQueueSize -1)) {
00235
00236
00237
00238 circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00239
00240 #ifdef PCQUEUE_MULTIQUEUE
00241 PCQueue_CmiMemoryWriteFence();
00242 #endif
00243
00244 Q->tail->next = circ;
00245 Q->tail = circ;
00246 }
00247
00248 PCQueue_CmiMemoryAtomicStore(circ1->data[push], data, std::memory_order_release);
00249 PCQueue_CmiMemoryAtomicIncrement(Q->len, std::memory_order_relaxed);
00250
00251 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00252 CmiUnlock(Q->lock);
00253 #endif
00254 }
00255
00256 #else
00257
00261 typedef struct PCQueueStruct
00262 {
00263 char **head;
00264 #if CMK_SMP
00265 char _pad1[CMI_CACHE_LINE_SIZE - sizeof(char**)];
00266 #endif
00267
00268
00269 char** tail;
00270 #if CMK_SMP
00271 char _pad2[CMI_CACHE_LINE_SIZE - sizeof(char**)];
00272 std::atomic<int> len;
00273 #else
00274 int len;
00275 #endif
00276 #if CMK_SMP
00277 char _pad3[CMI_CACHE_LINE_SIZE - sizeof(int)];
00278 #endif
00279
00280 const char **data;
00281 const char **bufEnd;
00282
00283 #if CMK_PCQUEUE_LOCK
00284 CmiNodeLock lock;
00285 #endif
00286
00287 }
00288 *PCQueue;
00289
00290 static PCQueue PCQueueCreate(void)
00291 {
00292 PCQueue Q;
00293
00294 Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
00295 Q->data = (const char **)malloc(sizeof(char *)*PCQueueSize);
00296 memset(Q->data, 0, sizeof(char *)*PCQueueSize);
00297 _MEMCHECK(Q);
00298 Q->head = (char **)Q->data;
00299 Q->tail = (char **)Q->data;
00300 Q->len = 0;
00301 Q->bufEnd = Q->data + PCQueueSize;
00302
00303 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00304 Q->lock = CmiCreateLock();
00305 #endif
00306
00307 return Q;
00308 }
00309
00310 static void PCQueueDestroy(PCQueue Q)
00311 {
00312 free(Q->data);
00313 free(Q);
00314 }
00315
00316 static int PCQueueEmpty(PCQueue Q)
00317 {
00318 return (PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_acquire) == 0);
00319 }
00320
00321 static int PCQueueLength(PCQueue Q)
00322 {
00323 return PCQueue_CmiMemoryAtomicLoad(Q->len, std::memory_order_acquire);
00324 }
00325 static char *PCQueueTop(PCQueue Q)
00326 {
00327
00328 char *data;
00329
00330 #if CMK_PCQUEUE_LOCK
00331 CmiLock(Q->lock);
00332 #endif
00333
00334 data = *(Q->head);
00335
00336
00337 #if CMK_PCQUEUE_LOCK
00338 CmiUnlock(Q->lock);
00339 #endif
00340
00341 return data;
00342 }
00343
00344 static char *PCQueuePop(PCQueue Q)
00345 {
00346
00347 char *data;
00348
00349 #if CMK_PCQUEUE_LOCK
00350 CmiLock(Q->lock);
00351 #endif
00352
00353 data = *(Q->head);
00354
00355
00356 PCQueue_CmiMemoryReadFence();
00357
00358 if(data){
00359 *(Q->head) = 0;
00360 Q->head++;
00361
00362 if (Q->head == (char **)Q->bufEnd ) {
00363 Q->head = (char **)Q->data;
00364 }
00365 PCQueue_CmiMemoryAtomicDecrement(Q->len, std::memory_order_release);
00366
00367 }
00368
00369 #if CMK_PCQUEUE_LOCK
00370 CmiUnlock(Q->lock);
00371 #endif
00372
00373 return data;
00374 }
00375 static void PCQueuePush(PCQueue Q, char *data)
00376 {
00377 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00378 CmiLock(Q->lock);
00379 #endif
00380
00381 PCQueue_CmiMemoryWriteFence();
00382
00383
00384
00385 *(Q->tail) = data;
00386 Q->tail++;
00387
00388 if (Q->tail == (char **)Q->bufEnd) {
00389
00390
00391 Q->tail = (char **)Q->data;
00392 }
00393
00394 #if 0
00395 if(Q->head == Q->tail && Q->len>0){
00396 CmiAbort("Simple PCQueue is full!!\n");
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407 }
00408 #endif
00409
00410 PCQueue_CmiMemoryAtomicIncrement(Q->len, std::memory_order_release);
00411
00412 #if CMK_PCQUEUE_LOCK || CMK_PCQUEUE_PUSH_LOCK
00413 CmiUnlock(Q->lock);
00414 #endif
00415 }
00416 #endif
00417
00418
00419 #if CMK_LOCKLESS_QUEUE
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 #include <stdlib.h>
00447 #include <stdint.h>
00448 #include <limits.h>
00449 #include <sched.h>
00450
00451 typedef char** DataNode;
00452
00453
00454 extern int DataNodeSize;
00455 extern int MaxDataNodes;
00456 extern int QueueUpperBound;
00457 extern int DataNodeWrap;
00458 extern int QueueWrap;
00459 extern int messageQueueOverflow;
00460
00461
00462 #define NodePoolSize 0x100
00463 #define FreeNodeWrap (NodePoolSize - 1)
00464
00465 void ReportOverflow()
00466 {
00467 CmiMemoryAtomicIncrement(messageQueueOverflow);
00468 }
00469
00470
00471
00472
00473
00474
00475 typedef struct FreeNodePoolStruct
00476 {
00477 std::atomic<unsigned int> push;
00478 std::atomic<unsigned int> pull;
00479 std::atomic<uintptr_t> nodes[NodePoolSize];
00480 } *FreeNodePool;
00481
00482
00483
00484
00485
00486
00487 typedef struct MPSCQueueStruct
00488 {
00489 std::atomic<unsigned int> push;
00490 char pad1[CMI_CACHE_LINE_SIZE - sizeof(std::atomic<unsigned int>)];
00491 unsigned int pull;
00492 char pad2[CMI_CACHE_LINE_SIZE - sizeof(unsigned int)];
00493 std::atomic<uintptr_t> *nodes;
00494 char pad3[CMI_CACHE_LINE_SIZE - sizeof(std::atomic<uintptr_t> *)];
00495 FreeNodePool freeNodePool;
00496 } *MPSCQueue;
00497
00498 static unsigned int WrappedDifference(unsigned int push, unsigned int pull)
00499 {
00500 unsigned int difference;
00501 if(push < pull)
00502 {
00503 difference = (UINT_MAX - pull + push);
00504 }
00505 else
00506 {
00507 difference = push - pull;
00508 }
00509 return difference;
00510 }
00511
00512 static int QueueFull(unsigned int push, unsigned int pull)
00513 {
00514 int difference = WrappedDifference(push, pull);
00515
00516
00517
00518
00519
00520 if(difference >= (QueueUpperBound - 2*DataNodeSize))
00521 return 1;
00522 else
00523 return 0;
00524 }
00525
00526
00527 static DataNode DataNodeCreate(void)
00528 {
00529 DataNode node = (DataNode)malloc(sizeof(char*)*DataNodeSize);
00530 int i;
00531 for(i = 0; i < DataNodeSize; ++i) node[i] = NULL;
00532 return node;
00533 }
00534
00535
00536 static FreeNodePool FreeNodePoolCreate(void)
00537 {
00538 FreeNodePool free_q;
00539
00540 free_q = (FreeNodePool)malloc(sizeof(struct FreeNodePoolStruct));
00541 std::atomic_store_explicit(&free_q->push, 0u, std::memory_order_relaxed);
00542 std::atomic_store_explicit(&free_q->pull, 0u, std::memory_order_relaxed);
00543
00544 int i;
00545 for(i = 0; i < NodePoolSize; ++i)
00546 std::atomic_store_explicit(&free_q->nodes[i], (uintptr_t)NULL, std::memory_order_relaxed);
00547
00548 return free_q;
00549 }
00550
00551
00552 static void FreeNodePoolDestroy(FreeNodePool q)
00553 {
00554 int i;
00555 for(i = 0; i < NodePoolSize; ++i)
00556 {
00557 if((uintptr_t)atomic_load_explicit(&q->nodes[i], std::memory_order_acquire) != (uintptr_t)NULL)
00558 {
00559 DataNode n = (DataNode)atomic_load_explicit(&q->nodes[i], std::memory_order_acquire);
00560 free(n);
00561 }
00562 }
00563 free(q);
00564 }
00565
00566 static DataNode get_free_node(FreeNodePool q)
00567 {
00568 DataNode node;
00569 unsigned int push;
00570 unsigned int pull = std::atomic_load_explicit(&q->pull, std::memory_order_acquire);
00571
00572
00573 do
00574 {
00575 push = std::atomic_load_explicit(&q->push, std::memory_order_acquire);
00576
00577 if(pull == push)
00578 return DataNodeCreate();
00579
00580 } while(!std::atomic_compare_exchange_weak_explicit(&q->pull, &pull, (pull + 1) & FreeNodeWrap, std::memory_order_release, std::memory_order_relaxed));
00581
00582
00583 while((node = (DataNode)atomic_load_explicit(&q->nodes[pull], std::memory_order_acquire)) == NULL);
00584 std::atomic_store_explicit(&q->nodes[pull], (uintptr_t)NULL, std::memory_order_release);
00585
00586 return node;
00587 }
00588
00589 static void add_free_node(FreeNodePool q, DataNode available)
00590 {
00591 unsigned int pull;
00592 unsigned int push = std::atomic_load_explicit(&q->push, std::memory_order_acquire);
00593
00594
00595 do
00596 {
00597
00598 if((uintptr_t)atomic_load_explicit(&q->nodes[push], std::memory_order_acquire) != (uintptr_t)NULL)
00599 {
00600 free(available);
00601 return;
00602 }
00603
00604 } while(!std::atomic_compare_exchange_weak_explicit(&q->push, &push, (push + 1) & FreeNodeWrap, std::memory_order_release, std::memory_order_relaxed));
00605
00606 std::atomic_store_explicit(&q->nodes[push], (uintptr_t)available, std::memory_order_release);
00607 }
00608
00609 static MPSCQueue MPSCQueueCreate(void)
00610 {
00611
00612 MPSCQueue Q = (MPSCQueue)malloc(sizeof(struct MPSCQueueStruct));
00613 Q->nodes = (std::atomic<uintptr_t>*)malloc(sizeof(std::atomic<uintptr_t>)*MaxDataNodes);
00614 Q->freeNodePool = FreeNodePoolCreate();
00615 Q->pull = 0;
00616 std::atomic_store_explicit(&Q->push, 0u, std::memory_order_relaxed);
00617
00618 unsigned int i;
00619 for(i = 0; i < MaxDataNodes; ++i)
00620 {
00621 std::atomic_store_explicit(&Q->nodes[i], (uintptr_t)NULL, std::memory_order_relaxed);
00622 }
00623
00624 return Q;
00625 }
00626
00627 static void MPSCQueueDestroy(MPSCQueue Q)
00628 {
00629
00630 unsigned int i, j;
00631 for(i = 0; i < MaxDataNodes; ++i)
00632 if((uintptr_t)atomic_load_explicit(&Q->nodes[i], std::memory_order_acquire) != (uintptr_t)NULL)
00633 {
00634 free((DataNode)atomic_load_explicit(&Q->nodes[i], std::memory_order_acquire));
00635 }
00636
00637 FreeNodePoolDestroy(Q->freeNodePool);
00638 free(Q->nodes);
00639 free(Q);
00640 }
00641
00642
00643 static inline unsigned int get_node_index(unsigned int value)
00644 {
00645 return ((value & QueueWrap) / DataNodeSize);
00646 }
00647
00648
00649 static DataNode get_push_node(MPSCQueue Q, unsigned int push_idx)
00650 {
00651
00652 unsigned int node_idx = get_node_index(push_idx);
00653
00654
00655 if((push_idx & DataNodeWrap) == 0)
00656 {
00657 DataNode new_node = get_free_node(Q->freeNodePool);
00658 std::atomic_store_explicit(&Q->nodes[node_idx], (uintptr_t)new_node, std::memory_order_release);
00659 return new_node;
00660 }
00661 else
00662 {
00663 DataNode node;
00664 while((node = (DataNode)atomic_load_explicit(&Q->nodes[node_idx], std::memory_order_acquire)) == NULL);
00665 return node;
00666 }
00667 }
00668
00669
00670 static inline DataNode get_pop_node(MPSCQueue Q, unsigned int pull_idx)
00671 {
00672 unsigned int node_idx = get_node_index(pull_idx);
00673
00674 return (DataNode)atomic_load_explicit(&Q->nodes[node_idx], std::memory_order_relaxed);
00675 }
00676
00677
00678 static void check_mem_reclamation(MPSCQueue Q, unsigned int pull_idx, DataNode node)
00679 {
00680 unsigned int node_idx = get_node_index(pull_idx);
00681
00682
00683 if((pull_idx & DataNodeWrap) == (DataNodeSize - 1))
00684 {
00685 add_free_node(Q->freeNodePool, node);
00686 std::atomic_store_explicit(&Q->nodes[node_idx], (uintptr_t)NULL, std::memory_order_relaxed);
00687 }
00688 }
00689
00690 static int MPSCQueueEmpty(MPSCQueue Q)
00691 {
00692 unsigned int push = std::atomic_load_explicit(&Q->push, std::memory_order_relaxed);
00693 unsigned int pull = Q->pull;
00694 return WrappedDifference(push, pull) == 0;
00695 }
00696
00697 static int MPSCQueueLength(MPSCQueue Q)
00698 {
00699 unsigned int push = std::atomic_load_explicit(&Q->push, std::memory_order_relaxed);
00700 unsigned int pull = Q->pull;
00701 return (int)WrappedDifference(push, pull);
00702 }
00703
00704 static char *MPSCQueueTop(MPSCQueue Q)
00705 {
00706 unsigned int pull = Q->pull;
00707 unsigned int push = std::atomic_load_explicit(&Q->push, std::memory_order_acquire);
00708
00709 DataNode node = get_pop_node(Q, pull);
00710
00711 if(pull == push || node == NULL) return NULL;
00712
00713 unsigned int node_pull = pull & DataNodeWrap;
00714
00715 char * data = node[node_pull];
00716 return data;
00717 }
00718
00719 static char *MPSCQueuePop(MPSCQueue Q)
00720 {
00721 unsigned int pull = Q->pull;
00722 unsigned int push = std::atomic_load_explicit(&Q->push, std::memory_order_acquire);
00723 if(pull == push)
00724 return NULL;
00725
00726 DataNode node = get_pop_node(Q, pull);
00727 if(node == NULL)
00728 return NULL;
00729
00730 unsigned int node_pull = pull & DataNodeWrap;
00731
00732 char * data = node[node_pull];
00733 if(data == NULL)
00734 return NULL;
00735
00736 node[node_pull] = NULL;
00737
00738 Q->pull = (pull + 1);
00739
00740 check_mem_reclamation(Q, pull, node);
00741
00742 return data;
00743 }
00744
00745 static void MPSCQueuePush(MPSCQueue Q, char *data)
00746 {
00747 unsigned int push = std::atomic_fetch_add_explicit(&Q->push, 1u, std::memory_order_release);
00748 unsigned int pull = Q->pull;
00749
00750 while(QueueFull(push, pull))
00751 {
00752 ReportOverflow();
00753
00754 sched_yield();
00755 pull = Q->pull;
00756 }
00757
00758 DataNode node = get_push_node(Q, push);
00759 node[push & DataNodeWrap] = data;
00760 }
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799 typedef struct MPMCDataNodeStruct
00800 {
00801 std::atomic<uintptr_t> *data;
00802 std::atomic<unsigned int> num_popped;
00803 } *MPMCDataNode;
00804
00805
00806
00807
00808
00809
00810 typedef struct FreeMPMCNodePoolStruct
00811 {
00812 std::atomic<unsigned int> push;
00813 std::atomic<unsigned int> pull;
00814 std::atomic<uintptr_t> nodes[NodePoolSize];
00815 } *FreeMPMCNodePool;
00816
00817
00818
00819
00820
00821
00822 typedef struct MPMCQueueStruct
00823 {
00824 std::atomic<unsigned int> push;
00825 char pad1[CMI_CACHE_LINE_SIZE - sizeof(std::atomic<unsigned int>)];
00826 std::atomic<unsigned int> pull;
00827 char pad2[CMI_CACHE_LINE_SIZE - sizeof(std::atomic<unsigned int>)];
00828 std::atomic<uintptr_t> *nodes;
00829 char pad3[CMI_CACHE_LINE_SIZE - sizeof(std::atomic<uintptr_t> *)];
00830 FreeMPMCNodePool freeMPMCNodePool;
00831 char pad4[CMI_CACHE_LINE_SIZE - sizeof(FreeMPMCNodePool)];
00832 std::atomic_flag queueOverflowed;
00833
00834 } *MPMCQueue;
00835
00836
00837 static MPMCDataNode MPMCDataNodeCreate(void)
00838 {
00839 MPMCDataNode node = (MPMCDataNode)malloc(sizeof(struct MPMCDataNodeStruct));
00840 node->data = (std::atomic<uintptr_t>*)malloc(sizeof(std::atomic<uintptr_t>)*DataNodeSize);
00841 std::atomic_store_explicit(&node->num_popped, 0u, std::memory_order_relaxed);
00842 int i;
00843 for(i = 0; i < DataNodeSize; ++i) std::atomic_store_explicit(&node->data[i], (uintptr_t)NULL, std::memory_order_relaxed);
00844 return node;
00845 }
00846
00847
00848 static FreeMPMCNodePool FreeMPMCNodePoolCreate(void)
00849 {
00850 FreeMPMCNodePool free_q;
00851
00852 free_q = (FreeMPMCNodePool)malloc(sizeof(struct FreeMPMCNodePoolStruct));
00853 std::atomic_store_explicit(&free_q->push, 0u, std::memory_order_relaxed);
00854 std::atomic_store_explicit(&free_q->pull, 0u, std::memory_order_relaxed);
00855
00856 int i;
00857 for(i = 0; i < NodePoolSize; ++i)
00858 std::atomic_store_explicit(&free_q->nodes[i], (uintptr_t)NULL, std::memory_order_relaxed);
00859
00860 return free_q;
00861 }
00862
00863
00864 static void FreeMPMCNodePoolDestroy(FreeMPMCNodePool q)
00865 {
00866 int i;
00867 for(i = 0; i < NodePoolSize; ++i)
00868 {
00869 if((uintptr_t)atomic_load_explicit(&q->nodes[i], std::memory_order_relaxed) != (uintptr_t)NULL)
00870 {
00871 MPMCDataNode n = (MPMCDataNode)atomic_load_explicit(&q->nodes[i], std::memory_order_relaxed);
00872 free(n->data);
00873 free(n);
00874 }
00875 }
00876 free(q);
00877 }
00878
00879 static MPMCDataNode mpmc_get_free_node(FreeMPMCNodePool q)
00880 {
00881 unsigned int push;
00882 unsigned int pull = std::atomic_load_explicit(&q->pull, std::memory_order_acquire);
00883 do
00884 {
00885 push = std::atomic_load_explicit(&q->push, std::memory_order_acquire);
00886
00887 if(pull == push)
00888 return MPMCDataNodeCreate();
00889
00890 } while(!std::atomic_compare_exchange_weak_explicit(&q->pull, &pull, (pull + 1) & FreeNodeWrap, std::memory_order_release, std::memory_order_relaxed));
00891
00892 MPMCDataNode node;
00893 while((node = (MPMCDataNode)atomic_load_explicit(&q->nodes[pull], std::memory_order_acquire)) == NULL);
00894 std::atomic_store_explicit(&q->nodes[pull], (uintptr_t)NULL, std::memory_order_release);
00895
00896 return node;
00897 }
00898
00899 static void mpmc_add_free_node(FreeMPMCNodePool q, MPMCDataNode available)
00900 {
00901
00902 unsigned int pull;
00903 unsigned int push = std::atomic_load_explicit(&q->push, std::memory_order_acquire);
00904 do
00905 {
00906
00907 if((uintptr_t)atomic_load_explicit(&q->nodes[push], std::memory_order_acquire) != (uintptr_t)NULL)
00908 {
00909 free(available->data);
00910 free(available);
00911 return;
00912 }
00913
00914 } while(!std::atomic_compare_exchange_weak_explicit(&q->push, &push, (push + 1) & FreeNodeWrap, std::memory_order_release, std::memory_order_relaxed));
00915
00916 std::atomic_store_explicit(&q->nodes[push], (uintptr_t)available, std::memory_order_release);
00917 }
00918
00919 static MPMCQueue MPMCQueueCreate(void)
00920 {
00921
00922 MPMCQueue Q = (MPMCQueue)malloc(sizeof(struct MPMCQueueStruct));
00923 Q->nodes = (std::atomic<uintptr_t>*)malloc(sizeof(std::atomic<uintptr_t>)*MaxDataNodes);
00924 Q->freeMPMCNodePool = FreeMPMCNodePoolCreate();
00925 std::atomic_store_explicit(&Q->pull, 0u, std::memory_order_relaxed);
00926 std::atomic_store_explicit(&Q->push, 0u, std::memory_order_relaxed);
00927
00928 unsigned int i;
00929 for(i = 0; i < MaxDataNodes; ++i)
00930 {
00931 std::atomic_store_explicit(&Q->nodes[i], (uintptr_t)NULL, std::memory_order_relaxed);
00932 }
00933
00934 return Q;
00935 }
00936
00937 static void MPMCQueueDestroy(MPMCQueue Q)
00938 {
00939
00940 unsigned int i, j;
00941 for(i = 0; i < MaxDataNodes; ++i)
00942 {
00943 if((uintptr_t)atomic_load_explicit(&Q->nodes[i], std::memory_order_relaxed) != (uintptr_t)NULL)
00944 {
00945 MPMCDataNode n = (MPMCDataNode)atomic_load_explicit(&Q->nodes[i], std::memory_order_relaxed);
00946 free(n->data);
00947 free(n);
00948 }
00949 }
00950
00951 FreeMPMCNodePoolDestroy(Q->freeMPMCNodePool);
00952 free(Q->nodes);
00953 free(Q);
00954 }
00955
00956
00957 static MPMCDataNode mpmc_get_push_node(MPMCQueue Q, unsigned int push_idx)
00958 {
00959 unsigned int node_idx = get_node_index(push_idx);
00960
00961 MPMCDataNode node = (MPMCDataNode)atomic_load(&Q->nodes[node_idx]);
00962
00963 if(node == NULL)
00964 {
00965
00966 MPMCDataNode new_node = MPMCDataNodeCreate();
00967 if(std::atomic_compare_exchange_strong(&Q->nodes[node_idx], (uintptr_t *)&node, (uintptr_t)new_node))
00968 return new_node;
00969 else
00970 {
00971 free(new_node->data);
00972 free(new_node);
00973 return (MPMCDataNode)atomic_load(&Q->nodes[node_idx]);
00974 }
00975 }
00976 else
00977 {
00978 return node;
00979 }
00980 }
00981
00982
00983 static inline MPMCDataNode mpmc_get_pop_node(MPMCQueue Q, unsigned int pull_idx)
00984 {
00985 unsigned int node_idx = get_node_index(pull_idx);
00986
00987 return (MPMCDataNode)atomic_load_explicit(&Q->nodes[node_idx], std::memory_order_relaxed);
00988 }
00989
00990
00991 static void mpmc_check_mem_reclamation(MPMCQueue Q, unsigned int pull_idx, MPMCDataNode node)
00992 {
00993 unsigned int node_idx = get_node_index(pull_idx);
00994
00995 unsigned int node_popped = std::atomic_fetch_add_explicit(&node->num_popped, 1u, std::memory_order_relaxed);
00996
00997
00998 if(node_popped == DataNodeSize - 1)
00999 {
01000 mpmc_add_free_node(Q->freeMPMCNodePool, node);
01001 std::atomic_store_explicit(&Q->nodes[node_idx], (uintptr_t)NULL, std::memory_order_relaxed);
01002 }
01003 }
01004
01005 static int MPMCQueueEmpty(MPMCQueue Q)
01006 {
01007 return (atomic_load_explicit(&Q->push, std::memory_order_relaxed) == std::atomic_load_explicit(&Q->pull, std::memory_order_relaxed));
01008 }
01009
01010 static int MPMCQueueLength(MPMCQueue Q)
01011 {
01012 return std::atomic_load_explicit(&Q->push, std::memory_order_relaxed) - std::atomic_load_explicit(&Q->pull, std::memory_order_relaxed);
01013 }
01014
01015 static char *MPMCQueueTop(MPMCQueue Q)
01016 {
01017 unsigned int pull = std::atomic_load_explicit(&Q->pull, std::memory_order_acquire);
01018 unsigned int push = std::atomic_load_explicit(&Q->push, std::memory_order_acquire);
01019 MPMCDataNode node = mpmc_get_pop_node(Q, pull);
01020
01021 if(pull == push || node == NULL)
01022 return NULL;
01023
01024 unsigned int node_pull = pull & DataNodeWrap;
01025
01026 char * data;
01027 while((data = (char *)atomic_load_explicit(&node->data[node_pull], std::memory_order_acquire)) == NULL);
01028
01029 return data;
01030 }
01031
01032 static char *MPMCQueuePop(MPMCQueue Q)
01033 {
01034 unsigned int pull;
01035 unsigned int push;
01036 MPMCDataNode node;
01037 do
01038 {
01039 pull = std::atomic_load_explicit(&Q->pull, std::memory_order_acquire);
01040 push = std::atomic_load_explicit(&Q->push, std::memory_order_acquire);
01041
01042 node = mpmc_get_pop_node(Q, pull);
01043
01044 if(pull == push || node == NULL)
01045 return NULL;
01046
01047 } while(!std::atomic_compare_exchange_weak_explicit(&Q->pull, &pull, pull + 1, std::memory_order_release, std::memory_order_relaxed));
01048
01049 unsigned int node_pull = pull & DataNodeWrap;
01050
01051 char * data;
01052 while((data = (char *)atomic_load_explicit(&node->data[node_pull], std::memory_order_acquire)) == NULL);
01053 std::atomic_store_explicit(&node->data[node_pull], (uintptr_t)NULL, std::memory_order_release);
01054
01055 mpmc_check_mem_reclamation(Q, pull, node);
01056
01057 return data;
01058 }
01059
01060 static void MPMCQueuePush(MPMCQueue Q, void *data)
01061 {
01062 unsigned int push = std::atomic_fetch_add_explicit(&Q->push, 1u, std::memory_order_release);
01063 unsigned int pull = std::atomic_load_explicit(&Q->pull, std::memory_order_acquire);
01064
01065 while(QueueFull(push, pull))
01066 {
01067 ReportOverflow();
01068
01069 sched_yield();
01070 pull = std::atomic_load_explicit(&Q->pull, std::memory_order_acquire);
01071 }
01072
01073 MPMCDataNode node = mpmc_get_push_node(Q, push & QueueWrap);
01074 std::atomic_store_explicit(&node->data[push & DataNodeWrap], (uintptr_t)data, std::memory_order_release);
01075 }
01076
01077
01078 #endif
01079
01080
01081
01082 #endif
01083