00001
00031 #include <sys/types.h>
00032 #include <sys/mman.h>
00033 #include <unistd.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 #include <errno.h>
00037
00038
00039
00040
00041 #if PXSHM_OSSPINLOCK
00042 #include <libkern/OSAtomic.h>
00043 #elif PXSHM_LOCK
00044 #include <semaphore.h>
00045 #else
00046
00047 #define PXSHM_FENCE 1
00048 #endif
00049
00050
00051 #define MEMDEBUG(x) //x
00052
00053 #define PXSHM_STATS 0
00054
00055
00056
00057 #if PXSHM_FENCE
00058 #ifdef POWER_PC
00059 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00060 #else
00061 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
00062
00063 #endif
00064 #else
00065 #define CmiMemoryWriteFence(startPtr,nBytes)
00066 #endif
00067
00068 #if PXSHM_FENCE
00069 #ifdef POWER_PC
00070 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00071 #else
00072 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
00073
00074 #endif
00075 #else
00076 #define CmiMemoryReadFence(startPtr,nBytes)
00077 #endif
00078
00079
00080
00081 enum entities {SENDER,RECEIVER};
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 #define NAMESTRLEN 50
00093 #define PREFIXSTRLEN 30
00094
00095 #define SHMBUFLEN 1000000
00096
00097 #define SENDQSTARTSIZE 128
00098
00099
00101 typedef struct {
00102 int count;
00103 int bytes;
00104
00105 #if PXSHM_OSSPINLOCK
00106 OSSpinLock lock;
00107 #endif
00108
00109 #if PXSHM_FENCE
00110 volatile int flagSender;
00111 volatile int flagReceiver;
00112 volatile int turn;
00113 #endif
00114
00115 } sharedBufHeader;
00116
00117
00118 typedef struct {
00119 #if PXSHM_LOCK
00120 sem_t *mutex;
00121 #endif
00122 sharedBufHeader *header;
00123 char *data;
00124 } sharedBufData;
00125
00126 typedef struct {
00127 int size;
00128 int begin;
00129 int end;
00130 int numEntries;
00131
00132 OutgoingMsg *data;
00133
00134 } PxshmSendQ;
00135
00136 typedef struct {
00137 int nodesize;
00138 int noderank;
00139 int nodestart,nodeend;
00140 char prefixStr[PREFIXSTRLEN];
00141 char **recvBufNames;
00142 char **sendBufNames;
00143
00144 sharedBufData *recvBufs;
00145 sharedBufData *sendBufs;
00146
00147 PxshmSendQ **sendQs;
00148
00149
00150 #if PXSHM_STATS
00151 int sendCount;
00152 int validCheckCount;
00153 int lockRecvCount;
00154 double validCheckTime;
00155 double sendTime;
00156 double commServerTime;
00157 #endif
00158
00159 } PxshmContext;
00160
00161
00162
00163 PxshmContext *pxshmContext=NULL;
00164
00165
00166 void calculateNodeSizeAndRank(char **);
00167 void setupSharedBuffers();
00168 void initAllSendQs();
00169
00170
00171
00172
00173
00174 void CmiInitPxshm(char **argv){
00175 MACHSTATE(3,"CminitPxshm start");
00176 pxshmContext = (PxshmContext *)malloc(sizeof(PxshmContext));
00177
00178 if(Cmi_charmrun_pid <= 0){
00179 CmiAbort("pxshm must be run with charmrun");
00180 }
00181 calculateNodeSizeAndRank(argv);
00182 if(pxshmContext->nodesize == 1){
00183 return;
00184 }
00185
00186 MACHSTATE1(3,"CminitPxshm %d calculateNodeSizeAndRank",pxshmContext->nodesize);
00187
00188 snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
00189
00190
00191 MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00192
00193 setupSharedBuffers();
00194
00195 MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00196
00197
00198 initAllSendQs();
00199
00200 MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
00201
00202 MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
00203
00204
00205 #if PXSHM_STATS
00206 pxshmContext->sendCount=0;
00207 pxshmContext->sendTime=0.0;
00208 pxshmContext->validCheckCount=0;
00209 pxshmContext->validCheckTime=0.0;
00210 pxshmContext->commServerTime = 0;
00211 pxshmContext->lockRecvCount = 0;
00212 #endif
00213
00214 };
00215
00216
00217
00218
00219
00220 void tearDownSharedBuffers();
00221
00222 void CmiExitPxshm(){
00223 int i=0;
00224
00225 if(pxshmContext->nodesize != 1){
00226 tearDownSharedBuffers();
00227
00228 for(i=0;i<pxshmContext->nodesize;i++){
00229 if(i != pxshmContext->noderank){
00230 break;
00231 }
00232 free(pxshmContext->recvBufNames[i]);
00233 free(pxshmContext->sendBufNames[i]);
00234 }
00235 free(pxshmContext->recvBufNames);
00236 free(pxshmContext->sendBufNames);
00237
00238 free(pxshmContext->recvBufs);
00239 free(pxshmContext->sendBufs);
00240
00241 }
00242 #if PXSHM_STATS
00243 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
00244 #endif
00245 free(pxshmContext);
00246 }
00247
00248
00249
00250
00251
00252 inline int CmiValidPxshm(OutgoingMsg ogm, OtherNode node){
00253 #if PXSHM_STATS
00254 pxshmContext->validCheckCount++;
00255 #endif
00256
00257
00258
00259
00260
00261 if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
00262 return 1;
00263 }else{
00264 return 0;
00265 }
00266 };
00267
00268
00269 inline int PxshmRank(int dst){
00270 return dst - pxshmContext->nodestart;
00271 }
00272 inline void pushSendQ(PxshmSendQ *q,OutgoingMsg msg);
00273 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
00274 inline int flushSendQ(int dstRank);
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284 void CmiSendMessagePxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
00285
00286
00287 #if PXSHM_STATS
00288 double _startSendTime = CmiWallTimer();
00289 #endif
00290
00291
00292 int dstRank = PxshmRank(ogm->dst);
00293 MEMDEBUG(CmiMemoryCheck());
00294
00295 DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
00296
00297
00298 MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00299
00300 CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
00301
00302 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00303
00304
00305 #if PXSHM_OSSPINLOCK
00306 if(! OSSpinLockTry(&dstBuf->header->lock)){
00307 #elif PXSHM_LOCK
00308 if(sem_trywait(dstBuf->mutex) < 0){
00309 #elif PXSHM_FENCE
00310 dstBuf->header->flagSender = 1;
00311 dstBuf->header->turn = RECEIVER;
00312 CmiMemoryReadFence(0,0);
00313 CmiMemoryWriteFence(0,0);
00314 if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
00315 dstBuf->header->flagSender = 0;
00316 #endif
00317
00321 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
00322 ogm->refcount++;
00323 MEMDEBUG(CmiMemoryCheck());
00324 return;
00325 }else{
00326
00327
00328
00329
00330
00331 if(pxshmContext->sendQs[dstRank]->numEntries == 0){
00332
00333 int ret = sendMessage(ogm,dstBuf,pxshmContext->sendQs[dstRank]);
00334 MACHSTATE(3,"Pxshm Send succeeded immediately");
00335 }else{
00336 ogm->refcount+=2;
00337 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
00338 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
00339 int sent = flushSendQ(dstRank);
00340 ogm->refcount--;
00341 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
00342 }
00343
00344
00345 #if PXSHM_OSSPINLOCK
00346 OSSpinLockUnlock(&dstBuf->header->lock);
00347 #elif PXSHM_LOCK
00348 sem_post(dstBuf->mutex);
00349 #elif PXSHM_FENCE
00350 CmiMemoryReadFence(0,0);
00351 CmiMemoryWriteFence(0,0);
00352 dstBuf->header->flagSender = 0;
00353 #endif
00354 }
00355 #if PXSHM_STATS
00356 pxshmContext->sendCount ++;
00357 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
00358 #endif
00359 MEMDEBUG(CmiMemoryCheck());
00360
00361 };
00362
00363 inline void emptyAllRecvBufs();
00364 inline void flushAllSendQs();
00365
00366
00367
00368
00369
00370 inline void CommunicationServerPxshm(){
00371
00372 #if PXSHM_STATS
00373 double _startCommServerTime =CmiWallTimer();
00374 #endif
00375
00376 MEMDEBUG(CmiMemoryCheck());
00377 emptyAllRecvBufs();
00378 flushAllSendQs();
00379
00380 #if PXSHM_STATS
00381 pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00382 #endif
00383
00384 MEMDEBUG(CmiMemoryCheck());
00385 };
00386
00387 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
00388 CommunicationServerPxshm();
00389 }
00390
00391
00392 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
00393 {
00394 CmiNotifyStillIdle(s);
00395 }
00396
00397
00398 void calculateNodeSizeAndRank(char **argv){
00399 pxshmContext->nodesize=1;
00400 MACHSTATE(3,"calculateNodeSizeAndRank start");
00401
00402 CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
00403 MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
00404
00405 pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
00406
00407 MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
00408
00409 pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
00410
00411 MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00412
00413 pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
00414
00415 if(pxshmContext->nodeend >= _Cmi_numnodes){
00416 pxshmContext->nodeend = _Cmi_numnodes-1;
00417 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
00418 }
00419
00420 MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
00421 }
00422
00423 void allocBufNameStrings(char ***bufName);
00424 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 void setupSharedBuffers(){
00437 int i=0;
00438
00439 allocBufNameStrings(&(pxshmContext->recvBufNames));
00440
00441 MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
00442 MEMDEBUG(CmiMemoryCheck());
00443
00444 allocBufNameStrings((&pxshmContext->sendBufNames));
00445
00446 MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
00447
00448 for(i=0;i<pxshmContext->nodesize;i++){
00449 if(i != pxshmContext->noderank){
00450 snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
00451 MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
00452 snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
00453 MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
00454 }
00455 }
00456
00457 createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
00458 createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
00459
00460 for(i=0;i<pxshmContext->nodesize;i++){
00461 if(i != pxshmContext->noderank){
00462 CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
00463 pxshmContext->sendBufs[i].header->count = 0;
00464 pxshmContext->sendBufs[i].header->bytes = 0;
00465 }
00466 }
00467 }
00468
00469 void allocBufNameStrings(char ***bufName){
00470 int i,count;
00471
00472 int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
00473 char *tmp = malloc(totalAlloc);
00474
00475 MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
00476
00477 *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
00478
00479 for(i=0,count=0;i<pxshmContext->nodesize;i++){
00480 if(i != pxshmContext->noderank){
00481 (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
00482 count++;
00483 }else{
00484 (*bufName)[i] = NULL;
00485 }
00486 }
00487 }
00488
00489 void createShmObject(char *name,int size,char **pPtr);
00490
00491 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
00492 int i=0;
00493
00494 *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*pxshmContext->nodesize);
00495
00496 for(i=0;i<pxshmContext->nodesize;i++){
00497 if(i != pxshmContext->noderank){
00498 createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
00499 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00500 #if PXSHM_OSSPINLOCK
00501 (*bufs)[i].header->lock = 0;
00502 #elif PXSHM_LOCK
00503 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00504 #endif
00505 }else{
00506 (*bufs)[i].header = NULL;
00507 (*bufs)[i].data = NULL;
00508 #if PXSHM_LOCK
00509 (*bufs)[i].mutex = NULL;
00510 #endif
00511 }
00512 }
00513 }
00514
00515
00516
00517 void createShmObject(char *name,int size,char **pPtr){
00518 int fd=-1;
00519 int flags;
00520 int open_repeat_count = 0;
00521
00522 flags= O_RDWR | O_CREAT;
00523
00524 while(fd<0 && open_repeat_count < 100){
00525 open_repeat_count++;
00526 fd = shm_open(name,flags, S_IRUSR | S_IWUSR);
00527
00528 if(fd < 0 && open_repeat_count > 10){
00529 fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
00530 fflush(stderr);
00531 }
00532 }
00533
00534 CmiAssert(fd >= 0);
00535
00536 ftruncate(fd,size);
00537
00538 *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
00539 CmiAssert(*pPtr != NULL);
00540
00541 close(fd);
00542 }
00543
00544 void tearDownSharedBuffers(){
00545 int i;
00546 for(i= 0;i<pxshmContext->nodesize;i++){
00547 if(i != pxshmContext->noderank){
00548 if( shm_unlink(pxshmContext->recvBufNames[i]) < 0){
00549 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
00550 }
00551
00552 #if PXSHM_LOCK
00553 sem_close(pxshmContext->recvBufs[i].mutex);
00554 sem_unlink(pxshmContext->recvBufNames[i]);
00555 sem_close(pxshmContext->sendBufs[i].mutex);
00556 #endif
00557 }
00558 }
00559 };
00560
00561
00562 void initSendQ(PxshmSendQ *q,int size);
00563
00564 void initAllSendQs(){
00565 int i=0;
00566 pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
00567 for(i=0;i<pxshmContext->nodesize;i++){
00568 if(i != pxshmContext->noderank){
00569 (pxshmContext->sendQs)[i] = (PxshmSendQ *)malloc(sizeof(PxshmSendQ));
00570 initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
00571 }else{
00572 (pxshmContext->sendQs)[i] = NULL;
00573 }
00574 }
00575 };
00576
00577
00578
00579
00580
00581
00582
00583
00584 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
00585
00586 if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
00588 dstBuf->header->count++;
00589 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
00590 dstBuf->header->bytes += ogm->size;
00591 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00592 return 1;
00593 }
00594
00595
00596
00597 printf("send buffer is too full\n");
00598 pushSendQ(dstSendQ,ogm);
00599 ogm->refcount++;
00600 MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00601 return 0;
00602 }
00603
00604 inline OutgoingMsg popSendQ(PxshmSendQ *q);
00605
00606
00607
00608
00609
00610
00611 inline int flushSendQ(int dstRank){
00612 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00613 PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
00614 int count=dstSendQ->numEntries;
00615 int sent=0;
00616 while(count > 0){
00617 OutgoingMsg ogm = popSendQ(dstSendQ);
00618 ogm->refcount--;
00619 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
00620 int ret = sendMessage(ogm,dstBuf,dstSendQ);
00621 if(ret==1){
00622 sent++;
00623 GarbageCollectMsg(ogm);
00624 }
00625 count--;
00626 }
00627 return sent;
00628 }
00629
00630 inline void emptyRecvBuf(sharedBufData *recvBuf);
00631
00632 inline void emptyAllRecvBufs(){
00633 int i;
00634 for(i=0;i<pxshmContext->nodesize;i++){
00635 if(i != pxshmContext->noderank){
00636 sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
00637 if(recvBuf->header->count > 0){
00638
00639 #if PXSHM_STATS
00640 pxshmContext->lockRecvCount++;
00641 #endif
00642
00643
00644 #if PXSHM_OSSPINLOCK
00645 if(! OSSpinLockTry(&recvBuf->header->lock)){
00646 #elif PXSHM_LOCK
00647 if(sem_trywait(recvBuf->mutex) < 0){
00648 #elif PXSHM_FENCE
00649 recvBuf->header->flagReceiver = 1;
00650 recvBuf->header->turn = SENDER;
00651 CmiMemoryReadFence(0,0);
00652 CmiMemoryWriteFence(0,0);
00653 if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
00654 recvBuf->header->flagReceiver = 0;
00655 #endif
00656 }else{
00657
00658
00659 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00660 emptyRecvBuf(recvBuf);
00661
00662 #if PXSHM_OSSPINLOCK
00663 OSSpinLockUnlock(&recvBuf->header->lock);
00664 #elif PXSHM_LOCK
00665 sem_post(recvBuf->mutex);
00666 #elif PXSHM_FENCE
00667 CmiMemoryReadFence(0,0);
00668 CmiMemoryWriteFence(0,0);
00669 recvBuf->header->flagReceiver = 0;
00670 #endif
00671
00672 }
00673
00674 }
00675 }
00676 }
00677 };
00678
00679 inline void flushAllSendQs(){
00680 int i=0;
00681
00682 for(i=0;i<pxshmContext->nodesize;i++){
00683 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
00684
00685 #if PXSHM_OSSPINLOCK
00686 if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
00687 #elif PXSHM_LOCK
00688 if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
00689 #elif PXSHM_FENCE
00690 pxshmContext->sendBufs[i].header->flagSender = 1;
00691 pxshmContext->sendBufs[i].header->turn = RECEIVER;
00692 CmiMemoryReadFence(0,0);
00693 CmiMemoryWriteFence(0,0);
00694 if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
00695 #endif
00696
00697 MACHSTATE1(3,"flushSendQ %d",i);
00698 flushSendQ(i);
00699
00700
00701
00702 #if PXSHM_OSSPINLOCK
00703 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
00704 #elif PXSHM_LOCK
00705 sem_post(pxshmContext->sendBufs[i].mutex);
00706 #elif PXSHM_FENCE
00707 CmiMemoryReadFence(0,0);
00708 CmiMemoryWriteFence(0,0);
00709 pxshmContext->sendBufs[i].header->flagSender = 0;
00710 #endif
00711 }else{
00712
00713 #if PXSHM_FENCE
00714 pxshmContext->sendBufs[i].header->flagSender = 0;
00715 #endif
00716
00717 }
00718
00719 }
00720 }
00721 };
00722
00723 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
00724
00725 void emptyRecvBuf(sharedBufData *recvBuf){
00726 int numMessages = recvBuf->header->count;
00727 int i=0;
00728
00729 char *ptr=recvBuf->data;
00730
00731 for(i=0;i<numMessages;i++){
00732 int size;
00733 int rank, srcpe, seqno, magic, i;
00734 unsigned int broot;
00735 char *msg = ptr;
00736 char *newMsg;
00737
00738 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00739 size = CmiMsgHeaderGetLength(msg);
00740
00741 newMsg = (char *)CmiAlloc(size);
00742 memcpy(newMsg,msg,size);
00743
00744 handoverPxshmMessage(newMsg,size,rank,broot);
00745
00746 ptr += size;
00747
00748 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00749 }
00750
00751
00752
00753
00754 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00755 recvBuf->header->count=0;
00756 recvBuf->header->bytes=0;
00757 }
00758
00759
00760 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
00761 CmiAssert(rank == 0);
00762 #if CMK_BROADCAST_SPANNING_TREE
00763 if (rank == DGRAM_BROADCAST
00764 #if CMK_NODE_QUEUE_AVAILABLE
00765 || rank == DGRAM_NODEBROADCAST
00766 #endif
00767 ){
00768 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00769 }
00770 #elif CMK_BROADCAST_HYPERCUBE
00771 if (rank == DGRAM_BROADCAST
00772 #if CMK_NODE_QUEUE_AVAILABLE
00773 || rank == DGRAM_NODEBROADCAST
00774 #endif
00775 ){
00776 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00777 }
00778 #endif
00779
00780 switch (rank) {
00781 case DGRAM_BROADCAST: {
00782 CmiPushPE(0, newmsg);
00783 break;
00784 }
00785 default:
00786 {
00787
00788 CmiPushPE(rank, newmsg);
00789 }
00790 }
00791 }
00792
00793
00794
00795
00796
00797
00798 void initSendQ(PxshmSendQ *q,int size){
00799 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
00800
00801 q->size = size;
00802 q->numEntries = 0;
00803
00804 q->begin = 0;
00805 q->end = 0;
00806 }
00807
00808 void pushSendQ(PxshmSendQ *q,OutgoingMsg msg){
00809 if(q->numEntries == q->size){
00810
00811 OutgoingMsg *oldData = q->data;
00812 int newSize = q->size<<1;
00813 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
00814
00815
00816 CmiAssert(q->begin == q->end);
00817
00818 CmiAssert(q->begin < q->size);
00819 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
00820
00821 if(q->end != 0){
00822 memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
00823 }
00824 free(oldData);
00825 q->begin = 0;
00826 q->end = q->size;
00827 q->size = newSize;
00828 }
00829 q->data[q->end] = msg;
00830 (q->end)++;
00831 if(q->end >= q->size){
00832 q->end -= q->size;
00833 }
00834 q->numEntries++;
00835 }
00836
00837 OutgoingMsg popSendQ(PxshmSendQ *q){
00838 OutgoingMsg ret;
00839 if(0 == q->numEntries){
00840 return NULL;
00841 }
00842
00843 ret = q->data[q->begin];
00844 (q->begin)++;
00845 if(q->begin >= q->size){
00846 q->begin -= q->size;
00847 }
00848
00849 q->numEntries--;
00850 return ret;
00851 }