arch/util/pcqueue.h

Go to the documentation of this file.
00001 
00018 #ifndef __PCQUEUE__
00019 #define __PCQUEUE__
00020 
00021 
00022 /*****************************************************************************
00023  * #define CMK_PCQUEUE_LOCK
00024  * PCQueue doesn't need any lock, the lock here is only 
00025  * for debugging and testing purpose! it only make sense in smp version
00026  ****************************************************************************/
00027 /*#define CMK_PCQUEUE_LOCK  1 */
00028 
00029 #if CMK_PCQUEUE_LOCK || !defined(CMK_USE_MFENCE) || !CMK_USE_MFENCE
00030 #undef smp_rmb
00031 #undef smp_wmb
00032 #define smp_rmb()
00033 #define smp_wmb()
00034 #endif
00035 
00036 #define PCQueueSize 0x100
00037 
00042 typedef struct CmiMemorySMPSeparation_t {
00043         unsigned char padding[128];
00044 } CmiMemorySMPSeparation_t;
00045 
00046 typedef struct CircQueueStruct
00047 {
00048   struct CircQueueStruct *next;
00049   int push;
00050   int pull;
00051   char *data[PCQueueSize];
00052 }
00053 *CircQueue;
00054 
00055 typedef struct PCQueueStruct
00056 {
00057   CircQueue head;
00058   CircQueue tail;
00059   int  len;
00060 #ifdef CMK_PCQUEUE_LOCK
00061   CmiNodeLock  lock;
00062 #endif
00063 }
00064 *PCQueue;
00065 
00066 /* static CircQueue Cmi_freelist_circqueuestruct = 0;
00067    static int freeCount = 0; */
00068 
00069 #define FreeCircQueueStruct(dg) {\
00070   CircQueue d;\
00071   CmiMemLock();\
00072   d=(dg);\
00073   d->next = Cmi_freelist_circqueuestruct;\
00074   Cmi_freelist_circqueuestruct = d;\
00075   freeCount++;\
00076   CmiMemUnlock();\
00077 }
00078 
00079 #if !CMK_XT3
00080 #define MallocCircQueueStruct(dg) {\
00081   CircQueue d;\
00082   CmiMemLock();\
00083   d = Cmi_freelist_circqueuestruct;\
00084   if (d==(CircQueue)0){\
00085     d = ((CircQueue)calloc(1, sizeof(struct CircQueueStruct))); \
00086   }\
00087   else{\
00088     freeCount--;\
00089     Cmi_freelist_circqueuestruct = d->next;\
00090     }\
00091   dg = d;\
00092   CmiMemUnlock();\
00093 }
00094 #else
00095 #define MallocCircQueueStruct(dg) {\
00096   CircQueue d;\
00097   CmiMemLock();\
00098   d = Cmi_freelist_circqueuestruct;\
00099   if (d==(CircQueue)0){\
00100     d = ((CircQueue)malloc(sizeof(struct CircQueueStruct))); \
00101     d = ((CircQueue)memset(d, 0, sizeof(struct CircQueueStruct))); \
00102   }\
00103   else{\
00104     freeCount--;\
00105     Cmi_freelist_circqueuestruct = d->next;\
00106     }\
00107   dg = d;\
00108   CmiMemUnlock();\
00109 }
00110 #endif
00111 
00112 PCQueue PCQueueCreate(void)
00113 {
00114   CircQueue circ;
00115   PCQueue Q;
00116 
00117   /* MallocCircQueueStruct(circ); */
00118 #if !CMK_XT3
00119   circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00120 #else
00121   circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
00122   circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
00123 #endif
00124   Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
00125   _MEMCHECK(Q);
00126   Q->head = circ;
00127   Q->tail = circ;
00128   Q->len = 0;
00129 #ifdef CMK_PCQUEUE_LOCK
00130   Q->lock = CmiCreateLock();
00131 #endif
00132   return Q;
00133 }
00134 
00135 int PCQueueEmpty(PCQueue Q)
00136 {
00137   CircQueue circ = Q->head;
00138   char *data = circ->data[circ->pull];
00139   return (data == 0);
00140 }
00141 
00142 int PCQueueLength(PCQueue Q)
00143 {
00144   return Q->len;
00145 }
00146 
00147 char *PCQueuePop(PCQueue Q)
00148 {
00149   CircQueue circ; int pull; char *data;
00150 
00151 #ifdef CMK_PCQUEUE_LOCK
00152     if (Q->len == 0) return 0;            /* len is accurate when using lock */
00153     CmiLock(Q->lock);
00154 #endif
00155     circ = Q->head;
00156 
00157     smp_rmb();
00158 
00159     pull = circ->pull;
00160     data = circ->data[pull];
00161 #if XT3_ONLY_PCQUEUE_WORKAROUND
00162     if (data && (Q->len > 0)) {
00163 #else
00164     if (data) {
00165 #endif
00166       circ->pull = (pull + 1);
00167       circ->data[pull] = 0;
00168       if (pull == PCQueueSize - 1) { /* just pulled the data from the last slot
00169                                      of this buffer */
00170         smp_rmb();
00171         Q->head = circ-> next; /* next buffer must exist, because "Push"  */
00172         CmiAssert(Q->head != NULL);
00173         
00174         /* FreeCircQueueStruct(circ); */
00175         free(circ);
00176         
00177         /* links in the next buffer *before* filling */
00178                                /* in the last slot. See below. */
00179       }
00180       Q->len --;
00181 #ifdef CMK_PCQUEUE_LOCK
00182       CmiUnlock(Q->lock);
00183 #endif
00184       return data;
00185     }
00186     else { /* queue seems to be empty. The producer may be adding something
00187               to it, but its ok to report queue is empty. */
00188 #ifdef CMK_PCQUEUE_LOCK
00189       CmiUnlock(Q->lock);
00190 #endif
00191       return 0;
00192     }
00193 }
00194 
00195 void PCQueuePush(PCQueue Q, char *data)
00196 {
00197   CircQueue circ, circ1; int push;
00198   
00199 #ifdef CMK_PCQUEUE_LOCK
00200   CmiLock(Q->lock);
00201 #endif
00202   circ1 = Q->tail;
00203   push = circ1->push;
00204   if (push == (PCQueueSize -1)) { /* last slot is about to be filled */
00205     /* this way, the next buffer is linked in before data is filled in 
00206        in the last slot of this buffer */
00207 
00208 #if !CMK_XT3
00209     circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
00210 #else
00211     circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
00212     circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
00213 #endif
00214     /* MallocCircQueueStruct(circ); */
00215 
00216     smp_wmb();
00217 
00218     Q->tail->next = circ;
00219     Q->tail = circ;
00220 
00221     smp_wmb();
00222   }
00223   circ1->data[push] = data;
00224   circ1->push = (push + 1);
00225   Q->len ++;
00226   smp_wmb();
00227 
00228 #ifdef CMK_PCQUEUE_LOCK
00229   CmiUnlock(Q->lock);
00230 #endif
00231 }
00232 
00233 #endif
00234 

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1