00001
00031 #include <sys/types.h>
00032 #include <sys/mman.h>
00033 #include <unistd.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 #include <errno.h>
00037 #include <signal.h>
00038
00039
00040
00041
00042
00043 #if PXSHM_OSSPINLOCK
00044 #include <libkern/OSAtomic.h>
00045 #elif PXSHM_LOCK
00046 #include <semaphore.h>
00047 #else
00048
00049 #define PXSHM_FENCE 1
00050 #endif
00051
00052
00053 #define MEMDEBUG(x) //x
00054
00055 #define PXSHM_STATS 0
00056
00057 #define SENDQ_LIST 0
00058
00059
00060 #if ! CMK_SMP
00061 #undef CmiMemoryWriteFence
00062 #if PXSHM_FENCE
00063 #ifdef POWER_PC
00064 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00065 #else
00066 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
00067
00068 #endif
00069 #else
00070 #undef CmiMemoryWriteFence
00071 #define CmiMemoryWriteFence(startPtr,nBytes)
00072 #endif
00073
00074 #undef CmiMemoryReadFence
00075 #if PXSHM_FENCE
00076 #ifdef POWER_PC
00077 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00078 #else
00079 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
00080
00081 #endif
00082 #else
00083 #define CmiMemoryReadFence(startPtr,nBytes)
00084 #endif
00085
00086 #endif
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096 enum entities {SENDER,RECEIVER};
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107 #define NAMESTRLEN 60
00108 #define PREFIXSTRLEN 50
00109
00110 static int SHMBUFLEN = (1024*1024*4);
00111 static int SHMMAXSIZE = (1024*1024);
00112
00113 static int SENDQSTARTSIZE = 256;
00114
00115
00117 typedef struct {
00118 int count;
00119 int bytes;
00120
00121 #if PXSHM_OSSPINLOCK
00122 OSSpinLock lock;
00123 #endif
00124
00125 #if PXSHM_FENCE
00126 volatile int flagSender;
00127 CmiMemorySMPSeparation_t pad1;
00128 volatile int flagReceiver;
00129 CmiMemorySMPSeparation_t pad2;
00130 volatile int turn;
00131 #endif
00132
00133 } sharedBufHeader;
00134
00135
00136 typedef struct {
00137 #if PXSHM_LOCK
00138 sem_t *mutex;
00139 #endif
00140 sharedBufHeader *header;
00141 char *data;
00142 } sharedBufData;
00143
00144 typedef struct OutgoingMsgRec
00145 {
00146 char *data;
00147 int *refcount;
00148 int size;
00149 }
00150 OutgoingMsgRec;
00151
00152 typedef struct {
00153 int size;
00154 int begin;
00155 int end;
00156 int numEntries;
00157 int rank;
00158 #if SENDQ_LIST
00159 int next;
00160 #endif
00161 OutgoingMsgRec *data;
00162
00163 } PxshmSendQ;
00164
00165 typedef struct {
00166 int nodesize;
00167 int noderank;
00168 int nodestart,nodeend;
00169 char prefixStr[PREFIXSTRLEN];
00170 char **recvBufNames;
00171 char **sendBufNames;
00172
00173 sharedBufData *recvBufs;
00174 sharedBufData *sendBufs;
00175
00176 PxshmSendQ **sendQs;
00177
00178
00179 #if PXSHM_STATS
00180 int sendCount;
00181 int validCheckCount;
00182 int lockRecvCount;
00183 double validCheckTime;
00184 double sendTime;
00185 double commServerTime;
00186 #endif
00187
00188 } PxshmContext;
00189
00190 #if SENDQ_LIST
00191 static int sendQ_head_index = -1;
00192 #endif
00193
00194 PxshmContext *pxshmContext=NULL;
00195
00196
00197 void calculateNodeSizeAndRank(char **);
00198 void setupSharedBuffers();
00199 void initAllSendQs();
00200
00201 void CmiExitPxshm();
00202
00203 static void cleanupOnAllSigs(int signo)
00204 {
00205 CmiExitPxshm();
00206 }
00207
00208
00209
00210
00211
00212 void CmiInitPxshm(char **argv){
00213 char *env;
00214 MACHSTATE(3,"CminitPxshm start");
00215
00216 pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
00217
00218 #if CMK_NET_VERSION
00219 if(Cmi_charmrun_pid <= 0){
00220 CmiAbort("pxshm must be run with charmrun");
00221 }
00222 #endif
00223 calculateNodeSizeAndRank(argv);
00224 if(pxshmContext->nodesize == 1) return;
00225
00226 MACHSTATE1(3,"CminitPxshm %d calculateNodeSizeAndRank",pxshmContext->nodesize);
00227
00228 env = getenv("CHARM_PXSHM_POOL_SIZE");
00229 if (env) {
00230 SHMBUFLEN = CmiReadSize(env);
00231 }
00232 env = getenv("CHARM_PXSHM_MESSAGE_MAX_SIZE");
00233 if (env) {
00234 SHMMAXSIZE = CmiReadSize(env);
00235 }
00236 if (SHMMAXSIZE > SHMBUFLEN)
00237 CmiAbort("Error> Pxshm pool size is set too small in env variable CHARM_PXSHM_POOL_SIZE");
00238
00239 SENDQSTARTSIZE = 32 * pxshmContext->nodesize;
00240
00241 if (_Cmi_mynode == 0)
00242 printf("Charm++> pxshm enabled: %d cores per node, buffer size: %.1fMB\n", pxshmContext->nodesize, SHMBUFLEN/1024.0/1024.0);
00243
00244 #if CMK_CRAYXE
00245 srand(getpid());
00246 int Cmi_charmrun_pid = rand();
00247 PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
00248 #elif !CMK_NET_VERSION
00249 #error "need a unique number"
00250 #endif
00251 snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
00252
00253 MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00254
00255 setupSharedBuffers();
00256
00257 MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00258
00259 initAllSendQs();
00260
00261 MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
00262
00263 MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
00264
00265 #if PXSHM_STATS
00266 pxshmContext->sendCount=0;
00267 pxshmContext->sendTime=0.0;
00268 pxshmContext->validCheckCount=0;
00269 pxshmContext->validCheckTime=0.0;
00270 pxshmContext->commServerTime = 0;
00271 pxshmContext->lockRecvCount = 0;
00272 #endif
00273
00274 signal(SIGSEGV, cleanupOnAllSigs);
00275 signal(SIGFPE, cleanupOnAllSigs);
00276 signal(SIGILL, cleanupOnAllSigs);
00277 signal(SIGTERM, cleanupOnAllSigs);
00278 signal(SIGABRT, cleanupOnAllSigs);
00279 signal(SIGQUIT, cleanupOnAllSigs);
00280 signal(SIGBUS, cleanupOnAllSigs);
00281 signal(SIGINT, cleanupOnAllSigs);
00282 signal(SIGTRAP, cleanupOnAllSigs);
00283
00284 #if 0
00285 char name[64];
00286 gethostname(name,64);
00287 printf("[%d] name: %s\n", myrank, name);
00288 #endif
00289 };
00290
00291
00292
00293
00294
00295 static int pxshm_freed = 0;
00296 void tearDownSharedBuffers();
00297 void freeSharedBuffers();
00298
00299 void CmiExitPxshm(){
00300 if (pxshmContext == NULL) return;
00301 if(pxshmContext->nodesize != 1){
00302 int i;
00303 if (!pxshm_freed)
00304 tearDownSharedBuffers();
00305
00306 for(i=0;i<pxshmContext->nodesize;i++){
00307 if(i != pxshmContext->noderank){
00308 break;
00309 }
00310 }
00311 free(pxshmContext->recvBufNames[i]);
00312 free(pxshmContext->sendBufNames[i]);
00313
00314 free(pxshmContext->recvBufNames);
00315 free(pxshmContext->sendBufNames);
00316
00317 free(pxshmContext->recvBufs);
00318 free(pxshmContext->sendBufs);
00319
00320 }
00321 #if PXSHM_STATS
00322 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
00323 #endif
00324 free(pxshmContext);
00325 pxshmContext = NULL;
00326 }
00327
00328
00329
00330
00331
00332
00333 inline
00334 static int CmiValidPxshm(int node, int size){
00335 #if PXSHM_STATS
00336 pxshmContext->validCheckCount++;
00337 #endif
00338
00339
00340
00341
00342
00343
00344 return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size <= SHMMAXSIZE )? 1: 0;
00345 };
00346
00347
00348 inline int PxshmRank(int dstnode){
00349 return dstnode - pxshmContext->nodestart;
00350 }
00351
00352 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
00353 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
00354 inline int flushSendQ(PxshmSendQ *q);
00355
00356 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
00357 return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368 void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
00369 {
00370 #if PXSHM_STATS
00371 double _startSendTime = CmiWallTimer();
00372 #endif
00373
00374 LrtsPrepareEnvelope(msg, size);
00375
00376 int dstRank = PxshmRank(dstnode);
00377 MEMDEBUG(CmiMemoryCheck());
00378
00379
00380
00381
00382
00383
00384 CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
00385
00386 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00387 PxshmSendQ *sendQ = pxshmContext->sendQs[dstRank];
00388
00389 #if PXSHM_OSSPINLOCK
00390 if(! OSSpinLockTry(&dstBuf->header->lock)){
00391 #elif PXSHM_LOCK
00392 if(sem_trywait(dstBuf->mutex) < 0){
00393 #elif PXSHM_FENCE
00394 dstBuf->header->flagSender = 1;
00395 dstBuf->header->turn = RECEIVER;
00396 CmiMemoryReadFence(0,0);
00397 CmiMemoryWriteFence(0,0);
00398
00399 if(dstBuf->header->flagReceiver){
00400 dstBuf->header->flagSender = 0;
00401 #endif
00402
00404 #if SENDQ_LIST
00405 if (sendQ->numEntries == 0 && sendQ->next == -2) {
00406 sendQ->next = sendQ_head_index;
00407 sendQ_head_index = dstRank;
00408 }
00409 #endif
00410 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
00411 (*refcount)++;
00412 MEMDEBUG(CmiMemoryCheck());
00413 return;
00414 }else{
00415
00416
00417
00418
00419
00420 if(pxshmContext->sendQs[dstRank]->numEntries == 0){
00421
00422 int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
00423 #if SENDQ_LIST
00424 if (sendQ->numEntries > 0 && sendQ->next == -2)
00425 {
00426 sendQ->next = sendQ_head_index;
00427 sendQ_head_index = dstRank;
00428 }
00429 #endif
00430 MACHSTATE(3,"Pxshm Send succeeded immediately");
00431 }else{
00432 (*refcount)+=2;
00433 pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
00434
00435 int sent = flushSendQ(sendQ);
00436 (*refcount)--;
00437 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
00438 }
00439
00440
00441 #if PXSHM_OSSPINLOCK
00442 OSSpinLockUnlock(&dstBuf->header->lock);
00443 #elif PXSHM_LOCK
00444 sem_post(dstBuf->mutex);
00445 #elif PXSHM_FENCE
00446 CmiMemoryReadFence(0,0);
00447 CmiMemoryWriteFence(0,0);
00448 dstBuf->header->flagSender = 0;
00449 #endif
00450 }
00451 #if PXSHM_STATS
00452 pxshmContext->sendCount ++;
00453 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
00454 #endif
00455 MEMDEBUG(CmiMemoryCheck());
00456
00457 };
00458
00459 inline void emptyAllRecvBufs();
00460 inline void flushAllSendQs();
00461
00462
00463
00464
00465
00466 inline void CommunicationServerPxshm(){
00467
00468 #if PXSHM_STATS
00469 double _startCommServerTime =CmiWallTimer();
00470 #endif
00471
00472 MEMDEBUG(CmiMemoryCheck());
00473 emptyAllRecvBufs();
00474 flushAllSendQs();
00475
00476 #if PXSHM_STATS
00477 pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00478 #endif
00479
00480 MEMDEBUG(CmiMemoryCheck());
00481 };
00482
00483 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
00484 CommunicationServerPxshm();
00485 }
00486
00487
00488 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
00489 {
00490 CmiNotifyStillIdle(s);
00491 }
00492
00493
00494 void calculateNodeSizeAndRank(char **argv){
00495 pxshmContext->nodesize=1;
00496 MACHSTATE(3,"calculateNodeSizeAndRank start");
00497
00498 CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
00499 MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
00500
00501 pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
00502
00503 MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
00504
00505 pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
00506
00507 MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00508
00509 pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
00510
00511 if(pxshmContext->nodeend >= _Cmi_numnodes){
00512 pxshmContext->nodeend = _Cmi_numnodes-1;
00513 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
00514 }
00515
00516 MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
00517 }
00518
00519 void allocBufNameStrings(char ***bufName);
00520 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532 void setupSharedBuffers(){
00533 int i=0;
00534
00535 allocBufNameStrings(&(pxshmContext->recvBufNames));
00536
00537 MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
00538 MEMDEBUG(CmiMemoryCheck());
00539
00540 allocBufNameStrings((&pxshmContext->sendBufNames));
00541
00542 MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
00543
00544 for(i=0;i<pxshmContext->nodesize;i++){
00545 if(i != pxshmContext->noderank){
00546 snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
00547 MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
00548 snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
00549 MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
00550 }
00551 }
00552
00553 createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
00554 createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
00555
00556 for(i=0;i<pxshmContext->nodesize;i++){
00557 if(i != pxshmContext->noderank){
00558
00559 pxshmContext->sendBufs[i].header->count = 0;
00560 pxshmContext->sendBufs[i].header->bytes = 0;
00561 }
00562 }
00563
00564 #if CMK_SMP && CMK_CRAYXE
00565 if (PMI_Barrier() != GNI_RC_SUCCESS) return;
00566 #else
00567 if (CmiBarrier() != 0) return;
00568 #endif
00569 freeSharedBuffers();
00570 pxshm_freed = 1;
00571 }
00572
00573 void allocBufNameStrings(char ***bufName){
00574 int i,count;
00575
00576 int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
00577 char *tmp = malloc(totalAlloc);
00578
00579 MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
00580
00581 *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
00582
00583 for(i=0,count=0;i<pxshmContext->nodesize;i++){
00584 if(i != pxshmContext->noderank){
00585 (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
00586 count++;
00587 }else{
00588 (*bufName)[i] = NULL;
00589 }
00590 }
00591 }
00592
00593 void createShmObject(char *name,int size,char **pPtr);
00594
00595 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
00596 int i=0;
00597
00598 *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
00599
00600 for(i=0;i<pxshmContext->nodesize;i++){
00601 if(i != pxshmContext->noderank){
00602 createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
00603 memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
00604 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00605 #if PXSHM_OSSPINLOCK
00606 (*bufs)[i].header->lock = 0;
00607 #elif PXSHM_LOCK
00608 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00609 #endif
00610 }else{
00611 (*bufs)[i].header = NULL;
00612 (*bufs)[i].data = NULL;
00613 #if PXSHM_LOCK
00614 (*bufs)[i].mutex = NULL;
00615 #endif
00616 }
00617 }
00618 }
00619
00620
00621 void createShmObject(char *name,int size,char **pPtr){
00622 int fd=-1;
00623 int flags;
00624 int open_repeat_count = 0;
00625
00626 flags= O_RDWR | O_CREAT;
00627
00628 while(fd<0 && open_repeat_count < 100){
00629 open_repeat_count++;
00630 fd = shm_open(name,flags, S_IRUSR | S_IWUSR);
00631
00632 if(fd < 0 && open_repeat_count > 10){
00633 fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
00634 fflush(stderr);
00635 }
00636 }
00637
00638 CmiAssert(fd >= 0);
00639
00640 ftruncate(fd,size);
00641
00642 *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
00643 CmiAssert(*pPtr != NULL);
00644
00645 close(fd);
00646 }
00647
00648 void freeSharedBuffers(){
00649 int i;
00650 for(i= 0;i<pxshmContext->nodesize;i++){
00651 if(i != pxshmContext->noderank){
00652 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
00653 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
00654 }
00655 #if PXSHM_LOCK
00656 sem_unlink(pxshmContext->recvBufNames[i]);
00657 #endif
00658 }
00659 }
00660 };
00661
00662 void tearDownSharedBuffers(){
00663 int i;
00664 for(i= 0;i<pxshmContext->nodesize;i++){
00665 if(i != pxshmContext->noderank){
00666 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
00667 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
00668 }
00669 #if PXSHM_LOCK
00670 sem_close(pxshmContext->recvBufs[i].mutex);
00671 sem_close(pxshmContext->sendBufs[i].mutex);
00672 sem_unlink(pxshmContext->recvBufNames[i]);
00673 pxshmContext->recvBufs[i].mutex = NULL;
00674 pxshmContext->sendBufs[i].mutex = NULL;
00675 #endif
00676 }
00677 }
00678 };
00679
00680
00681 void initSendQ(PxshmSendQ *q,int size,int rank);
00682
00683 void initAllSendQs(){
00684 int i=0;
00685 pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
00686 for(i=0;i<pxshmContext->nodesize;i++){
00687 if(i != pxshmContext->noderank){
00688 (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
00689 initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE,i);
00690 }else{
00691 (pxshmContext->sendQs)[i] = NULL;
00692 }
00693 }
00694 };
00695
00696
00697
00698
00699
00700
00701
00702
00703 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
00704
00705 if(dstBuf->header->bytes+size <= SHMBUFLEN){
00707 dstBuf->header->count++;
00708 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
00709 dstBuf->header->bytes += size;
00710
00711 CmiFree(msg);
00712 return 1;
00713 }
00714
00715
00716
00717
00718 pushSendQ(dstSendQ,msg,size,refcount);
00719 (*refcount)++;
00720
00721 return 0;
00722 }
00723
00724 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
00725
00726
00727
00728
00729
00730
00731 inline int flushSendQ(PxshmSendQ *dstSendQ){
00732 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstSendQ->rank]);
00733 int count=dstSendQ->numEntries;
00734 int sent=0;
00735 while(count > 0){
00736 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
00737 (*ogm->refcount)--;
00738 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
00739 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
00740 if(ret==1){
00741 sent++;
00742 #if CMK_NET_VERSION
00743 GarbageCollectMsg(ogm);
00744 #endif
00745 }
00746 count--;
00747 }
00748 return sent;
00749 }
00750
00751 inline void emptyRecvBuf(sharedBufData *recvBuf);
00752
00753 inline void emptyAllRecvBufs(){
00754 int i;
00755 for(i=0;i<pxshmContext->nodesize;i++){
00756 if(i != pxshmContext->noderank){
00757 sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
00758 if(recvBuf->header->count > 0){
00759
00760 #if PXSHM_STATS
00761 pxshmContext->lockRecvCount++;
00762 #endif
00763
00764 #if PXSHM_OSSPINLOCK
00765 if(! OSSpinLockTry(&recvBuf->header->lock)){
00766 #elif PXSHM_LOCK
00767 if(sem_trywait(recvBuf->mutex) < 0){
00768 #elif PXSHM_FENCE
00769 recvBuf->header->flagReceiver = 1;
00770 recvBuf->header->turn = SENDER;
00771 CmiMemoryReadFence(0,0);
00772 CmiMemoryWriteFence(0,0);
00773
00774 if((recvBuf->header->flagSender)){
00775 recvBuf->header->flagReceiver = 0;
00776 #endif
00777 }else{
00778
00779
00780 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00781 emptyRecvBuf(recvBuf);
00782
00783 #if PXSHM_OSSPINLOCK
00784 OSSpinLockUnlock(&recvBuf->header->lock);
00785 #elif PXSHM_LOCK
00786 sem_post(recvBuf->mutex);
00787 #elif PXSHM_FENCE
00788 CmiMemoryReadFence(0,0);
00789 CmiMemoryWriteFence(0,0);
00790 recvBuf->header->flagReceiver = 0;
00791 #endif
00792
00793 }
00794
00795 }
00796 }
00797 }
00798 };
00799
00800 inline void flushAllSendQs(){
00801 int i;
00802 #if SENDQ_LIST
00803 int index_prev = -1;
00804
00805 i = sendQ_head_index;
00806 while (i!= -1) {
00807 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
00808 CmiAssert(i != pxshmContext->noderank);
00809 if(sendQ->numEntries > 0){
00810 #else
00811 for(i=0;i<pxshmContext->nodesize;i++) {
00812 if (i == pxshmContext->noderank) continue;
00813 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
00814 if(sendQ->numEntries > 0) {
00815 #endif
00816
00817 #if PXSHM_OSSPINLOCK
00818 if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
00819 #elif PXSHM_LOCK
00820 if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
00821 #elif PXSHM_FENCE
00822 pxshmContext->sendBufs[i].header->flagSender = 1;
00823 pxshmContext->sendBufs[i].header->turn = RECEIVER;
00824 CmiMemoryReadFence(0,0);
00825 CmiMemoryWriteFence(0,0);
00826 if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
00827 #endif
00828
00829 MACHSTATE1(3,"flushSendQ %d",i);
00830 flushSendQ(sendQ);
00831
00832 #if PXSHM_OSSPINLOCK
00833 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
00834 #elif PXSHM_LOCK
00835 sem_post(pxshmContext->sendBufs[i].mutex);
00836 #elif PXSHM_FENCE
00837 CmiMemoryReadFence(0,0);
00838 CmiMemoryWriteFence(0,0);
00839 pxshmContext->sendBufs[i].header->flagSender = 0;
00840 #endif
00841 }else{
00842
00843 #if PXSHM_FENCE
00844 pxshmContext->sendBufs[i].header->flagSender = 0;
00845 #endif
00846
00847 }
00848
00849 }
00850 #if SENDQ_LIST
00851 if (sendQ->numEntries == 0) {
00852 if (index_prev != -1)
00853 pxshmContext->sendQs[index_prev]->next = sendQ->next;
00854 else
00855 sendQ_head_index = sendQ->next;
00856 i = sendQ->next;
00857 sendQ->next = -2;
00858 }
00859 else {
00860 index_prev = i;
00861 i = sendQ->next;
00862 }
00863 #endif
00864 }
00865 };
00866
00867 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
00868
00869 void emptyRecvBuf(sharedBufData *recvBuf){
00870 int numMessages = recvBuf->header->count;
00871 int i=0;
00872
00873 char *ptr=recvBuf->data;
00874
00875 for(i=0;i<numMessages;i++){
00876 int size;
00877 int rank, srcpe, seqno, magic, i;
00878 unsigned int broot;
00879 char *msg = ptr;
00880 char *newMsg;
00881
00882 #if CMK_NET_VERSION
00883 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00884 size = CmiMsgHeaderGetLength(msg);
00885 #else
00886 size = CmiGetMsgSize(msg);
00887 #endif
00888
00889 newMsg = (char *)CmiAlloc(size);
00890 memcpy(newMsg,msg,size);
00891
00892 #if CMK_NET_VERSION
00893 handoverPxshmMessage(newMsg,size,rank,broot);
00894 #else
00895 handleOneRecvedMsg(size, newMsg);
00896 #endif
00897
00898 ptr += size;
00899
00900 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00901 }
00902 #if 1
00903 if(ptr - recvBuf->data != recvBuf->header->bytes){
00904 CmiPrintf("[%d] ptr - recvBuf->data %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
00905 }
00906 #endif
00907 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00908 recvBuf->header->count=0;
00909 recvBuf->header->bytes=0;
00910 }
00911
00912
00913 #if CMK_NET_VERSION
00914 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
00915 CmiAssert(rank == 0);
00916 #if CMK_BROADCAST_SPANNING_TREE
00917 if (rank == DGRAM_BROADCAST
00918 #if CMK_NODE_QUEUE_AVAILABLE
00919 || rank == DGRAM_NODEBROADCAST
00920 #endif
00921 ){
00922 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00923 }
00924 #elif CMK_BROADCAST_HYPERCUBE
00925 if (rank == DGRAM_BROADCAST
00926 #if CMK_NODE_QUEUE_AVAILABLE
00927 || rank == DGRAM_NODEBROADCAST
00928 #endif
00929 ){
00930 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00931 }
00932 #endif
00933
00934 switch (rank) {
00935 case DGRAM_BROADCAST: {
00936 CmiPushPE(0, newmsg);
00937 break;
00938 }
00939 default:
00940 {
00941
00942 CmiPushPE(rank, newmsg);
00943 }
00944 }
00945 }
00946 #endif
00947
00948
00949
00950
00951
00952
00953 void initSendQ(PxshmSendQ *q,int size, int rank){
00954 q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
00955
00956 q->size = size;
00957 q->numEntries = 0;
00958
00959 q->begin = 0;
00960 q->end = 0;
00961
00962 q->rank = rank;
00963 #if SENDQ_LIST
00964 q->next = -2;
00965 #endif
00966 }
00967
00968 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
00969 if(q->numEntries == q->size){
00970
00971 OutgoingMsgRec *oldData = q->data;
00972 int newSize = q->size<<1;
00973 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
00974
00975 CmiAssert(q->begin == q->end);
00976
00977 CmiAssert(q->begin < q->size);
00978 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
00979
00980 if(q->end!=0){
00981 memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
00982 }
00983 free(oldData);
00984 q->begin = 0;
00985 q->end = q->size;
00986 q->size = newSize;
00987 }
00988 OutgoingMsgRec *omg = &q->data[q->end];
00989 omg->size = size;
00990 omg->data = msg;
00991 omg->refcount = refcount;
00992 (q->end)++;
00993 if(q->end >= q->size){
00994 q->end -= q->size;
00995 }
00996 q->numEntries++;
00997 }
00998
00999 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
01000 OutgoingMsgRec * ret;
01001 if(0 == q->numEntries){
01002 return NULL;
01003 }
01004
01005 ret = &q->data[q->begin];
01006 (q->begin)++;
01007 if(q->begin >= q->size){
01008 q->begin -= q->size;
01009 }
01010
01011 q->numEntries--;
01012 return ret;
01013 }