arch/net/machine-eth.c

Go to the documentation of this file.
00001 
00020 /******************************************************************************
00021  *
00022  * CmiNotifyIdle()-- wait until a packet comes in
00023  *
00024  *****************************************************************************/
00025 
00026 typedef struct {
00027   int sleepMs; /*Milliseconds to sleep while idle*/
00028   int nIdles; /*Number of times we've been idle in a row*/
00029   CmiState cs; /*Machine state*/
00030 } CmiIdleState;
00031 
00032 static CmiIdleState *CmiNotifyGetState(void)
00033 {
00034   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
00035   s->sleepMs=0;
00036   s->nIdles=0;
00037   s->cs=CmiGetState();
00038   return s;
00039 }
00040 
00041 static void CmiNotifyBeginIdle(CmiIdleState *s)
00042 {
00043   s->sleepMs=0;
00044   s->nIdles=0;
00045 
00046   MACHSTATE(3,"begin idle")
00047 }
00048 
00049 static void CmiNotifyStillIdle(CmiIdleState *s)
00050 {
00051 #if CMK_SHARED_VARS_UNAVAILABLE
00052   /*No comm. thread-- listen on sockets for incoming messages*/
00053   MACHSTATE(1,"idle commserver {")
00054   CommunicationServer(Cmi_idlepoll?0:10, 0);
00055   MACHSTATE(1,"} idle commserver")
00056 #else
00057 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00058   if(_Cmi_noprocforcommthread ){
00059 #endif
00060     int nSpins=20; /*Number of times to spin before sleeping*/
00061     s->nIdles++;
00062     if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
00063       s->sleepMs+=2;
00064       if (s->sleepMs>10) s->sleepMs=10;
00065     }
00066     /*Comm. thread will listen on sockets-- just sleep*/
00067     if (s->sleepMs>0) {
00068       MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
00069       CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
00070       CsdResetPeriodic();               /* check ccd callbacks when I am awakened */
00071       MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
00072     }
00073 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00074   }
00075 #endif
00076 #endif
00077 }
00078 
00079 void CmiNotifyIdle(void) {
00080   CmiIdleState s;
00081   s.sleepMs=5; 
00082   CmiNotifyStillIdle(&s);
00083 }
00084 
00085 /****************************************************************************
00086  *                                                                          
00087  * CheckSocketsReady
00088  *
00089  * Checks both sockets to see which are readable and which are writeable.
00090  * We check all these things at the same time since this can be done for
00091  * free with ``select.'' The result is stored in global variables, since
00092  * this is essentially global state information and several routines need it.
00093  *
00094  ***************************************************************************/
00095 
00096 int CheckSocketsReady(int withDelayMs)
00097 {   
00098   int nreadable,dataWrite=writeableDgrams || writeableAcks;
00099   CMK_PIPE_DECL(withDelayMs);
00100 
00101 
00102 #if CMK_USE_KQUEUE && 0
00103   // This implementation doesn't yet work, but potentially is much faster
00104 
00105   /* Only setup the CMK_PIPE structures the first time they are used. 
00106      This makes the kqueue implementation much faster.
00107   */
00108   static int first = 1;
00109   if(first){
00110     first = 0;
00111     CmiStdoutAdd(CMK_PIPE_SUB);
00112     if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
00113     else return 0; /* If there's no charmrun, none of this matters. */
00114     if (dataskt!=-1) {
00115       CMK_PIPE_ADDREAD(dataskt); 
00116       CMK_PIPE_ADDWRITE(dataskt);
00117     }
00118   }
00119 
00120 #else  
00121   CmiStdoutAdd(CMK_PIPE_SUB);  
00122   if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }  
00123   else return 0; /* If there's no charmrun, none of this matters. */  
00124   if (dataskt!=-1) {  
00125     { CMK_PIPE_ADDREAD(dataskt); }  
00126     if (dataWrite)  
00127       CMK_PIPE_ADDWRITE(dataskt);  
00128   }  
00129 #endif 
00130 
00131   nreadable=CMK_PIPE_CALL();
00132   ctrlskt_ready_read = 0;
00133   dataskt_ready_read = 0;
00134   dataskt_ready_write = 0;
00135 
00136   if (nreadable == 0) {
00137     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00138     return nreadable;
00139   }
00140   if (nreadable==-1) {
00141     CMK_PIPE_CHECKERR();
00142     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00143     return CheckSocketsReady(0);
00144   }
00145   
00146   CmiStdoutCheck(CMK_PIPE_SUB);
00147   if (Cmi_charmrun_fd!=-1) 
00148           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00149   if (dataskt!=-1) {
00150         dataskt_ready_read = CMK_PIPE_CHECKREAD(dataskt);
00151         if (dataWrite)
00152                 dataskt_ready_write = CMK_PIPE_CHECKWRITE(dataskt);
00153   }
00154   return nreadable;
00155 }
00156 
00157 /***********************************************************************
00158  * TransmitAckDatagram
00159  *
00160  * This function sends the ack datagram, after setting the window
00161  * array to show which of the datagrams in the current window have been
00162  * received. The sending side will then use this information to resend
00163  * packets, mark packets as received, etc. This system also prevents
00164  * multiple retransmissions/acks when acks are lost.
00165  ***********************************************************************/
00166 void TransmitAckDatagram(OtherNode node)
00167 {
00168   DgramAck ack; int i, seqno, slot; ExplicitDgram dg;
00169   int retval;
00170   
00171   seqno = node->recv_next;
00172   MACHSTATE2(3,"  TransmitAckDgram [seq %d to 'pe' %d]",seqno,node->nodestart)
00173   DgramHeaderMake(&ack, DGRAM_ACKNOWLEDGE, Cmi_nodestart, Cmi_charmrun_pid, seqno, 0);
00174   LOG(Cmi_clock, Cmi_nodestart, 'A', node->nodestart, seqno);
00175   for (i=0; i<Cmi_window_size; i++) {
00176     slot = seqno % Cmi_window_size;
00177     dg = node->recv_window[slot];
00178     ack.window[i] = (dg && (dg->seqno == seqno));
00179     seqno = ((seqno+1) & DGRAM_SEQNO_MASK);
00180   }
00181   memcpy(&ack.window[Cmi_window_size], &(node->send_ack_seqno), 
00182           sizeof(unsigned int));
00183   node->send_ack_seqno = ((node->send_ack_seqno + 1) & DGRAM_SEQNO_MASK);
00184   retval = (-1);
00185 #ifdef CMK_USE_CHECKSUM
00186   DgramHeader *head = (DgramHeader *)(&ack);
00187   head->magic ^= computeCheckSum((unsigned char*)&ack, DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int));
00188 #endif
00189   while(retval==(-1))
00190     retval = sendto(dataskt, (char *)&ack,
00191          DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int), 0,
00192          (struct sockaddr *)&(node->addr),
00193          sizeof(struct sockaddr_in));
00194   node->stat_send_ack++;
00195 }
00196 
00197 
00198 /***********************************************************************
00199  * TransmitImplicitDgram
00200  * TransmitImplicitDgram1
00201  *
00202  * These functions do the actual work of sending a UDP datagram.
00203  ***********************************************************************/
00204 void TransmitImplicitDgram(ImplicitDgram dg)
00205 {
00206   char *data; DgramHeader *head; int len; DgramHeader temp;
00207   OtherNode dest;
00208   int retval;
00209   
00210   MACHSTATE3(3,"  TransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
00211              dg->datalen,dg->seqno,dg->dest->nodestart)
00212   len = dg->datalen;
00213   data = dg->dataptr;
00214   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00215   temp = *head;
00216   dest = dg->dest;
00217   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, dg->seqno, dg->broot);
00218 #ifdef CMK_USE_CHECKSUM
00219   head->magic ^= computeCheckSum((unsigned char*)head, len + DGRAM_HEADER_SIZE);
00220 #endif
00221   LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
00222   retval = (-1);
00223   while(retval==(-1))
00224     retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
00225               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
00226   *head = temp;
00227   dest->stat_send_pkt++;
00228 }
00229 
00230 void TransmitImplicitDgram1(ImplicitDgram dg)
00231 {
00232   char *data; DgramHeader *head; int len; DgramHeader temp;
00233   OtherNode dest;
00234   int retval;
00235 
00236   MACHSTATE3(4,"  RETransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
00237              dg->datalen,dg->seqno,dg->dest->nodestart)
00238   len = dg->datalen;
00239   data = dg->dataptr;
00240   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00241   temp = *head;
00242   dest = dg->dest;
00243   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, dg->seqno, dg->broot);
00244 #ifdef CMK_USE_CHECKSUM
00245   head->magic ^= computeCheckSum((unsigned char *)head, len + DGRAM_HEADER_SIZE);
00246 #endif
00247   LOG(Cmi_clock, Cmi_nodestart, 'P', dest->nodestart, dg->seqno);
00248   retval = (-1);
00249   while (retval == (-1))
00250     retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
00251               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
00252   *head = temp;
00253   dest->stat_resend_pkt++;
00254 }
00255 
00256 
00257 /***********************************************************************
00258  * TransmitAcknowledgement
00259  *
00260  * This function sends the ack datagrams, after checking to see if the 
00261  * Recv Window is atleast half-full. After that, if the Recv window size 
00262  * is 0, then the count of un-acked datagrams, and the time at which
00263  * the ack should be sent is reset.
00264  ***********************************************************************/
00265 int TransmitAcknowledgement()
00266 {
00267   int skip; static int nextnode=0; OtherNode node;
00268   for (skip=0; skip<_Cmi_numnodes; skip++) {
00269     node = nodes+nextnode;
00270     nextnode = (nextnode + 1) % _Cmi_numnodes;
00271     if (node->recv_ack_cnt) {
00272       if ((node->recv_ack_cnt > Cmi_half_window) ||
00273           (Cmi_clock >= node->recv_ack_time)) {
00274         TransmitAckDatagram(node);
00275         if (node->recv_winsz) {
00276           node->recv_ack_cnt  = 1;
00277           node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
00278         } else {
00279           node->recv_ack_cnt  = 0;
00280           node->recv_ack_time = 0.0;
00281         }
00282         return 1;
00283       }
00284     }
00285   }
00286   return 0;
00287 }
00288 
00289 
00290 /***********************************************************************
00291  * TransmitDatagram()
00292  *
00293  * This function fills up the Send Window with the contents of the
00294  * Send Queue. It also sets the node->send_primer variable, which
00295  * indicates when a retransmission will be attempted.
00296  ***********************************************************************/
00297 int TransmitDatagram()
00298 {
00299   ImplicitDgram dg; OtherNode node;
00300   static int nextnode=0; int skip, count, slot;
00301   unsigned int seqno;
00302   
00303   for (skip=0; skip<_Cmi_numnodes; skip++) {
00304     node = nodes+nextnode;
00305     nextnode = (nextnode + 1) % _Cmi_numnodes;
00306     dg = node->send_queue_h;
00307     if (dg) {
00308       seqno = dg->seqno;
00309       slot = seqno % Cmi_window_size;
00310       if (node->send_window[slot] == 0) {
00311         node->send_queue_h = dg->next;
00312         node->send_window[slot] = dg;
00313         TransmitImplicitDgram(dg);
00314         if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK))
00315           node->send_last = seqno;
00316         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
00317         return 1;
00318       }
00319     }
00320     if (Cmi_clock > node->send_primer) {
00321       slot = (node->send_last % Cmi_window_size);
00322       for (count=0; count<Cmi_window_size; count++) {
00323         dg = node->send_window[slot];
00324         if (dg) break;
00325         slot = ((slot+Cmi_window_size-1) % Cmi_window_size);
00326       }
00327       if (dg) {
00328         TransmitImplicitDgram1(node->send_window[slot]);
00329         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
00330         return 1;
00331       }
00332     }
00333   }
00334   return 0;
00335 }
00336 
00337 /***********************************************************************
00338  * EnqueOutgoingDgram()
00339  *
00340  * This function enqueues the datagrams onto the Send queue of the
00341  * sender, after setting appropriate data values into each of the
00342  * datagrams. 
00343  ***********************************************************************/
00344 void EnqueueOutgoingDgram
00345         (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
00346 {
00347   int seqno, dst, src; ImplicitDgram dg;
00348   src = ogm->src;
00349   dst = ogm->dst;
00350   seqno = node->send_next;
00351   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00352   MallocImplicitDgram(dg);
00353   dg->dest = node;
00354   dg->srcpe = src;
00355   dg->rank = rank;
00356   dg->seqno = seqno;
00357   dg->broot = broot;
00358   dg->dataptr = ptr;
00359   dg->datalen = len;
00360   dg->ogm = ogm;
00361   ogm->refcount++;
00362   dg->next = 0;
00363   if (node->send_queue_h == 0) {
00364     node->send_queue_h = dg;
00365     node->send_queue_t = dg;
00366   } else {
00367     node->send_queue_t->next = dg;
00368     node->send_queue_t = dg;
00369   }
00370 }
00371 
00372 
00373 /***********************************************************************
00374  * DeliverViaNetwork()
00375  *
00376  * This function is responsible for all non-local transmission. This
00377  * function takes the outgoing messages, splits it into datagrams and
00378  * enqueues them into the Send Queue.
00379  ***********************************************************************/
00380 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00381 {
00382   int size; char *data;
00383   OtherNode myNode = nodes+CmiMyNode();
00384 
00385   MACHSTATE2(3,"DeliverViaNetwork %d-byte message to pe %d",
00386              ogm->size,node->nodestart+rank);
00387   size = ogm->size - DGRAM_HEADER_SIZE;
00388   data = ogm->data + DGRAM_HEADER_SIZE;
00389   writeableDgrams++;
00390   while (size > Cmi_dgram_max_data) {
00391     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00392     data += Cmi_dgram_max_data;
00393     size -= Cmi_dgram_max_data;
00394   }
00395   EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00396 
00397   myNode->sent_msgs++;
00398   myNode->sent_bytes += ogm->size;
00399   /*Try to immediately send the packets off*/
00400   writeableDgrams=1;
00401 
00402 }
00403 
00404 /***********************************************************************
00405  * AssembleDatagram()
00406  *
00407  * This function does the actual assembly of datagrams into a
00408  * message. node->asm_msg holds the current message being
00409  * assembled. Once the message assemble is complete (known by checking
00410  * if the total number of datagrams is equal to the number of datagrams
00411  * constituting the assembled message), the message is pushed into the
00412  * Producer-Consumer queue
00413  ***********************************************************************/
00414 void AssembleDatagram(OtherNode node, ExplicitDgram dg)
00415 {
00416   int i;
00417   unsigned int size; char *msg;
00418   OtherNode myNode = nodes+CmiMyNode();
00419   
00420   MACHSTATE3(2,"  AssembleDatagram [seq %d from 'pe' %d, packet len %d]",
00421         dg->seqno,node->nodestart,dg->len)
00422   LOG(Cmi_clock, Cmi_nodestart, 'X', dg->srcpe, dg->seqno);
00423   msg = node->asm_msg;
00424   if (msg == 0) {
00425     size = CmiMsgHeaderGetLength(dg->data);
00426     MACHSTATE3(4,"  Assemble new datagram seq %d from 'pe' %d, len %d",
00427         dg->seqno,node->nodestart,size)
00428     msg = (char *)CmiAlloc(size);
00429     if (!msg)
00430       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
00431     if (size < dg->len) KillEveryoneCode(4559312);
00432 #ifndef CMK_OPTIMIZE
00433     setMemoryTypeMessage(msg);
00434 #endif
00435     memcpy(msg, (char*)(dg->data), dg->len);
00436     node->asm_rank = dg->rank;
00437     node->asm_total = size;
00438     node->asm_fill = dg->len;
00439     node->asm_msg = msg;
00440   } else {
00441     size = dg->len - DGRAM_HEADER_SIZE;
00442     memcpy(msg + node->asm_fill, ((char*)(dg->data))+DGRAM_HEADER_SIZE, size);
00443     node->asm_fill += size;
00444   }
00445   MACHSTATE3(2,"  AssembleDatagram: now have %d of %d bytes from %d",
00446         node->asm_fill, node->asm_total, node->nodestart)
00447   if (node->asm_fill > node->asm_total) {
00448       fprintf(stderr, "\n\n\t\tLength mismatch!!\n\n");
00449       fflush(stderr);
00450       MACHSTATE4(5,"Length mismatch seq %d, from 'pe' %d, fill %d, total %d\n", dg->seqno,node->nodestart,node->asm_fill,node->asm_total)
00451       KillEveryoneCode(4559313);
00452   }
00453   if (node->asm_fill == node->asm_total) {
00454     /* spanning tree broadcast - send first to avoid invalid msg ptr */
00455 #if CMK_BROADCAST_SPANNING_TREE
00456     if (node->asm_rank == DGRAM_BROADCAST
00457 #if CMK_NODE_QUEUE_AVAILABLE 
00458           || node->asm_rank == DGRAM_NODEBROADCAST
00459 #endif
00460       )
00461         SendSpanningChildren(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
00462 #elif CMK_BROADCAST_HYPERCUBE
00463     if (node->asm_rank == DGRAM_BROADCAST
00464 #if CMK_NODE_QUEUE_AVAILABLE
00465           || node->asm_rank == DGRAM_NODEBROADCAST
00466 #endif
00467       )
00468         SendHypercube(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
00469 #endif
00470     if (node->asm_rank == DGRAM_BROADCAST) {
00471       int len = node->asm_total;
00472       for (i=1; i<_Cmi_mynodesize; i++)
00473          CmiPushPE(i, CopyMsg(msg, len));
00474       CmiPushPE(0, msg);
00475     } else {
00476 #if CMK_NODE_QUEUE_AVAILABLE
00477          if (node->asm_rank==DGRAM_NODEMESSAGE ||
00478              node->asm_rank==DGRAM_NODEBROADCAST) 
00479          {
00480            CmiPushNode(msg);
00481          }
00482          else
00483 #endif
00484            CmiPushPE(node->asm_rank, msg);
00485     }
00486     node->asm_msg = 0;
00487     myNode->recd_msgs++;
00488     myNode->recd_bytes += node->asm_total;
00489   }
00490   FreeExplicitDgram(dg);
00491 }
00492 
00493 
00494 /***********************************************************************
00495  * AssembleReceivedDatagrams()
00496  *
00497  * This function assembles the datagrams received so far, into a
00498  * single message. This also results in part of the Receive Window being 
00499  * freed.
00500  ***********************************************************************/
00501 void AssembleReceivedDatagrams(OtherNode node)
00502 {
00503   unsigned int next, slot; ExplicitDgram dg;
00504   next = node->recv_next;
00505   while (1) {
00506     slot = (next % Cmi_window_size);
00507     dg = node->recv_window[slot];
00508     if (dg == 0) break;
00509     AssembleDatagram(node, dg);
00510     node->recv_window[slot] = 0;
00511     node->recv_winsz--;
00512     next = ((next + 1) & DGRAM_SEQNO_MASK);
00513   }
00514   node->recv_next = next;
00515 }
00516 
00517 
00518 
00519 
00520 /************************************************************************
00521  * IntegrateMessageDatagram()
00522  *
00523  * This function integrates the received datagrams. It first
00524  * increments the count of un-acked datagrams. (This is to aid the
00525  * heuristic that an ack should be sent when the Receive window is half
00526  * full). If the current datagram is the first missing packet, then this 
00527  * means that the datagram that was missing in the incomplete sequence
00528  * of datagrams so far, has arrived, and hence the datagrams can be
00529  * assembled. 
00530  ************************************************************************/
00531 
00532 void IntegrateMessageDatagram(ExplicitDgram dg)
00533 {
00534   int seqno;
00535   unsigned int slot; OtherNode node;
00536 
00537   LOG(Cmi_clock, Cmi_nodestart, 'M', dg->srcpe, dg->seqno);
00538   MACHSTATE2(2,"  IntegrateMessageDatagram [seq %d from pe %d]", dg->seqno,dg->srcpe)
00539 
00540   node = nodes_by_pe[dg->srcpe];
00541   node->stat_recv_pkt++;
00542   seqno = dg->seqno;
00543   writeableAcks=1;
00544   node->recv_ack_cnt++;
00545   if (node->recv_ack_time == 0.0)
00546     node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
00547   if (((seqno - node->recv_next) & DGRAM_SEQNO_MASK) < Cmi_window_size) {
00548     slot = (seqno % Cmi_window_size);
00549     if (node->recv_window[slot] == 0) {
00550       node->recv_window[slot] = dg;
00551       node->recv_winsz++;
00552       if (seqno == node->recv_next)
00553         AssembleReceivedDatagrams(node);
00554       if (seqno > node->recv_expect)
00555         node->recv_ack_time = 0.0;
00556       if (seqno >= node->recv_expect)
00557         node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00558       LOG(Cmi_clock, Cmi_nodestart, 'Y', node->recv_next, dg->seqno);
00559       return;
00560     }
00561   }
00562   LOG(Cmi_clock, Cmi_nodestart, 'y', node->recv_next, dg->seqno);
00563   FreeExplicitDgram(dg);
00564 }
00565 
00566 
00567 
00568 /***********************************************************************
00569  * IntegrateAckDatagram()
00570  * 
00571  * This function is called on the message sending side, on receipt of
00572  * an ack for a message that it sent. Since messages and acks could be 
00573  * lost, our protocol works in such a way that acks for higher sequence
00574  * numbered packets act as implict acks for lower sequence numbered
00575  * packets, in case the acks for the lower sequence numbered packets
00576  * were lost.
00577 
00578  * Recall that the Send and Receive windows are circular queues, and the
00579  * sequence numbers of the packets (datagrams) are monotically
00580  * increasing. Hence it is important to know for which sequence number
00581  * the ack is for, and to correspodinly relate that to tha actual packet 
00582  * sitting in the Send window. Since every 20th packet occupies the same
00583  * slot in the windows, a number of sanity checks are required for our
00584  * protocol to work. 
00585  * 1. If the ack number (first missing packet sequence number) is less
00586  * than the last ack number received then this ack can be ignored. 
00587 
00588  * 2. The last ack number received must be set to the current ack
00589  * sequence number (This is done only if 1. is not true).
00590 
00591  * 3. Now the whole Send window is examined, in a kind of reverse
00592  * order. The check starts from a sequence number = 20 + the first
00593  * missing packet's sequence number. For each of these sequence numbers, 
00594  * the slot in the Send window is checked for existence of a datagram
00595  * that should have been sent. If there is no datagram, then the search
00596  * advances. If there is a datagram, then the sequence number of that is 
00597  * checked with the expected sequence number for the current iteration
00598  * (This is decremented in each iteration of the loop).
00599 
00600  * If the sequence numbers do not match, then checks are made (for
00601  * the unlikely scenarios where the current slot sequence number is 
00602  * equal to the first missing packet's sequence number, and where
00603  * somehow, packets which have greater sequence numbers than allowed for 
00604  * the current window)
00605 
00606  * If the sequence numbers DO match, then the flag 'rxing' is
00607  * checked. The semantics for this flag is that : If any packet with a
00608  * greater sequence number than the current packet (and hence in the
00609  * previous iteration of the for loop) has been acked, then the 'rxing'
00610  * flag is set to 1, to imply that all the packets of lower sequence
00611  * number, for which the ack->window[] element does not indicate that the 
00612  * packet has been received, must be retransmitted.
00613  * 
00614  ***********************************************************************/
00615 
00616 void IntegrateAckDatagram(ExplicitDgram dg)
00617 {
00618   OtherNode node; DgramAck *ack; ImplicitDgram idg;
00619   int i; unsigned int slot, rxing, dgseqno, seqno, ackseqno;
00620   int diff;
00621   unsigned int tmp;
00622 
00623   node = nodes_by_pe[dg->srcpe];
00624   ack = ((DgramAck*)(dg->data));
00625   memcpy(&ackseqno, &(ack->window[Cmi_window_size]), sizeof(unsigned int));
00626   dgseqno = dg->seqno;
00627   seqno = (dgseqno + Cmi_window_size) & DGRAM_SEQNO_MASK;
00628   slot = seqno % Cmi_window_size;
00629   rxing = 0;
00630   node->stat_recv_ack++;
00631   LOG(Cmi_clock, Cmi_nodestart, 'R', node->nodestart, dg->seqno);
00632 
00633   tmp = node->recv_ack_seqno;
00634   /* check that the ack being received is actually appropriate */
00635   if ( !((node->recv_ack_seqno >= 
00636           ((DGRAM_SEQNO_MASK >> 1) + (DGRAM_SEQNO_MASK >> 2))) &&
00637          (ackseqno < (DGRAM_SEQNO_MASK >> 1))) &&
00638        (ackseqno <= node->recv_ack_seqno))
00639     {
00640       FreeExplicitDgram(dg);
00641       return;
00642     } 
00643   /* higher ack so adjust */
00644   node->recv_ack_seqno = ackseqno;
00645   writeableDgrams=1; /* May have freed up some send slots */
00646   
00647   for (i=Cmi_window_size-1; i>=0; i--) {
00648     slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size;
00649     seqno = (seqno-1) & DGRAM_SEQNO_MASK;
00650     idg = node->send_window[slot];
00651     if (idg) {
00652       if (idg->seqno == seqno) {
00653         if (ack->window[i]) {
00654           /* remove those that have been received and are within a window
00655              of the first missing packet */
00656           node->stat_ack_pkts++;
00657           LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, seqno);
00658           node->send_window[slot] = 0;
00659           DiscardImplicitDgram(idg);
00660           rxing = 1;
00661         } else if (rxing) {
00662           node->send_window[slot] = 0;
00663           idg->next = node->send_queue_h;
00664           if (node->send_queue_h == 0) {
00665             node->send_queue_t = idg;
00666           }
00667           node->send_queue_h = idg;
00668         }
00669       } else {
00670         diff = dgseqno >= idg->seqno ? 
00671           ((dgseqno - idg->seqno) & DGRAM_SEQNO_MASK) :
00672           ((dgseqno + (DGRAM_SEQNO_MASK - idg->seqno) + 1) & DGRAM_SEQNO_MASK);
00673           
00674         if ((diff <= 0) || (diff > Cmi_window_size))
00675         {
00676           continue;
00677         }
00678 
00679         /* if ack is really less than our packet seq (consider wrap around) */
00680         if (dgseqno < idg->seqno && (idg->seqno - dgseqno <= Cmi_window_size))
00681         {
00682           continue;
00683         }
00684         if (dgseqno == idg->seqno)
00685         {
00686           continue;
00687         }
00688         node->stat_ack_pkts++;
00689         LOG(Cmi_clock, Cmi_nodestart, 'o', node->nodestart, idg->seqno);
00690         node->send_window[slot] = 0;
00691         DiscardImplicitDgram(idg);
00692       }
00693     }
00694   }
00695   FreeExplicitDgram(dg);  
00696 }
00697 
00698 void ReceiveDatagram()
00699 {
00700   ExplicitDgram dg; int ok, magic;
00701   MACHLOCK_ASSERT(comm_flag,"ReceiveDatagram")
00702   MallocExplicitDgram(dg);
00703   ok = recv(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0);
00704   /*ok = recvfrom(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0, 0, 0);*/
00705   /* if (ok<0) { perror("recv"); KillEveryoneCode(37489437); } */
00706   if (ok < 0) {
00707     MACHSTATE1(4,"  recv dgram failed (errno=%d)",errno)
00708     FreeExplicitDgram(dg);
00709     if (errno == EINTR) return;  /* A SIGIO interrupted the receive */
00710     if (errno == EAGAIN) return; /* Just try again later */
00711 #if !defined(_WIN32) || defined(__CYGWIN__) 
00712     if (errno == EWOULDBLOCK) return; /* No more messages on that socket. */
00713     if (errno == ECONNREFUSED) return;  /* A "Host unreachable" ICMP packet came in */
00714 #endif
00715     CmiPrintf("ReceiveDatagram: recv: %s(%d)\n", strerror(errno), errno) ;
00716     KillEveryoneCode(37489437);
00717   }
00718   dg->len = ok;
00719 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
00720   /* randomly corrupt data and ack datagrams */
00721   randomCorrupt((char*)dg->data, dg->len);
00722 #endif
00723 
00724   if (ok >= DGRAM_HEADER_SIZE) {
00725     DgramHeaderBreak(dg->data, dg->rank, dg->srcpe, magic, dg->seqno, dg->broot);
00726     MACHSTATE3(2,"  recv dgram [seq %d, for rank %d, from pe %d]",
00727                dg->seqno,dg->rank,dg->srcpe)
00728 #ifdef CMK_USE_CHECKSUM
00729     if (computeCheckSum((unsigned char*)dg->data, dg->len) == 0)
00730 #else
00731     if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00732 #endif
00733     {
00734       if (dg->rank == DGRAM_ACKNOWLEDGE)
00735         IntegrateAckDatagram(dg);
00736       else IntegrateMessageDatagram(dg);
00737     } else FreeExplicitDgram(dg);
00738   } else {
00739     MACHSTATE1(4,"  recv dgram failed (len=%d)",ok)
00740     FreeExplicitDgram(dg);
00741   }
00742 }
00743 
00744 
00745 /***********************************************************************
00746  * CommunicationServer()
00747  * 
00748  * This function does the scheduling of the tasks related to the
00749  * message sends and receives. It is called from the CmiGeneralSend()
00750  * function, and periodically from the CommunicationInterrupt() (in case 
00751  * of the single processor version), and from the comm_thread (for the
00752  * SMP version). Based on which of the data/control read/write sockets
00753  * are ready, the corresponding tasks are called
00754  *
00755  ***********************************************************************/
00756 void CmiHandleImmediate();
00757 
00758 /*
00759 0: from smp thread
00760 1: from interrupt
00761 2: from worker thread
00762 */
00763 static void CommunicationServer(int sleepTime, int where)
00764 {
00765   unsigned int nTimes=0; /* Loop counter */
00766   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00767   MACHSTATE2(1,"CommunicationsServer(%d,%d)",
00768              sleepTime,writeableAcks||writeableDgrams)  
00769 #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/
00770   if (sleepTime!=0) {/*Sleep *without* holding the comm. lock*/
00771     MACHSTATE(1,"CommServer going to sleep (NO LOCK)");
00772     if (CheckSocketsReady(sleepTime)<=0) {
00773       MACHSTATE(1,"CommServer finished without anything happening.");
00774     }
00775   }
00776   sleepTime=0;
00777 #endif
00778   CmiCommLock();
00779   /* in netpoll mode, only perform service to stdout */
00780   if (Cmi_netpoll && where == 1) {
00781     if (CmiStdoutNeedsService()) {CmiStdoutService();}
00782     CmiCommUnlock();
00783     return;
00784   }
00785   CommunicationsClock();
00786   /*Don't sleep if a signal has stored messages for us*/
00787   if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
00788   while (CheckSocketsReady(sleepTime)>0) {
00789     int again=0;
00790       MACHSTATE(2,"CheckSocketsReady returned true");
00791     sleepTime=0;
00792     if (ctrlskt_ready_read) {again=1;ctrl_getone();}
00793     if (dataskt_ready_read) {again=1;ReceiveDatagram();}
00794     if (dataskt_ready_write) {
00795       if (writeableAcks) 
00796         if (0!=(writeableAcks=TransmitAcknowledgement())) again=1;
00797       if (writeableDgrams)
00798         if (0!=(writeableDgrams=TransmitDatagram())) again=1; 
00799     }
00800     if (CmiStdoutNeedsService()) {CmiStdoutService();}
00801     if (!again) break; /* Nothing more to do */
00802     if ((nTimes++ &16)==15) {
00803       /*We just grabbed a whole pile of packets-- try to retire a few*/
00804       CommunicationsClock();
00805     }
00806   }
00807   CmiCommUnlock();
00808 
00809   /* when called by communication thread or in interrupt */
00810   if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_INTERRUPT) {
00811 #if CMK_IMMEDIATE_MSG
00812   CmiHandleImmediate();
00813 #endif
00814 #if CMK_PERSISTENT_COMM
00815   PumpPersistent();
00816 #endif
00817   }
00818 
00819   MACHSTATE(1,"} CommunicationServer") 
00820 }
00821 
00822 void CmiMachineInit(char **argv)
00823 {
00824 }
00825 
00826 void CmiCommunicationInit(char **argv)
00827 {
00828 }
00829 
00830 void CmiMachineExit()
00831 {
00832 }
00833 
00834 static void sendBarrierMessage(int pe)
00835 {
00836   char buf[32];
00837   OtherNode  node = nodes + pe;
00838   int retval = -1;
00839   while (retval == -1) {
00840      retval = sendto(dataskt, (char *)buf, 32, 0,
00841          (struct sockaddr *)&(node->addr),
00842          sizeof(struct sockaddr_in));
00843   }
00844 }
00845 
00846 static void recvBarrierMessage()
00847 {
00848   char buf[32];
00849   int nreadable, ok, s;
00850   
00851   if (dataskt!=-1) {
00852         do {
00853         CMK_PIPE_DECL(10);
00854         CMK_PIPE_ADDREAD(dataskt);
00855           nreadable=CMK_PIPE_CALL();
00856           if (nreadable == 0) continue;
00857           s = CMK_PIPE_CHECKREAD(dataskt);
00858           if (s) break;
00859         } while (1);
00860         ok = recv(dataskt,buf,32,0);
00861         CmiAssert(ok >= 0);
00862   }
00863 }
00864 
00865 /* happen at node level */
00866 /* must be called on every PE including communication processors */
00867 int CmiBarrier()
00868 {
00869   int len, size, i;
00870   int status;
00871   int count = 0;
00872   OtherNode node;
00873   int numnodes = CmiNumNodes();
00874 
00875   if (Cmi_netpoll == 0) return -1;
00876 
00877   if (CmiMyRank() == 0) {
00878     /* every one send to pe 0 */
00879     if (CmiMyNode() != 0) {
00880       sendBarrierMessage(0);
00881     }
00882     if (CmiMyNode() == 0) 
00883     {
00884       for (count = 1; count < numnodes; count ++) 
00885       {
00886         recvBarrierMessage();
00887       }
00888       /* pe 0 broadcast */
00889       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00890         int p = i;
00891         if (p > numnodes - 1) break;
00892         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
00893         sendBarrierMessage(p);
00894       }
00895     }
00896     /* non 0 node waiting */
00897     if (CmiMyNode() != 0) 
00898     {
00899       recvBarrierMessage();
00900       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00901         int p = CmiMyNode();
00902         p = BROADCAST_SPANNING_FACTOR*p + i;
00903         if (p > numnodes - 1) break;
00904         p = p%numnodes;
00905         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p);  */
00906         sendBarrierMessage(p);
00907       }
00908     }
00909   }
00910   CmiNodeAllBarrier();
00911   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
00912   return 0;
00913 }
00914 
00915 
00916 int CmiBarrierZero()
00917 {
00918   int i;
00919 
00920   if (Cmi_netpoll == 0) return -1;
00921 
00922   if (CmiMyRank() == 0) {
00923     if (CmiMyNode()) {
00924       sendBarrierMessage(0);
00925     }
00926     else {
00927       for (i=0; i<CmiNumNodes()-1; i++)
00928       {
00929         recvBarrierMessage();
00930       }
00931     }
00932   }
00933   CmiNodeAllBarrier();
00934   return 0;
00935 }

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1