00001
00018 #include <sys/types.h>
00019 #include <sys/mman.h>
00020 #include <unistd.h>
00021 #include <sys/stat.h>
00022 #include <fcntl.h>
00023 #include <errno.h>
00024 #include <signal.h>
00025
00026 #include "xpmem.h"
00027
00028
00029
00030
00031 #if XPMEM_OSSPINLOCK
00032 #include <libkern/OSAtomic.h>
00033 #elif XPMEM_LOCK
00034 #include <semaphore.h>
00035 #else
00036
00037 #define XPMEM_FENCE 1
00038 #endif
00039
00040 #define MEMDEBUG(x) //x
00041
00042 #define XPMEM_STATS 0
00043
00044 #define SENDQ_LIST 0
00045
00046
00047 #undef CmiMemoryWriteFence
00048 #if XPMEM_FENCE
00049 #ifdef POWER_PC
00050 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00051 #else
00052 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
00053
00054 #endif
00055 #else
00056 #undef CmiMemoryWriteFence
00057 #define CmiMemoryWriteFence(startPtr,nBytes)
00058 #endif
00059
00060 #undef CmiMemoryReadFence
00061 #if XPMEM_FENCE
00062 #ifdef POWER_PC
00063 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00064 #else
00065 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
00066
00067 #endif
00068 #else
00069 #define CmiMemoryReadFence(startPtr,nBytes)
00070 #endif
00071
00072 #if CMK_SMP
00073 #error "PXSHM can only be used in non-smp build of Charm++"
00074 #endif
00075
00076
00077
00078 enum entities {SENDER,RECEIVER};
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089 #define NAMESTRLEN 60
00090 #define PREFIXSTRLEN 50
00091
00092 static int XPMEMBUFLEN = (1024*1024*4);
00093 #define XPMEMMINSIZE (1*1024)
00094 #define XPMEMMAXSIZE (1024*1024)
00095
00096 static int SENDQSTARTSIZE = 256;
00097
00098
00100 typedef struct {
00101 int count;
00102 int bytes;
00103
00104 #if XPMEM_OSSPINLOCK
00105 OSSpinLock lock;
00106 #endif
00107
00108 #if XPMEM_FENCE
00109 volatile int flagSender;
00110 char pad1[CMI_CACHE_LINE_SIZE - sizeof(volatile int)];
00111 volatile int flagReceiver;
00112 char pad2[CMI_CACHE_LINE_SIZE - sizeof(volatile int)];
00113 volatile int turn;
00114 #endif
00115
00116 } sharedBufHeader;
00117
00118
00119 typedef struct {
00120 #if XPMEM_LOCK
00121 sem_t *mutex;
00122 #endif
00123 sharedBufHeader *header;
00124 char *data;
00125 __s64 segid;
00126 } sharedBufData;
00127
00128 typedef struct OutgoingMsgRec
00129 {
00130 char *data;
00131 int *refcount;
00132 int size;
00133 }
00134 OutgoingMsgRec;
00135
00136 typedef struct {
00137 int size;
00138 int begin;
00139 int end;
00140 int numEntries;
00141 int rank;
00142 #if SENDQ_LIST
00143 int next;
00144 #endif
00145 OutgoingMsgRec *data;
00146
00147 } XpmemSendQ;
00148
00149 typedef struct {
00150 int nodesize;
00151 int noderank;
00152 int nodestart,nodeend;
00153 char prefixStr[PREFIXSTRLEN];
00154 char **recvBufNames;
00155 char **sendBufNames;
00156
00157 sharedBufData *recvBufs;
00158 sharedBufData *sendBufs;
00159
00160 XpmemSendQ **sendQs;
00161
00162 #if XPMEM_STATS
00163 int sendCount;
00164 int validCheckCount;
00165 int lockRecvCount;
00166 double validCheckTime;
00167 double sendTime;
00168 double commServerTime;
00169 #endif
00170
00171 } XpmemContext;
00172
00173
00174 #if SENDQ_LIST
00175 static int sendQ_head_index = -1;
00176 #endif
00177
00178 XpmemContext *xpmemContext=NULL;
00179
00180
00181 void calculateNodeSizeAndRank(char **);
00182 void setupSharedBuffers(void);
00183 void initAllSendQs(void);
00184
00185 void CmiExitXpmem(void);
00186
00187 static void cleanupOnAllSigs(int signo)
00188 {
00189 CmiExitXpmem();
00190 }
00191
00192 static int xpmem_fd;
00193
00194
00195
00196
00197
00198 void CmiInitXpmem(char **argv){
00199 char input[32];
00200 char *env;
00201
00202 MACHSTATE(3,"CminitXpmem start");
00203 xpmemContext = (XpmemContext *)calloc(1,sizeof(XpmemContext));
00204
00205 #if CMK_NET_VERSION
00206 if(Cmi_charmrun_pid <= 0){
00207 CmiAbort("pxshm must be run with charmrun");
00208 }
00209 #endif
00210 calculateNodeSizeAndRank(argv);
00211
00212 MACHSTATE1(3,"CminitXpmem %d calculateNodeSizeAndRank",xpmemContext->nodesize);
00213
00214 if(xpmemContext->nodesize == 1) return;
00215
00216 env = getenv("CHARM_XPMEM_SIZE");
00217 if (env) {
00218 XPMEMBUFLEN = CmiReadSize(env);
00219 }
00220 SENDQSTARTSIZE = 32 * xpmemContext->nodesize;
00221
00222 if (_Cmi_mynode == 0)
00223 CmiPrintf("Charm++> xpmem enabled: %d cores per node, buffer size: %.1fMB\n", xpmemContext->nodesize, XPMEMBUFLEN/1024.0/1024.0);
00224
00225 xpmem_fd = open("/dev/xpmem", O_RDWR);
00226 if (xpmem_fd == -1) {
00227 CmiAbort("Opening /dev/xpmem");
00228 }
00229
00230 #if CMK_CRAYXE || CMK_CRAYXC
00231 srand(getpid());
00232 int Cmi_charmrun_pid = rand();
00233 PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
00234 #elif !CMK_NET_VERSION
00235 #error "need a unique number"
00236 #endif
00237 snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_xpmem_%d",Cmi_charmrun_pid);
00238
00239 MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
00240
00241 setupSharedBuffers();
00242
00243 MACHSTATE2(3,"CminitXpmem %s %d setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
00244
00245 initAllSendQs();
00246
00247 MACHSTATE2(3,"CminitXpmem %s %d initAllSendQs",xpmemContext->prefixStr,xpmemContext->nodesize);
00248
00249 MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
00250
00251 struct sigaction sa;
00252 sa.sa_handler = cleanupOnAllSigs;
00253 sigemptyset(&sa.sa_mask);
00254 sa.sa_flags = SA_RESTART;
00255
00256 sigaction(SIGSEGV, &sa, NULL);
00257 sigaction(SIGFPE, &sa, NULL);
00258 sigaction(SIGILL, &sa, NULL);
00259 sigaction(SIGTERM, &sa, NULL);
00260 sigaction(SIGABRT, &sa, NULL);
00261 sigaction(SIGQUIT, &sa, NULL);
00262 sigaction(SIGBUS, &sa, NULL);
00263 };
00264
00265
00266
00267
00268
00269 static int pxshm_freed = 0;
00270 void tearDownSharedBuffers(void);
00271 void freeSharedBuffers(void);
00272
00273 void CmiExitXpmem(void){
00274 int i=0;
00275
00276 if (xpmemContext == NULL) return;
00277
00278 if(xpmemContext->nodesize != 1) {
00279
00280
00281 for(i=0;i<xpmemContext->nodesize;i++){
00282 if(i != xpmemContext->noderank){
00283 break;
00284 }
00285 }
00286 free(xpmemContext->recvBufNames[i]);
00287 free(xpmemContext->sendBufNames[i]);
00288
00289 free(xpmemContext->recvBufNames);
00290 free(xpmemContext->sendBufNames);
00291
00292 free(xpmemContext->recvBufs);
00293 free(xpmemContext->sendBufs);
00294
00295 }
00296 #if XPMEM_STATS
00297 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
00298 #endif
00299 free(xpmemContext);
00300 xpmemContext = NULL;
00301 }
00302
00303
00304
00305
00306
00307
00308 inline
00309 static int CmiValidXpmem(int node, int size){
00310 #if XPMEM_STATS
00311 xpmemContext->validCheckCount++;
00312 #endif
00313
00314
00315 return (node >= xpmemContext->nodestart && node <= xpmemContext->nodeend && size <= XPMEMMAXSIZE )? 1: 0;
00316 };
00317
00318
00319 inline int XpmemRank(int dstnode){
00320 return dstnode - xpmemContext->nodestart;
00321 }
00322
00323 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
00324 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
00325 inline int flushSendQ(XpmemSendQ *sendQ);
00326
00327 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
00328 return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
00329 }
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339 void CmiSendMessageXpmem(char *msg, int size, int dstnode, int *refcount)
00340 {
00341 #if XPMEM_STATS
00342 double _startSendTime = CmiWallTimer();
00343 #endif
00344
00345 LrtsPrepareEnvelope(msg, size);
00346
00347 int dstRank = XpmemRank(dstnode);
00348 MEMDEBUG(CmiMemoryCheck());
00349
00350 MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00351 MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00352
00353 CmiAssert(dstRank >=0 && dstRank != xpmemContext->noderank);
00354
00355 sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
00356 XpmemSendQ *sendQ = xpmemContext->sendQs[dstRank];
00357
00358 #if XPMEM_OSSPINLOCK
00359 if(! OSSpinLockTry(&dstBuf->header->lock)){
00360 #elif XPMEM_LOCK
00361 if(sem_trywait(dstBuf->mutex) < 0){
00362 #elif XPMEM_FENCE
00363 dstBuf->header->flagSender = 1;
00364 dstBuf->header->turn = RECEIVER;
00365 CmiMemoryReadFence(0,0);
00366 CmiMemoryWriteFence(0,0);
00367
00368 if(dstBuf->header->flagReceiver){
00369 dstBuf->header->flagSender = 0;
00370 #endif
00371
00373 #if SENDQ_LIST
00374 if (sendQ->numEntries == 0 && sendQ->next == -2) {
00375 sendQ->next = sendQ_head_index;
00376 sendQ_head_index = dstRank;
00377 }
00378 #endif
00379 pushSendQ(sendQ, msg, size, refcount);
00380 (*refcount)++;
00381 MEMDEBUG(CmiMemoryCheck());
00382 return;
00383 }else{
00384
00385
00386
00387
00388 if(sendQ->numEntries == 0){
00389
00390 int ret = sendMessage(msg,size,refcount,dstBuf,sendQ);
00391 #if SENDQ_LIST
00392 if (sendQ->numEntries > 0 && sendQ->next == -2)
00393 {
00394 sendQ->next = sendQ_head_index;
00395 sendQ_head_index = dstRank;
00396 }
00397 #endif
00398 MACHSTATE(3,"Xpmem Send succeeded immediately");
00399 }else{
00400 (*refcount)+=2;
00401 pushSendQ(sendQ,msg,size,refcount);
00402 MACHSTATE3(3,"Xpmem ogm %p pushed to sendQ length %d refcount %d",ogm,sendQ->numEntries,ogm->refcount);
00403 int sent = flushSendQ(sendQ);
00404 (*refcount)--;
00405 MACHSTATE1(3,"Xpmem flushSendQ sent %d messages",sent);
00406 }
00407
00408
00409 #if XPMEM_OSSPINLOCK
00410 OSSpinLockUnlock(&dstBuf->header->lock);
00411 #elif XPMEM_LOCK
00412 sem_post(dstBuf->mutex);
00413 #elif XPMEM_FENCE
00414 CmiMemoryReadFence(0,0);
00415 CmiMemoryWriteFence(0,0);
00416 dstBuf->header->flagSender = 0;
00417 #endif
00418 }
00419 #if XPMEM_STATS
00420 xpmemContext->sendCount ++;
00421 xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
00422 #endif
00423 MEMDEBUG(CmiMemoryCheck());
00424
00425 };
00426
00427 inline void emptyAllRecvBufs(void);
00428 inline void flushAllSendQs(void);
00429
00430
00431
00432
00433
00434 inline void CommunicationServerXpmem(void)
00435 {
00436 #if XPMEM_STATS
00437 double _startCommServerTime =CmiWallTimer();
00438 #endif
00439 MEMDEBUG(CmiMemoryCheck());
00440
00441 emptyAllRecvBufs();
00442 flushAllSendQs();
00443
00444 #if XPMEM_STATS
00445 xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00446 #endif
00447 MEMDEBUG(CmiMemoryCheck());
00448 };
00449
00450 static void CmiNotifyStillIdleXpmem(CmiIdleState *s){
00451 CommunicationServerXpmem();
00452 }
00453
00454
00455 static void CmiNotifyBeginIdleXpmem(CmiIdleState *s)
00456 {
00457 CmiNotifyStillIdle(s);
00458 }
00459
00460 void calculateNodeSizeAndRank(char **argv)
00461 {
00462 xpmemContext->nodesize=1;
00463 MACHSTATE(3,"calculateNodeSizeAndRank start");
00464
00465 CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
00466 MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
00467
00468 xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
00469
00470 MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",xpmemContext->noderank);
00471
00472 xpmemContext->nodestart = _Cmi_mynode -xpmemContext->noderank;
00473
00474 MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00475
00476 xpmemContext->nodeend = xpmemContext->nodestart + xpmemContext->nodesize -1;
00477
00478 if(xpmemContext->nodeend >= _Cmi_numnodes){
00479 xpmemContext->nodeend = _Cmi_numnodes-1;
00480 xpmemContext->nodesize = (xpmemContext->nodeend - xpmemContext->nodestart) +1;
00481 }
00482
00483 MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",xpmemContext->nodestart,xpmemContext->nodesize,xpmemContext->noderank);
00484 }
00485
00486 void allocBufNameStrings(char ***bufName);
00487 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames);
00488 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames);
00489 void removeXpmemFiles(void);
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502 void setupSharedBuffers(void){
00503 int i=0;
00504
00505 allocBufNameStrings(&(xpmemContext->recvBufNames));
00506
00507 allocBufNameStrings((&xpmemContext->sendBufNames));
00508
00509 for(i=0;i<xpmemContext->nodesize;i++){
00510 if(i != xpmemContext->noderank){
00511 snprintf(xpmemContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,xpmemContext->noderank+xpmemContext->nodestart,i+xpmemContext->nodestart);
00512 MACHSTATE2(3,"recvBufName %s with rank %d",xpmemContext->recvBufNames[i],i)
00513 snprintf(xpmemContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,i+xpmemContext->nodestart,xpmemContext->noderank+xpmemContext->nodestart);
00514 MACHSTATE2(3,"sendBufName %s with rank %d",xpmemContext->sendBufNames[i],i);
00515 }
00516 }
00517
00518 createRecvXpmemAndSems(&(xpmemContext->recvBufs),xpmemContext->recvBufNames);
00519 CmiBarrier();
00520 createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
00521 CmiBarrier();
00522 removeXpmemFiles();
00523 freeSharedBuffers();
00524
00525 for(i=0;i<xpmemContext->nodesize;i++){
00526 if(i != xpmemContext->noderank){
00527
00528 xpmemContext->sendBufs[i].header->count = 0;
00529 xpmemContext->sendBufs[i].header->bytes = 0;
00530 }
00531 }
00532 }
00533
00534 void allocBufNameStrings(char ***bufName)
00535 {
00536 int i,count;
00537 int totalAlloc = sizeof(char)*NAMESTRLEN*(xpmemContext->nodesize-1);
00538 char *tmp = malloc(totalAlloc);
00539
00540 MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
00541
00542 *bufName = (char **)malloc(sizeof(char *)*xpmemContext->nodesize);
00543 for(i=0,count=0;i<xpmemContext->nodesize;i++){
00544 if(i != xpmemContext->noderank){
00545 (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
00546 count++;
00547 }else{
00548 (*bufName)[i] = NULL;
00549 }
00550 }
00551 }
00552
00553 __s64 createXpmemObject(int size,char **pPtr)
00554 {
00555 struct xpmem_cmd_make make_info;
00556 int ret;
00557
00558 *pPtr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
00559 if (*pPtr == MAP_FAILED) {
00560 perror("Creating mapping.");
00561 return -1;
00562 }
00563 make_info.vaddr = (__u64) *pPtr;
00564 make_info.size = size;
00565 make_info.permit_type = XPMEM_PERMIT_MODE;
00566 make_info.permit_value = (__u64) 0600;
00567 ret = ioctl(xpmem_fd, XPMEM_CMD_MAKE, &make_info);
00568 if (ret != 0) {
00569 perror("xpmem_make");
00570 CmiAbort("xpmem_make");
00571 }
00572 return make_info.segid;
00573 }
00574
00575 void attachXpmemObject(__s64 segid, int size, char **pPtr)
00576 {
00577 int ret;
00578 __s64 apid;
00579 struct xpmem_cmd_get get_info;
00580 struct xpmem_cmd_attach attach_info;
00581
00582 get_info.segid = segid;
00583 get_info.flags = XPMEM_RDWR;
00584 get_info.permit_type = XPMEM_PERMIT_MODE;
00585 get_info.permit_value = (__u64) NULL;
00586 ret = ioctl(xpmem_fd, XPMEM_CMD_GET, &get_info);
00587 if (ret != 0) {
00588 CmiAbort("xpmem_get");
00589 }
00590 apid = get_info.apid;
00591
00592 attach_info.apid = get_info.apid;
00593 attach_info.offset = 0;
00594 attach_info.size = size;
00595 attach_info.vaddr = (__u64) NULL;
00596 attach_info.fd = xpmem_fd;
00597 attach_info.flags = 0;
00598
00599 ret = ioctl(xpmem_fd, XPMEM_CMD_ATTACH, &attach_info);
00600 if (ret != 0) {
00601 CmiAbort("xpmem_attach");
00602 }
00603
00604 *pPtr = (void *)attach_info.vaddr;
00605 }
00606
00607 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
00608 int i=0;
00609 __s64 *segid_arr;
00610 int size, pagesize = getpagesize();
00611
00612 *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
00613 segid_arr = malloc(sizeof(__s64)*xpmemContext->nodesize);
00614
00615 size = XPMEMBUFLEN+sizeof(sharedBufHeader);
00616 size = ((~(pagesize-1))&(size+pagesize-1));
00617
00618 for(i=0;i<xpmemContext->nodesize;i++){
00619 if(i != xpmemContext->noderank) {
00620 (*bufs)[i].segid = segid_arr[i] = createXpmemObject(size,(char **)&((*bufs)[i].header));
00621 memset(((*bufs)[i].header), 0, size);
00622 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00623 #if XPMEM_OSSPINLOCK
00624 (*bufs)[i].header->lock = 0;
00625 #elif XPMEM_LOCK
00626 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00627 #endif
00628 }else{
00629 (*bufs)[i].header = NULL;
00630 (*bufs)[i].data = NULL;
00631 #if XPMEM_LOCK
00632 (*bufs)[i].mutex = NULL;
00633 #endif
00634 }
00635 }
00636
00637 int fd;
00638 char fname[128];
00639 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
00640 fd = open(fname, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
00641 if (fd == -1) {
00642 CmiAbort("createShmObjectsAndSems failed");
00643 }
00644 write(fd, segid_arr, sizeof(__s64*)*xpmemContext->nodesize);
00645 close(fd);
00646 free(segid_arr);
00647 }
00648
00649 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
00650 {
00651 int i;
00652 int size, pagesize;
00653
00654 pagesize = getpagesize();
00655 size = XPMEMBUFLEN+sizeof(sharedBufHeader);
00656 size = ((~(pagesize-1))&(size+pagesize-1));
00657
00658 *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
00659
00660 for(i=0;i<xpmemContext->nodesize;i++){
00661 if(i != xpmemContext->noderank) {
00662 __s64 segid;
00663 char fname[128];
00664 int fd;
00665 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+i);
00666 fd = open(fname, O_RDONLY);
00667 if (fd == -1) {
00668 CmiAbort("createShmObjectsAndSems failed");
00669 }
00670 lseek(fd, xpmemContext->noderank*sizeof(__s64), SEEK_SET);
00671 read(fd, &segid, sizeof(__s64*));
00672 close(fd);
00673 (*bufs)[i].segid = segid;
00674 attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
00675 memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
00676 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00677 #if XPMEM_OSSPINLOCK
00678 (*bufs)[i].header->lock = 0;
00679 #elif XPMEM_LOCK
00680 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00681 #endif
00682 }else{
00683 (*bufs)[i].header = NULL;
00684 (*bufs)[i].data = NULL;
00685 #if XPMEM_LOCK
00686 (*bufs)[i].mutex = NULL;
00687 #endif
00688 }
00689 }
00690 }
00691
00692 void removeXpmemFiles(void)
00693 {
00694 char fname[64];
00695 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
00696 unlink(fname);
00697 }
00698
00699 void freeSharedBuffers(void){
00700 int i;
00701 for(i= 0;i<xpmemContext->nodesize;i++){
00702 if(i != xpmemContext->noderank){
00703 #if XPMEM_LOCK
00704 sem_unlink(xpmemContext->sendBufNames[i]);
00705 sem_unlink(xpmemContext->recvBufNames[i]);
00706 #endif
00707 }
00708 }
00709 }
00710
00711 void tearDownSharedBuffers(void){
00712 int i;
00713 for(i= 0;i<xpmemContext->nodesize;i++){
00714 if(i != xpmemContext->noderank){
00715 #if XPMEM_LOCK
00716 sem_close(xpmemContext->recvBufs[i].mutex);
00717 sem_close(xpmemContext->sendBufs[i].mutex);
00718 sem_unlink(xpmemContext->sendBufNames[i]);
00719 sem_unlink(xpmemContext->recvBufNames[i]);
00720 xpmemContext->recvBufs[i].mutex = NULL;
00721 xpmemContext->sendBufs[i].mutex = NULL;
00722 #endif
00723 }
00724 }
00725 };
00726
00727 void initSendQ(XpmemSendQ *q,int size,int rank);
00728
00729 void initAllSendQs(void){
00730 int i=0;
00731 xpmemContext->sendQs = (XpmemSendQ **) malloc(sizeof(XpmemSendQ *)*xpmemContext->nodesize);
00732 for(i=0;i<xpmemContext->nodesize;i++){
00733 if(i != xpmemContext->noderank){
00734 xpmemContext->sendQs[i] = (XpmemSendQ *)calloc(1, sizeof(XpmemSendQ));
00735 initSendQ((xpmemContext->sendQs)[i],SENDQSTARTSIZE,i);
00736 }else{
00737 xpmemContext->sendQs[i] = NULL;
00738 }
00739 }
00740 };
00741
00742
00743
00744
00745
00746
00747
00748
00749 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
00750
00751 if(dstBuf->header->bytes+size <= XPMEMBUFLEN){
00753 dstBuf->header->count++;
00754 CmiMemcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
00755 dstBuf->header->bytes += size;
00756 MACHSTATE4(3,"Xpmem send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00757 CmiFree(msg);
00758 return 1;
00759 }
00760
00761
00762
00763
00764 pushSendQ(dstSendQ,msg,size,refcount);
00765 (*refcount)++;
00766 MACHSTATE3(3,"Xpmem send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00767 return 0;
00768 }
00769
00770 inline OutgoingMsgRec* popSendQ(XpmemSendQ *q);
00771
00772
00773
00774
00775
00776
00777 inline int flushSendQ(XpmemSendQ *dstSendQ){
00778 sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstSendQ->rank]);
00779 int count=dstSendQ->numEntries;
00780 int sent=0;
00781 while(count > 0){
00782 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
00783 (*ogm->refcount)--;
00784 MACHSTATE4(3,"Xpmem trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
00785 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
00786 if(ret==1){
00787 sent++;
00788 #if CMK_NET_VERSION
00789 GarbageCollectMsg(ogm);
00790 #endif
00791 }
00792 count--;
00793 }
00794 return sent;
00795 }
00796
00797 inline void emptyRecvBuf(sharedBufData *recvBuf);
00798
00799 inline void emptyAllRecvBufs(void){
00800 int i;
00801 for(i=0;i<xpmemContext->nodesize;i++){
00802 if(i != xpmemContext->noderank){
00803 sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
00804 if(recvBuf->header->count > 0){
00805
00806 #if XPMEM_STATS
00807 xpmemContext->lockRecvCount++;
00808 #endif
00809
00810 #if XPMEM_OSSPINLOCK
00811 if(! OSSpinLockTry(&recvBuf->header->lock)){
00812 #elif XPMEM_LOCK
00813 if(sem_trywait(recvBuf->mutex) < 0){
00814 #elif XPMEM_FENCE
00815 recvBuf->header->flagReceiver = 1;
00816 recvBuf->header->turn = SENDER;
00817 CmiMemoryReadFence(0,0);
00818 CmiMemoryWriteFence(0,0);
00819
00820 if((recvBuf->header->flagSender)){
00821 recvBuf->header->flagReceiver = 0;
00822 #endif
00823 }else{
00824
00825
00826 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00827 emptyRecvBuf(recvBuf);
00828
00829 #if XPMEM_OSSPINLOCK
00830 OSSpinLockUnlock(&recvBuf->header->lock);
00831 #elif XPMEM_LOCK
00832 sem_post(recvBuf->mutex);
00833 #elif XPMEM_FENCE
00834 CmiMemoryReadFence(0,0);
00835 CmiMemoryWriteFence(0,0);
00836 recvBuf->header->flagReceiver = 0;
00837 #endif
00838
00839 }
00840
00841 }
00842 }
00843 }
00844 };
00845
00846 inline void flushAllSendQs(void){
00847 int i;
00848 #if SENDQ_LIST
00849 int index_prev = -1;
00850
00851 i = sendQ_head_index;
00852 while (i!= -1) {
00853 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
00854 CmiAssert(i != xpmemContext->noderank);
00855 if(sendQ->numEntries > 0){
00856 #else
00857 for(i=0;i<xpmemContext->nodesize;i++) {
00858 if (i == xpmemContext->noderank) continue;
00859 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
00860 if(sendQ->numEntries > 0) {
00861 #endif
00862
00863 #if XPMEM_OSSPINLOCK
00864 if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
00865 #elif XPMEM_LOCK
00866 if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
00867 #elif XPMEM_FENCE
00868 xpmemContext->sendBufs[i].header->flagSender = 1;
00869 xpmemContext->sendBufs[i].header->turn = RECEIVER;
00870 CmiMemoryReadFence(0,0);
00871 CmiMemoryWriteFence(0,0);
00872 if(!(xpmemContext->sendBufs[i].header->flagReceiver && xpmemContext->sendBufs[i].header->turn == RECEIVER)){
00873 #endif
00874
00875 MACHSTATE1(3,"flushSendQ %d",i);
00876 flushSendQ(sendQ);
00877
00878 #if XPMEM_OSSPINLOCK
00879 OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
00880 #elif XPMEM_LOCK
00881 sem_post(xpmemContext->sendBufs[i].mutex);
00882 #elif XPMEM_FENCE
00883 CmiMemoryReadFence(0,0);
00884 CmiMemoryWriteFence(0,0);
00885 xpmemContext->sendBufs[i].header->flagSender = 0;
00886 #endif
00887 }else{
00888
00889 #if XPMEM_FENCE
00890 xpmemContext->sendBufs[i].header->flagSender = 0;
00891 #endif
00892
00893 }
00894 }
00895 #if SENDQ_LIST
00896 if (sendQ->numEntries == 0) {
00897 if (index_prev != -1)
00898 xpmemContext->sendQs[index_prev]->next = sendQ->next;
00899 else
00900 sendQ_head_index = sendQ->next;
00901 i = sendQ->next;
00902 sendQ->next = -2;
00903 }
00904 else {
00905 index_prev = i;
00906 i = sendQ->next;
00907 }
00908 #endif
00909 }
00910 };
00911
00912 void static inline handoverXpmemMessage(char *newmsg,int total_size,int rank,int broot);
00913
00914 void emptyRecvBuf(sharedBufData *recvBuf){
00915 int numMessages = recvBuf->header->count;
00916 int i;
00917
00918 char *ptr=recvBuf->data;
00919
00920 for(i=0;i<numMessages;i++){
00921 int size;
00922 int rank, srcpe, seqno, magic, i;
00923 unsigned int broot;
00924 char *msg = ptr;
00925 char *newMsg;
00926
00927 #if CMK_NET_VERSION
00928 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00929 size = CMI_MSG_SIZE(msg);
00930 #else
00931 size = CmiGetMsgSize(msg);
00932 #endif
00933
00934 newMsg = (char *)CmiAlloc(size);
00935 memcpy(newMsg,msg,size);
00936
00937 #if CMK_NET_VERSION
00938 handoverPxshmMessage(newMsg,size,rank,broot);
00939 #else
00940 handleOneRecvedMsg(size, newMsg);
00941 #endif
00942
00943 ptr += size;
00944
00945 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00946 }
00947 #if 1
00948 if(ptr - recvBuf->data != recvBuf->header->bytes){
00949 CmiPrintf("[%d] ptr - recvBuf->data %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
00950 }
00951 #endif
00952 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00953 recvBuf->header->count=0;
00954 recvBuf->header->bytes=0;
00955 }
00956
00957
00958 #if CMK_NET_VERSION
00959 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
00960 CmiAssert(rank == 0);
00961 #if CMK_BROADCAST_SPANNING_TREE
00962 if (rank == DGRAM_BROADCAST
00963 #if CMK_NODE_QUEUE_AVAILABLE
00964 || rank == DGRAM_NODEBROADCAST
00965 #endif
00966 ){
00967 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00968 }
00969 #elif CMK_BROADCAST_HYPERCUBE
00970 if (rank == DGRAM_BROADCAST
00971 #if CMK_NODE_QUEUE_AVAILABLE
00972 || rank == DGRAM_NODEBROADCAST
00973 #endif
00974 ){
00975 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00976 }
00977 #endif
00978
00979 switch (rank) {
00980 case DGRAM_BROADCAST: {
00981 CmiPushPE(0, newmsg);
00982 break;
00983 }
00984 default:
00985 {
00986
00987 CmiPushPE(rank, newmsg);
00988 }
00989 }
00990 }
00991 #endif
00992
00993
00994
00995
00996
00997
00998 void initSendQ(XpmemSendQ *q,int size,int rank){
00999 q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
01000
01001 q->size = size;
01002 q->numEntries = 0;
01003
01004 q->begin = 0;
01005 q->end = 0;
01006
01007 q->rank = rank;
01008 #if SENDQ_LIST
01009 q->next = -2;
01010 #endif
01011 }
01012
01013 void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount){
01014 if(q->numEntries == q->size){
01015
01016 OutgoingMsgRec *oldData = q->data;
01017 int newSize = q->size<<1;
01018 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
01019
01020 CmiAssert(q->begin == q->end);
01021
01022 CmiAssert(q->begin < q->size);
01023 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
01024
01025 if(q->end!=0){
01026 memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
01027 }
01028 free(oldData);
01029 q->begin = 0;
01030 q->end = q->size;
01031 q->size = newSize;
01032 }
01033 OutgoingMsgRec *omg = &q->data[q->end];
01034 omg->size = size;
01035 omg->data = msg;
01036 omg->refcount = refcount;
01037 (q->end)++;
01038 if(q->end >= q->size){
01039 q->end -= q->size;
01040 }
01041 q->numEntries++;
01042 }
01043
01044 OutgoingMsgRec * popSendQ(XpmemSendQ *q){
01045 OutgoingMsgRec * ret;
01046 if(0 == q->numEntries){
01047 return NULL;
01048 }
01049
01050 ret = &q->data[q->begin];
01051 (q->begin)++;
01052 if(q->begin >= q->size){
01053 q->begin -= q->size;
01054 }
01055
01056 q->numEntries--;
01057 return ret;
01058 }