arch/net/machine-dgram.c

Go to the documentation of this file.
00001 
00019 #if CMK_USE_AMMASSO
00020 #include "machine-ammasso.h"
00021 #endif
00022 
00023 #if CMK_USE_IBVERBS
00024 #include <infiniband/verbs.h>
00025 #endif
00026 
00027 #define DGRAM_HEADER_SIZE 8
00028 
00029 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
00030 #define CmiMsgHeaderGetLength(msg)      (((int*)(msg))[2])
00031 #define CmiMsgNext(msg) (*((void**)(msg)))
00032 
00033 #define DGRAM_ROOTPE_MASK   (0xFFFFu)
00034 #define DGRAM_SRCPE_MASK    (0xFFFF)
00035 #define DGRAM_MAGIC_MASK    (0xFF)
00036 #define DGRAM_SEQNO_MASK    (0xFFFFu)
00037 
00038 #if CMK_NODE_QUEUE_AVAILABLE
00039 #define DGRAM_NODEBROADCAST (0xFA)
00040 #define DGRAM_NODEMESSAGE   (0xFB)
00041 #endif
00042 #define DGRAM_DSTRANK_MAX   (0xFC)
00043 #define DGRAM_SIMPLEKILL    (0xFD)
00044 #define DGRAM_BROADCAST     (0xFE)
00045 #define DGRAM_ACKNOWLEDGE   (0xFF)
00046 
00047 /* DgramHeader overlays the first 4 fields of the converse CMK_MSG_HEADER_BASIC,
00048    defined in conv-common.h.  As such, its size and alignment are critical. */
00049 typedef struct {
00050         unsigned int seqno:16;  /* seq number in send window */
00051         unsigned int srcpe:16;  /* CmiMyPe of the sender */
00052         unsigned int dstrank:8; /* rank of destination processor */
00053         unsigned int magic:8;   /* Low 8 bits of charmrun PID */
00054         unsigned int rootpe:16; /* broadcast root processor */
00055 } DgramHeader;
00056 
00057 
00058 /* the window size needs to be Cmi_window_size + sizeof(unsigned int) bytes) */
00059 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
00060 
00061 extern unsigned char computeCheckSum(unsigned char *data, int len);
00062 
00063 #define DgramHeaderMake(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00064    DgramHeader *header = (DgramHeader *)(ptr);  \
00065    header->seqno = seqno_; \
00066    header->srcpe = srcpe_; \
00067    header->dstrank = dstrank_; \
00068    header->magic = magic_ & DGRAM_MAGIC_MASK; \
00069    header->rootpe = root_; \
00070 }
00071 
00072 #define DgramHeaderBreak(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00073    DgramHeader *header = (DgramHeader *)(ptr);  \
00074    seqno_ = header->seqno; \
00075    srcpe_ = header->srcpe; \
00076    dstrank_ = header->dstrank; \
00077    magic_ = header->magic; \
00078    root_ = header->rootpe; \
00079 }
00080 
00081 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
00082 static void randomCorrupt(char *data, int len)
00083 {
00084   if (0==(rand()%CMK_RANDOMLY_CORRUPT_MESSAGES))
00085   { /* insert one random bit flip into this message: */
00086     int badByte=rand()%len;
00087     int badBit=rand()%8;
00088     data[badByte]^=(1<<badBit);
00089   } 
00090 }
00091 #endif
00092 
00093 #define PE_BROADCAST_OTHERS (-101)
00094 #define PE_BROADCAST_ALL    (-102)
00095 
00096 #if CMK_NODE_QUEUE_AVAILABLE
00097 #define NODE_BROADCAST_OTHERS (-201)
00098 #define NODE_BROADCAST_ALL    (-202)
00099 #endif
00100 
00101 /********* Startup and Command-line args ********/
00102 static int    Cmi_max_dgram_size;
00103 static int    Cmi_os_buffer_size;
00104 static int    Cmi_window_size;
00105 static int    Cmi_half_window;
00106 static double Cmi_delay_retransmit;
00107 static double Cmi_ack_delay;
00108 static int    Cmi_dgram_max_data;
00109 static int    Cmi_comm_periodic_delay;
00110 static int    Cmi_comm_clock_delay;
00111 static int writeableAcks,writeableDgrams;/*Write-queue counts (to know when to sleep)*/
00112 
00113 static void setspeed_atm()
00114 {
00115   Cmi_max_dgram_size   = 2048;
00116   Cmi_os_buffer_size   = 50000;
00117   Cmi_window_size      = 16;       /*20;*/
00118   Cmi_delay_retransmit = 0.0150;
00119   Cmi_ack_delay        = 0.0035;
00120 }
00121 
00122 static void setspeed_eth()
00123 {
00124   Cmi_max_dgram_size   = 1400;
00125   Cmi_window_size      = 32;        /*40*/
00126   Cmi_os_buffer_size   = Cmi_window_size*Cmi_max_dgram_size;
00127   Cmi_delay_retransmit = 0.0400;
00128   Cmi_ack_delay        = 0.0050;
00129 }
00130 
00131 static void setspeed_gigabit()
00132 {
00133   /* for gigabit net */
00134   Cmi_max_dgram_size   = 9000;
00135   Cmi_window_size      = 8;
00136   Cmi_os_buffer_size   = 200000;
00137   Cmi_delay_retransmit = 0.020;
00138   Cmi_ack_delay        = 0.018;
00139 }
00140 
00141 static void extract_args(char **argv)
00142 {
00143   int ms;
00144   setspeed_eth();
00145   if (CmiGetArgFlagDesc(argv,"+atm","Tune for a low-latency ATM network"))
00146     setspeed_atm();
00147   if (CmiGetArgFlagDesc(argv,"+eth","Tune for an ethernet network"))
00148     setspeed_eth();
00149   if (CmiGetArgFlagDesc(argv,"+giga","Tune for a gigabit network"))
00150     setspeed_gigabit();
00151   CmiGetArgIntDesc(argv,"+max_dgram_size",&Cmi_max_dgram_size,"Size of each UDP packet");
00152   CmiGetArgIntDesc(argv,"+window_size",&Cmi_window_size,"Number of unacknowledged packets");
00153   /* must divide for window protocol to work */
00154   if ( (DGRAM_SEQNO_MASK+1)%Cmi_window_size != 0)
00155     CmiAbort("Invalid window size!");
00156   CmiGetArgIntDesc(argv,"+os_buffer_size",&Cmi_os_buffer_size, "UDP socket's SO_RCVBUF/SO_SNDBUF");
00157   if (CmiGetArgIntDesc(argv,"+delay_retransmit",&ms, "Milliseconds to wait before retransmit"))
00158           Cmi_delay_retransmit=0.001*ms;
00159   if (CmiGetArgIntDesc(argv,"+ack_delay",&ms, "Milliseconds to wait before ack'ing"))
00160           Cmi_delay_retransmit=0.001*ms;
00161   extract_common_args(argv);
00162   Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
00163   Cmi_half_window = Cmi_window_size >> 1;
00164   if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
00165     KillEveryone("Window size too big for OS buffer.");
00166   Cmi_comm_periodic_delay=(int)(1000*Cmi_delay_retransmit);
00167   if (Cmi_comm_periodic_delay>60) Cmi_comm_periodic_delay=60;
00168   Cmi_comm_clock_delay=(int)(1000*Cmi_ack_delay);
00169   if (sizeof(DgramHeader)!=DGRAM_HEADER_SIZE) {
00170     CmiAbort("DatagramHeader in machine-dgram.c is the wrong size!\n");
00171   }
00172 }
00173 
00174 /* Compare seqnos using modular arithmetic-- currently unused
00175 static int seqno_in_window(unsigned int seqno,unsigned int winStart)
00176 {
00177   return ((DGRAM_SEQNO_MASK&(seqno-winStart)) < Cmi_window_size);
00178 }
00179 static int seqno_lt(unsigned int seqA,unsigned int seqB)
00180 {
00181   unsigned int del=seqB-seqA;
00182   return (del>0u) && (del<(DGRAM_SEQNO_MASK/2));
00183 }
00184 static int seqno_le(unsigned int seqA,unsigned int seqB)
00185 {
00186   unsigned int del=seqB-seqA;
00187   return (del>=0u) && (del<(DGRAM_SEQNO_MASK/2));
00188 }
00189 */
00190 
00191 
00192 /*****************************************************************************
00193  *
00194  * Communication Structures
00195  *
00196  *****************************************************************************/
00197 
00198 typedef struct OutgoingMsgStruct
00199 {
00200   struct OutgoingMsgStruct *next;
00201   int   src, dst;
00202   int   size;
00203   char *data;
00204   int   refcount;
00205   int   freemode;
00206 }
00207 *OutgoingMsg;
00208 
00209 typedef struct ExplicitDgramStruct
00210 {
00211   struct ExplicitDgramStruct *next;
00212   int  srcpe, rank, seqno, broot;
00213   unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */
00214   double data[1];
00215 }
00216 *ExplicitDgram;
00217 
00218 typedef struct ImplicitDgramStruct
00219 {
00220   struct ImplicitDgramStruct *next;
00221   struct OtherNodeStruct *dest;
00222   int srcpe, rank, seqno, broot;
00223   char  *dataptr;
00224   int    datalen;
00225   OutgoingMsg ogm;
00226 }
00227 *ImplicitDgram;
00228 
00229 struct PendingMsgStruct;
00230 
00231 #if CMK_USE_IBVERBS
00232 struct infiOtherNodeData;
00233 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]);
00234 void    infiPostInitialRecvs();
00235 #endif
00236 
00237 #if CMK_USE_SYSVSHM
00238 inline void CommunicationServerSysvshm();
00239 #endif
00240 #if CMK_USE_PXSHM
00241 inline void CommunicationServerPxshm();
00242 #endif
00243 
00244 #if CMK_USE_AMMASSO
00245 
00246 /*
00247  * State Machine for Queue Pair Connection State (machine layer state for QP)
00248  *  PRE_CONNECT  --->  CONNECTED  ---> CONNECTION_LOST
00249  *                        |    /|\              |
00250  *                       \|/    \---------------/
00251  *           CONNECTION_CLOSED
00252  */
00253 
00254 typedef enum __qp_connection_state {
00255   QP_CONN_STATE_PRE_CONNECT = 1,     /* Connection is being attempted and no successful connection has been made yet           */
00256   QP_CONN_STATE_CONNECTED,           /* Connection has be established                                                          */
00257   QP_CONN_STATE_CONNECTION_LOST,     /* Connection is being attempted and there has been an established connection in the past */
00258   QP_CONN_STATE_CONNECTION_CLOSED    /* Connection closed                                                                      */
00259 } qp_connection_state_t;
00260 #endif
00261   
00262 typedef struct FutureMessageStruct {
00263   char *msg;
00264   int len;
00265 } *FutureMessage;
00266 
00267 typedef struct OtherNodeStruct
00268 {
00269   int nodestart, nodesize;
00270   skt_ip_t IP;
00271   unsigned int mach_id;
00272   unsigned int dataport;
00273   struct sockaddr_in addr;
00274 #if CMK_USE_TCP
00275   SOCKET        sock;           /* for TCP */
00276 #endif
00277 #if CMK_USE_MX
00278   CmiUInt8 nic_id;
00279   mx_endpoint_addr_t       endpoint_addr;
00280   CdsFifo                  futureMsgs;   /* out-of-order */
00281 #endif
00282 
00283   unsigned int             send_last;    /* seqno of last dgram sent */
00284   ImplicitDgram           *send_window;  /* datagrams sent, not acked */
00285   ImplicitDgram            send_queue_h; /* head of send queue */
00286   ImplicitDgram            send_queue_t; /* tail of send queue */
00287   unsigned int             send_next;    /* next seqno to go into queue */
00288   unsigned int             send_good;    /* last acknowledged seqno */
00289   double                   send_primer;  /* time to send retransmit */
00290   unsigned int             send_ack_seqno; /* next ack seqno to send */
00291   int                      retransmit_leash; /*Maximum number of packets to retransmit*/
00292 #if CMK_USE_GM
00293   struct PendingMsgStruct *sendhead, *sendtail;  /* gm send queue */
00294   int                      disable;
00295   int                      gm_pending;
00296 #endif
00297 
00298 #if CMK_USE_IBVERBS
00299         struct infiOtherNodeData *infiData;
00300 #endif
00301 
00302 #if CMK_USE_AMMASSO
00303   /* DMK : TODO : If any of these can be shared, then they can be moved to mycb_t in "machine-ammasso.c"  */
00304 
00305   cc_uint32_t            recv_cq_depth;
00306   cc_cq_handle_t         recv_cq;
00307   cc_uint32_t            send_cq_depth;
00308   cc_cq_handle_t         send_cq;
00309   cc_qp_id_t             qp_id;      /* Queue Pair ID      */
00310   cc_qp_handle_t         qp;         /* Queue Pair Handle  */
00311 
00312   int                    myNode;
00313 
00314   // list of buffers currently allocated to this paricular node
00315   LIST_DEFINE(AmmassoBuffer,recv_buf);
00316   // This is used when deallocating buffers: it is set to the buffer previous to
00317   // the last in the list of recv_buf, so that the list can be broken, and the
00318   // buffers released without the usage of either a double linked list or a
00319   // linear search
00320   AmmassoBuffer          *secondLastRecvBuf;
00321 
00322   // when allocating new buffers, they remain here until the other node conferm
00323   // their allocation
00324   LIST_DEFINE(AmmassoBuffer,pending);
00325 
00326   // number of the next ACK to be sent to this node, it is piggybacked on every
00327   // message
00328   ammasso_ack_t          *remoteAck;
00329 
00330   // how many messages have been received since the last ACK was sent
00331   int                    messagesNotYetAcknowledged;
00332 
00333   // the following is used to send the ACK without a message
00334   cc_sq_wr_t             *ack_sq_wr;
00335 
00336   // the following is a pointer to where the ACK will arrive (if directly sent)
00337   ammasso_ack_t          *directAck;
00338 
00339   LIST_DEFINE(AmmassoToken,sendTokens); // linked list of available tokens
00340   LIST_DEFINE(AmmassoToken,usedTokens); // they are waiting for an ACK
00341 
00342   // number of ACK received from this node, it is compared against the incoming
00343   // ACK to release send buffers
00344   ammasso_ack_t          localAck;
00345 
00346   // NOTE: this locks are probably not needed, try to see if they can be deleted
00347   CmiNodeLock            sendBufLock;
00348   CmiNodeLock            send_next_lock;
00349   CmiNodeLock            recv_expect_lock;
00350 
00351   cc_ep_handle_t         ep;
00352   cc_ep_handle_t         cr;
00353 
00354   cc_qp_query_attrs_t      qp_attrs;
00355   cc_stag_index_t          qp_attrs_stag_index;
00356 
00357   int                    posted;
00358 
00359   cc_inet_addr_t         address;  /* local if passive side of connection, remote if active side of connection */
00360   cc_inet_port_t         port;     /* local if passive side of connection, remote if active side of connection */
00361   qp_connection_state_t  connectionState;  /* State of the connection (connected, lost, etc) */
00362 
00363   cc_stag_t              remote_recv_stag;
00364   cc_uint64_t            remote_starting_to;
00365   int                    recv_UseIndex;
00366 
00367   /* Used by DYNAMIC ALLOCATOR */
00368   int  max_used_tokens;
00369 
00370 #endif
00371 
00372   int                      asm_rank;
00373   int                      asm_total;
00374   int                      asm_fill;
00375   char                    *asm_msg;
00376   
00377   int                      recv_ack_cnt; /* number of unacked dgrams */
00378   double                   recv_ack_time;/* time when ack should be sent */
00379   unsigned int             recv_expect;  /* next dgram to expect */
00380   ExplicitDgram           *recv_window;  /* Packets received, not integrated */
00381   int                      recv_winsz;   /* Number of packets in recv window */
00382   unsigned int             recv_next;    /* Seqno of first missing packet */
00383   unsigned int             recv_ack_seqno; /* last ack seqno received */
00384 
00385   unsigned int             stat_total_intr; /* Total Number of Interrupts */
00386   unsigned int             stat_proc_intr;  /* Processed Interrupts */
00387   unsigned int             stat_send_pkt;   /* number of packets sent */
00388   unsigned int             stat_resend_pkt; /* number of packets resent */
00389   unsigned int             stat_send_ack;   /* number of acks sent */
00390   unsigned int             stat_recv_pkt;   /* number of packets received */
00391   unsigned int             stat_recv_ack;   /* number of acks received */
00392   unsigned int             stat_ack_pkts;   /* packets acked */
00393   unsigned int             stat_consec_resend; /*Packets retransmitted since last ack*/ 
00394 
00395   int sent_msgs;
00396   int recd_msgs;
00397   int sent_bytes;
00398   int recd_bytes;
00399 }
00400 *OtherNode;
00401 
00402 static void OtherNode_init(OtherNode node)
00403 {
00404     int i;
00405     node->send_primer = 1.0e30; /*Don't retransmit until needed*/
00406     node->retransmit_leash = 1; /*Start with short leash*/
00407     node->send_last=0;
00408     node->send_window =
00409       (ImplicitDgram*)malloc(Cmi_window_size*sizeof(ImplicitDgram));
00410     for (i=0;i<Cmi_window_size;i++) node->send_window[i]=NULL;
00411     node->send_queue_h=node->send_queue_t=NULL;
00412     node->send_next=0;
00413     node->send_good=(unsigned int)(-1);
00414     node->send_ack_seqno=0;
00415 #if CMK_USE_GM
00416     node->sendhead = node->sendtail = NULL;
00417     node->disable = 0;
00418     node->gm_pending = 0;
00419 #endif
00420 #if CMK_USE_MX
00421     node->futureMsgs = CdsFifo_Create();
00422 #endif
00423 
00424     /*
00425     TODO: The initial values of the Ammasso related members will be set by the machine layer
00426           as the QPs are being created (along with any initial values).  After all the details
00427           of the layer are figured out, put some defaults here just so they are initialized to
00428           known values.  (Though, it should not be a problem that they are not initialized here yet.)
00429     */
00430 
00431     node->asm_rank=0;
00432     node->asm_total=0;
00433     node->asm_fill=0;
00434     node->asm_msg=0;
00435     
00436     node->recv_ack_cnt=0;
00437     node->recv_ack_time=1.0e30;
00438     node->recv_ack_seqno=0;
00439     node->recv_expect=0;
00440     node->recv_window =
00441       (ExplicitDgram*)malloc(Cmi_window_size*sizeof(ExplicitDgram));
00442     for (i=0;i<Cmi_window_size;i++) node->recv_window[i]=NULL;    
00443     node->recv_winsz=0;
00444     node->recv_next=0;
00445 
00446     node->stat_total_intr=0;
00447     node->stat_proc_intr=0;
00448     node->stat_send_pkt=0;
00449     node->stat_resend_pkt=0;
00450     node->stat_send_ack=0; 
00451     node->stat_recv_pkt=0;      
00452     node->stat_recv_ack=0;        
00453     node->stat_ack_pkts=0;
00454 
00455     node->sent_msgs = 0;
00456     node->recd_msgs = 0;
00457     node->sent_bytes = 0;
00458     node->recd_bytes = 0;
00459 }
00460 
00461 static OtherNode *nodes_by_pe;  /* OtherNodes indexed by processor number */
00462 static OtherNode  nodes;        /* Indexed only by ``node number'' */
00463 
00464 #ifdef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
00465 
00467 int CmiLongSendQueue(int forNode,int longerThan) {
00468         int ret=0;
00469         ImplicitDgram dg;
00470         CmiCommLock();
00471         dg=nodes[forNode].send_queue_h;
00472         while (longerThan>0 && dg) {
00473                 longerThan-=dg->datalen;
00474                 dg=dg->next;
00475         }
00476         CmiCommUnlock();
00477         return ret;
00478 }
00479 #endif
00480 
00481 extern void CmiGmConvertMachineID(unsigned int *mach_id);
00482 extern void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00483 
00484 /* initnode node table reply format:
00485  +------------------------------------------------------- 
00486  | 4 bytes  |   Number of nodes n                       ^
00487  |          |   (big-endian binary integer)       4+12*n bytes
00488  +-------------------------------------------------     |
00489  ^  |        (one entry for each node)            ^     |
00490  |  | 4 bytes  |   Number of PEs for this node    |     |
00491  n  | 4 bytes  |   IP address of this node   12*n bytes |
00492  |  | 4 bytes  |   Data (UDP) port of this node   |     |
00493  v  |          |   (big-endian binary integers)   v     v
00494  ---+----------------------------------------------------
00495 */
00496 static void node_addresses_store(ChMessage *msg)
00497 {
00498   ChMessageInt_t *n32=(ChMessageInt_t *)msg->data;
00499   ChNodeinfo *d=(ChNodeinfo *)(n32+1);
00500   int nodestart;
00501   int i,j,n;
00502   MACHSTATE(1,"node_addresses_store {");        
00503   _Cmi_numnodes=ChMessageInt(n32[0]);
00504 
00505 #if CMK_USE_IBVERBS
00506         ChInfiAddr *remoteInfiAddr = (ChInfiAddr *) (&msg->data[sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes]);
00507   if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes +sizeof(ChInfiAddr)*_Cmi_numnodes )
00508          !=(unsigned int)msg->len)
00509     {printf("Node table has inconsistent length!");machine_exit(1);}
00510 
00511 #else
00512 
00513   if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes)
00514          !=(unsigned int)msg->len)
00515     {printf("Node table has inconsistent length!");machine_exit(1);}
00516 #endif//CMK_USE_IBVERBS
00517   nodes = (OtherNode)malloc(_Cmi_numnodes * sizeof(struct OtherNodeStruct));
00518   nodestart=0;
00519   for (i=0; i<_Cmi_numnodes; i++) {
00520     nodes[i].nodestart = nodestart;
00521     nodes[i].nodesize  = ChMessageInt(d[i].nPE);
00522     MACHSTATE2(3,"node %d nodesize %d",i,nodes[i].nodesize);
00523     nodes[i].mach_id = ChMessageInt(d[i].mach_id);
00524 #if CMK_USE_MX
00525     nodes[i].nic_id = ChMessageLong(d[i].nic_id);
00526 #endif
00527 #if CMK_USE_GM
00528     CmiGmConvertMachineID(& nodes[i].mach_id);
00529 #endif
00530     nodes[i].IP=d[i].IP;
00531     if (i==_Cmi_mynode) {
00532       Cmi_nodestart=nodes[i].nodestart;
00533       _Cmi_mynodesize=nodes[i].nodesize;
00534       Cmi_self_IP=nodes[i].IP;
00535     }
00536 #if CMK_USE_IBVERBS
00537                 {
00538                         if(i != _Cmi_mynode){
00539                                 int addr[3];
00540                                 addr[0] =ChMessageInt(remoteInfiAddr[i].lid);
00541                                 addr[1] =ChMessageInt(remoteInfiAddr[i].qpn);
00542                                 addr[2] =ChMessageInt(remoteInfiAddr[i].psn);
00543                                 nodes[i].infiData = initInfiOtherNodeData(i,addr);
00544                         }
00545                 }
00546 #else
00547     nodes[i].dataport = ChMessageInt(d[i].dataport);
00548     nodes[i].addr = skt_build_addr(nodes[i].IP,nodes[i].dataport);
00549 #endif
00550 
00551 #if CMK_USE_TCP
00552     nodes[i].sock = INVALID_SOCKET;
00553 #endif
00554     nodestart+=nodes[i].nodesize;
00555 
00556 #if CMK_USE_AMMASSO
00557     CmiAmmassoNodeAddressesStoreHandler(nodes[i].nodestart, &(nodes[i].addr), nodes[i].dataport);
00558 #endif
00559 
00560   }
00561   _Cmi_numpes=nodestart;
00562   n = _Cmi_numpes;
00563 #ifdef CMK_CPV_IS_SMP
00564   n += _Cmi_numnodes;
00565 #endif
00566   nodes_by_pe = (OtherNode*)malloc(n * sizeof(OtherNode));
00567   _MEMCHECK(nodes_by_pe);
00568   for (i=0; i<_Cmi_numnodes; i++) {
00569     OtherNode node = nodes + i;
00570     OtherNode_init(node);
00571     for (j=0; j<node->nodesize; j++)
00572       nodes_by_pe[j + node->nodestart] = node;
00573   }
00574 #ifdef CMK_CPV_IS_SMP
00575   /* index for communication threads */
00576   for (i=_Cmi_numpes; i<_Cmi_numpes+_Cmi_numnodes; i++) {
00577     OtherNode node = nodes + i-_Cmi_numpes;
00578     nodes_by_pe[i] = node;
00579   }
00580 #endif
00581 #if CMK_USE_IBVERBS
00582         infiPostInitialRecvs();
00583 #endif
00584   MACHSTATE(1,"} node_addresses_store");
00585 }
00586 
00590 static char statstr[10000];
00591 
00592 void printNetStatistics(void)
00593 {
00594   char tmpstr[1024];
00595   OtherNode myNode;
00596   int i;
00597   unsigned int send_pkt=0, resend_pkt=0, recv_pkt=0, send_ack=0;
00598   unsigned int recv_ack=0, ack_pkts=0;
00599 
00600   myNode = nodes+CmiMyNode();
00601   sprintf(tmpstr, "***********************************\n");
00602   strcpy(statstr, tmpstr);
00603   sprintf(tmpstr, "Net Statistics For Node %u\n", CmiMyNode());
00604   strcat(statstr, tmpstr);
00605   sprintf(tmpstr, "Interrupts: %u \tProcessed: %u\n",
00606                   myNode->stat_total_intr, myNode->stat_proc_intr);
00607   strcat(statstr, tmpstr);
00608   sprintf(tmpstr, "Total Msgs Sent: %u \tTotal Bytes Sent: %u\n",
00609                   myNode->sent_msgs, myNode->sent_bytes);
00610   strcat(statstr, tmpstr);
00611   sprintf(tmpstr, "Total Msgs Recv: %u \tTotal Bytes Recv: %u\n",
00612                   myNode->recd_msgs, myNode->recd_bytes);
00613   strcat(statstr, tmpstr);
00614   sprintf(tmpstr, "***********************************\n");
00615   strcat(statstr, tmpstr);
00616   sprintf(tmpstr, "[Num]\tSENDTO\tRESEND\tRECV\tACKSTO\tACKSFRM\tPKTACK\n");
00617   strcat(statstr,tmpstr);
00618   sprintf(tmpstr, "=====\t======\t======\t====\t======\t=======\t======\n");
00619   strcat(statstr,tmpstr);
00620   for(i=0;i<CmiNumNodes();i++) {
00621     OtherNode node = nodes+i;
00622     sprintf(tmpstr, "[%u]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00623                      i, node->stat_send_pkt, node->stat_resend_pkt,
00624                      node->stat_recv_pkt, node->stat_send_ack,
00625                      node->stat_recv_ack, node->stat_ack_pkts);
00626     strcat(statstr, tmpstr);
00627     send_pkt += node->stat_send_pkt;
00628     recv_pkt += node->stat_recv_pkt;
00629     resend_pkt += node->stat_resend_pkt;
00630     send_ack += node->stat_send_ack;
00631     recv_ack += node->stat_recv_ack;
00632     ack_pkts += node->stat_ack_pkts;
00633   }
00634   sprintf(tmpstr, "[TOTAL]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00635                      send_pkt, resend_pkt,
00636                      recv_pkt, send_ack,
00637                      recv_ack, ack_pkts);
00638   strcat(statstr, tmpstr);
00639   sprintf(tmpstr, "***********************************\n");
00640   strcat(statstr, tmpstr);
00641   CmiPrintf(statstr);
00642 }
00643 
00644 
00645 /************** free list management *****************/
00646 
00647 static ExplicitDgram Cmi_freelist_explicit;
00648 static ImplicitDgram Cmi_freelist_implicit;
00649 /*static OutgoingMsg   Cmi_freelist_outgoing;*/
00650 
00651 #define FreeImplicitDgram(dg) {\
00652   ImplicitDgram d=(dg);\
00653   d->next = Cmi_freelist_implicit;\
00654   Cmi_freelist_implicit = d;\
00655 }
00656 
00657 #define MallocImplicitDgram(dg) {\
00658   ImplicitDgram d = Cmi_freelist_implicit;\
00659   if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
00660              _MEMCHECK(d);\
00661   } else Cmi_freelist_implicit = d->next;\
00662   dg = d;\
00663 }
00664 
00665 #define FreeExplicitDgram(dg) {\
00666   ExplicitDgram d=(dg);\
00667   d->next = Cmi_freelist_explicit;\
00668   Cmi_freelist_explicit = d;\
00669 }
00670 
00671 #define MallocExplicitDgram(dg) {\
00672   ExplicitDgram d = Cmi_freelist_explicit;\
00673   if (d==0) { d = ((ExplicitDgram)malloc \
00674                    (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
00675               _MEMCHECK(d);\
00676   } else Cmi_freelist_explicit = d->next;\
00677   dg = d;\
00678 }
00679 
00680 /* Careful with these next two, need concurrency control */
00681 
00682 #define FreeOutgoingMsg(m) (free(m))
00683 #define MallocOutgoingMsg(m)\
00684     {(m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct))); _MEMCHECK(m);}
00685 
00686 /****************************************************************************
00687  *                                                                          
00688  * CheckSocketsReady
00689  *
00690  * Checks both sockets to see which are readable and which are writeable.
00691  * We check all these things at the same time since this can be done for
00692  * free with ``select.'' The result is stored in global variables, since
00693  * this is essentially global state information and several routines need it.
00694  *
00695  ***************************************************************************/
00696 
00697 static int ctrlskt_ready_read;
00698 static int dataskt_ready_read;
00699 static int dataskt_ready_write;
00700 
00701 /******************************************************************************
00702  *
00703  * Transmission Code
00704  *
00705  *****************************************************************************/
00706 
00707 void GarbageCollectMsg(OutgoingMsg ogm)
00708 {
00709   MACHSTATE2(3,"GarbageCollectMsg called on ogm %p refcount %d",ogm,ogm->refcount);
00710         if (ogm->refcount == 0) {
00711     if (ogm->freemode == 'A') {
00712       ogm->freemode = 'X';
00713     } else {
00714       if (ogm->freemode != 'G') CmiFree(ogm->data);
00715       FreeOutgoingMsg(ogm);
00716     }
00717   }
00718 }
00719 
00720 void DiscardImplicitDgram(ImplicitDgram dg)
00721 {
00722   OutgoingMsg ogm;
00723   ogm = dg->ogm;
00724   ogm->refcount--;
00725   GarbageCollectMsg(ogm);
00726   FreeImplicitDgram(dg);
00727 }
00728 
00729 /*
00730  Check the real-time clock and perform periodic tasks.
00731  Must be called with comm. lock held.
00732  */
00733 static double Cmi_ack_last, Cmi_check_last;
00734 static void CommunicationsClock(void)
00735 {
00736   MACHSTATE(1,"CommunicationsClock");
00737   Cmi_clock = GetClock();
00738   if (Cmi_clock > Cmi_ack_last + 0.5*Cmi_ack_delay) {
00739     MACHSTATE(2,"CommunicationsClock timing out acks");    
00740     Cmi_ack_last=Cmi_clock;
00741     writeableAcks=1;
00742     writeableDgrams=1;
00743   }
00744   
00745   if (Cmi_clock > Cmi_check_last + Cmi_check_delay) {
00746     MACHSTATE(4,"CommunicationsClock pinging charmrun");       
00747     Cmi_check_last = Cmi_clock; 
00748     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
00749   }
00750 }
00751 
00752 #if CMK_SHARED_VARS_UNAVAILABLE
00753 static void CommunicationsClockCaller(void *ignored)
00754 {
00755   CmiCommLock();
00756   CommunicationsClock();
00757   CmiCommUnlock();
00758   CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);  
00759 }
00760 
00761 static void CommunicationPeriodic(void) 
00762 { /*Poll on the communications server*/
00763 #if CMK_USE_SYSVSHM
00764         CommunicationServerSysvshm();
00765 #endif
00766 #if CMK_USE_PXSHM
00767         CommunicationServerPxshm();
00768 #endif
00769   CommunicationServer(0, COMM_SERVER_FROM_SMP);
00770 }
00771 
00772 static void CommunicationPeriodicCaller(void *ignored)
00773 {
00774   CommunicationPeriodic();
00775   CcdCallFnAfter((CcdVoidFn)CommunicationPeriodicCaller,NULL,Cmi_comm_periodic_delay);
00776 }
00777 #endif
00778 
00779 /* common hardware dependent API */
00780 /*void EnqueueOutgoingDgram(OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot);*/
00781 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy);
00782 
00783 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int nodesend);
00784 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int curcycle, int nodesend);
00785 
00786 #if CMK_USE_GM
00787 
00788 #include "machine-gm.c"
00789 
00790 #elif CMK_USE_MX
00791 
00792 #include "machine-mx.c"
00793 
00794 #elif CMK_USE_AMMASSO
00795 
00796 #include "machine-ammasso.c"
00797 #define BARRIER_NULL           1
00798 
00799 #elif CMK_USE_TCP
00800 
00801 #include "machine-tcp.c"
00802 #define BARRIER_NULL           1
00803 
00804 #elif CMK_USE_IBVERBS
00805 
00806 #include "machine-ibverbs.c"
00807 /*#define BARRIER_NULL           1*/
00808 
00809 #else
00810 
00811 #include "machine-eth.c"
00812 
00813 #endif
00814 
00815 #if CMK_USE_SYSVSHM
00816 #include "machine-sysvshm.c"
00817 #endif
00818 #if CMK_USE_PXSHM
00819 #include "machine-pxshm.c"
00820 #endif
00821 
00822 
00823 
00824 #if  BARRIER_NULL
00825 int CmiBarrier()
00826 {
00827   return -1;
00828 }
00829 
00830 int CmiBarrierZero()
00831 {
00832   return -1;
00833 }
00834 #endif
00835 

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1