00001
00024 #if !defined(_WIN32) || defined(__CYGWIN__)
00025 #include <netinet/tcp.h>
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #endif
00029
00030 #define NO_NAGLE_ALG 1
00031 #define FRAGMENTATION 1
00032
00033 #if FRAGMENTATION
00034 #define PACKET_MAX 32767
00035 #else
00036 #define PACKET_MAX 1000000000
00037 #endif
00038
00039 void ReceiveDatagram(int node);
00040 int TransmitDatagram(int pe);
00041
00042
00043
00044
00045
00046
00047 typedef struct {
00048 int sleepMs;
00049 int nIdles;
00050 CmiState cs;
00051 } CmiIdleState;
00052
00053
00054 static CmiIdleState *CmiNotifyGetState(void)
00055 {
00056 CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
00057 s->sleepMs=0;
00058 s->nIdles=0;
00059 s->cs=CmiGetState();
00060 return s;
00061 }
00062
00063 static void CmiNotifyBeginIdle(CmiIdleState *s)
00064 {
00065 s->sleepMs=0;
00066 s->nIdles=0;
00067 }
00068
00069 static void CmiNotifyStillIdle(CmiIdleState *s)
00070 {
00071 #if CMK_SHARED_VARS_UNAVAILABLE
00072 CommunicationServer(10, 0);
00073 #else
00074 int nSpins=20;
00075 s->nIdles++;
00076 if (s->nIdles>nSpins) {
00077 s->sleepMs+=2;
00078 if (s->sleepMs>10) s->sleepMs=10;
00079 }
00080
00081 if (s->sleepMs>0) {
00082 MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
00083 CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
00084 CsdResetPeriodic();
00085 MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
00086 }
00087 #endif
00088 }
00089
00090 void CmiNotifyIdle(void) {
00091 CmiIdleState s;
00092 s.sleepMs=5;
00093 CmiNotifyStillIdle(&s);
00094 }
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116 static char sockReadStates[1000] = {0};
00117 static char sockWriteStates[1000] = {0};
00118
00119 #if CMK_USE_POLL
00120
00121 #undef CMK_PIPE_DECL
00122 #define CMK_PIPE_DECL(delayMs) \
00123 struct pollfd fds[1000]; \
00124 int nFds_sto=0; int *nFds=&nFds_sto; \
00125 int pollDelayMs=delayMs;
00126
00127 #define CMK_PIPE_ADDREADWRITE(afd) \
00128 CMK_PIPE_ADDREAD(afd); \
00129 if (nodes[i].send_queue_h) fds[(*nFds)-1].events |= POLLOUT;
00130
00131 #undef CMK_PIPE_CHECKWRITE
00132 #define CMK_PIPE_CHECKWRITE(afd) \
00133 fds[*nFds].revents&POLLOUT
00134
00135 #define CMK_PIPE_SETUP \
00136 CmiStdoutAdd(CMK_PIPE_SUB); \
00137 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00138 if (dataskt!=-1) { \
00139 for (i=0; i<CmiNumNodes(); i++) \
00140 { \
00141 if (i == CmiMyNode()) continue; \
00142 CMK_PIPE_ADDREADWRITE(nodes[i].sock); \
00143 } \
00144 }
00145
00146 #else
00147
00148 #define CMK_PIPE_SETUP \
00149 CmiStdoutAdd(CMK_PIPE_SUB); \
00150 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00151 if (dataskt!=-1) { \
00152 for (i=0; i<CmiNumNodes(); i++) \
00153 { \
00154 if (i == CmiMyNode()) continue; \
00155 CMK_PIPE_ADDREAD(nodes[i].sock); \
00156 if (nodes[i].send_queue_h) CMK_PIPE_ADDWRITE(nodes[i].sock);\
00157 } \
00158 }
00159
00160 #endif
00161
00162
00163 static void CmiCheckSocks()
00164 {
00165 int node;
00166 if (dataskt!=-1) {
00167 for (node=0; node<CmiNumNodes(); node++)
00168 {
00169 if (node == CmiMyNode()) continue;
00170 if (sockReadStates[node]) {
00171 MACHSTATE1(2,"go to ReceiveDatagram %d", node)
00172 ReceiveDatagram(node);
00173 }
00174 if (sockWriteStates[node]) {
00175 MACHSTATE1(2,"go to TransmitDatagram %d", node)
00176 TransmitDatagram(node);
00177 }
00178 }
00179 }
00180 }
00181
00182
00183
00184
00185 int CheckSocketsReady(int withDelayMs, int output)
00186 {
00187 int nreadable,i;
00188 CMK_PIPE_DECL(withDelayMs);
00189
00190 CMK_PIPE_SETUP;
00191 nreadable=CMK_PIPE_CALL();
00192
00193 ctrlskt_ready_read = 0;
00194 dataskt_ready_read = 0;
00195 dataskt_ready_write = 0;
00196
00197 if (nreadable == 0) {
00198 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00199 return nreadable;
00200 }
00201 if (nreadable==-1) {
00202 #if defined(_WIN32) && !defined(__CYGWIN__)
00203
00204
00205
00206
00207
00208 #else
00209 if (errno!=EINTR)
00210 KillEveryone("Socket error in CheckSocketsReady!\n");
00211 #endif
00212 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00213 return CheckSocketsReady(0, output);
00214 }
00215
00216 if (output) {
00217
00218 CmiStdoutCheck(CMK_PIPE_SUB);
00219 if (Cmi_charmrun_fd!=-1)
00220 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00221 if (dataskt!=-1) {
00222 for (i=0; i<CmiNumNodes(); i++)
00223 {
00224 if (i == CmiMyNode()) continue;
00225 if (nodes[i].send_queue_h) {
00226 sockWriteStates[i] = CMK_PIPE_CHECKWRITE(nodes[i].sock);
00227 if (sockWriteStates[i]) dataskt_ready_write = 1;
00228
00229 }
00230 else
00231 sockWriteStates[i] = 0;
00232 sockReadStates[i] = CMK_PIPE_CHECKREAD(nodes[i].sock);
00233 if (sockReadStates[i]) dataskt_ready_read = 1;
00234 }
00235 }
00236 }
00237 MACHSTATE(1,"} CheckSocketsReady")
00238 return nreadable;
00239 }
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 static void CommunicationServer(int sleepTime, int where)
00257 {
00258 unsigned int nTimes=0;
00259 CmiCommLockOrElse({
00260 MACHSTATE(4,"Attempted to re-enter comm. server!")
00261 return;
00262 });
00263 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00264 MACHSTATE2(sleepTime?3:2,"CommunicationsServer(%d,%d) {",
00265 sleepTime,writeableAcks||writeableDgrams)
00266 #if !CMK_SHARED_VARS_UNAVAILABLE
00267 if (sleepTime!=0) {
00268 MACHSTATE(2,"CommServer going to sleep (NO LOCK)");
00269 if (CheckSocketsReady(sleepTime, 0)<=0) {
00270 MACHSTATE(2,"CommServer finished without anything happening.");
00271 }
00272 }
00273 sleepTime=0;
00274 #endif
00275 CmiCommLock();
00276
00277 if (Cmi_netpoll && where == 1) {
00278 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00279 CmiCommUnlock();
00280 return;
00281 }
00282 CommunicationsClock();
00283
00284 if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
00285 while (CheckSocketsReady(sleepTime, 1)>0) {
00286 int again=0;
00287 sleepTime=0;
00288 CmiCheckSocks();
00289 if (ctrlskt_ready_read) {again=1;ctrl_getone();}
00290 if (dataskt_ready_read || dataskt_ready_write) {again=1;}
00291 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00292 if (!again) break;
00293 if ((nTimes++ &16)==15) {
00294
00295 CommunicationsClock();
00296 break;
00297 }
00298 }
00299 CmiCommUnlock();
00300
00301
00302 if (where == 0 || where == 1)
00303 {
00304 #if CMK_IMMEDIATE_MSG
00305 CmiHandleImmediate();
00306 #endif
00307 }
00308
00309 MACHSTATE(2,"} CommunicationServer")
00310 }
00311
00312
00313 #if FRAGMENTATION
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323 static char * maxbuf = NULL;
00324
00325 static char * getMaxBuf() {
00326 char *buf;
00327 if (maxbuf == NULL)
00328 buf = (char *)CmiAlloc(PACKET_MAX);
00329 else {
00330 buf = maxbuf;
00331 maxbuf = NULL;
00332 }
00333 return buf;
00334 }
00335
00336 static void freeMaxBuf(char *buf) {
00337 if (maxbuf) CmiFree(buf);
00338 else maxbuf = buf;
00339 }
00340
00341 #endif
00342
00343 static void IntegrateMessageDatagram(char **msg, int len)
00344 {
00345 char *newmsg;
00346 int rank, srcpe, seqno, magic, broot, i;
00347 int size;
00348
00349 if (len >= DGRAM_HEADER_SIZE) {
00350 DgramHeaderBreak(*msg, rank, srcpe, magic, seqno, broot);
00351 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK)) {
00352 OtherNode node = nodes_by_pe[srcpe];
00353 newmsg = node->asm_msg;
00354 if (newmsg == NULL) {
00355 size = CmiMsgHeaderGetLength(*msg);
00356 if (size < len) KillEveryoneCode(4559312);
00357 #if FRAGMENTATION
00358 if (size == len) {
00359 newmsg = *msg;
00360 }
00361 else {
00362 newmsg = (char *)CmiAlloc(size);
00363 if (!newmsg)
00364 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
00365 memcpy(newmsg, *msg, len);
00366 if (len == PACKET_MAX)
00367 freeMaxBuf(*msg);
00368 else
00369 CmiFree(*msg);
00370 }
00371 #else
00372 newmsg = *msg;
00373 #endif
00374 node->asm_rank = rank;
00375 node->asm_total = size;
00376 node->asm_fill = len;
00377 node->asm_msg = newmsg;
00378 } else {
00379 #if ! FRAGMENTATION
00380 CmiAssert(0);
00381 #else
00382 size = len - DGRAM_HEADER_SIZE;
00383 memcpy(newmsg + node->asm_fill, (*msg)+DGRAM_HEADER_SIZE, size);
00384 node->asm_fill += size;
00385 if (len == PACKET_MAX)
00386 freeMaxBuf(*msg);
00387 else
00388 CmiFree(*msg);
00389 #endif
00390 }
00391 if (node->asm_fill > node->asm_total)
00392 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00393 if (node->asm_fill == node->asm_total) {
00394
00395 #if CMK_BROADCAST_SPANNING_TREE
00396 if (rank == DGRAM_BROADCAST
00397 #if CMK_NODE_QUEUE_AVAILABLE
00398 || rank == DGRAM_NODEBROADCAST
00399 #endif
00400 )
00401 SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00402 #elif CMK_BROADCAST_HYPERCUBE
00403 if (rank == DGRAM_BROADCAST
00404 #if CMK_NODE_QUEUE_AVAILABLE
00405 || rank == DGRAM_NODEBROADCAST
00406 #endif
00407 )
00408 SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00409 #endif
00410 if (rank == DGRAM_BROADCAST) {
00411 for (i=1; i<_Cmi_mynodesize; i++)
00412 CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00413 CmiPushPE(0, newmsg);
00414 } else {
00415 #if CMK_NODE_QUEUE_AVAILABLE
00416 if (rank==DGRAM_NODEMESSAGE || rank==DGRAM_NODEBROADCAST) {
00417 CmiPushNode(newmsg);
00418 }
00419 else
00420 #endif
00421 CmiPushPE(rank, newmsg);
00422 }
00423 node->asm_msg = 0;
00424 }
00425 }
00426 else {
00427 CmiPrintf("message ignored1: magic not agree:%d != %d!\n", magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00428 CmiPrintf("recv: rank:%d src:%d mag:%d\n", rank, srcpe, magic);
00429 }
00430 }
00431 else CmiPrintf("message ignored2!\n");
00432 }
00433
00434
00435 void ReceiveDatagram(int node)
00436 {
00437 static char *buf = NULL;
00438 int size;
00439 DgramHeader *head, temp;
00440 int newmsg = 0;
00441
00442 OtherNode nodeptr = &nodes[node];
00443
00444 SOCKET fd = nodeptr->sock;
00445 if (-1 == skt_recvN(fd, &size, sizeof(int)))
00446 KillEveryoneCode(4559318);
00447
00448 #if FRAGMENTATION
00449 if (size == PACKET_MAX)
00450 buf = getMaxBuf();
00451 else
00452 buf = (char *)CmiAlloc(size);
00453 #if 0
00454
00455 CmiAssert(size<=PACKET_MAX);
00456 if (nodeptr->asm_msg == NULL) {
00457 if (size == PACKET_MAX)
00458 buf = getMaxBuf();
00459 else
00460 buf = (char *)CmiAlloc(size);
00461 }
00462 else {
00463
00464 CmiAssert(nodeptr->asm_fill+size-DGRAM_HEADER_SIZE <= nodeptr->asm_total);
00465
00466 buf = (char*)nodeptr->asm_msg + nodeptr->asm_fill - DGRAM_HEADER_SIZE;
00467 head = (DgramHeader *)buf;
00468 temp = *head;
00469 newmsg = 1;
00470 }
00471 #endif
00472 #else
00473 buf = (char *)CmiAlloc(size);
00474 #endif
00475
00476 if (-1==skt_recvN(fd, buf, size))
00477 KillEveryoneCode(4559319);
00478
00479 IntegrateMessageDatagram(&buf, size);
00480
00481 #if FRAGMENTATION
00482
00483 if (newmsg) *head = temp;
00484 #endif
00485
00486 }
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497 int TransmitImplicitDgram(ImplicitDgram dg)
00498 {
00499 ChMessageHeader msg;
00500 char *data; DgramHeader *head; int len; DgramHeader temp;
00501 OtherNode dest;
00502 int retval;
00503
00504 MACHSTATE2(2," TransmitImplicitDgram (%d bytes) [%d]",dg->datalen,dg->seqno)
00505 len = dg->datalen+DGRAM_HEADER_SIZE;
00506 data = dg->dataptr;
00507 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00508 temp = *head;
00509 dest = dg->dest;
00510
00511 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, len, dg->broot);
00512 LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
00513
00514
00515
00516
00517
00518
00519
00520 if (-1==skt_sendN(dest->sock,(const char *)&len,sizeof(len)))
00521 CmiAbort("EnqueueOutgoingDgram");
00522 if (-1==skt_sendN(dest->sock,(const char *)head,len))
00523 CmiAbort("EnqueueOutgoingDgram");
00524
00525 *head = temp;
00526 dest->stat_send_pkt++;
00527 return 1;
00528 }
00529
00530 int TransmitDatagram(int pe)
00531 {
00532 ImplicitDgram dg; OtherNode node;
00533 int count;
00534 unsigned int seqno;
00535
00536 node = nodes+pe;
00537 dg = node->send_queue_h;
00538 if (dg) {
00539 if (TransmitImplicitDgram(dg)) {
00540 node->send_queue_h = dg->next;
00541 if (node->send_queue_h == NULL) node->send_queue_t = NULL;
00542 DiscardImplicitDgram(dg);
00543 }
00544 }
00545 return 0;
00546 }
00547
00548 void EnqueueOutgoingDgram
00549 (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
00550 {
00551 int seqno, dst, src; ImplicitDgram dg;
00552 src = ogm->src;
00553 dst = ogm->dst;
00554 seqno = node->send_next;
00555 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00556 MallocImplicitDgram(dg);
00557 dg->dest = node;
00558 dg->srcpe = src;
00559 dg->rank = rank;
00560 dg->seqno = seqno;
00561 dg->broot = broot;
00562 dg->dataptr = ptr;
00563 dg->datalen = len;
00564 dg->ogm = ogm;
00565 ogm->refcount++;
00566 dg->next = 0;
00567 if (node->send_queue_h == 0) {
00568 node->send_queue_h = dg;
00569 node->send_queue_t = dg;
00570 } else {
00571 node->send_queue_t->next = dg;
00572 node->send_queue_t = dg;
00573 }
00574 }
00575
00576
00577 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00578 {
00579 int size; char *data;
00580
00581
00582
00583
00584 size = ogm->size - DGRAM_HEADER_SIZE;
00585 data = ogm->data + DGRAM_HEADER_SIZE;
00586 while (size > Cmi_dgram_max_data) {
00587 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00588 data += Cmi_dgram_max_data;
00589 size -= Cmi_dgram_max_data;
00590 }
00591 EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00592 }
00593
00594
00595
00596
00597
00598
00599
00600
00601 void CmiMachineInit(char **argv)
00602 {
00603 #if FRAGMENTATION
00604 Cmi_dgram_max_data = PACKET_MAX - DGRAM_HEADER_SIZE;
00605 #else
00606 Cmi_dgram_max_data = PACKET_MAX;
00607 #endif
00608 }
00609
00610 void CmiMachineExit()
00611 {
00612 }
00613
00614 static void open_tcp_sockets()
00615 {
00616 int i, ok, pe, flag;
00617 int mype, numpes;
00618 SOCKET skt;
00619 int val;
00620
00621 mype = _Cmi_mynode;
00622 numpes = _Cmi_numnodes;
00623 MACHSTATE2(2," open_tcp_sockets (%d:%d)", mype, numpes);
00624 for (i=0; i<mype; i++) {
00625 unsigned int clientPort;
00626 skt_ip_t clientIP;
00627 skt = skt_accept(dataskt, &clientIP,&clientPort);
00628 if (skt<0) KillEveryoneCode(98246554);
00629 #if NO_NAGLE_ALG
00630 flag = 1;
00631 ok = setsockopt(skt, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
00632 #endif
00633 ok = skt_recvN(skt, &pe, sizeof(int));
00634 if (ok<0) KillEveryoneCode(98246556);
00635 nodes[pe].sock = skt;
00636 #if FRAGMENTATION
00637 skt_setSockBuf(skt, PACKET_MAX*4);
00638 #endif
00639 #if 0
00640 #if !defined(_WIN32) || defined(__CYGWIN__)
00641 if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00642 if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00643 #endif
00644 #endif
00645 }
00646 for (pe=mype+1; pe<numpes; pe++) {
00647 skt = skt_connect(nodes[pe].IP, nodes[pe].dataport, 300);
00648 if (skt<0) KillEveryoneCode(894788843);
00649 #if NO_NAGLE_ALG
00650 flag = 1;
00651 ok = setsockopt(skt, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
00652 #endif
00653 ok = skt_sendN(skt, &mype, sizeof(int));
00654 if (ok<0) KillEveryoneCode(98246556);
00655 nodes[pe].sock = skt;
00656 #if FRAGMENTATION
00657 skt_setSockBuf(skt, PACKET_MAX*4);
00658 #endif
00659 #if 0
00660 #if !defined(_WIN32) || defined(__CYGWIN__)
00661 if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00662 if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00663 #endif
00664 #endif
00665 }
00666 }
00667
00668 void CmiCommunicationInit(char **argv)
00669 {
00670 open_tcp_sockets();
00671 }
00672