00001
00002 #ifndef __PPC_ATOMIC_QUEUE__
00003 #define __PPC_ATOMIC_QUEUE__
00004
00005 #include <pthread.h>
00006 #include <stdio.h>
00007 #include <assert.h>
00008 #include <stdint.h>
00009 #include "pcqueue.h"
00010
00011 #define DEFAULT_SIZE 2048
00012
00013 #define CMI_PPCQ_SUCCESS 0
00014 #define CMI_PPCQ_EAGAIN -1
00015
00017
00018
00019
00020
00021
00022
00023
00025
00026 #if CMK_PPC_ATOMIC_DEFAULT_IMPL
00027 #include "default_ppcq.h"
00028 #else
00029
00030 #include "ppc_atomicq_impl.h"
00031 #endif
00032
00033 #if 0
00034 void PPC_AtomicCounterAllocate (void **atomic_mem, size_t atomic_memsize);
00035 ppc_atomic_type_t PPC_AtomicLoadIncrementBounded (volatile ppc_atomic_t *counter);
00036 void PPC_AtomicStore(volatile ppc_atomic_t *counter, ppc_atomic_type_t val);
00037 void PPC_AtomicReadFence();
00038 void PPC_AtomicWriteFence();
00039 #endif
00040
00041 typedef void* PPCAtomicQueueElement;
00042
00043 typedef struct _ppcatomicstate {
00044 volatile ppc_atomic_t Producer;
00045 volatile ppc_atomic_t UpperBound;
00046 #if CMK_BLUEGENEQ
00047 char pad[32 - 2*sizeof(ppc_atomic_t)];
00048 #endif
00049 } PPCAtomicState;
00050
00051 typedef struct _ppcatomicq {
00052 PPCAtomicState * _state;
00053 volatile void * volatile * _array;
00054 volatile ppc_atomic_type_t _consumer;
00055 int _qsize;
00056 int _useOverflowQ;
00057 PCQueue _overflowQ;
00058 char _pad[24];
00059 } PPCAtomicQueue;
00060
00061 void PPCAtomicQueueInit (void * atomic_mem,
00062 size_t atomic_memsize,
00063 PPCAtomicQueue * queue,
00064 int use_overflow,
00065 int nelem)
00066 {
00067 pami_result_t rc;
00068
00069
00070 #if CMK_BLUEGENEQ
00071 assert ( (((uintptr_t) atomic_mem) & (0x1F)) == 0 );
00072 assert (sizeof(PPCAtomicState) == 32);
00073 assert (sizeof(PPCAtomicState) <= atomic_memsize);
00074 #endif
00075
00076 queue->_useOverflowQ = use_overflow;
00077
00078 int qsize = 2;
00079 while (qsize < nelem)
00080 qsize *= 2;
00081 queue->_qsize = qsize;
00082
00083 queue->_state = (PPCAtomicState *) atomic_mem;
00084 queue->_overflowQ = PCQueueCreate();
00085 queue->_consumer = 0;
00086 PPC_AtomicStore(&queue->_state->Producer, 0);
00087 PPC_AtomicStore(&queue->_state->UpperBound, qsize);
00088
00089 rc = (pami_result_t) posix_memalign ((void **)&queue->_array,
00090 128,
00091 sizeof(PPCAtomicQueueElement) * qsize);
00092
00093 assert(rc == PAMI_SUCCESS);
00094 memset((void*)queue->_array, 0, sizeof(PPCAtomicQueueElement)*qsize);
00095 }
00096
00097 int PPCAtomicEnqueue (PPCAtomicQueue * queue,
00098 void * element)
00099 {
00100
00101
00102 int qsize_1 = queue->_qsize - 1;
00103 ppc_atomic_type_t index = PPC_AtomicLoadIncrementBounded(&queue->_state->Producer);
00104 PPC_AtomicWriteFence();
00105 if (index != CMI_PPC_ATOMIC_FAIL) {
00106 queue->_array[index & qsize_1] = element;
00107 return CMI_PPCQ_SUCCESS;
00108 }
00109
00110
00111 if (!queue->_useOverflowQ)
00112 return CMI_PPCQ_EAGAIN;
00113
00114
00115 PCQueuePush(queue->_overflowQ, (char *)element);
00116
00117 return CMI_PPCQ_SUCCESS;
00118 }
00119
00120 void * PPCAtomicDequeue (PPCAtomicQueue *queue)
00121 {
00122 ppc_atomic_type_t head, tail;
00123 tail = PPC_AQVal(queue->_state->Producer);
00124 head = queue->_consumer;
00125 int qsize_1 = queue->_qsize-1;
00126
00127 volatile void *e = NULL;
00128 if (head < tail) {
00129 e = queue->_array[head & qsize_1];
00130 if (e == NULL)
00131 return NULL;
00132
00133 queue->_array[head & qsize_1] = NULL;
00134 PPC_AtomicReadFence();
00135
00136 head ++;
00137 queue->_consumer = head;
00138
00139
00140
00141 ppc_atomic_type_t n = head + queue->_qsize;
00142
00143
00144 if ((n & 0xF) == 0)
00145 PPC_AtomicStore(&queue->_state->UpperBound, n);
00146 return (void*) e;
00147 }
00148
00149
00150 if (!queue->_useOverflowQ)
00151 return NULL;
00152
00153 e = PCQueuePop (queue->_overflowQ);
00154 return (void *) e;
00155 }
00156
00157 int PPCAtomicQueueEmpty (PPCAtomicQueue *queue) {
00158 return ( (PCQueueLength(queue->_overflowQ) == 0) &&
00159 (PPC_AQVal(queue->_state->Producer) == queue->_consumer) );
00160 }
00161
00162
00163
00164 int PPCAtomicQueueSpinWait (PPCAtomicQueue * queue,
00165 int n)
00166 {
00167 if (!PPCAtomicQueueEmpty(queue))
00168 return 0;
00169
00170 ppc_atomic_type_t head, tail;
00171 head = queue->_consumer;
00172
00173 size_t i = n;
00174 do {
00175 tail = PPC_AQVal(queue->_state->Producer);
00176 i--;
00177 }
00178
00179 while (head == tail && i != 0);
00180
00181 return 0;
00182 }
00183
00184
00185
00186 int PPCAtomicQueue2QSpinWait (PPCAtomicQueue * queue0,
00187 PPCAtomicQueue * queue1,
00188 int n)
00189 {
00190 if (!PPCAtomicQueueEmpty(queue0))
00191 return 0;
00192
00193 if (!PPCAtomicQueueEmpty(queue1))
00194 return 0;
00195
00196 ppc_atomic_type_t head0, tail0;
00197 ppc_atomic_type_t head1, tail1;
00198
00199 head0 = queue0->_consumer;
00200 head1 = queue1->_consumer;
00201
00202 size_t i = n;
00203 do {
00204 tail0 = PPC_AQVal(queue0->_state->Producer);
00205 tail1 = PPC_AQVal(queue1->_state->Producer);
00206 i --;
00207 } while (head0==tail0 && head1==tail1 && i!=0);
00208
00209 return 0;
00210 }
00211
00212 #endif