00001
00030 #include <sys/types.h>
00031 #include <sys/mman.h>
00032 #include <unistd.h>
00033 #include <sys/stat.h>
00034 #include <fcntl.h>
00035 #include <errno.h>
00036 #include <signal.h>
00037
00038
00039
00040
00041 #if PXSHM_OSSPINLOCK
00042 #include <libkern/OSAtomic.h>
00043 #elif PXSHM_LOCK
00044 #include <semaphore.h>
00045 #else
00046
00047 #define PXSHM_FENCE 1
00048 #endif
00049
00050 #define MEMDEBUG(x) // x
00051
00052 #define PXSHM_STATS 0
00053
00054 #define SENDQ_LIST 0
00055
00056
00057 #if !CMK_SMP
00058 #if PXSHM_FENCE
00059
00060 #undef CmiMemoryReadFence
00061 #undef CmiMemoryWriteFence
00062
00063 #if CMK_C_SYNC_SYNCHRONIZE_PRIMITIVE
00064 #define CmiMemoryReadFence() __sync_synchronize()
00065 #define CmiMemoryWriteFence() __sync_synchronize()
00066 #else
00067 #error Cannot build PXSHM with non-SMP on a machine with no ASM for atomic fence
00068 #endif
00069
00070 #endif // end of PXSHM_FENCE
00071 #endif // end of CMK_SMP
00072
00073
00074
00075 enum entities { SENDER, RECEIVER };
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 #define NAMESTRLEN 60
00087 #define PREFIXSTRLEN 50
00088
00089 static int SHMBUFLEN = (1024 * 1024 * 4);
00090 static int SHMMAXSIZE = (1024 * 1024);
00091
00092 static int SENDQSTARTSIZE = 256;
00093
00096 typedef struct {
00097 int count;
00098 int bytes;
00099
00100 #if PXSHM_OSSPINLOCK
00101 OSSpinLock lock;
00102 #endif
00103
00104 #if PXSHM_FENCE
00105 volatile int flagSender;
00106 char pad1[CMI_CACHE_LINE_SIZE - sizeof(volatile int)];
00107 volatile int flagReceiver;
00108 char pad2[CMI_CACHE_LINE_SIZE - sizeof(volatile int)];
00109 volatile int turn;
00110 #endif
00111
00112 } sharedBufHeader;
00113
00114 typedef struct {
00115 #if PXSHM_LOCK
00116 sem_t *mutex;
00117 #endif
00118 sharedBufHeader *header;
00119 char *data;
00120 } sharedBufData;
00121
00122 typedef struct OutgoingMsgRec {
00123 char *data;
00124 int *refcount;
00125 int size;
00126 } OutgoingMsgRec;
00127
00128 typedef struct {
00129 int size;
00130 int begin;
00131 int end;
00132 int numEntries;
00133 int rank;
00134 #if SENDQ_LIST
00135 int next;
00136 #endif
00137 OutgoingMsgRec *data;
00138
00139 } PxshmSendQ;
00140
00141 typedef struct {
00142 int nodesize;
00143 int noderank;
00144 int nodestart, nodeend;
00145 char prefixStr[PREFIXSTRLEN];
00146 char **recvBufNames;
00147 char **sendBufNames;
00148
00149 sharedBufData *recvBufs;
00150 sharedBufData *sendBufs;
00151
00152 PxshmSendQ **sendQs;
00153
00154 #if PXSHM_STATS
00155 int sendCount;
00156 int validCheckCount;
00157 int lockRecvCount;
00158 double validCheckTime;
00159 double sendTime;
00160 double commServerTime;
00161 #endif
00162
00163 } PxshmContext;
00164
00165 #if SENDQ_LIST
00166 static int sendQ_head_index = -1;
00167 #endif
00168
00169 PxshmContext *pxshmContext = NULL;
00170
00171 void calculateNodeSizeAndRank(char **);
00172 void setupSharedBuffers(void);
00173 void initAllSendQs(void);
00174
00175 void CmiExitPxshm(void);
00176
00177 static void cleanupOnAllSigs(int signo) { CmiExitPxshm(); }
00178
00179
00180
00181
00182
00183 void CmiInitPxshm(char **argv)
00184 {
00185 char *env;
00186 MACHSTATE(3, "CminitPxshm start");
00187
00188 pxshmContext = (PxshmContext *) calloc(1, sizeof(PxshmContext));
00189
00190 CmiDeprecateArgInt(argv, "+nodesize", "Number of cores in this node",
00191 "Charmrun> Deprecation warning: charmrun now figures "
00192 "out the nodesize on its own.");
00193
00194 calculateNodeSizeAndRank(argv);
00195 if (pxshmContext->nodesize == 1)
00196 return;
00197
00198 MACHSTATE1(3, "CminitPxshm %d calculateNodeSizeAndRank",
00199 pxshmContext->nodesize);
00200
00201 env = getenv("CHARM_PXSHM_POOL_SIZE");
00202 if (env) {
00203 SHMBUFLEN = CmiReadSize(env);
00204 }
00205 env = getenv("CHARM_PXSHM_MESSAGE_MAX_SIZE");
00206 if (env) {
00207 SHMMAXSIZE = CmiReadSize(env);
00208 }
00209 if (SHMMAXSIZE > SHMBUFLEN)
00210 CmiAbort("Error> Pxshm pool size is set too small in env variable "
00211 "CHARM_PXSHM_POOL_SIZE");
00212
00213 SENDQSTARTSIZE = 32 * pxshmContext->nodesize;
00214
00215 if (_Cmi_mynode == 0)
00216 printf("Charm++> pxshm enabled: %d cores per node, buffer size: %.1fMB\n",
00217 pxshmContext->nodesize, SHMBUFLEN / 1024.0 / 1024.0);
00218
00219 #if CMK_CRAYXE || CMK_CRAYXC
00220 srand(getpid());
00221 int Cmi_charmrun_pid = rand();
00222 PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
00223 snprintf(&(pxshmContext->prefixStr[0]), PREFIXSTRLEN - 1, "charm_pxshm_%d",
00224 Cmi_charmrun_pid);
00225 #elif CMK_NET_VERSION
00226 srand(Cmi_charmrun_pid);
00227 snprintf(&(pxshmContext->prefixStr[0]), PREFIXSTRLEN - 1, "charm_pxshm_%d",
00228 rand());
00229 #else
00230 #error PXSHM does not support the machine layer you are building on; please contact Charm++ develpers to report this.
00231 #endif
00232
00233 MACHSTATE2(3, "CminitPxshm %s %d pre setupSharedBuffers",
00234 pxshmContext->prefixStr, pxshmContext->nodesize);
00235
00236 setupSharedBuffers();
00237
00238 MACHSTATE2(3, "CminitPxshm %s %d setupSharedBuffers", pxshmContext->prefixStr,
00239 pxshmContext->nodesize);
00240
00241 initAllSendQs();
00242
00243 MACHSTATE2(3, "CminitPxshm %s %d initAllSendQs", pxshmContext->prefixStr,
00244 pxshmContext->nodesize);
00245
00246 MACHSTATE2(3, "CminitPxshm %s %d done", pxshmContext->prefixStr,
00247 pxshmContext->nodesize);
00248
00249 #if PXSHM_STATS
00250 pxshmContext->sendCount = 0;
00251 pxshmContext->sendTime = 0.0;
00252 pxshmContext->validCheckCount = 0;
00253 pxshmContext->validCheckTime = 0.0;
00254 pxshmContext->commServerTime = 0;
00255 pxshmContext->lockRecvCount = 0;
00256 #endif
00257 struct sigaction sa;
00258 sa.sa_handler = cleanupOnAllSigs;
00259 sigemptyset(&sa.sa_mask);
00260 sa.sa_flags = SA_RESTART;
00261
00262 sigaction(SIGSEGV, &sa, NULL);
00263 sigaction(SIGFPE, &sa, NULL);
00264 sigaction(SIGILL, &sa, NULL);
00265 sigaction(SIGTERM, &sa, NULL);
00266 sigaction(SIGABRT, &sa, NULL);
00267 sigaction(SIGQUIT, &sa, NULL);
00268 sigaction(SIGBUS, &sa, NULL);
00269 sigaction(SIGINT, &sa, NULL);
00270 sigaction(SIGTRAP, &sa, NULL);
00271 #if 0
00272 char name[64];
00273 gethostname(name,64);
00274 printf("[%d] name: %s\n", myrank, name);
00275 #endif
00276 };
00277
00278
00279
00280
00281
00282 static int pxshm_freed = 0;
00283 void tearDownSharedBuffers(void);
00284 void freeSharedBuffers(void);
00285
00286 void CmiExitPxshm(void)
00287 {
00288 if (pxshmContext == NULL)
00289 return;
00290 if (pxshmContext->nodesize != 1) {
00291 int i;
00292 if (!pxshm_freed)
00293 tearDownSharedBuffers();
00294
00295 for (i = 0; i < pxshmContext->nodesize; i++) {
00296 if (i != pxshmContext->noderank) {
00297 break;
00298 }
00299 }
00300 free(pxshmContext->recvBufNames[i]);
00301 free(pxshmContext->sendBufNames[i]);
00302
00303 free(pxshmContext->recvBufNames);
00304 free(pxshmContext->sendBufNames);
00305
00306 free(pxshmContext->recvBufs);
00307 free(pxshmContext->sendBufs);
00308 }
00309 #if PXSHM_STATS
00310 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime "
00311 "%.6lf commServerTime %6lf lockRecvCount %d \n",
00312 _Cmi_mynode, pxshmContext->sendCount, pxshmContext->sendTime,
00313 pxshmContext->validCheckCount, pxshmContext->validCheckTime,
00314 pxshmContext->commServerTime, pxshmContext->lockRecvCount);
00315 #endif
00316 free(pxshmContext);
00317 pxshmContext = NULL;
00318 }
00319
00320
00321
00322
00323
00324
00325 inline static int CmiValidPxshm(int node, int size)
00326 {
00327 #if PXSHM_STATS
00328 pxshmContext->validCheckCount++;
00329 #endif
00330
00331
00332
00333
00334
00335
00336
00337 return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend &&
00338 size <= SHMMAXSIZE)
00339 ? 1
00340 : 0;
00341 };
00342
00343 int PxshmRank(int dstnode) { return dstnode - pxshmContext->nodestart; }
00344
00345 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
00346 inline int sendMessage(char *msg, int size, int *refcount,
00347 sharedBufData *dstBuf, PxshmSendQ *dstSendQ);
00348 int flushSendQ(PxshmSendQ *q);
00349
00350 int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,
00351 PxshmSendQ *dstSendQ)
00352 {
00353 return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
00354 }
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364 void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
00365 {
00366 #if PXSHM_STATS
00367 double _startSendTime = CmiWallTimer();
00368 #endif
00369
00370 LrtsPrepareEnvelope(msg, size);
00371
00372 int dstRank = PxshmRank(dstnode);
00373 MEMDEBUG(CmiMemoryCheck());
00374
00375
00376
00377
00378
00379
00380
00381
00382 CmiAssert(dstRank >= 0 && dstRank != pxshmContext->noderank);
00383
00384 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00385 PxshmSendQ *sendQ = pxshmContext->sendQs[dstRank];
00386
00387 #if PXSHM_OSSPINLOCK
00388 if (!OSSpinLockTry(&dstBuf->header->lock)) {
00389 #elif PXSHM_LOCK
00390 if (sem_wait(dstBuf->mutex) != 0) {
00391
00392 #elif PXSHM_FENCE
00393 dstBuf->header->flagSender = 1;
00394 dstBuf->header->turn = RECEIVER;
00395 CmiMemoryReadFence();
00396 CmiMemoryWriteFence();
00397
00398 if (dstBuf->header->flagReceiver) {
00399 dstBuf->header->flagSender = 0;
00400 #endif
00401
00403 #if SENDQ_LIST
00404 if (sendQ->numEntries == 0 && sendQ->next == -2) {
00405 sendQ->next = sendQ_head_index;
00406 sendQ_head_index = dstRank;
00407 }
00408 #endif
00409 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
00410 (*refcount)++;
00411 MEMDEBUG(CmiMemoryCheck());
00412 return;
00413 } else {
00414
00415
00416
00417
00418
00419 if (pxshmContext->sendQs[dstRank]->numEntries == 0) {
00420
00421 int ret = sendMessage(msg, size, refcount, dstBuf,
00422 pxshmContext->sendQs[dstRank]);
00423 #if SENDQ_LIST
00424 if (sendQ->numEntries > 0 && sendQ->next == -2) {
00425 sendQ->next = sendQ_head_index;
00426 sendQ_head_index = dstRank;
00427 }
00428 #endif
00429 MACHSTATE(3, "Pxshm Send succeeded immediately");
00430 } else {
00431 (*refcount) +=
00432 2;
00433 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
00434
00435
00436
00437 int sent = flushSendQ(sendQ);
00438 (*refcount)--;
00439
00440 MACHSTATE1(3, "Pxshm flushSendQ sent %d messages", sent);
00441 }
00442
00443
00444 #if PXSHM_OSSPINLOCK
00445 OSSpinLockUnlock(&dstBuf->header->lock);
00446 #elif PXSHM_LOCK
00447 sem_post(dstBuf->mutex);
00448 #elif PXSHM_FENCE
00449 CmiMemoryReadFence();
00450 CmiMemoryWriteFence();
00451 dstBuf->header->flagSender = 0;
00452 #endif
00453 }
00454 #if PXSHM_STATS
00455 pxshmContext->sendCount++;
00456 pxshmContext->sendTime += (CmiWallTimer() - _startSendTime);
00457 #endif
00458 MEMDEBUG(CmiMemoryCheck());
00459 };
00460
00461 void emptyAllRecvBufs(void);
00462 void flushAllSendQs(void);
00463
00464
00465
00466
00467
00468 void CommunicationServerPxshm(void)
00469 {
00470
00471 #if PXSHM_STATS
00472 double _startCommServerTime = CmiWallTimer();
00473 #endif
00474
00475 MEMDEBUG(CmiMemoryCheck());
00476 emptyAllRecvBufs();
00477 flushAllSendQs();
00478
00479 #if PXSHM_STATS
00480 pxshmContext->commServerTime += (CmiWallTimer() - _startCommServerTime);
00481 #endif
00482
00483 MEMDEBUG(CmiMemoryCheck());
00484 };
00485
00486 static void CmiNotifyStillIdlePxshm(CmiIdleState *s)
00487 {
00488 CommunicationServerPxshm();
00489 }
00490
00491 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s) { CmiNotifyStillIdle(s); }
00492
00493 void calculateNodeSizeAndRank(char **argv)
00494 {
00495 pxshmContext->nodesize = 1;
00496 MACHSTATE(3, "calculateNodeSizeAndRank start");
00497
00498
00499
00500 #if CMK_CRAYXE || CMK_CRAYXC
00501
00502
00503
00504
00505 pxshmContext->nodesize =
00506 (CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(CmiMyNode())) +
00507 CmiMyNodeSize() - 1) /
00508 CmiMyNodeSize();
00509 #else
00510 pxshmContext->nodesize = _Cmi_myphysnode_numprocesses;
00511 #endif
00512 MACHSTATE1(3, "calculateNodeSizeAndRank argintdesc %d",
00513 pxshmContext->nodesize);
00514
00515 pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
00516
00517 MACHSTATE1(3, "calculateNodeSizeAndRank noderank %d", pxshmContext->noderank);
00518
00519 pxshmContext->nodestart = _Cmi_mynode - pxshmContext->noderank;
00520
00521 MACHSTATE(3, "calculateNodeSizeAndRank nodestart ");
00522
00523 pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize - 1;
00524
00525 if (pxshmContext->nodeend >= _Cmi_numnodes) {
00526 pxshmContext->nodeend = _Cmi_numnodes - 1;
00527 pxshmContext->nodesize =
00528 (pxshmContext->nodeend - pxshmContext->nodestart) + 1;
00529 }
00530
00531 MACHSTATE3(3, "calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",
00532 pxshmContext->nodestart, pxshmContext->nodesize,
00533 pxshmContext->noderank);
00534 }
00535
00536 void allocBufNameStrings(char ***bufName);
00537 void createShmObjectsAndSems(sharedBufData **bufs, char **bufNames);
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 void setupSharedBuffers(void)
00553 {
00554 int i = 0;
00555
00556 allocBufNameStrings(&(pxshmContext->recvBufNames));
00557
00558 MACHSTATE(3, "allocBufNameStrings for recvBufNames done");
00559 MEMDEBUG(CmiMemoryCheck());
00560
00561 allocBufNameStrings((&pxshmContext->sendBufNames));
00562
00563 MACHSTATE(3, "allocBufNameStrings for sendBufNames done");
00564
00565 for (i = 0; i < pxshmContext->nodesize; i++) {
00566 if (i != pxshmContext->noderank) {
00567 snprintf(pxshmContext->recvBufNames[i], NAMESTRLEN - 1, "%s_%d_%d",
00568 pxshmContext->prefixStr,
00569 pxshmContext->noderank + pxshmContext->nodestart,
00570 i + pxshmContext->nodestart);
00571 MACHSTATE2(3, "recvBufName %s with rank %d",
00572 pxshmContext->recvBufNames[i], i)
00573 snprintf(pxshmContext->sendBufNames[i], NAMESTRLEN - 1, "%s_%d_%d",
00574 pxshmContext->prefixStr, i + pxshmContext->nodestart,
00575 pxshmContext->noderank + pxshmContext->nodestart);
00576 MACHSTATE2(3, "sendBufName %s with rank %d",
00577 pxshmContext->sendBufNames[i], i);
00578 }
00579 }
00580
00581 createShmObjectsAndSems(&(pxshmContext->recvBufs),
00582 pxshmContext->recvBufNames);
00583 createShmObjectsAndSems(&(pxshmContext->sendBufs),
00584 pxshmContext->sendBufNames);
00585
00586 for (i = 0; i < pxshmContext->nodesize; i++) {
00587 if (i != pxshmContext->noderank) {
00588
00589 pxshmContext->sendBufs[i].header->count = 0;
00590 pxshmContext->sendBufs[i].header->bytes = 0;
00591 }
00592 }
00593
00594 return LrtsBarrier();
00595 }
00596
00597 void allocBufNameStrings(char ***bufName)
00598 {
00599 int i, count;
00600
00601 int totalAlloc = sizeof(char) * NAMESTRLEN * (pxshmContext->nodesize - 1);
00602 char *tmp = (char *)malloc(totalAlloc);
00603
00604 MACHSTATE2(3, "allocBufNameStrings tmp %p totalAlloc %d", tmp, totalAlloc);
00605
00606 *bufName = (char **) malloc(sizeof(char *) * pxshmContext->nodesize);
00607
00608 for (i = 0, count = 0; i < pxshmContext->nodesize; i++) {
00609 if (i != pxshmContext->noderank) {
00610 (*bufName)[i] = &(tmp[count * NAMESTRLEN * sizeof(char)]);
00611 count++;
00612 } else {
00613 (*bufName)[i] = NULL;
00614 }
00615 }
00616 }
00617
00618 void createShmObject(char *name, int size, char **pPtr);
00619
00620 void createShmObjectsAndSems(sharedBufData **bufs, char **bufNames)
00621 {
00622 int i = 0;
00623
00624 *bufs =
00625 (sharedBufData *) calloc(pxshmContext->nodesize, sizeof(sharedBufData));
00626
00627 for (i = 0; i < pxshmContext->nodesize; i++) {
00628 if (i != pxshmContext->noderank) {
00629 createShmObject(bufNames[i], SHMBUFLEN + sizeof(sharedBufHeader),
00630 (char **) &((*bufs)[i].header));
00631 memset(((*bufs)[i].header), 0, SHMBUFLEN + sizeof(sharedBufHeader));
00632 (*bufs)[i].data =
00633 ((char *) ((*bufs)[i].header)) + sizeof(sharedBufHeader);
00634 #if PXSHM_OSSPINLOCK
00635 (*bufs)[i].header->lock =
00636 0;
00637 #elif PXSHM_LOCK
00638 (*bufs)[i].mutex = sem_open(bufNames[i], O_CREAT, S_IRUSR | S_IWUSR, 1);
00639 #endif
00640 } else {
00641 (*bufs)[i].header = NULL;
00642 (*bufs)[i].data = NULL;
00643 #if PXSHM_LOCK
00644 (*bufs)[i].mutex = NULL;
00645 #endif
00646 }
00647 }
00648 }
00649
00650 void createShmObject(char *name, int size, char **pPtr)
00651 {
00652 int fd = -1;
00653 int flags;
00654 int open_repeat_count = 0;
00655
00656 flags =
00657 O_RDWR |
00658 O_CREAT;
00659
00660 while (fd < 0 && open_repeat_count < 100) {
00661 open_repeat_count++;
00662 fd = shm_open(name, flags, S_IRUSR | S_IWUSR);
00663
00664
00665
00666 if (fd < 0 && open_repeat_count > 10) {
00667 fprintf(stderr, "Error(attempt=%d) from shm_open %s while opening %s \n",
00668 open_repeat_count, strerror(errno), name);
00669 fflush(stderr);
00670 }
00671 }
00672
00673 CmiAssert(fd >= 0);
00674
00675 ftruncate(fd, size);
00676
00677 *pPtr = (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
00678 CmiAssert(*pPtr != NULL);
00679
00680 close(fd);
00681 }
00682
00683 void freeSharedBuffers(void)
00684 {
00685 int i;
00686 for (i = 0; i < pxshmContext->nodesize; i++) {
00687 if (i != pxshmContext->noderank) {
00688 if (shm_unlink(pxshmContext->recvBufNames[i]) < 0) {
00689 fprintf(stderr, "Error from shm_unlink %s \n", strerror(errno));
00690 }
00691 #if PXSHM_LOCK
00692 sem_unlink(pxshmContext->recvBufNames[i]);
00693 #endif
00694 }
00695 }
00696 };
00697
00698 void tearDownSharedBuffers(void)
00699 {
00700 int i;
00701 for (i = 0; i < pxshmContext->nodesize; i++) {
00702 if (i != pxshmContext->noderank) {
00703 if (shm_unlink(pxshmContext->recvBufNames[i]) < 0) {
00704 fprintf(stderr, "Error from shm_unlink %s \n", strerror(errno));
00705 }
00706 #if PXSHM_LOCK
00707 sem_close(pxshmContext->recvBufs[i].mutex);
00708 sem_close(pxshmContext->sendBufs[i].mutex);
00709 sem_unlink(pxshmContext->recvBufNames[i]);
00710 pxshmContext->recvBufs[i].mutex = NULL;
00711 pxshmContext->sendBufs[i].mutex = NULL;
00712 #endif
00713 }
00714 }
00715 };
00716
00717 void initSendQ(PxshmSendQ *q, int size, int rank);
00718
00719 void initAllSendQs(void)
00720 {
00721 int i = 0;
00722 pxshmContext->sendQs =
00723 (PxshmSendQ **) malloc(sizeof(PxshmSendQ *) * pxshmContext->nodesize);
00724 for (i = 0; i < pxshmContext->nodesize; i++) {
00725 if (i != pxshmContext->noderank) {
00726 (pxshmContext->sendQs)[i] = (PxshmSendQ *) calloc(1, sizeof(PxshmSendQ));
00727 initSendQ((pxshmContext->sendQs)[i], SENDQSTARTSIZE, i);
00728 } else {
00729 (pxshmContext->sendQs)[i] = NULL;
00730 }
00731 }
00732 };
00733
00734
00735
00736
00737
00738
00739
00740 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,
00741 PxshmSendQ *dstSendQ)
00742 {
00743
00744 if (dstBuf->header->bytes + size <= SHMBUFLEN) {
00746 dstBuf->header->count++;
00747 memcpy(dstBuf->data + dstBuf->header->bytes, msg, size);
00748 dstBuf->header->bytes += size;
00749
00750
00751
00752 CmiFree(msg);
00753 return 1;
00754 }
00755
00756
00757
00758
00759 pushSendQ(dstSendQ, msg, size, refcount);
00760 (*refcount)++;
00761
00762
00763 return 0;
00764 }
00765
00766 inline OutgoingMsgRec *popSendQ(PxshmSendQ *q);
00767
00768
00769
00770
00771
00772
00773 inline int flushSendQ(PxshmSendQ *dstSendQ)
00774 {
00775 sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstSendQ->rank]);
00776 int count = dstSendQ->numEntries;
00777 int sent = 0;
00778 while (count > 0) {
00779 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
00780 (*ogm->refcount)--;
00781 MACHSTATE4(3, "Pxshm trysending ogm %p size %d to dstRank %d refcount %d",
00782 ogm, ogm->size, dstSendQ->rank, ogm->refcount);
00783 int ret = sendMessageRec(ogm, dstBuf, dstSendQ);
00784 if (ret == 1) {
00785 sent++;
00786 }
00787 count--;
00788 }
00789 return sent;
00790 }
00791
00792 inline void emptyRecvBuf(sharedBufData *recvBuf);
00793
00794 inline void emptyAllRecvBufs(void)
00795 {
00796 int i;
00797 for (i = 0; i < pxshmContext->nodesize; i++) {
00798 if (i != pxshmContext->noderank) {
00799 sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
00800 if (recvBuf->header->count > 0) {
00801
00802 #if PXSHM_STATS
00803 pxshmContext->lockRecvCount++;
00804 #endif
00805
00806 #if PXSHM_OSSPINLOCK
00807 if (!OSSpinLockTry(&recvBuf->header->lock)) {
00808 #elif PXSHM_LOCK
00809 if (sem_wait(recvBuf->mutex) != 0) {
00810 #elif PXSHM_FENCE
00811 recvBuf->header->flagReceiver = 1;
00812 recvBuf->header->turn = SENDER;
00813 CmiMemoryReadFence();
00814 CmiMemoryWriteFence();
00815
00816 if ((recvBuf->header->flagSender)) {
00817 recvBuf->header->flagReceiver = 0;
00818 #endif
00819 } else {
00820
00821 MACHSTATE1(3, "emptyRecvBuf to be called for rank %d", i);
00822 emptyRecvBuf(recvBuf);
00823
00824 #if PXSHM_OSSPINLOCK
00825 OSSpinLockUnlock(&recvBuf->header->lock);
00826 #elif PXSHM_LOCK
00827 sem_post(recvBuf->mutex);
00828 #elif PXSHM_FENCE
00829 CmiMemoryReadFence();
00830 CmiMemoryWriteFence();
00831 recvBuf->header->flagReceiver = 0;
00832 #endif
00833 }
00834 }
00835 }
00836 }
00837 };
00838
00839 inline void flushAllSendQs(void)
00840 {
00841 int i;
00842 #if SENDQ_LIST
00843 int index_prev = -1;
00844
00845 i = sendQ_head_index;
00846 while (i != -1) {
00847 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
00848 CmiAssert(i != pxshmContext->noderank);
00849 if (sendQ->numEntries > 0) {
00850 #else
00851 for (i = 0; i < pxshmContext->nodesize; i++) {
00852 if (i == pxshmContext->noderank)
00853 continue;
00854 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
00855 if (sendQ->numEntries > 0) {
00856 #endif
00857
00858 #if PXSHM_OSSPINLOCK
00859 if (OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)) {
00860 #elif PXSHM_LOCK
00861 if (sem_wait(pxshmContext->sendBufs[i].mutex) == 0) {
00862 #elif PXSHM_FENCE
00863 pxshmContext->sendBufs[i].header->flagSender = 1;
00864 pxshmContext->sendBufs[i].header->turn = RECEIVER;
00865 CmiMemoryReadFence();
00866 CmiMemoryWriteFence();
00867 if (!(pxshmContext->sendBufs[i].header->flagReceiver &&
00868 pxshmContext->sendBufs[i].header->turn == RECEIVER)) {
00869 #endif
00870
00871 MACHSTATE1(3, "flushSendQ %d", i);
00872 flushSendQ(sendQ);
00873
00874 #if PXSHM_OSSPINLOCK
00875 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
00876 #elif PXSHM_LOCK
00877 sem_post(pxshmContext->sendBufs[i].mutex);
00878 #elif PXSHM_FENCE
00879 CmiMemoryReadFence();
00880 CmiMemoryWriteFence();
00881 pxshmContext->sendBufs[i].header->flagSender = 0;
00882 #endif
00883 } else {
00884
00885 #if PXSHM_FENCE
00886 pxshmContext->sendBufs[i].header->flagSender = 0;
00887 #endif
00888 }
00889 }
00890 #if SENDQ_LIST
00891 if (sendQ->numEntries == 0) {
00892 if (index_prev != -1)
00893 pxshmContext->sendQs[index_prev]->next = sendQ->next;
00894 else
00895 sendQ_head_index = sendQ->next;
00896 i = sendQ->next;
00897 sendQ->next = -2;
00898 } else {
00899 index_prev = i;
00900 i = sendQ->next;
00901 }
00902 #endif
00903 }
00904 };
00905
00906 void emptyRecvBuf(sharedBufData *recvBuf)
00907 {
00908 int numMessages = recvBuf->header->count;
00909 int i = 0;
00910
00911 char *ptr = recvBuf->data;
00912
00913 for (i = 0; i < numMessages; i++) {
00914 int size;
00915 int rank, srcpe, seqno, magic, i;
00916 unsigned int broot;
00917 char *msg = ptr;
00918 char *newMsg;
00919
00920 size = CMI_MSG_SIZE(msg);
00921
00922 newMsg = (char *) CmiAlloc(size);
00923 memcpy(newMsg, msg, size);
00924
00925 handleOneRecvedMsg(size, newMsg);
00926
00927 ptr += size;
00928
00929 MACHSTATE3(
00930 3,
00931 "message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",
00932 size, ptr - recvBuf->data, recvBuf->header->bytes);
00933 }
00934 #if 1
00935 if (ptr - recvBuf->data != recvBuf->header->bytes) {
00936 CmiPrintf("[%d] ptr - recvBuf->data %d recvBuf->header->bytes %d "
00937 "numMessages %d \n",
00938 _Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,
00939 numMessages);
00940 }
00941 #endif
00942 CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00943 recvBuf->header->count = 0;
00944 recvBuf->header->bytes = 0;
00945 }
00946
00947
00948
00949
00950
00951 void initSendQ(PxshmSendQ *q, int size, int rank)
00952 {
00953 q->data = (OutgoingMsgRec *) calloc(size, sizeof(OutgoingMsgRec));
00954
00955 q->size = size;
00956 q->numEntries = 0;
00957
00958 q->begin = 0;
00959 q->end = 0;
00960
00961 q->rank = rank;
00962 #if SENDQ_LIST
00963 q->next = -2;
00964 #endif
00965 }
00966
00967 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount)
00968 {
00969 if (q->numEntries == q->size) {
00970
00971 OutgoingMsgRec *oldData = q->data;
00972 int newSize = q->size << 1;
00973 q->data = (OutgoingMsgRec *) calloc(newSize, sizeof(OutgoingMsgRec));
00974
00975 CmiAssert(q->begin == q->end);
00976
00977 CmiAssert(q->begin < q->size);
00978 memcpy(&(q->data[0]), &(oldData[q->begin]),
00979 sizeof(OutgoingMsgRec) * (q->size - q->begin));
00980
00981 if (q->end != 0) {
00982 memcpy(&(q->data[(q->size - q->begin)]), &(oldData[0]),
00983 sizeof(OutgoingMsgRec) * (q->end));
00984 }
00985 free(oldData);
00986 q->begin = 0;
00987 q->end = q->size;
00988 q->size = newSize;
00989 }
00990 OutgoingMsgRec *omg = &q->data[q->end];
00991 omg->size = size;
00992 omg->data = msg;
00993 omg->refcount = refcount;
00994 (q->end)++;
00995 if (q->end >= q->size) {
00996 q->end -= q->size;
00997 }
00998 q->numEntries++;
00999 }
01000
01001 OutgoingMsgRec *popSendQ(PxshmSendQ *q)
01002 {
01003 OutgoingMsgRec *ret;
01004 if (0 == q->numEntries) {
01005 return NULL;
01006 }
01007
01008 ret = &q->data[q->begin];
01009 (q->begin)++;
01010 if (q->begin >= q->size) {
01011 q->begin -= q->size;
01012 }
01013
01014 q->numEntries--;
01015 return ret;
01016 }