00001 
00002 #ifndef __PPC_ATOMIC_QUEUE__
00003 #define __PPC_ATOMIC_QUEUE__
00004 
00005 #include <pthread.h>
00006 #include <stdio.h>
00007 #include <assert.h>
00008 #include <stdint.h>
00009 #include "pcqueue.h"
00010 
00011 #define DEFAULT_SIZE         2048
00012 
00013 #define CMI_PPCQ_SUCCESS  0
00014 #define CMI_PPCQ_EAGAIN  -1
00015 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00025 
00026 #if CMK_PPC_ATOMIC_DEFAULT_IMPL
00027 #include "default_ppcq.h"
00028 #else
00029 
00030 #include "ppc_atomicq_impl.h"
00031 #endif
00032 
00033 #if 0
00034 void PPC_AtomicCounterAllocate (void **atomic_mem, size_t  atomic_memsize);
00035 ppc_atomic_type_t PPC_AtomicLoadIncrementBounded (volatile ppc_atomic_t *counter);
00036 void PPC_AtomicStore(volatile ppc_atomic_t *counter, ppc_atomic_type_t val);
00037 void PPC_AtomicReadFence();
00038 void PPC_AtomicWriteFence();
00039 #endif
00040 
00041 typedef  void* PPCAtomicQueueElement;
00042 
00043 typedef struct _ppcatomicstate {
00044   volatile ppc_atomic_t Producer;
00045   volatile ppc_atomic_t UpperBound;
00046 #if CMK_BLUEGENEQ
00047   char pad[32 - 2*sizeof(ppc_atomic_t)];
00048 #endif
00049 } PPCAtomicState;
00050 
00051 typedef struct _ppcatomicq {
00052   PPCAtomicState              * _state;
00053   volatile void * volatile    * _array;
00054   volatile ppc_atomic_type_t    _consumer;
00055   int                           _qsize;
00056   int                           _useOverflowQ;
00057   PCQueue                       _overflowQ;   
00058   char                          _pad[24];     
00059 } PPCAtomicQueue; 
00060 
00061 void PPCAtomicQueueInit      (void            * atomic_mem,
00062   size_t            atomic_memsize,
00063   PPCAtomicQueue  * queue,
00064   int               use_overflow,
00065   int               nelem)
00066 {
00067   pami_result_t rc;
00068 
00069   
00070 #if CMK_BLUEGENEQ
00071   assert ( (((uintptr_t) atomic_mem) & (0x1F)) == 0 );
00072   assert (sizeof(PPCAtomicState) == 32); 
00073   assert (sizeof(PPCAtomicState) <= atomic_memsize);
00074 #endif
00075 
00076   queue->_useOverflowQ = use_overflow;
00077 
00078   int qsize = 2;
00079   while (qsize < nelem)
00080     qsize *= 2;
00081   queue->_qsize = qsize;
00082 
00083   queue->_state = (PPCAtomicState *) atomic_mem;
00084   queue->_overflowQ = PCQueueCreate();
00085   queue->_consumer = 0;
00086   PPC_AtomicStore(&queue->_state->Producer, 0);
00087   PPC_AtomicStore(&queue->_state->UpperBound, qsize);
00088 
00089   rc = (pami_result_t) posix_memalign ((void **)&queue->_array,
00090       128, 
00091       sizeof(PPCAtomicQueueElement) * qsize);
00092 
00093   assert(rc == PAMI_SUCCESS);
00094   memset((void*)queue->_array, 0, sizeof(PPCAtomicQueueElement)*qsize);
00095 }
00096 
00097 int PPCAtomicEnqueue (PPCAtomicQueue          * queue,
00098                       void                   * element)
00099 {
00100   
00101 
00102   int qsize_1 = queue->_qsize - 1;
00103   ppc_atomic_type_t index = PPC_AtomicLoadIncrementBounded(&queue->_state->Producer);
00104   PPC_AtomicWriteFence();
00105   if (index != CMI_PPC_ATOMIC_FAIL) {
00106     queue->_array[index & qsize_1] = element;
00107     return CMI_PPCQ_SUCCESS;
00108   }
00109 
00110   
00111   if (!queue->_useOverflowQ)
00112     return CMI_PPCQ_EAGAIN; 
00113 
00114   
00115   PCQueuePush(queue->_overflowQ, (char *)element);
00116 
00117   return CMI_PPCQ_SUCCESS;
00118 }
00119 
00120 void * PPCAtomicDequeue (PPCAtomicQueue    *queue)
00121 {
00122   ppc_atomic_type_t head, tail;
00123   tail = PPC_AQVal(queue->_state->Producer);
00124   head = queue->_consumer;
00125   int qsize_1 = queue->_qsize-1;
00126 
00127   volatile void *e = NULL;
00128   if (head < tail) {
00129     e = queue->_array[head & qsize_1];
00130     if (e == NULL)
00131       return NULL;
00132 
00133     queue->_array[head & qsize_1] = NULL;
00134     PPC_AtomicReadFence();
00135 
00136     head ++;
00137     queue->_consumer = head;
00138 
00139     
00140     
00141     ppc_atomic_type_t n = head + queue->_qsize;
00142 
00143     
00144     if ((n & 0xF) == 0)
00145       PPC_AtomicStore(&queue->_state->UpperBound, n);
00146     return (void*) e;
00147   }
00148 
00149   
00150   if (!queue->_useOverflowQ)
00151     return NULL;
00152 
00153   e = PCQueuePop (queue->_overflowQ);
00154   return (void *) e;
00155 }
00156 
00157 int PPCAtomicQueueEmpty (PPCAtomicQueue *queue) {
00158   return ( (PCQueueLength(queue->_overflowQ) == 0) &&
00159       (PPC_AQVal(queue->_state->Producer) == queue->_consumer) );
00160 }
00161 
00162 
00163 
00164 int PPCAtomicQueueSpinWait (PPCAtomicQueue    * queue,
00165                             int                n)
00166 {
00167   if (!PPCAtomicQueueEmpty(queue))
00168     return 0;  
00169 
00170   ppc_atomic_type_t head, tail;
00171   head = queue->_consumer;
00172 
00173   size_t i = n;
00174   do {
00175     tail = PPC_AQVal(queue->_state->Producer);
00176     i--;
00177   }
00178   
00179   while (head == tail && i != 0);
00180 
00181   return 0; 
00182 }
00183 
00184 
00185 
00186 int PPCAtomicQueue2QSpinWait (PPCAtomicQueue    * queue0,
00187                               PPCAtomicQueue    * queue1,
00188                               int                n)
00189 {
00190   if (!PPCAtomicQueueEmpty(queue0))
00191     return 0;  
00192 
00193   if (!PPCAtomicQueueEmpty(queue1))
00194     return 0;  
00195 
00196   ppc_atomic_type_t head0, tail0;
00197   ppc_atomic_type_t head1, tail1;
00198 
00199   head0 = queue0->_consumer;
00200   head1 = queue1->_consumer;
00201 
00202   size_t i = n;
00203   do {
00204     tail0 = PPC_AQVal(queue0->_state->Producer);
00205     tail1 = PPC_AQVal(queue1->_state->Producer);
00206     i --;
00207   } while (head0==tail0 && head1==tail1 && i!=0);
00208 
00209   return 0;
00210 }
00211 
00212 #endif