00001
00024 #if !defined(_WIN32) || defined(__CYGWIN__)
00025 #include <netinet/tcp.h>
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #endif
00029
00030 #define NO_NAGLE_ALG 1
00031 #define FRAGMENTATION 1
00032
00033 #if FRAGMENTATION
00034 #define PACKET_MAX 32767
00035 #else
00036 #define PACKET_MAX 1000000000
00037 #endif
00038
00039 void ReceiveDatagram(int node);
00040 int TransmitDatagram(int pe);
00041
00042
00043
00044
00045
00046
00047 typedef struct {
00048 int sleepMs;
00049 int nIdles;
00050 CmiState cs;
00051 } CmiIdleState;
00052
00053
00054 static CmiIdleState *CmiNotifyGetState(void)
00055 {
00056 CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
00057 s->sleepMs=0;
00058 s->nIdles=0;
00059 s->cs=CmiGetState();
00060 return s;
00061 }
00062
00063 static void CmiNotifyBeginIdle(CmiIdleState *s)
00064 {
00065 s->sleepMs=0;
00066 s->nIdles=0;
00067 }
00068
00069 static void CmiNotifyStillIdle(CmiIdleState *s)
00070 {
00071 #if CMK_SHARED_VARS_UNAVAILABLE
00072 CommunicationServer(10, COMM_SERVER_FROM_SMP);
00073 #else
00074 int nSpins=20;
00075 s->nIdles++;
00076 if (s->nIdles>nSpins) {
00077 s->sleepMs+=2;
00078 if (s->sleepMs>10) s->sleepMs=10;
00079 }
00080
00081 if (s->sleepMs>0) {
00082 MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
00083 CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
00084 CsdResetPeriodic();
00085 MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
00086 }
00087 #endif
00088 }
00089
00090 void CmiNotifyIdle(void) {
00091 CmiIdleState s;
00092 s.sleepMs=5;
00093 CmiNotifyStillIdle(&s);
00094 }
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116 static char sockReadStates[1000] = {0};
00117 static char sockWriteStates[1000] = {0};
00118
00119 #if CMK_USE_POLL
00120
00121 #undef CMK_PIPE_DECL
00122 #define CMK_PIPE_DECL(delayMs) \
00123 struct pollfd fds[1000]; \
00124 int nFds_sto=0; int *nFds=&nFds_sto; \
00125 int pollDelayMs=delayMs;
00126
00127 #define CMK_PIPE_ADDREADWRITE(afd) \
00128 CMK_PIPE_ADDREAD(afd); \
00129 if (nodes[i].send_queue_h) fds[(*nFds)-1].events |= POLLOUT;
00130
00131 #undef CMK_PIPE_CHECKWRITE
00132 #define CMK_PIPE_CHECKWRITE(afd) \
00133 fds[*nFds].revents&POLLOUT
00134
00135 #define CMK_PIPE_SETUP \
00136 CmiStdoutAdd(CMK_PIPE_SUB); \
00137 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00138 if (dataskt!=-1) { \
00139 for (i=0; i<CmiNumNodes(); i++) \
00140 { \
00141 if (i == CmiMyNode()) continue; \
00142 CMK_PIPE_ADDREADWRITE(nodes[i].sock); \
00143 } \
00144 }
00145
00146 #else
00147
00148 #define CMK_PIPE_SETUP \
00149 CmiStdoutAdd(CMK_PIPE_SUB); \
00150 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00151 if (dataskt!=-1) { \
00152 for (i=0; i<CmiNumNodes(); i++) \
00153 { \
00154 if (i == CmiMyNode()) continue; \
00155 CMK_PIPE_ADDREAD(nodes[i].sock); \
00156 if (nodes[i].send_queue_h) CMK_PIPE_ADDWRITE(nodes[i].sock);\
00157 } \
00158 }
00159
00160 #endif
00161
00162
00163 static void CmiCheckSocks()
00164 {
00165 int node;
00166 if (dataskt!=-1) {
00167 for (node=0; node<CmiNumNodes(); node++)
00168 {
00169 if (node == CmiMyNode()) continue;
00170 if (sockReadStates[node]) {
00171 MACHSTATE1(2,"go to ReceiveDatagram %d", node)
00172 ReceiveDatagram(node);
00173 }
00174 if (sockWriteStates[node]) {
00175 MACHSTATE1(2,"go to TransmitDatagram %d", node)
00176 TransmitDatagram(node);
00177 }
00178 }
00179 }
00180 }
00181
00182
00183
00184
00185 int CheckSocketsReady(int withDelayMs, int output)
00186 {
00187 int nreadable,i;
00188 CMK_PIPE_DECL(withDelayMs);
00189
00190 CMK_PIPE_SETUP;
00191 nreadable=CMK_PIPE_CALL();
00192
00193 ctrlskt_ready_read = 0;
00194 dataskt_ready_read = 0;
00195 dataskt_ready_write = 0;
00196
00197 if (nreadable == 0) {
00198 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00199 return nreadable;
00200 }
00201 if (nreadable==-1) {
00202 #if defined(_WIN32) && !defined(__CYGWIN__)
00203
00204
00205
00206
00207
00208 #else
00209 if (errno!=EINTR)
00210 KillEveryone("Socket error in CheckSocketsReady!\n");
00211 #endif
00212 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00213 return CheckSocketsReady(0, output);
00214 }
00215
00216 if (output) {
00217
00218 CmiStdoutCheck(CMK_PIPE_SUB);
00219 if (Cmi_charmrun_fd!=-1)
00220 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00221 if (dataskt!=-1) {
00222 for (i=0; i<CmiNumNodes(); i++)
00223 {
00224 if (i == CmiMyNode()) continue;
00225 if (nodes[i].send_queue_h) {
00226 sockWriteStates[i] = CMK_PIPE_CHECKWRITE(nodes[i].sock);
00227 if (sockWriteStates[i]) dataskt_ready_write = 1;
00228
00229 }
00230 else
00231 sockWriteStates[i] = 0;
00232 sockReadStates[i] = CMK_PIPE_CHECKREAD(nodes[i].sock);
00233 if (sockReadStates[i]) dataskt_ready_read = 1;
00234 }
00235 }
00236 }
00237 MACHSTATE(1,"} CheckSocketsReady")
00238 return nreadable;
00239 }
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251 static void CommunicationServer(int sleepTime, int where)
00252 {
00253 unsigned int nTimes=0;
00254 CmiCommLockOrElse({
00255 MACHSTATE(4,"Attempted to re-enter comm. server!")
00256 return;
00257 });
00258 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00259 MACHSTATE2(sleepTime?3:2,"CommunicationsServer(%d,%d) {",
00260 sleepTime,writeableAcks||writeableDgrams)
00261 #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/
00262 if (sleepTime!=0) {/*Sleep *without* holding the comm. lock*/
00263 MACHSTATE(2,"CommServer going to sleep (NO LOCK)");
00264 if (CheckSocketsReady(sleepTime, 0)<=0) {
00265 MACHSTATE(2,"CommServer finished without anything happening.");
00266 }
00267 }
00268 sleepTime=0;
00269 #endif
00270 CmiCommLock();
00271 /* in netpoll mode, only perform service to stdout */
00272 if (Cmi_netpoll && where == COMM_SERVER_FROM_INTERRUPT) {
00273 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00274 CmiCommUnlock();
00275 return;
00276 }
00277 CommunicationsClock();
00278 /*Don't sleep if a signal has stored messages for us*/
00279 if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
00280 while (CheckSocketsReady(sleepTime, 1)>0) {
00281 int again=0;
00282 sleepTime=0;
00283 CmiCheckSocks();
00284 if (ctrlskt_ready_read) {again=1;ctrl_getone();}
00285 if (dataskt_ready_read || dataskt_ready_write) {again=1;}
00286 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00287 if (!again) break; /* Nothing more to do */
00288 if ((nTimes++ &16)==15) {
00289 /*We just grabbed a whole pile of packets-- try to retire a few*/
00290 CommunicationsClock();
00291 break;
00292 }
00293 }
00294 CmiCommUnlock();
00295
00296 /* when called by communication thread or in interrupt */
00297 if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_INTERRUPT)
00298 {
00299 #if CMK_IMMEDIATE_MSG
00300 CmiHandleImmediate();
00301 #endif
00302 }
00303
00304 MACHSTATE(2,"} CommunicationServer")
00305 }
00306
00307
00308 #if FRAGMENTATION
00309 /* keep one buffer of PACKET_MAX size to ensure copy free operation
00310 1. for short message that is less than PACKET_MAX,
00311 buffer of that size is allocated and directly pass up
00312 2. for long messages,
00313 for first packet, buffer of PACKET_MAX is allocated and can be reused
00314 as recv buffer, asm_msg of actual message size.
00315 for afterwards packets, recv buffer will not allocated and the real
00316 message is used as recv buffer
00317 */
00318 static char * maxbuf = NULL;
00319
00320 static char * getMaxBuf() {
00321 char *buf;
00322 if (maxbuf == NULL)
00323 buf = (char *)CmiAlloc(PACKET_MAX);
00324 else {
00325 buf = maxbuf;
00326 maxbuf = NULL;
00327 }
00328 return buf;
00329 }
00330
00331 static void freeMaxBuf(char *buf) {
00332 if (maxbuf) CmiFree(buf);
00333 else maxbuf = buf;
00334 }
00335
00336 #endif
00337
00338 static void IntegrateMessageDatagram(char **msg, int len)
00339 {
00340 char *newmsg;
00341 int rank, srcpe, seqno, magic, broot, i;
00342 int size;
00343
00344 if (len >= DGRAM_HEADER_SIZE) {
00345 DgramHeaderBreak(*msg, rank, srcpe, magic, seqno, broot);
00346 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK)) {
00347 OtherNode node = nodes_by_pe[srcpe];
00348 newmsg = node->asm_msg;
00349 if (newmsg == NULL) {
00350 size = CmiMsgHeaderGetLength(*msg);
00351 if (size < len) KillEveryoneCode(4559312);
00352 #if FRAGMENTATION
00353 if (size == len) { /* whole message in one packet */
00354 newmsg = *msg; /* directly use the buffer */
00355 }
00356 else {
00357 newmsg = (char *)CmiAlloc(size);
00358 if (!newmsg)
00359 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
00360 memcpy(newmsg, *msg, len);
00361 if (len == PACKET_MAX)
00362 freeMaxBuf(*msg); /* free buffer, must be max size */
00363 else
00364 CmiFree(*msg);
00365 }
00366 #else
00367 newmsg = *msg;
00368 #endif
00369 node->asm_rank = rank;
00370 node->asm_total = size;
00371 node->asm_fill = len;
00372 node->asm_msg = newmsg;
00373 } else {
00374 #if ! FRAGMENTATION
00375 CmiAssert(0);
00376 #else
00377 size = len - DGRAM_HEADER_SIZE;
00378 memcpy(newmsg + node->asm_fill, (*msg)+DGRAM_HEADER_SIZE, size);
00379 node->asm_fill += size;
00380 if (len == PACKET_MAX)
00381 freeMaxBuf(*msg); /* free buffer, must be max size */
00382 else
00383 CmiFree(*msg);
00384 #endif
00385 }
00386 if (node->asm_fill > node->asm_total)
00387 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00388 if (node->asm_fill == node->asm_total) {
00389 /* do it at integration - the following function may re-entrant */
00390 #if CMK_BROADCAST_SPANNING_TREE
00391 if (rank == DGRAM_BROADCAST
00392 #if CMK_NODE_QUEUE_AVAILABLE
00393 || rank == DGRAM_NODEBROADCAST
00394 #endif
00395 )
00396 SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00397 #elif CMK_BROADCAST_HYPERCUBE
00398 if (rank == DGRAM_BROADCAST
00399 #if CMK_NODE_QUEUE_AVAILABLE
00400 || rank == DGRAM_NODEBROADCAST
00401 #endif
00402 )
00403 SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00404 #endif
00405 if (rank == DGRAM_BROADCAST) {
00406 for (i=1; i<_Cmi_mynodesize; i++)
00407 CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00408 CmiPushPE(0, newmsg);
00409 } else {
00410 #if CMK_NODE_QUEUE_AVAILABLE
00411 if (rank==DGRAM_NODEMESSAGE || rank==DGRAM_NODEBROADCAST) {
00412 CmiPushNode(newmsg);
00413 }
00414 else
00415 #endif
00416 CmiPushPE(rank, newmsg);
00417 }
00418 node->asm_msg = 0;
00419 }
00420 }
00421 else {
00422 CmiPrintf("message ignored1: magic not agree:%d != %d!\n", magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00423 CmiPrintf("recv: rank:%d src:%d mag:%d\n", rank, srcpe, magic);
00424 }
00425 }
00426 else CmiPrintf("message ignored2!\n");
00427 }
00428
00429
00430 void ReceiveDatagram(int node)
00431 {
00432 static char *buf = NULL;
00433 int size;
00434 DgramHeader *head, temp;
00435 int newmsg = 0;
00436
00437 OtherNode nodeptr = &nodes[node];
00438
00439 SOCKET fd = nodeptr->sock;
00440 if (-1 == skt_recvN(fd, &size, sizeof(int)))
00441 KillEveryoneCode(4559318);
00442
00443 #if FRAGMENTATION
00444 if (size == PACKET_MAX)
00445 buf = getMaxBuf();
00446 else
00447 buf = (char *)CmiAlloc(size);
00448 #if 0
00449 /* buggy code */
00450 CmiAssert(size<=PACKET_MAX);
00451 if (nodeptr->asm_msg == NULL) {
00452 if (size == PACKET_MAX)
00453 buf = getMaxBuf();
00454 else
00455 buf = (char *)CmiAlloc(size);
00456 }
00457 else {
00458 /* this is not the first packet of a message */
00459 CmiAssert(nodeptr->asm_fill+size-DGRAM_HEADER_SIZE <= nodeptr->asm_total);
00460 /* find the dgram header start and save the header to temp */
00461 buf = (char*)nodeptr->asm_msg + nodeptr->asm_fill - DGRAM_HEADER_SIZE;
00462 head = (DgramHeader *)buf;
00463 temp = *head;
00464 newmsg = 1;
00465 }
00466 #endif
00467 #else
00468 buf = (char *)CmiAlloc(size);
00469 #endif
00470
00471 if (-1==skt_recvN(fd, buf, size))
00472 KillEveryoneCode(4559319);
00473
00474 IntegrateMessageDatagram(&buf, size);
00475
00476 #if FRAGMENTATION
00477 /* restore header */
00478 if (newmsg) *head = temp;
00479 #endif
00480
00481 }
00482
00483
00484 /***********************************************************************
00485 * DeliverViaNetwork()
00486 *
00487 * This function is responsible for all non-local transmission. It
00488 * first allocate a send token, if fails, put the send message to
00489 * penging message queue, otherwise invoke the GM send.
00490 ***********************************************************************/
00491
00492 int TransmitImplicitDgram(ImplicitDgram dg)
00493 {
00494 ChMessageHeader msg;
00495 char *data; DgramHeader *head; int len; DgramHeader temp;
00496 OtherNode dest;
00497 int retval;
00498
00499 MACHSTATE2(2," TransmitImplicitDgram (%d bytes) [%d]",dg->datalen,dg->seqno)
00500 len = dg->datalen+DGRAM_HEADER_SIZE;
00501 data = dg->dataptr;
00502 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00503 temp = *head;
00504 dest = dg->dest;
00505 /* first int is len of the packet */
00506 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, len, dg->broot);
00507 LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
00508 /*
00509 ChMessageHeader_new("data", len, &msg);
00510 if (-1==skt_sendN(dest->sock,(const char *)&msg,sizeof(msg)))
00511 CmiAbort("EnqueueOutgoingDgram");
00512 if (-1==skt_sendN(dest->sock,head,len))
00513 CmiAbort("EnqueueOutgoingDgram");
00514 */
00515 if (-1==skt_sendN(dest->sock,(const char *)&len,sizeof(len)))
00516 CmiAbort("EnqueueOutgoingDgram");
00517 if (-1==skt_sendN(dest->sock,(const char *)head,len))
00518 CmiAbort("EnqueueOutgoingDgram");
00519
00520 *head = temp;
00521 dest->stat_send_pkt++;
00522 return 1;
00523 }
00524
00525 int TransmitDatagram(int pe)
00526 {
00527 ImplicitDgram dg; OtherNode node;
00528 int count;
00529 unsigned int seqno;
00530
00531 node = nodes+pe;
00532 dg = node->send_queue_h;
00533 if (dg) {
00534 if (TransmitImplicitDgram(dg)) {
00535 node->send_queue_h = dg->next;
00536 if (node->send_queue_h == NULL) node->send_queue_t = NULL;
00537 DiscardImplicitDgram(dg);
00538 }
00539 }
00540 return 0;
00541 }
00542
00543 void EnqueueOutgoingDgram
00544 (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
00545 {
00546 int seqno, dst, src; ImplicitDgram dg;
00547 src = ogm->src;
00548 dst = ogm->dst;
00549 seqno = node->send_next;
00550 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00551 MallocImplicitDgram(dg);
00552 dg->dest = node;
00553 dg->srcpe = src;
00554 dg->rank = rank;
00555 dg->seqno = seqno;
00556 dg->broot = broot;
00557 dg->dataptr = ptr;
00558 dg->datalen = len;
00559 dg->ogm = ogm;
00560 ogm->refcount++;
00561 dg->next = 0;
00562 if (node->send_queue_h == 0) {
00563 node->send_queue_h = dg;
00564 node->send_queue_t = dg;
00565 } else {
00566 node->send_queue_t->next = dg;
00567 node->send_queue_t = dg;
00568 }
00569 }
00570
00571 /* ignore copy, because it is safe to reuse the msg buffer after send */
00572 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00573 {
00574 int size; char *data;
00575
00576 /*CmiPrintf("DeliverViaNetwork to %d\n", node->nodestart);*/
00577 /*CmiPrintf("send time: %fus\n", (CmiWallTimer()-t)*1.0e6); */
00578
00579 size = ogm->size - DGRAM_HEADER_SIZE;
00580 data = ogm->data + DGRAM_HEADER_SIZE;
00581 while (size > Cmi_dgram_max_data) {
00582 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00583 data += Cmi_dgram_max_data;
00584 size -= Cmi_dgram_max_data;
00585 }
00586 EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00587 }
00588
00589 /***********************************************************************
00590 * CmiMachineInit()
00591 *
00592 * This function intialize the GM board. Set receive buffer
00593 *
00594 ***********************************************************************/
00595
00596 void CmiMachineInit(char **argv)
00597 {
00598 #if FRAGMENTATION
00599 Cmi_dgram_max_data = PACKET_MAX - DGRAM_HEADER_SIZE;
00600 #else
00601 Cmi_dgram_max_data = PACKET_MAX;
00602 #endif
00603 }
00604
00605 void CmiMachineExit()
00606 {
00607 }
00608
00609 static void open_tcp_sockets()
00610 {
00611 int i, ok, pe, flag;
00612 int mype, numpes;
00613 SOCKET skt;
00614 int val;
00615
00616 mype = _Cmi_mynode;
00617 numpes = _Cmi_numnodes;
00618 MACHSTATE2(2," open_tcp_sockets (%d:%d)", mype, numpes);
00619 for (i=0; i<mype; i++) {
00620 unsigned int clientPort;
00621 skt_ip_t clientIP;
00622 skt = skt_accept(dataskt, &clientIP,&clientPort);
00623 if (skt<0) KillEveryoneCode(98246554);
00624 #if NO_NAGLE_ALG
00625 skt_tcp_no_nagle(skt);
00626 #endif
00627 ok = skt_recvN(skt, &pe, sizeof(int));
00628 if (ok<0) KillEveryoneCode(98246556);
00629 nodes[pe].sock = skt;
00630 #if FRAGMENTATION
00631 skt_setSockBuf(skt, PACKET_MAX*4);
00632 #endif
00633 #if 0
00634 #if !defined(_WIN32) || defined(__CYGWIN__)
00635 if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00636 if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00637 #endif
00638 #endif
00639 }
00640 for (pe=mype+1; pe<numpes; pe++) {
00641 skt = skt_connect(nodes[pe].IP, nodes[pe].dataport, 300);
00642 if (skt<0) KillEveryoneCode(894788843);
00643 #if NO_NAGLE_ALG
00644 skt_tcp_no_nagle(skt);
00645 #endif
00646 ok = skt_sendN(skt, &mype, sizeof(int));
00647 if (ok<0) KillEveryoneCode(98246556);
00648 nodes[pe].sock = skt;
00649 #if FRAGMENTATION
00650 skt_setSockBuf(skt, PACKET_MAX*4);
00651 #endif
00652 #if 0
00653 #if !defined(_WIN32) || defined(__CYGWIN__)
00654 if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00655 if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00656 #endif
00657 #endif
00658 }
00659 }
00660
00661 void CmiCommunicationInit(char **argv)
00662 {
00663 open_tcp_sockets();
00664 }
00665