00001
00018 #include <sys/types.h>
00019 #include <sys/mman.h>
00020 #include <unistd.h>
00021 #include <sys/stat.h>
00022 #include <fcntl.h>
00023 #include <errno.h>
00024 #include <signal.h>
00025
00026 #include "xpmem.h"
00027
00028
00029
00030
00031 #if XPMEM_OSSPINLOCK
00032 #include <libkern/OSAtomic.h>
00033 #elif XPMEM_LOCK
00034 #include <semaphore.h>
00035 #else
00036
00037 #define XPMEM_FENCE 1
00038 #endif
00039
00040 #define MEMDEBUG(x) //x
00041
00042 #define XPMEM_STATS 0
00043
00044 #define SENDQ_LIST 0
00045
00046
00047 #undef CmiMemoryWriteFence
00048 #if XPMEM_FENCE
00049 #ifdef POWER_PC
00050 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00051 #else
00052 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
00053
00054 #endif
00055 #else
00056 #undef CmiMemoryWriteFence
00057 #define CmiMemoryWriteFence(startPtr,nBytes)
00058 #endif
00059
00060 #undef CmiMemoryReadFence
00061 #if XPMEM_FENCE
00062 #ifdef POWER_PC
00063 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00064 #else
00065 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
00066
00067 #endif
00068 #else
00069 #define CmiMemoryReadFence(startPtr,nBytes)
00070 #endif
00071
00072 #if CMK_SMP
00073 #error "PXSHM can only be used in non-smp build of Charm++"
00074 #endif
00075
00076
00077
00078 enum entities {SENDER,RECEIVER};
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089 #define NAMESTRLEN 60
00090 #define PREFIXSTRLEN 50
00091
00092 static int XPMEMBUFLEN = (1024*1024*4);
00093 #define XPMEMMINSIZE (1*1024)
00094 #define XPMEMMAXSIZE (1024*1024)
00095
00096 static int SENDQSTARTSIZE = 256;
00097
00098
00100 typedef struct {
00101 int count;
00102 int bytes;
00103
00104 #if XPMEM_OSSPINLOCK
00105 OSSpinLock lock;
00106 #endif
00107
00108 #if XPMEM_FENCE
00109 volatile int flagSender;
00110 CmiMemorySMPSeparation_t pad1;
00111 volatile int flagReceiver;
00112 CmiMemorySMPSeparation_t pad2;
00113 volatile int turn;
00114 #endif
00115
00116 } sharedBufHeader;
00117
00118
00119 typedef struct {
00120 #if XPMEM_LOCK
00121 sem_t *mutex;
00122 #endif
00123 sharedBufHeader *header;
00124 char *data;
00125 __s64 segid;
00126 } sharedBufData;
00127
00128 typedef struct OutgoingMsgRec
00129 {
00130 char *data;
00131 int *refcount;
00132 int size;
00133 }
00134 OutgoingMsgRec;
00135
00136 typedef struct {
00137 int size;
00138 int begin;
00139 int end;
00140 int numEntries;
00141 int rank;
00142 #if SENDQ_LIST
00143 int next;
00144 #endif
00145 OutgoingMsgRec *data;
00146
00147 } XpmemSendQ;
00148
00149 typedef struct {
00150 int nodesize;
00151 int noderank;
00152 int nodestart,nodeend;
00153 char prefixStr[PREFIXSTRLEN];
00154 char **recvBufNames;
00155 char **sendBufNames;
00156
00157 sharedBufData *recvBufs;
00158 sharedBufData *sendBufs;
00159
00160 XpmemSendQ **sendQs;
00161
00162 #if XPMEM_STATS
00163 int sendCount;
00164 int validCheckCount;
00165 int lockRecvCount;
00166 double validCheckTime;
00167 double sendTime;
00168 double commServerTime;
00169 #endif
00170
00171 } XpmemContext;
00172
00173
00174 #if SENDQ_LIST
00175 static int sendQ_head_index = -1;
00176 #endif
00177
00178 XpmemContext *xpmemContext=NULL;
00179
00180
00181 void calculateNodeSizeAndRank(char **);
00182 void setupSharedBuffers();
00183 void initAllSendQs();
00184
00185 void CmiExitXpmem();
00186
00187 static void cleanupOnAllSigs(int signo)
00188 {
00189 CmiExitXpmem();
00190 }
00191
00192 static int xpmem_fd;
00193
00194
00195
00196
00197
00198 void CmiInitXpmem(char **argv){
00199 char input[32];
00200 char *env;
00201
00202 MACHSTATE(3,"CminitXpmem start");
00203 xpmemContext = (XpmemContext *)calloc(1,sizeof(XpmemContext));
00204
00205 #if CMK_NET_VERSION
00206 if(Cmi_charmrun_pid <= 0){
00207 CmiAbort("pxshm must be run with charmrun");
00208 }
00209 #endif
00210 calculateNodeSizeAndRank(argv);
00211
00212 MACHSTATE1(3,"CminitXpmem %d calculateNodeSizeAndRank",xpmemContext->nodesize);
00213
00214 if(xpmemContext->nodesize == 1) return;
00215
00216 env = getenv("CHARM_XPMEM_SIZE");
00217 if (env) {
00218 XPMEMBUFLEN = CmiReadSize(env);
00219 }
00220 SENDQSTARTSIZE = 32 * xpmemContext->nodesize;
00221
00222 if (_Cmi_mynode == 0)
00223 CmiPrintf("Charm++> xpmem enabled: %d cores per node, buffer size: %.1fMB\n", xpmemContext->nodesize, XPMEMBUFLEN/1024.0/1024.0);
00224
00225 xpmem_fd = open("/dev/xpmem", O_RDWR);
00226 if (xpmem_fd == -1) {
00227 CmiAbort("Opening /dev/xpmem");
00228 }
00229
00230 #if CMK_CRAYXE
00231 srand(getpid());
00232 int Cmi_charmrun_pid = rand();
00233 PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
00234 #elif !CMK_NET_VERSION
00235 #error "need a unique number"
00236 #endif
00237 snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_xpmem_%d",Cmi_charmrun_pid);
00238
00239 MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
00240
00241 setupSharedBuffers();
00242
00243 MACHSTATE2(3,"CminitXpmem %s %d setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
00244
00245 initAllSendQs();
00246
00247 MACHSTATE2(3,"CminitXpmem %s %d initAllSendQs",xpmemContext->prefixStr,xpmemContext->nodesize);
00248
00249 MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
00250
00251 signal(SIGSEGV, cleanupOnAllSigs);
00252 signal(SIGFPE, cleanupOnAllSigs);
00253 signal(SIGILL, cleanupOnAllSigs);
00254 signal(SIGTERM, cleanupOnAllSigs);
00255 signal(SIGABRT, cleanupOnAllSigs);
00256 signal(SIGQUIT, cleanupOnAllSigs);
00257 signal(SIGBUS, cleanupOnAllSigs);
00258 };
00259
00260
00261
00262
00263
00264 static int pxshm_freed = 0;
00265 void tearDownSharedBuffers();
00266 void freeSharedBuffers();
00267
00268 void CmiExitXpmem(){
00269 int i=0;
00270
00271 if (xpmemContext == NULL) return;
00272
00273 if(xpmemContext->nodesize != 1) {
00274
00275
00276 for(i=0;i<xpmemContext->nodesize;i++){
00277 if(i != xpmemContext->noderank){
00278 break;
00279 }
00280 }
00281 free(xpmemContext->recvBufNames[i]);
00282 free(xpmemContext->sendBufNames[i]);
00283
00284 free(xpmemContext->recvBufNames);
00285 free(xpmemContext->sendBufNames);
00286
00287 free(xpmemContext->recvBufs);
00288 free(xpmemContext->sendBufs);
00289
00290 }
00291 #if XPMEM_STATS
00292 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
00293 #endif
00294 free(xpmemContext);
00295 xpmemContext = NULL;
00296 }
00297
00298
00299
00300
00301
00302
00303 inline
00304 static int CmiValidXpmem(int node, int size){
00305 #if XPMEM_STATS
00306 xpmemContext->validCheckCount++;
00307 #endif
00308
00309
00310 return (node >= xpmemContext->nodestart && node <= xpmemContext->nodeend && size <= XPMEMMAXSIZE )? 1: 0;
00311 };
00312
00313
00314 inline int XpmemRank(int dstnode){
00315 return dstnode - xpmemContext->nodestart;
00316 }
00317
00318 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
00319 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
00320 inline int flushSendQ(XpmemSendQ *sendQ);
00321
00322 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
00323 return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
00324 }
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334 void CmiSendMessageXpmem(char *msg, int size, int dstnode, int *refcount)
00335 {
00336 #if XPMEM_STATS
00337 double _startSendTime = CmiWallTimer();
00338 #endif
00339
00340 LrtsPrepareEnvelope(msg, size);
00341
00342 int dstRank = XpmemRank(dstnode);
00343 MEMDEBUG(CmiMemoryCheck());
00344
00345 MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00346 MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00347
00348 CmiAssert(dstRank >=0 && dstRank != xpmemContext->noderank);
00349
00350 sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
00351 XpmemSendQ *sendQ = xpmemContext->sendQs[dstRank];
00352
00353 #if XPMEM_OSSPINLOCK
00354 if(! OSSpinLockTry(&dstBuf->header->lock)){
00355 #elif XPMEM_LOCK
00356 if(sem_trywait(dstBuf->mutex) < 0){
00357 #elif XPMEM_FENCE
00358 dstBuf->header->flagSender = 1;
00359 dstBuf->header->turn = RECEIVER;
00360 CmiMemoryReadFence(0,0);
00361 CmiMemoryWriteFence(0,0);
00362
00363 if(dstBuf->header->flagReceiver){
00364 dstBuf->header->flagSender = 0;
00365 #endif
00366
00368 #if SENDQ_LIST
00369 if (sendQ->numEntries == 0 && sendQ->next == -2) {
00370 sendQ->next = sendQ_head_index;
00371 sendQ_head_index = dstRank;
00372 }
00373 #endif
00374 pushSendQ(sendQ, msg, size, refcount);
00375 (*refcount)++;
00376 MEMDEBUG(CmiMemoryCheck());
00377 return;
00378 }else{
00379
00380
00381
00382
00383 if(sendQ->numEntries == 0){
00384
00385 int ret = sendMessage(msg,size,refcount,dstBuf,sendQ);
00386 #if SENDQ_LIST
00387 if (sendQ->numEntries > 0 && sendQ->next == -2)
00388 {
00389 sendQ->next = sendQ_head_index;
00390 sendQ_head_index = dstRank;
00391 }
00392 #endif
00393 MACHSTATE(3,"Xpmem Send succeeded immediately");
00394 }else{
00395 (*refcount)+=2;
00396 pushSendQ(sendQ,msg,size,refcount);
00397 MACHSTATE3(3,"Xpmem ogm %p pushed to sendQ length %d refcount %d",ogm,sendQ->numEntries,ogm->refcount);
00398 int sent = flushSendQ(sendQ);
00399 (*refcount)--;
00400 MACHSTATE1(3,"Xpmem flushSendQ sent %d messages",sent);
00401 }
00402
00403
00404 #if XPMEM_OSSPINLOCK
00405 OSSpinLockUnlock(&dstBuf->header->lock);
00406 #elif XPMEM_LOCK
00407 sem_post(dstBuf->mutex);
00408 #elif XPMEM_FENCE
00409 CmiMemoryReadFence(0,0);
00410 CmiMemoryWriteFence(0,0);
00411 dstBuf->header->flagSender = 0;
00412 #endif
00413 }
00414 #if XPMEM_STATS
00415 xpmemContext->sendCount ++;
00416 xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
00417 #endif
00418 MEMDEBUG(CmiMemoryCheck());
00419
00420 };
00421
00422 inline void emptyAllRecvBufs();
00423 inline void flushAllSendQs();
00424
00425
00426
00427
00428
00429 inline void CommunicationServerXpmem()
00430 {
00431 #if XPMEM_STATS
00432 double _startCommServerTime =CmiWallTimer();
00433 #endif
00434 MEMDEBUG(CmiMemoryCheck());
00435
00436 emptyAllRecvBufs();
00437 flushAllSendQs();
00438
00439 #if XPMEM_STATS
00440 xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00441 #endif
00442 MEMDEBUG(CmiMemoryCheck());
00443 };
00444
00445 static void CmiNotifyStillIdleXpmem(CmiIdleState *s){
00446 CommunicationServerXpmem();
00447 }
00448
00449
00450 static void CmiNotifyBeginIdleXpmem(CmiIdleState *s)
00451 {
00452 CmiNotifyStillIdle(s);
00453 }
00454
00455 void calculateNodeSizeAndRank(char **argv)
00456 {
00457 xpmemContext->nodesize=1;
00458 MACHSTATE(3,"calculateNodeSizeAndRank start");
00459
00460 CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
00461 MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
00462
00463 xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
00464
00465 MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",xpmemContext->noderank);
00466
00467 xpmemContext->nodestart = _Cmi_mynode -xpmemContext->noderank;
00468
00469 MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00470
00471 xpmemContext->nodeend = xpmemContext->nodestart + xpmemContext->nodesize -1;
00472
00473 if(xpmemContext->nodeend >= _Cmi_numnodes){
00474 xpmemContext->nodeend = _Cmi_numnodes-1;
00475 xpmemContext->nodesize = (xpmemContext->nodeend - xpmemContext->nodestart) +1;
00476 }
00477
00478 MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",xpmemContext->nodestart,xpmemContext->nodesize,xpmemContext->noderank);
00479 }
00480
00481 void allocBufNameStrings(char ***bufName);
00482 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames);
00483 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames);
00484 void removeXpmemFiles();
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497 void setupSharedBuffers(){
00498 int i=0;
00499
00500 allocBufNameStrings(&(xpmemContext->recvBufNames));
00501
00502 allocBufNameStrings((&xpmemContext->sendBufNames));
00503
00504 for(i=0;i<xpmemContext->nodesize;i++){
00505 if(i != xpmemContext->noderank){
00506 snprintf(xpmemContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,xpmemContext->noderank+xpmemContext->nodestart,i+xpmemContext->nodestart);
00507 MACHSTATE2(3,"recvBufName %s with rank %d",xpmemContext->recvBufNames[i],i)
00508 snprintf(xpmemContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,i+xpmemContext->nodestart,xpmemContext->noderank+xpmemContext->nodestart);
00509 MACHSTATE2(3,"sendBufName %s with rank %d",xpmemContext->sendBufNames[i],i);
00510 }
00511 }
00512
00513 createRecvXpmemAndSems(&(xpmemContext->recvBufs),xpmemContext->recvBufNames);
00514 CmiBarrier();
00515 createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
00516 CmiBarrier();
00517 removeXpmemFiles();
00518 freeSharedBuffers();
00519
00520 for(i=0;i<xpmemContext->nodesize;i++){
00521 if(i != xpmemContext->noderank){
00522
00523 xpmemContext->sendBufs[i].header->count = 0;
00524 xpmemContext->sendBufs[i].header->bytes = 0;
00525 }
00526 }
00527 }
00528
00529 void allocBufNameStrings(char ***bufName)
00530 {
00531 int i,count;
00532 int totalAlloc = sizeof(char)*NAMESTRLEN*(xpmemContext->nodesize-1);
00533 char *tmp = malloc(totalAlloc);
00534
00535 MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
00536
00537 *bufName = (char **)malloc(sizeof(char *)*xpmemContext->nodesize);
00538 for(i=0,count=0;i<xpmemContext->nodesize;i++){
00539 if(i != xpmemContext->noderank){
00540 (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
00541 count++;
00542 }else{
00543 (*bufName)[i] = NULL;
00544 }
00545 }
00546 }
00547
00548 __s64 createXpmemObject(int size,char **pPtr)
00549 {
00550 struct xpmem_cmd_make make_info;
00551 int ret;
00552
00553 *pPtr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
00554 if (*pPtr == MAP_FAILED) {
00555 perror("Creating mapping.");
00556 return -1;
00557 }
00558 make_info.vaddr = (__u64) *pPtr;
00559 make_info.size = size;
00560 make_info.permit_type = XPMEM_PERMIT_MODE;
00561 make_info.permit_value = (__u64) 0600;
00562 ret = ioctl(xpmem_fd, XPMEM_CMD_MAKE, &make_info);
00563 if (ret != 0) {
00564 perror("xpmem_make");
00565 CmiAbort("xpmem_make");
00566 }
00567 return make_info.segid;
00568 }
00569
00570 void attachXpmemObject(__s64 segid, int size, char **pPtr)
00571 {
00572 int ret;
00573 __s64 apid;
00574 struct xpmem_cmd_get get_info;
00575 struct xpmem_cmd_attach attach_info;
00576
00577 get_info.segid = segid;
00578 get_info.flags = XPMEM_RDWR;
00579 get_info.permit_type = XPMEM_PERMIT_MODE;
00580 get_info.permit_value = (__u64) NULL;
00581 ret = ioctl(xpmem_fd, XPMEM_CMD_GET, &get_info);
00582 if (ret != 0) {
00583 CmiAbort("xpmem_get");
00584 }
00585 apid = get_info.apid;
00586
00587 attach_info.apid = get_info.apid;
00588 attach_info.offset = 0;
00589 attach_info.size = size;
00590 attach_info.vaddr = (__u64) NULL;
00591 attach_info.fd = xpmem_fd;
00592 attach_info.flags = 0;
00593
00594 ret = ioctl(xpmem_fd, XPMEM_CMD_ATTACH, &attach_info);
00595 if (ret != 0) {
00596 CmiAbort("xpmem_attach");
00597 }
00598
00599 *pPtr = (void *)attach_info.vaddr;
00600 }
00601
00602 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
00603 int i=0;
00604 __s64 *segid_arr;
00605 int size, pagesize = getpagesize();
00606
00607 *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
00608 segid_arr = malloc(sizeof(__s64)*xpmemContext->nodesize);
00609
00610 size = XPMEMBUFLEN+sizeof(sharedBufHeader);
00611 size = ((~(pagesize-1))&(size+pagesize-1));
00612
00613 for(i=0;i<xpmemContext->nodesize;i++){
00614 if(i != xpmemContext->noderank) {
00615 (*bufs)[i].segid = segid_arr[i] = createXpmemObject(size,(char **)&((*bufs)[i].header));
00616 memset(((*bufs)[i].header), 0, size);
00617 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00618 #if XPMEM_OSSPINLOCK
00619 (*bufs)[i].header->lock = 0;
00620 #elif XPMEM_LOCK
00621 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00622 #endif
00623 }else{
00624 (*bufs)[i].header = NULL;
00625 (*bufs)[i].data = NULL;
00626 #if XPMEM_LOCK
00627 (*bufs)[i].mutex = NULL;
00628 #endif
00629 }
00630 }
00631
00632 int fd;
00633 char fname[128];
00634 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
00635 fd = open(fname, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
00636 if (fd == -1) {
00637 CmiAbort("createShmObjectsAndSems failed");
00638 }
00639 write(fd, segid_arr, sizeof(__s64*)*xpmemContext->nodesize);
00640 close(fd);
00641 free(segid_arr);
00642 }
00643
00644 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
00645 {
00646 int i;
00647 int size, pagesize;
00648
00649 pagesize = getpagesize();
00650 size = XPMEMBUFLEN+sizeof(sharedBufHeader);
00651 size = ((~(pagesize-1))&(size+pagesize-1));
00652
00653 *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
00654
00655 for(i=0;i<xpmemContext->nodesize;i++){
00656 if(i != xpmemContext->noderank) {
00657 __s64 segid;
00658 char fname[128];
00659 int fd;
00660 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+i);
00661 fd = open(fname, O_RDONLY);
00662 if (fd == -1) {
00663 CmiAbort("createShmObjectsAndSems failed");
00664 }
00665 lseek(fd, xpmemContext->noderank*sizeof(__s64), SEEK_SET);
00666 read(fd, &segid, sizeof(__s64*));
00667 close(fd);
00668 (*bufs)[i].segid = segid;
00669 attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
00670 memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
00671 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00672 #if XPMEM_OSSPINLOCK
00673 (*bufs)[i].header->lock = 0;
00674 #elif XPMEM_LOCK
00675 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00676 #endif
00677 }else{
00678 (*bufs)[i].header = NULL;
00679 (*bufs)[i].data = NULL;
00680 #if XPMEM_LOCK
00681 (*bufs)[i].mutex = NULL;
00682 #endif
00683 }
00684 }
00685 }
00686
00687 void removeXpmemFiles()
00688 {
00689 char fname[64];
00690 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
00691 unlink(fname);
00692 }
00693
00694 void freeSharedBuffers(){
00695 int i;
00696 for(i= 0;i<xpmemContext->nodesize;i++){
00697 if(i != xpmemContext->noderank){
00698 #if XPMEM_LOCK
00699 sem_unlink(xpmemContext->sendBufNames[i]);
00700 sem_unlink(xpmemContext->recvBufNames[i]);
00701 #endif
00702 }
00703 }
00704 }
00705
00706 void tearDownSharedBuffers(){
00707 int i;
00708 for(i= 0;i<xpmemContext->nodesize;i++){
00709 if(i != xpmemContext->noderank){
00710 #if XPMEM_LOCK
00711 sem_close(xpmemContext->recvBufs[i].mutex);
00712 sem_close(xpmemContext->sendBufs[i].mutex);
00713 sem_unlink(xpmemContext->sendBufNames[i]);
00714 sem_unlink(xpmemContext->recvBufNames[i]);
00715 xpmemContext->recvBufs[i].mutex = NULL;
00716 xpmemContext->sendBufs[i].mutex = NULL;
00717 #endif
00718 }
00719 }
00720 };
00721
00722 void initSendQ(XpmemSendQ *q,int size,int rank);
00723
00724 void initAllSendQs(){
00725 int i=0;
00726 xpmemContext->sendQs = (XpmemSendQ **) malloc(sizeof(XpmemSendQ *)*xpmemContext->nodesize);
00727 for(i=0;i<xpmemContext->nodesize;i++){
00728 if(i != xpmemContext->noderank){
00729 xpmemContext->sendQs[i] = (XpmemSendQ *)calloc(1, sizeof(XpmemSendQ));
00730 initSendQ((xpmemContext->sendQs)[i],SENDQSTARTSIZE,i);
00731 }else{
00732 xpmemContext->sendQs[i] = NULL;
00733 }
00734 }
00735 };
00736
00737
00738
00739
00740
00741
00742
00743
00744 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
00745
00746 if(dstBuf->header->bytes+size <= XPMEMBUFLEN){
00748 dstBuf->header->count++;
00749 CmiMemcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
00750 dstBuf->header->bytes += size;
00751 MACHSTATE4(3,"Xpmem send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00752 CmiFree(msg);
00753 return 1;
00754 }
00755
00756
00757
00758
00759 pushSendQ(dstSendQ,msg,size,refcount);
00760 (*refcount)++;
00761 MACHSTATE3(3,"Xpmem send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00762 return 0;
00763 }
00764
00765 inline OutgoingMsgRec* popSendQ(XpmemSendQ *q);
00766
00767
00768
00769
00770
00771
00772 inline int flushSendQ(XpmemSendQ *dstSendQ){
00773 sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstSendQ->rank]);
00774 int count=dstSendQ->numEntries;
00775 int sent=0;
00776 while(count > 0){
00777 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
00778 (*ogm->refcount)--;
00779 MACHSTATE4(3,"Xpmem trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
00780 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
00781 if(ret==1){
00782 sent++;
00783 #if CMK_NET_VERSION
00784 GarbageCollectMsg(ogm);
00785 #endif
00786 }
00787 count--;
00788 }
00789 return sent;
00790 }
00791
00792 inline void emptyRecvBuf(sharedBufData *recvBuf);
00793
00794 inline void emptyAllRecvBufs(){
00795 int i;
00796 for(i=0;i<xpmemContext->nodesize;i++){
00797 if(i != xpmemContext->noderank){
00798 sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
00799 if(recvBuf->header->count > 0){
00800
00801 #if XPMEM_STATS
00802 xpmemContext->lockRecvCount++;
00803 #endif
00804
00805 #if XPMEM_OSSPINLOCK
00806 if(! OSSpinLockTry(&recvBuf->header->lock)){
00807 #elif XPMEM_LOCK
00808 if(sem_trywait(recvBuf->mutex) < 0){
00809 #elif XPMEM_FENCE
00810 recvBuf->header->flagReceiver = 1;
00811 recvBuf->header->turn = SENDER;
00812 CmiMemoryReadFence(0,0);
00813 CmiMemoryWriteFence(0,0);
00814
00815 if((recvBuf->header->flagSender)){
00816 recvBuf->header->flagReceiver = 0;
00817 #endif
00818 }else{
00819
00820
00821 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00822 emptyRecvBuf(recvBuf);
00823
00824 #if XPMEM_OSSPINLOCK
00825 OSSpinLockUnlock(&recvBuf->header->lock);
00826 #elif XPMEM_LOCK
00827 sem_post(recvBuf->mutex);
00828 #elif XPMEM_FENCE
00829 CmiMemoryReadFence(0,0);
00830 CmiMemoryWriteFence(0,0);
00831 recvBuf->header->flagReceiver = 0;
00832 #endif
00833
00834 }
00835
00836 }
00837 }
00838 }
00839 };
00840
00841 inline void flushAllSendQs(){
00842 int i;
00843 #if SENDQ_LIST
00844 int index_prev = -1;
00845
00846 i = sendQ_head_index;
00847 while (i!= -1) {
00848 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
00849 CmiAssert(i != xpmemContext->noderank);
00850 if(sendQ->numEntries > 0){
00851 #else
00852 for(i=0;i<xpmemContext->nodesize;i++) {
00853 if (i == xpmemContext->noderank) continue;
00854 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
00855 if(sendQ->numEntries > 0) {
00856 #endif
00857
00858 #if XPMEM_OSSPINLOCK
00859 if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
00860 #elif XPMEM_LOCK
00861 if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
00862 #elif XPMEM_FENCE
00863 xpmemContext->sendBufs[i].header->flagSender = 1;
00864 xpmemContext->sendBufs[i].header->turn = RECEIVER;
00865 CmiMemoryReadFence(0,0);
00866 CmiMemoryWriteFence(0,0);
00867 if(!(xpmemContext->sendBufs[i].header->flagReceiver && xpmemContext->sendBufs[i].header->turn == RECEIVER)){
00868 #endif
00869
00870 MACHSTATE1(3,"flushSendQ %d",i);
00871 flushSendQ(sendQ);
00872
00873 #if XPMEM_OSSPINLOCK
00874 OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
00875 #elif XPMEM_LOCK
00876 sem_post(xpmemContext->sendBufs[i].mutex);
00877 #elif XPMEM_FENCE
00878 CmiMemoryReadFence(0,0);
00879 CmiMemoryWriteFence(0,0);
00880 xpmemContext->sendBufs[i].header->flagSender = 0;
00881 #endif
00882 }else{
00883
00884 #if XPMEM_FENCE
00885 xpmemContext->sendBufs[i].header->flagSender = 0;
00886 #endif
00887
00888 }
00889 }
00890 #if SENDQ_LIST
00891 if (sendQ->numEntries == 0) {
00892 if (index_prev != -1)
00893 xpmemContext->sendQs[index_prev]->next = sendQ->next;
00894 else
00895 sendQ_head_index = sendQ->next;
00896 i = sendQ->next;
00897 sendQ->next = -2;
00898 }
00899 else {
00900 index_prev = i;
00901 i = sendQ->next;
00902 }
00903 #endif
00904 }
00905 };
00906
00907 void static inline handoverXpmemMessage(char *newmsg,int total_size,int rank,int broot);
00908
00909 void emptyRecvBuf(sharedBufData *recvBuf){
00910 int numMessages = recvBuf->header->count;
00911 int i;
00912
00913 char *ptr=recvBuf->data;
00914
00915 for(i=0;i<numMessages;i++){
00916 int size;
00917 int rank, srcpe, seqno, magic, i;
00918 unsigned int broot;
00919 char *msg = ptr;
00920 char *newMsg;
00921
00922 #if CMK_NET_VERSION
00923 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00924 size = CmiMsgHeaderGetLength(msg);
00925 #else
00926 size = CmiGetMsgSize(msg);
00927 #endif
00928
00929 newMsg = (char *)CmiAlloc(size);
00930 memcpy(newMsg,msg,size);
00931
00932 #if CMK_NET_VERSION
00933 handoverPxshmMessage(newMsg,size,rank,broot);
00934 #else
00935 handleOneRecvedMsg(size, newMsg);
00936 #endif
00937
00938 ptr += size;
00939
00940 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00941 }
00942 #if 1
00943 if(ptr - recvBuf->data != recvBuf->header->bytes){
00944 CmiPrintf("[%d] ptr - recvBuf->data %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
00945 }
00946 #endif
00947 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00948 recvBuf->header->count=0;
00949 recvBuf->header->bytes=0;
00950 }
00951
00952
00953 #if CMK_NET_VERSION
00954 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
00955 CmiAssert(rank == 0);
00956 #if CMK_BROADCAST_SPANNING_TREE
00957 if (rank == DGRAM_BROADCAST
00958 #if CMK_NODE_QUEUE_AVAILABLE
00959 || rank == DGRAM_NODEBROADCAST
00960 #endif
00961 ){
00962 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00963 }
00964 #elif CMK_BROADCAST_HYPERCUBE
00965 if (rank == DGRAM_BROADCAST
00966 #if CMK_NODE_QUEUE_AVAILABLE
00967 || rank == DGRAM_NODEBROADCAST
00968 #endif
00969 ){
00970 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00971 }
00972 #endif
00973
00974 switch (rank) {
00975 case DGRAM_BROADCAST: {
00976 CmiPushPE(0, newmsg);
00977 break;
00978 }
00979 default:
00980 {
00981
00982 CmiPushPE(rank, newmsg);
00983 }
00984 }
00985 }
00986 #endif
00987
00988
00989
00990
00991
00992
00993 void initSendQ(XpmemSendQ *q,int size,int rank){
00994 q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
00995
00996 q->size = size;
00997 q->numEntries = 0;
00998
00999 q->begin = 0;
01000 q->end = 0;
01001
01002 q->rank = rank;
01003 #if SENDQ_LIST
01004 q->next = -2;
01005 #endif
01006 }
01007
01008 void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount){
01009 if(q->numEntries == q->size){
01010
01011 OutgoingMsgRec *oldData = q->data;
01012 int newSize = q->size<<1;
01013 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
01014
01015 CmiAssert(q->begin == q->end);
01016
01017 CmiAssert(q->begin < q->size);
01018 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
01019
01020 if(q->end!=0){
01021 memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
01022 }
01023 free(oldData);
01024 q->begin = 0;
01025 q->end = q->size;
01026 q->size = newSize;
01027 }
01028 OutgoingMsgRec *omg = &q->data[q->end];
01029 omg->size = size;
01030 omg->data = msg;
01031 omg->refcount = refcount;
01032 (q->end)++;
01033 if(q->end >= q->size){
01034 q->end -= q->size;
01035 }
01036 q->numEntries++;
01037 }
01038
01039 OutgoingMsgRec * popSendQ(XpmemSendQ *q){
01040 OutgoingMsgRec * ret;
01041 if(0 == q->numEntries){
01042 return NULL;
01043 }
01044
01045 ret = &q->data[q->begin];
01046 (q->begin)++;
01047 if(q->begin >= q->size){
01048 q->begin -= q->size;
01049 }
01050
01051 q->numEntries--;
01052 return ret;
01053 }