00001
00027 #include <sys/types.h>
00028 #include <sys/mman.h>
00029 #include <unistd.h>
00030 #include <sys/stat.h>
00031 #include <fcntl.h>
00032 #include <errno.h>
00033
00034 #include <sys/shm.h>
00035 #include <sys/sem.h>
00036
00037 #define SYSVSHM_BITS 5
00038 #define SYSVSHM_SIZE (1<<SYSVSHM_BITS)
00039
00040 #define MEMDEBUG(x)
00041
00042 #define SYSVSHM_STATS 0
00043
00044 #define ACQUIRENW(i) sb.sem_num=i; sb.sem_op=-1; sb.sem_flg=IPC_NOWAIT
00045 #define ACQUIRE(i) sb.sem_num=i; sb.sem_op=-1; sb.sem_flg=SEM_UNDO
00046 #define RELEASE(i) sb.sem_num=i; sb.sem_op=1; sb.sem_flg=SEM_UNDO
00047
00048 #define TESTARRAY 1
00049
00050 #define SNDBITS(i) ((sysvshmContext->nodestart+i)%SYSVSHM_SIZE)
00051 #define RCVBITS(i) ((sysvshmContext->nodestart+sysvshmContext->noderank)%SYSVSHM_SIZE)
00052 #define PIDBITS Cmi_charmrun_pid%(sizeof(int)-2*SYSVSHM_SIZE)
00053 #define SHMSNDNAME(i) (PIDBITS<<(SYSVSHM_BITS*2))+(RCVBITS(i)<<SYSVSHM_BITS)+SNDBITS(i)
00054 #define SHMRCVNAME(i) (PIDBITS<<(SYSVSHM_BITS*2))+(SNDBITS(i)<<SYSVSHM_BITS)+RCVBITS(i)
00055 #define SEMSNDNAME(i) (PIDBITS<<(SYSVSHM_BITS*2))+SNDBITS(i)
00056 #define SEMRCVNAME(i) (PIDBITS<<(SYSVSHM_BITS*2))+RCVBITS(i)
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 #define SHMBUFLEN 1000000
00069
00070 #define SENDQSTARTSIZE 128
00071
00072
00073
00074 typedef struct {
00075 int count;
00076 int bytes;
00077
00078 } sharedBufHeader;
00079
00080
00081 typedef struct {
00082 int semid;
00083 int shmid;
00084 sharedBufHeader *header;
00085 char *data;
00086 } sharedBufData;
00087
00088 typedef struct {
00089 int size;
00090 int begin;
00091 int end;
00092 int numEntries;
00093
00094 OutgoingMsg *data;
00095
00096 } SysvshmSendQ;
00097
00098 typedef struct {
00099 int nodesize;
00100 int noderank;
00101 int nodestart,nodeend;
00102
00103 ushort *semarray;
00104
00105 int *sendbufnames;
00106 int *recvbufnames;
00107
00108 sharedBufData *recvBufs;
00109 sharedBufData *sendBufs;
00110
00111 SysvshmSendQ **sendQs;
00112
00113
00114 #if SYSVSHM_STATS
00115 int sendCount;
00116 int validCheckCount;
00117 int lockRecvCount;
00118 double validCheckTime;
00119 double sendTime;
00120 double commServerTime;
00121 #endif
00122
00123 } SysvshmContext;
00124
00125 SysvshmContext *sysvshmContext=NULL;
00126
00127
00128 void calculateNodeSizeAndRank(char **);
00129 void setupSharedBuffers();
00130 void initAllSendQs();
00131
00132
00133
00134
00135
00136 void CmiInitSysvshm(char **argv){
00137 MACHSTATE(3,"CminitSysvshm start");
00138 sysvshmContext = (SysvshmContext *)malloc(sizeof(SysvshmContext));
00139
00140 if(Cmi_charmrun_pid <= 0){
00141 CmiAbort("sysvshm must be run with charmrun");
00142 }
00143 calculateNodeSizeAndRank(argv);
00144 if(sysvshmContext->nodesize == 1){
00145 return;
00146 }
00147 MACHSTATE1(3,"CminitSysvshm %d calculateNodeSizeAndRank",sysvshmContext->nodesize);
00148
00149 setupSharedBuffers();
00150
00151 MACHSTATE2(3,"CminitSysvshm %d %d setupSharedBuffers",Cmi_charmrun_pid,sysvshmContext->nodesize);
00152
00153 initAllSendQs();
00154
00155 MACHSTATE2(3,"CminitSysvshm %d %d initAllSendQs",Cmi_charmrun_pid,sysvshmContext->nodesize);
00156
00157 MACHSTATE2(3,"CminitSysvshm %d %d done",Cmi_charmrun_pid,sysvshmContext->nodesize);
00158
00159
00160 #if SYSVSHM_STATS
00161 sysvshmContext->sendCount=0;
00162 sysvshmContext->sendTime=0.0;
00163 sysvshmContext->validCheckCount=0;
00164 sysvshmContext->validCheckTime=0.0;
00165 sysvshmContext->commServerTime = 0;
00166 sysvshmContext->lockRecvCount = 0;
00167 #endif
00168
00169 };
00170
00171
00172
00173
00174
00175 void tearDownSharedBuffers();
00176
00177 void CmiExitSysvshm(){
00178 int i=0;
00179
00180 if(sysvshmContext->nodesize != 1){
00181 tearDownSharedBuffers();
00182
00183
00184 free(sysvshmContext->recvbufnames);
00185 free(sysvshmContext->sendbufnames);
00186 free(sysvshmContext->semarray);
00187
00188 free(sysvshmContext->recvBufs);
00189 free(sysvshmContext->sendBufs);
00190 }
00191 #if SYSVSHM_STATS
00192 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",
00193 _Cmi_mynode,sysvshmContext->sendCount,sysvshmContext->sendTime,sysvshmContext->validCheckCount,sysvshmContext->validCheckTime,sysvshmContext->commServerTime,sysvshmContext->lockRecvCount);
00194 #endif
00195 free(sysvshmContext);
00196 }
00197
00198
00199
00200
00201
00202 inline int CmiValidSysvshm(OutgoingMsg ogm, OtherNode node){
00203 #if SYSVSHM_STATS
00204 sysvshmContext->validCheckCount++;
00205 #endif
00206
00207 if(ogm->dst >= sysvshmContext->nodestart && ogm->dst <= sysvshmContext->nodeend && ogm->size < SHMBUFLEN ){
00208 return 1;
00209 }else{
00210 return 0;
00211 }
00212 };
00213
00214
00215 inline int SysvshmRank(int dst){
00216 return dst - sysvshmContext->nodestart;
00217 }
00218 inline void pushSendQ(SysvshmSendQ *q,OutgoingMsg msg);
00219 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,SysvshmSendQ *dstSendQ);
00220 inline int flushSendQ(int dstRank);
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 void CmiSendMessageSysvshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
00231 struct sembuf sb;
00232
00233 #if SYSVSHM_STATS
00234 double _startSendTime = CmiWallTimer();
00235 #endif
00236
00237
00238 int dstRank = SysvshmRank(ogm->dst);
00239 MEMDEBUG(CmiMemoryCheck());
00240
00241 DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
00242
00243
00244 MACHSTATE4(3,"Send Msg Sysvshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00245
00246 CmiAssert(dstRank >=0 && dstRank != sysvshmContext->noderank);
00247
00248 sharedBufData *dstBuf = &(sysvshmContext->sendBufs[dstRank]);
00249
00250 ACQUIRENW(sysvshmContext->noderank);
00251 if(semop(dstBuf->semid, &sb, 1)<0) {
00255 pushSendQ(sysvshmContext->sendQs[dstRank],ogm);
00256 ogm->refcount++;
00257 MEMDEBUG(CmiMemoryCheck());
00258 return;
00259 }else{
00260
00261
00262
00263
00264 if(sysvshmContext->sendQs[dstRank]->numEntries == 0){
00265
00266 int ret = sendMessage(ogm,dstBuf,sysvshmContext->sendQs[dstRank]);
00267 MACHSTATE(3,"Sysvshm Send succeeded immediately");
00268 }else{
00269 ogm->refcount+=2;
00270 pushSendQ(sysvshmContext->sendQs[dstRank],ogm);
00271 MACHSTATE3(3,"Sysvshm ogm %p pushed to sendQ length %d refcount %d",ogm,sysvshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
00272 int sent = flushSendQ(dstRank);
00273 ogm->refcount--;
00274 MACHSTATE1(3,"Sysvshm flushSendQ sent %d messages",sent);
00275 }
00276
00277 RELEASE(sysvshmContext->noderank);
00278 CmiAssert(semop(dstBuf->semid, &sb, 1)>=0);
00279 }
00280 #if SYSVSHM_STATS
00281 sysvshmContext->sendCount ++;
00282 sysvshmContext->sendTime += (CmiWallTimer()-_startSendTime);
00283 #endif
00284 MEMDEBUG(CmiMemoryCheck());
00285
00286 };
00287
00288 inline void emptyAllRecvBufs();
00289 inline void flushAllSendQs();
00290
00291
00292
00293
00294
00295 inline void CommunicationServerSysvshm(){
00296
00297 #if SYSVSHM_STATS
00298 double _startCommServerTime =CmiWallTimer();
00299 #endif
00300
00301 MEMDEBUG(CmiMemoryCheck());
00302 emptyAllRecvBufs();
00303 flushAllSendQs();
00304
00305 #if SYSVSHM_STATS
00306 sysvshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00307 #endif
00308
00309 MEMDEBUG(CmiMemoryCheck());
00310 };
00311
00312 static void CmiNotifyStillIdleSysvshm(CmiIdleState *s){
00313 CommunicationServerSysvshm();
00314 }
00315
00316
00317 static void CmiNotifyBeginIdleSysvshm(CmiIdleState *s)
00318 {
00319 CmiNotifyStillIdle(s);
00320 }
00321
00322
00323 void calculateNodeSizeAndRank(char **argv){
00324 sysvshmContext->nodesize=1;
00325 MACHSTATE(3,"calculateNodeSizeAndRank start");
00326 CmiGetArgIntDesc(argv, "+nodesize", &(sysvshmContext->nodesize),"Number of cores in this node");
00327 MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",sysvshmContext->nodesize);
00328
00329 sysvshmContext->noderank = _Cmi_mynode % (sysvshmContext->nodesize);
00330
00331 MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",sysvshmContext->noderank);
00332
00333 sysvshmContext->nodestart = _Cmi_mynode -sysvshmContext->noderank;
00334
00335 MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00336
00337 sysvshmContext->nodeend = sysvshmContext->nodestart + sysvshmContext->nodesize -1;
00338
00339 if(sysvshmContext->nodeend >= _Cmi_numnodes){
00340 sysvshmContext->nodeend = _Cmi_numnodes-1;
00341 sysvshmContext->nodesize = (sysvshmContext->nodeend - sysvshmContext->nodestart) +1;
00342 }
00343
00344 MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",sysvshmContext->nodestart,sysvshmContext->nodesize,sysvshmContext->noderank);
00345 }
00346
00347 void createShmObjectsAndSems(sharedBufData **bufs, int *bufnames,int issend);
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 void setupSharedBuffers(){
00358 int i=0;
00359
00360 CmiAssert((sysvshmContext->recvbufnames=(int *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00361 CmiAssert((sysvshmContext->sendbufnames=(int *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00362 CmiAssert((sysvshmContext->semarray= (ushort *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00363
00364
00365 for(i=0;i<sysvshmContext->nodesize;i++){
00366 sysvshmContext->semarray[i]=1;
00367 if(i != sysvshmContext->noderank){
00368 sysvshmContext->sendbufnames[i]=SHMSNDNAME(i);
00369 sysvshmContext->recvbufnames[i]=SHMRCVNAME(i);
00370 }
00371 }
00372
00373 createShmObjectsAndSems(&(sysvshmContext->recvBufs),sysvshmContext->recvbufnames,0);
00374 createShmObjectsAndSems(&(sysvshmContext->sendBufs),sysvshmContext->sendbufnames,1);
00375
00376 for(i=0;i<sysvshmContext->nodesize;i++){
00377 if(i != sysvshmContext->noderank){
00378 CmiAssert(sysvshmContext->sendBufs[i].header->count == 0);
00379 sysvshmContext->sendBufs[i].header->count = 0;
00380 sysvshmContext->sendBufs[i].header->bytes = 0;
00381 }
00382 }
00383 }
00384
00385
00386
00387 void createShmObjectsAndSems(sharedBufData **bufs, int *bufnames,int issend) {
00388 int i;
00389 int name;
00390
00391 union semun {
00392 int val;
00393 struct semid_ds *buf;
00394 ushort *array;
00395 } arg;
00396 struct semid_ds seminfo;
00397 arg.array=sysvshmContext->semarray;
00398
00399 *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*sysvshmContext->nodesize);
00400
00401 for(i=0;i<sysvshmContext->nodesize;i++){
00402 if(i != sysvshmContext->noderank){
00403 if(issend)
00404 name=SEMSNDNAME(i);
00405 else
00406 name=SEMRCVNAME(i);
00407
00408 if(((*bufs)[i].semid=semget(name,sysvshmContext->nodesize,0666|IPC_CREAT|IPC_EXCL))>=0) {
00409 CmiAssert((semctl((*bufs)[i].semid,sysvshmContext->nodesize,SETALL,arg))>=0);
00410 } else if(errno==EEXIST) {
00411 CmiAssert((((*bufs)[i].semid)=semget(name,sysvshmContext->nodesize,0666))>=0);
00412 } else {
00413 tearDownSharedBuffers();
00414 CmiPrintf("problem getting sem : %s\n",strerror(errno));
00415 CmiAbort("sem\n");
00416 }
00417
00418 (*bufs)[i].shmid=-1;
00419 (*bufs)[i].shmid=shmget(bufnames[i],SHMBUFLEN+sizeof(sharedBufHeader),0666|IPC_CREAT|IPC_EXCL);
00420 if(errno==EEXIST)
00421 (*bufs)[i].shmid=shmget(bufnames[i],SHMBUFLEN+sizeof(sharedBufHeader),0666);
00422 CmiAssert(((*bufs)[i].shmid)>0);
00423
00424 CmiAssert(((*bufs)[i].header=shmat((*bufs)[i].shmid, (void *)0, 0))>0);
00425 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00426 }else{
00427 (*bufs)[i].shmid=-1;
00428 (*bufs)[i].semid=-1;
00429 (*bufs)[i].header = NULL;
00430 (*bufs)[i].data = NULL;
00431 }
00432 }
00433 }
00434
00435
00436 void tearDownSharedBuffers(){
00437 int i,j,ret;
00438 struct shmid_ds arg;
00439 for(i= 0;i<sysvshmContext->nodesize;i++){
00440 if(i != sysvshmContext->noderank){
00441
00442 shmdt(sysvshmContext->sendBufs[i].header);
00443 shmdt(sysvshmContext->recvBufs[i].header);
00444
00445 shmctl(sysvshmContext->sendBufs[i].shmid,IPC_STAT,&arg);
00446 if(arg.shm_nattch==0) {
00447 shmctl(sysvshmContext->sendBufs[i].shmid,IPC_RMID,NULL);
00448 shmctl(sysvshmContext->recvBufs[i].shmid,IPC_RMID,NULL);
00449 semctl(sysvshmContext->sendBufs[i].semid,0, IPC_RMID,0);
00450 semctl(sysvshmContext->recvBufs[i].semid,0, IPC_RMID,0);
00451 }
00452 }
00453 }
00454 };
00455
00456
00457 void initSendQ(SysvshmSendQ *q,int size);
00458
00459 void initAllSendQs(){
00460 int i=0;
00461 sysvshmContext->sendQs = (SysvshmSendQ **) malloc(sizeof(SysvshmSendQ *)*sysvshmContext->nodesize);
00462 for(i=0;i<sysvshmContext->nodesize;i++){
00463 if(i != sysvshmContext->noderank){
00464 (sysvshmContext->sendQs)[i] = (SysvshmSendQ *)malloc(sizeof(SysvshmSendQ));
00465 initSendQ((sysvshmContext->sendQs)[i],SENDQSTARTSIZE);
00466 }else{
00467 (sysvshmContext->sendQs)[i] = NULL;
00468 }
00469 }
00470 };
00471
00472
00473
00474
00475
00476
00477
00478
00479 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,SysvshmSendQ *dstSendQ){
00480
00481 if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
00483 dstBuf->header->count++;
00484 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
00485 dstBuf->header->bytes += ogm->size;
00486 MACHSTATE4(3,"Sysvshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00487 return 1;
00488 }
00489
00490
00491
00492 printf("send buffer is too full\n");
00493 pushSendQ(dstSendQ,ogm);
00494 ogm->refcount++;
00495 MACHSTATE3(3,"Sysvshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00496 return 0;
00497 }
00498
00499 inline OutgoingMsg popSendQ(SysvshmSendQ *q);
00500
00501
00502
00503
00504
00505
00506 inline int flushSendQ(int dstRank){
00507 sharedBufData *dstBuf = &(sysvshmContext->sendBufs[dstRank]);
00508 SysvshmSendQ *dstSendQ = sysvshmContext->sendQs[dstRank];
00509 int count=dstSendQ->numEntries;
00510 int sent=0;
00511 while(count > 0){
00512 OutgoingMsg ogm = popSendQ(dstSendQ);
00513 ogm->refcount--;
00514 MACHSTATE4(3,"Sysvshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
00515 int ret = sendMessage(ogm,dstBuf,dstSendQ);
00516 if(ret==1){
00517 sent++;
00518 GarbageCollectMsg(ogm);
00519 }
00520 count--;
00521 }
00522 return sent;
00523 }
00524
00525 inline void emptyRecvBuf(sharedBufData *recvBuf);
00526
00527 inline void emptyAllRecvBufs(){
00528 struct sembuf sb;
00529 int i;
00530 int j,ret;
00531 union semun {
00532 int val;
00533 struct semid_ds *buf;
00534 ushort array[1];
00535 } arg;
00536 for(i=0;i<sysvshmContext->nodesize;i++){
00537 if(i != sysvshmContext->noderank){
00538 sharedBufData *recvBuf = &(sysvshmContext->recvBufs[i]);
00539 if(recvBuf->header->count > 0){
00540
00541 #if SYSVSHM_STATS
00542 sysvshmContext->lockRecvCount++;
00543 #endif
00544
00545 ACQUIRE(i);
00546 if(semop(recvBuf->semid, &sb, 1)>=0) {
00547 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00548 emptyRecvBuf(recvBuf);
00549 RELEASE(i);
00550 CmiAssert((semop(recvBuf->semid, &sb, 1))>=0);
00551 }
00552
00553 }
00554 }
00555 }
00556 };
00557
00558 inline void flushAllSendQs(){
00559 struct sembuf sb;
00560 int i=0;
00561
00562 for(i=0;i<sysvshmContext->nodesize;i++){
00563 if(i != sysvshmContext->noderank && sysvshmContext->sendQs[i]->numEntries > 0){
00564 ACQUIRE(sysvshmContext->noderank);
00565 if(semop(sysvshmContext->sendBufs[i].semid, &sb, 1)>=0) {
00566 MACHSTATE1(3,"flushSendQ %d",i);
00567 flushSendQ(i);
00568 RELEASE(sysvshmContext->noderank);
00569 CmiAssert(semop(sysvshmContext->sendBufs[i].semid, &sb, 1)>=0);
00570 }
00571
00572 }
00573 }
00574 };
00575
00576 void static inline handoverSysvshmMessage(char *newmsg,int total_size,int rank,int broot);
00577
00578 void emptyRecvBuf(sharedBufData *recvBuf){
00579 int numMessages = recvBuf->header->count;
00580 int i=0;
00581
00582 char *ptr=recvBuf->data;
00583
00584 for(i=0;i<numMessages;i++){
00585 int size;
00586 int rank, srcpe, seqno, magic, i;
00587 unsigned int broot;
00588 char *msg = ptr;
00589 char *newMsg;
00590
00591 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00592 size = CmiMsgHeaderGetLength(msg);
00593
00594 newMsg = (char *)CmiAlloc(size);
00595 memcpy(newMsg,msg,size);
00596
00597 handoverSysvshmMessage(newMsg,size,rank,broot);
00598
00599 ptr += size;
00600
00601 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00602 }
00603 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00604 recvBuf->header->count=0;
00605 recvBuf->header->bytes=0;
00606 }
00607
00608
00609 void static inline handoverSysvshmMessage(char *newmsg,int total_size,int rank,int broot){
00610 CmiAssert(rank == 0);
00611 #if CMK_BROADCAST_SPANNING_TREE
00612 if (rank == DGRAM_BROADCAST
00613 #if CMK_NODE_QUEUE_AVAILABLE
00614 || rank == DGRAM_NODEBROADCAST
00615 #endif
00616 ){
00617 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00618 }
00619 #elif CMK_BROADCAST_HYPERCUBE
00620 if (rank == DGRAM_BROADCAST
00621 #if CMK_NODE_QUEUE_AVAILABLE
00622 || rank == DGRAM_NODEBROADCAST
00623 #endif
00624 ){
00625 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00626 }
00627 #endif
00628
00629 switch (rank) {
00630 case DGRAM_BROADCAST: {
00631 CmiPushPE(0, newmsg);
00632 break;
00633 }
00634 default:
00635 {
00636
00637 CmiPushPE(rank, newmsg);
00638 }
00639 }
00640 }
00641
00642
00643
00644
00645
00646
00647 void initSendQ(SysvshmSendQ *q,int size){
00648 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
00649
00650 q->size = size;
00651 q->numEntries = 0;
00652
00653 q->begin = 0;
00654 q->end = 0;
00655 }
00656
00657 void pushSendQ(SysvshmSendQ *q,OutgoingMsg msg){
00658 if(q->numEntries == q->size){
00659
00660 OutgoingMsg *oldData = q->data;
00661 int newSize = q->size<<1;
00662 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
00663
00664
00665 CmiAssert(q->begin == q->end);
00666
00667 CmiAssert(q->begin < q->size);
00668 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
00669
00670 if(q->end != 0){
00671 memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
00672 }
00673 free(oldData);
00674 q->begin = 0;
00675 q->end = q->size;
00676 q->size = newSize;
00677 }
00678 q->data[q->end] = msg;
00679 (q->end)++;
00680 if(q->end >= q->size){
00681 q->end -= q->size;
00682 }
00683 q->numEntries++;
00684 }
00685
00686 OutgoingMsg popSendQ(SysvshmSendQ *q){
00687 OutgoingMsg ret;
00688 if(0 == q->numEntries){
00689 return NULL;
00690 }
00691
00692 ret = q->data[q->begin];
00693 (q->begin)++;
00694 if(q->begin >= q->size){
00695 q->begin -= q->size;
00696 }
00697
00698 q->numEntries--;
00699 return ret;
00700 }