arch/net/machine-tcp.c

Go to the documentation of this file.
00001 
00024 #if !defined(_WIN32) || defined(__CYGWIN__)
00025 #include <netinet/tcp.h>
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #endif
00029 
00030 #define NO_NAGLE_ALG            1
00031 #define FRAGMENTATION           1
00032 
00033 #if FRAGMENTATION
00034 #define PACKET_MAX              32767
00035 #else
00036 #define PACKET_MAX              1000000000
00037 #endif
00038 
00039 void ReceiveDatagram(int node);
00040 int TransmitDatagram(int pe);
00041 
00042 /******************************************************************************
00043  *
00044  * CmiNotifyIdle()-- wait until a packet comes in
00045  *
00046  *****************************************************************************/
00047 typedef struct {
00048   int sleepMs; /*Milliseconds to sleep while idle*/
00049   int nIdles; /*Number of times we've been idle in a row*/
00050   CmiState cs; /*Machine state*/
00051 } CmiIdleState;
00052 
00053 
00054 static CmiIdleState *CmiNotifyGetState(void) 
00055 {
00056   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
00057   s->sleepMs=0;
00058   s->nIdles=0;
00059   s->cs=CmiGetState();
00060   return s;
00061 }
00062 
00063 static void CmiNotifyBeginIdle(CmiIdleState *s)
00064 {
00065   s->sleepMs=0;
00066   s->nIdles=0;
00067 }
00068 
00069 static void CmiNotifyStillIdle(CmiIdleState *s)
00070 {
00071 #if CMK_SHARED_VARS_UNAVAILABLE
00072   CommunicationServer(10, 0);
00073 #else
00074   int nSpins=20; /*Number of times to spin before sleeping*/
00075   s->nIdles++;
00076   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
00077     s->sleepMs+=2;
00078     if (s->sleepMs>10) s->sleepMs=10;
00079   }
00080   /*Comm. thread will listen on sockets-- just sleep*/
00081   if (s->sleepMs>0) {
00082     MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
00083     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
00084     CsdResetPeriodic();         /* check ccd callbacks when I am awakened */
00085     MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
00086   }
00087 #endif
00088 }
00089 
00090 void CmiNotifyIdle(void) {
00091   CmiIdleState s;
00092   s.sleepMs=5; 
00093   CmiNotifyStillIdle(&s);
00094 }
00095 
00096 /****************************************************************************
00097  *                                                                          
00098  * CheckSocketsReady
00099  *
00100  * Checks both sockets to see which are readable and which are writeable.
00101  * We check all these things at the same time since this can be done for
00102  * free with ``select.'' The result is stored in global variables, since
00103  * this is essentially global state information and several routines need it.
00104  *
00105  ***************************************************************************/
00106 
00107 /*
00108   FIXME !
00109   current tcp version only allow the program to run on <= 1000 nodes
00110   due to the static fixed size array below.
00111   This can be easily fixed, however, I suspect tcp version won't scale well
00112   on large number of processors due to the checking of sockets, 
00113   so I don't bother.
00114 */
00115 
00116 static char sockReadStates[1000] = {0};
00117 static char sockWriteStates[1000] = {0};
00118 
00119 #if CMK_USE_POLL
00120 
00121 #undef CMK_PIPE_DECL
00122 #define CMK_PIPE_DECL(delayMs)  \
00123         struct pollfd  fds[1000];       \
00124         int nFds_sto=0; int *nFds=&nFds_sto; \
00125         int pollDelayMs=delayMs;
00126 
00127 #define CMK_PIPE_ADDREADWRITE(afd)      \
00128       CMK_PIPE_ADDREAD(afd);    \
00129       if (nodes[i].send_queue_h) fds[(*nFds)-1].events |= POLLOUT;
00130         
00131 #undef CMK_PIPE_CHECKWRITE
00132 #define CMK_PIPE_CHECKWRITE(afd)        \
00133         fds[*nFds].revents&POLLOUT
00134 
00135 #define CMK_PIPE_SETUP  \
00136         CmiStdoutAdd(CMK_PIPE_SUB);     \
00137         if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00138         if (dataskt!=-1) {      \
00139           for (i=0; i<CmiNumNodes(); i++)       \
00140           {     \
00141             if (i == CmiMyNode()) continue;     \
00142             CMK_PIPE_ADDREADWRITE(nodes[i].sock);       \
00143           }     \
00144         }       
00145 
00146 #else
00147 
00148 #define CMK_PIPE_SETUP  \
00149         CmiStdoutAdd(CMK_PIPE_SUB);     \
00150         if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); } \
00151         if (dataskt!=-1) {      \
00152           for (i=0; i<CmiNumNodes(); i++)       \
00153           {     \
00154             if (i == CmiMyNode()) continue;     \
00155             CMK_PIPE_ADDREAD(nodes[i].sock);    \
00156             if (nodes[i].send_queue_h) CMK_PIPE_ADDWRITE(nodes[i].sock);\
00157           }     \
00158         }               
00159 
00160 #endif
00161 
00162 /* check data sockets and invoking functions */
00163 static void CmiCheckSocks()
00164 {
00165   int node;
00166   if (dataskt!=-1) {
00167     for (node=0; node<CmiNumNodes(); node++)
00168     {
00169       if (node == CmiMyNode()) continue;
00170       if (sockReadStates[node]) {
00171         MACHSTATE1(2,"go to ReceiveDatagram %d", node)
00172         ReceiveDatagram(node);
00173       }
00174       if (sockWriteStates[node]) {
00175         MACHSTATE1(2,"go to TransmitDatagram %d", node)
00176         TransmitDatagram(node);
00177       }
00178     }
00179   }
00180 }
00181 
00182 /*
00183   when output = 1, this function is not thread safe
00184 */
00185 int CheckSocketsReady(int withDelayMs, int output)
00186 {   
00187   int nreadable,i;
00188   CMK_PIPE_DECL(withDelayMs);
00189 
00190   CMK_PIPE_SETUP;
00191   nreadable=CMK_PIPE_CALL();
00192 
00193   ctrlskt_ready_read = 0;
00194   dataskt_ready_read = 0;
00195   dataskt_ready_write = 0;
00196 
00197   if (nreadable == 0) {
00198     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00199     return nreadable;
00200   }
00201   if (nreadable==-1) {
00202 #if defined(_WIN32) && !defined(__CYGWIN__)
00203 /* Win32 socket seems to randomly return inexplicable errors
00204 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
00205         int err=WSAGetLastError();
00206         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errno,err);
00207 */
00208 #else
00209         if (errno!=EINTR)
00210                 KillEveryone("Socket error in CheckSocketsReady!\n");
00211 #endif
00212     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00213     return CheckSocketsReady(0, output);
00214   }
00215 
00216   if (output) {
00217 
00218     CmiStdoutCheck(CMK_PIPE_SUB);
00219     if (Cmi_charmrun_fd!=-1)
00220         ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00221     if (dataskt!=-1) {
00222       for (i=0; i<CmiNumNodes(); i++)
00223       {
00224         if (i == CmiMyNode()) continue;
00225         if (nodes[i].send_queue_h) {
00226           sockWriteStates[i] = CMK_PIPE_CHECKWRITE(nodes[i].sock);
00227           if (sockWriteStates[i]) dataskt_ready_write = 1;
00228           /* sockWriteStates[i] = dataskt_ready_write = 1; */
00229         }
00230         else
00231           sockWriteStates[i] = 0;
00232         sockReadStates[i] = CMK_PIPE_CHECKREAD(nodes[i].sock);
00233         if (sockReadStates[i])  dataskt_ready_read = 1;
00234       }
00235     }
00236   }
00237   MACHSTATE(1,"} CheckSocketsReady")
00238   return nreadable;
00239 }
00240 
00241 /***********************************************************************
00242  * CommunicationServer()
00243  * 
00244  * This function does the scheduling of the tasks related to the
00245  * message sends and receives. 
00246  * It first check the charmrun port for message, and poll the gm event
00247  * for send complete and outcoming messages.
00248  *
00249  ***********************************************************************/
00250 
00251 /*
00252 0: from smp thread
00253 1: from interrupt
00254 2: from worker thread
00255 */
00256 static void CommunicationServer(int sleepTime, int where)
00257 {
00258   unsigned int nTimes=0; /* Loop counter */
00259   CmiCommLockOrElse({
00260     MACHSTATE(4,"Attempted to re-enter comm. server!") 
00261     return;
00262   });
00263   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00264   MACHSTATE2(sleepTime?3:2,"CommunicationsServer(%d,%d) {",
00265              sleepTime,writeableAcks||writeableDgrams)  
00266 #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/
00267   if (sleepTime!=0) {/*Sleep *without* holding the comm. lock*/
00268     MACHSTATE(2,"CommServer going to sleep (NO LOCK)");
00269     if (CheckSocketsReady(sleepTime, 0)<=0) {
00270       MACHSTATE(2,"CommServer finished without anything happening.");
00271     }
00272   }
00273   sleepTime=0;
00274 #endif
00275   CmiCommLock();
00276   /* in netpoll mode, only perform service to stdout */
00277   if (Cmi_netpoll && where == 1) {
00278     if (CmiStdoutNeedsService()) {CmiStdoutService();}
00279     CmiCommUnlock();
00280     return;
00281   }
00282   CommunicationsClock();
00283   /*Don't sleep if a signal has stored messages for us*/
00284   if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
00285   while (CheckSocketsReady(sleepTime, 1)>0) {
00286     int again=0;
00287     sleepTime=0;
00288     CmiCheckSocks();
00289     if (ctrlskt_ready_read) {again=1;ctrl_getone();}
00290     if (dataskt_ready_read || dataskt_ready_write) {again=1;}
00291     if (CmiStdoutNeedsService()) {CmiStdoutService();}
00292     if (!again) break; /* Nothing more to do */
00293     if ((nTimes++ &16)==15) {
00294       /*We just grabbed a whole pile of packets-- try to retire a few*/
00295       CommunicationsClock();
00296       break;
00297     }
00298   }
00299   CmiCommUnlock();
00300 
00301   /* when called by communication thread or in interrupt */
00302   if (where == 0 || where == 1)
00303   {
00304 #if CMK_IMMEDIATE_MSG
00305   CmiHandleImmediate();
00306 #endif
00307   }
00308 
00309   MACHSTATE(2,"} CommunicationServer") 
00310 }
00311 
00312 
00313 #if FRAGMENTATION
00314 /* keep one buffer of PACKET_MAX size to ensure copy free operation 
00315    1. for short message that is less than PACKET_MAX, 
00316       buffer of that size is allocated and directly pass up
00317    2. for long messages,
00318       for first packet, buffer of PACKET_MAX is allocated and can be reused
00319       as recv buffer, asm_msg of actual message size.
00320       for afterwards packets, recv buffer will not allocated and the real 
00321       message is used as recv buffer
00322 */
00323 static char * maxbuf = NULL;
00324 
00325 static char * getMaxBuf() {
00326   char *buf;
00327   if (maxbuf == NULL)
00328     buf = (char *)CmiAlloc(PACKET_MAX);
00329   else {
00330     buf = maxbuf; 
00331     maxbuf = NULL;
00332   }
00333   return buf;
00334 }
00335 
00336 static void freeMaxBuf(char *buf) {
00337   if (maxbuf) CmiFree(buf);
00338   else maxbuf = buf;
00339 }
00340 
00341 #endif
00342 
00343 static void IntegrateMessageDatagram(char **msg, int len)
00344 {
00345   char *newmsg;
00346   int rank, srcpe, seqno, magic, broot, i;
00347   int size;
00348   
00349   if (len >= DGRAM_HEADER_SIZE) {
00350     DgramHeaderBreak(*msg, rank, srcpe, magic, seqno, broot);
00351     if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK)) {
00352       OtherNode node = nodes_by_pe[srcpe];
00353       newmsg = node->asm_msg;
00354       if (newmsg == NULL) {
00355         size = CmiMsgHeaderGetLength(*msg);
00356         if (size < len) KillEveryoneCode(4559312);
00357 #if FRAGMENTATION
00358         if (size == len) {              /* whole message in one packet */
00359           newmsg = *msg;                /* directly use the buffer */
00360         }
00361         else {
00362           newmsg = (char *)CmiAlloc(size);
00363           if (!newmsg)
00364             fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
00365           memcpy(newmsg, *msg, len);
00366           if (len == PACKET_MAX) 
00367               freeMaxBuf(*msg);         /* free buffer, must be max size */
00368           else 
00369               CmiFree(*msg);
00370         }
00371 #else
00372         newmsg = *msg;
00373 #endif
00374         node->asm_rank = rank;
00375         node->asm_total = size;
00376         node->asm_fill = len;
00377         node->asm_msg = newmsg;
00378       } else {
00379 #if ! FRAGMENTATION
00380         CmiAssert(0);
00381 #else
00382         size = len - DGRAM_HEADER_SIZE;
00383         memcpy(newmsg + node->asm_fill, (*msg)+DGRAM_HEADER_SIZE, size);
00384         node->asm_fill += size;
00385         if (len == PACKET_MAX) 
00386               freeMaxBuf(*msg);         /* free buffer, must be max size */
00387         else 
00388               CmiFree(*msg);
00389 #endif
00390       }
00391       if (node->asm_fill > node->asm_total)
00392          CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00393       if (node->asm_fill == node->asm_total) {
00394         /* do it at integration - the following function may re-entrant */
00395 #if CMK_BROADCAST_SPANNING_TREE
00396         if (rank == DGRAM_BROADCAST
00397 #if CMK_NODE_QUEUE_AVAILABLE
00398           || rank == DGRAM_NODEBROADCAST
00399 #endif
00400            )
00401           SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00402 #elif CMK_BROADCAST_HYPERCUBE
00403         if (rank == DGRAM_BROADCAST
00404 #if CMK_NODE_QUEUE_AVAILABLE
00405           || rank == DGRAM_NODEBROADCAST
00406 #endif
00407            )
00408           SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00409 #endif
00410         if (rank == DGRAM_BROADCAST) {
00411           for (i=1; i<_Cmi_mynodesize; i++)
00412             CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00413           CmiPushPE(0, newmsg);
00414         } else {
00415 #if CMK_NODE_QUEUE_AVAILABLE
00416            if (rank==DGRAM_NODEMESSAGE || rank==DGRAM_NODEBROADCAST) {
00417              CmiPushNode(newmsg);
00418            }
00419            else
00420 #endif
00421              CmiPushPE(rank, newmsg);
00422         }
00423         node->asm_msg = 0;
00424       }
00425     } 
00426     else {
00427       CmiPrintf("message ignored1: magic not agree:%d != %d!\n", magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00428       CmiPrintf("recv: rank:%d src:%d mag:%d\n", rank, srcpe, magic);
00429     }
00430   } 
00431   else CmiPrintf("message ignored2!\n");
00432 }
00433 
00434 
00435 void ReceiveDatagram(int node)
00436 {
00437   static char *buf = NULL;
00438   int size;
00439   DgramHeader *head, temp;
00440   int newmsg = 0;
00441 
00442   OtherNode nodeptr = &nodes[node];
00443 
00444   SOCKET fd = nodeptr->sock;
00445   if (-1 == skt_recvN(fd, &size, sizeof(int)))
00446     KillEveryoneCode(4559318);
00447 
00448 #if FRAGMENTATION
00449   if (size == PACKET_MAX)
00450       buf = getMaxBuf();
00451   else
00452       buf = (char *)CmiAlloc(size);
00453 #if 0
00454    /* buggy code */
00455   CmiAssert(size<=PACKET_MAX);
00456   if (nodeptr->asm_msg == NULL) {
00457     if (size == PACKET_MAX)
00458       buf = getMaxBuf();
00459     else
00460       buf = (char *)CmiAlloc(size);
00461   }
00462   else {
00463       /* this is not the first packet of a message */
00464     CmiAssert(nodeptr->asm_fill+size-DGRAM_HEADER_SIZE <= nodeptr->asm_total);
00465       /* find the dgram header start and save the header to temp */
00466     buf = (char*)nodeptr->asm_msg + nodeptr->asm_fill - DGRAM_HEADER_SIZE;
00467     head = (DgramHeader *)buf;
00468     temp = *head;
00469     newmsg = 1;
00470   }
00471 #endif
00472 #else
00473   buf = (char *)CmiAlloc(size);
00474 #endif
00475 
00476   if (-1==skt_recvN(fd, buf, size))
00477     KillEveryoneCode(4559319);
00478 
00479   IntegrateMessageDatagram(&buf, size);
00480 
00481 #if FRAGMENTATION
00482     /* restore header */
00483   if (newmsg) *head = temp;
00484 #endif
00485 
00486 }
00487 
00488 
00489 /***********************************************************************
00490  * DeliverViaNetwork()
00491  *
00492  * This function is responsible for all non-local transmission. It
00493  * first allocate a send token, if fails, put the send message to
00494  * penging message queue, otherwise invoke the GM send.
00495  ***********************************************************************/
00496 
00497 int TransmitImplicitDgram(ImplicitDgram dg)
00498 {
00499   ChMessageHeader msg;
00500   char *data; DgramHeader *head; int len; DgramHeader temp;
00501   OtherNode dest;
00502   int retval;
00503   
00504   MACHSTATE2(2,"  TransmitImplicitDgram (%d bytes) [%d]",dg->datalen,dg->seqno)
00505   len = dg->datalen+DGRAM_HEADER_SIZE;
00506   data = dg->dataptr;
00507   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00508   temp = *head;
00509   dest = dg->dest;
00510   /* first int is len of the packet */
00511   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, len, dg->broot);
00512   LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
00513   /*
00514   ChMessageHeader_new("data", len, &msg);
00515   if (-1==skt_sendN(dest->sock,(const char *)&msg,sizeof(msg))) 
00516     CmiAbort("EnqueueOutgoingDgram"); 
00517   if (-1==skt_sendN(dest->sock,head,len))
00518     CmiAbort("EnqueueOutgoingDgram"); 
00519   */
00520   if (-1==skt_sendN(dest->sock,(const char *)&len,sizeof(len))) 
00521     CmiAbort("EnqueueOutgoingDgram"); 
00522   if (-1==skt_sendN(dest->sock,(const char *)head,len)) 
00523     CmiAbort("EnqueueOutgoingDgram"); 
00524     
00525   *head = temp;
00526   dest->stat_send_pkt++;
00527   return 1;
00528 }
00529 
00530 int TransmitDatagram(int pe)
00531 {
00532   ImplicitDgram dg; OtherNode node;
00533   int count;
00534   unsigned int seqno;
00535   
00536   node = nodes+pe;
00537   dg = node->send_queue_h;
00538   if (dg) {
00539     if (TransmitImplicitDgram(dg)) {
00540       node->send_queue_h = dg->next;
00541       if (node->send_queue_h == NULL) node->send_queue_t = NULL;
00542       DiscardImplicitDgram(dg);
00543     }
00544   }
00545   return 0;
00546 }
00547 
00548 void EnqueueOutgoingDgram
00549         (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
00550 {
00551   int seqno, dst, src; ImplicitDgram dg;
00552   src = ogm->src;
00553   dst = ogm->dst;
00554   seqno = node->send_next;
00555   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00556   MallocImplicitDgram(dg);
00557   dg->dest = node;
00558   dg->srcpe = src;
00559   dg->rank = rank;
00560   dg->seqno = seqno;
00561   dg->broot = broot;
00562   dg->dataptr = ptr;
00563   dg->datalen = len;
00564   dg->ogm = ogm;
00565   ogm->refcount++;
00566   dg->next = 0;
00567   if (node->send_queue_h == 0) {
00568     node->send_queue_h = dg;
00569     node->send_queue_t = dg;
00570   } else {
00571     node->send_queue_t->next = dg;
00572     node->send_queue_t = dg;
00573   }
00574 }
00575 
00576 /* ignore copy, because it is safe to reuse the msg buffer after send */
00577 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00578 {
00579   int size; char *data;
00580 
00581 /*CmiPrintf("DeliverViaNetwork to %d\n", node->nodestart);*/
00582 /*CmiPrintf("send time: %fus\n", (CmiWallTimer()-t)*1.0e6); */
00583  
00584   size = ogm->size - DGRAM_HEADER_SIZE;
00585   data = ogm->data + DGRAM_HEADER_SIZE;
00586   while (size > Cmi_dgram_max_data) {
00587     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00588     data += Cmi_dgram_max_data;
00589     size -= Cmi_dgram_max_data;
00590   }
00591   EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00592 }
00593 
00594 /***********************************************************************
00595  * CmiMachineInit()
00596  *
00597  * This function intialize the GM board. Set receive buffer
00598  *
00599  ***********************************************************************/
00600 
00601 void CmiMachineInit(char **argv)
00602 {
00603 #if FRAGMENTATION
00604   Cmi_dgram_max_data = PACKET_MAX - DGRAM_HEADER_SIZE; 
00605 #else
00606   Cmi_dgram_max_data = PACKET_MAX;
00607 #endif
00608 }
00609 
00610 void CmiMachineExit()
00611 {
00612 }
00613 
00614 static void open_tcp_sockets()
00615 {
00616   int i, ok, pe, flag;
00617   int mype, numpes;
00618   SOCKET skt;
00619   int val;
00620 
00621   mype = _Cmi_mynode;
00622   numpes = _Cmi_numnodes;
00623   MACHSTATE2(2,"  open_tcp_sockets (%d:%d)", mype, numpes);
00624   for (i=0; i<mype; i++) {
00625     unsigned int clientPort;
00626     skt_ip_t clientIP;
00627     skt = skt_accept(dataskt, &clientIP,&clientPort);
00628     if (skt<0) KillEveryoneCode(98246554);
00629 #if NO_NAGLE_ALG
00630     flag = 1;
00631     ok = setsockopt(skt, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
00632 #endif
00633     ok = skt_recvN(skt, &pe, sizeof(int));
00634     if (ok<0) KillEveryoneCode(98246556);
00635     nodes[pe].sock = skt;
00636 #if FRAGMENTATION
00637     skt_setSockBuf(skt, PACKET_MAX*4);
00638 #endif
00639 #if 0
00640 #if !defined(_WIN32) || defined(__CYGWIN__)
00641     if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00642     if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00643 #endif
00644 #endif
00645   }
00646   for (pe=mype+1; pe<numpes; pe++) {
00647     skt = skt_connect(nodes[pe].IP, nodes[pe].dataport, 300);
00648     if (skt<0) KillEveryoneCode(894788843);
00649 #if NO_NAGLE_ALG
00650     flag = 1;
00651     ok = setsockopt(skt, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
00652 #endif
00653     ok = skt_sendN(skt, &mype, sizeof(int));
00654     if (ok<0) KillEveryoneCode(98246556);
00655     nodes[pe].sock = skt;
00656 #if FRAGMENTATION
00657     skt_setSockBuf(skt, PACKET_MAX*4);
00658 #endif
00659 #if 0
00660 #if !defined(_WIN32) || defined(__CYGWIN__)
00661     if ((val = fcntl(skt, F_GETFL, 0)) < 0) KillEveryoneCode(98246557);
00662     if (fcntl(skt, F_SETFL, val|O_NONBLOCK) < 0) KillEveryoneCode(98246558);
00663 #endif
00664 #endif
00665   }
00666 }
00667 
00668 void CmiCommunicationInit(char **argv)
00669 {
00670   open_tcp_sockets();
00671 }
00672 

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1