00001
00019 #if CMK_USE_AMMASSO
00020 #include "machine-ammasso.h"
00021 #endif
00022
00023 #if CMK_USE_IBVERBS | CMK_USE_IBUD
00024 #include <infiniband/verbs.h>
00025 #endif
00026
00027 #define DGRAM_HEADER_SIZE 8
00028
00029 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
00030 #define CmiMsgHeaderGetLength(msg) (((int*)(msg))[2])
00031 #define CmiMsgNext(msg) (*((void**)(msg)))
00032
00033 #define DGRAM_ROOTPE_MASK (0xFFFFu)
00034 #define DGRAM_SRCPE_MASK (0xFFFF)
00035 #define DGRAM_MAGIC_MASK (0xFF)
00036 #define DGRAM_SEQNO_MASK (0xFFFFu)
00037
00038 #if CMK_NODE_QUEUE_AVAILABLE
00039 #define DGRAM_NODEBROADCAST (0xFA)
00040 #define DGRAM_NODEMESSAGE (0xFB)
00041 #endif
00042 #define DGRAM_DSTRANK_MAX (0xFC)
00043 #define DGRAM_SIMPLEKILL (0xFD)
00044 #define DGRAM_BROADCAST (0xFE)
00045 #define DGRAM_ACKNOWLEDGE (0xFF)
00046
00047
00048
00049 typedef struct {
00050 unsigned int seqno:16;
00051 unsigned int srcpe:16;
00052 unsigned int dstrank:8;
00053 unsigned int magic:8;
00054 unsigned int rootpe:16;
00055 } DgramHeader;
00056
00057
00058
00059 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
00060
00061 extern unsigned char computeCheckSum(unsigned char *data, int len);
00062
00063 #define DgramHeaderMake(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00064 DgramHeader *header = (DgramHeader *)(ptr); \
00065 header->seqno = seqno_; \
00066 header->srcpe = srcpe_; \
00067 header->dstrank = dstrank_; \
00068 header->magic = magic_ & DGRAM_MAGIC_MASK; \
00069 header->rootpe = root_; \
00070 }
00071
00072 #define DgramHeaderBreak(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00073 DgramHeader *header = (DgramHeader *)(ptr); \
00074 seqno_ = header->seqno; \
00075 srcpe_ = header->srcpe; \
00076 dstrank_ = header->dstrank; \
00077 magic_ = header->magic; \
00078 root_ = header->rootpe; \
00079 }
00080
00081 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
00082 static void randomCorrupt(char *data, int len)
00083 {
00084 if (0==(rand()%CMK_RANDOMLY_CORRUPT_MESSAGES))
00085 {
00086 int badByte=rand()%len;
00087 int badBit=rand()%8;
00088 data[badByte]^=(1<<badBit);
00089 }
00090 }
00091 #endif
00092
00093 #define PE_BROADCAST_OTHERS (-101)
00094 #define PE_BROADCAST_ALL (-102)
00095
00096 #if CMK_NODE_QUEUE_AVAILABLE
00097 #define NODE_BROADCAST_OTHERS (-201)
00098 #define NODE_BROADCAST_ALL (-202)
00099 #endif
00100
00101
00102 static int Cmi_max_dgram_size;
00103 static int Cmi_os_buffer_size;
00104 static int Cmi_window_size;
00105 static int Cmi_half_window;
00106 static double Cmi_delay_retransmit;
00107 static double Cmi_ack_delay;
00108 static int Cmi_dgram_max_data;
00109 static int Cmi_comm_periodic_delay;
00110 static int Cmi_comm_clock_delay;
00111 static int writeableAcks,writeableDgrams;
00112
00113 static void setspeed_atm()
00114 {
00115 Cmi_max_dgram_size = 2048;
00116 Cmi_os_buffer_size = 50000;
00117 Cmi_window_size = 16;
00118 Cmi_delay_retransmit = 0.0150;
00119 Cmi_ack_delay = 0.0035;
00120 }
00121
00122 static void setspeed_eth()
00123 {
00124 Cmi_max_dgram_size = 1400;
00125 Cmi_window_size = 32;
00126 Cmi_os_buffer_size = Cmi_window_size*Cmi_max_dgram_size;
00127 Cmi_delay_retransmit = 0.0400;
00128 Cmi_ack_delay = 0.0050;
00129 }
00130
00131 static void setspeed_gigabit()
00132 {
00133
00134 Cmi_max_dgram_size = 9000;
00135 Cmi_window_size = 8;
00136 Cmi_os_buffer_size = 200000;
00137 Cmi_delay_retransmit = 0.020;
00138 Cmi_ack_delay = 0.018;
00139 }
00140
00141 static void extract_args(char **argv)
00142 {
00143 int ms;
00144 setspeed_gigabit();
00145 if (CmiGetArgFlagDesc(argv,"+atm","Tune for a low-latency ATM network"))
00146 setspeed_atm();
00147 if (CmiGetArgFlagDesc(argv,"+eth","Tune for an ethernet network"))
00148 setspeed_eth();
00149 if (CmiGetArgFlagDesc(argv,"+giga","Tune for a gigabit network"))
00150 setspeed_gigabit();
00151 CmiGetArgIntDesc(argv,"+max_dgram_size",&Cmi_max_dgram_size,"Size of each UDP packet");
00152 CmiGetArgIntDesc(argv,"+window_size",&Cmi_window_size,"Number of unacknowledged packets");
00153
00154 if ( (DGRAM_SEQNO_MASK+1)%Cmi_window_size != 0)
00155 CmiAbort("Invalid window size!");
00156 CmiGetArgIntDesc(argv,"+os_buffer_size",&Cmi_os_buffer_size, "UDP socket's SO_RCVBUF/SO_SNDBUF");
00157 if (CmiGetArgIntDesc(argv,"+delay_retransmit",&ms, "Milliseconds to wait before retransmit"))
00158 Cmi_delay_retransmit=0.001*ms;
00159 if (CmiGetArgIntDesc(argv,"+ack_delay",&ms, "Milliseconds to wait before ack'ing"))
00160 Cmi_ack_delay=0.001*ms;
00161 extract_common_args(argv);
00162 Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
00163 Cmi_half_window = Cmi_window_size >> 1;
00164 if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
00165 KillEveryone("Window size too big for OS buffer.");
00166 Cmi_comm_periodic_delay=(int)(1000*Cmi_delay_retransmit);
00167 if (Cmi_comm_periodic_delay>60) Cmi_comm_periodic_delay=60;
00168 Cmi_comm_clock_delay=(int)(1000*Cmi_ack_delay);
00169 if (sizeof(DgramHeader)!=DGRAM_HEADER_SIZE) {
00170 CmiAbort("DatagramHeader in machine-dgram.c is the wrong size!\n");
00171 }
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198 typedef struct OutgoingMsgStruct
00199 {
00200 struct OutgoingMsgStruct *next;
00201 int src, dst;
00202 int size;
00203 char *data;
00204 int refcount;
00205 int freemode;
00206 }
00207 *OutgoingMsg;
00208
00209 typedef struct ExplicitDgramStruct
00210 {
00211 struct ExplicitDgramStruct *next;
00212 int srcpe, rank, seqno, broot;
00213 unsigned int len, dummy;
00214 double data[1];
00215 }
00216 *ExplicitDgram;
00217
00218 typedef struct ImplicitDgramStruct
00219 {
00220 struct ImplicitDgramStruct *next;
00221 struct OtherNodeStruct *dest;
00222 int srcpe, rank, seqno, broot;
00223 char *dataptr;
00224 int datalen;
00225 OutgoingMsg ogm;
00226 }
00227 *ImplicitDgram;
00228
00229 struct PendingMsgStruct;
00230
00231
00232 #if CMK_USE_IBUD
00233 struct infiOtherNodeData;
00234 struct infiOtherNodeData *initinfiData(int node,int lid,int qpn,int psn);
00235 #endif
00236 #if CMK_USE_IBVERBS
00237 struct infiOtherNodeData;
00238 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]);
00239 void infiPostInitialRecvs();
00240 #endif
00241
00242 #if CMK_USE_SYSVSHM
00243 inline void CommunicationServerSysvshm();
00244 #endif
00245 #if CMK_USE_PXSHM
00246 inline void CommunicationServerPxshm();
00247 #endif
00248
00249 #if CMK_USE_AMMASSO
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259 typedef enum __qp_connection_state {
00260 QP_CONN_STATE_PRE_CONNECT = 1,
00261 QP_CONN_STATE_CONNECTED,
00262 QP_CONN_STATE_CONNECTION_LOST,
00263 QP_CONN_STATE_CONNECTION_CLOSED
00264 } qp_connection_state_t;
00265 #endif
00266
00267 typedef struct FutureMessageStruct {
00268 char *msg;
00269 int len;
00270 } *FutureMessage;
00271
00272 typedef struct OtherNodeStruct
00273 {
00274 int nodestart, nodesize;
00275 skt_ip_t IP;
00276 unsigned int mach_id;
00277 unsigned int dataport;
00278 struct sockaddr_in addr;
00279 #if CMK_USE_TCP
00280 SOCKET sock;
00281 #endif
00282 #if CMK_USE_MX
00283 CmiUInt8 nic_id;
00284 mx_endpoint_addr_t endpoint_addr;
00285 CdsFifo futureMsgs;
00286 #endif
00287
00288 unsigned int send_last;
00289 ImplicitDgram *send_window;
00290 ImplicitDgram send_queue_h;
00291 ImplicitDgram send_queue_t;
00292 unsigned int send_next;
00293 unsigned int send_good;
00294 double send_primer;
00295 unsigned int send_ack_seqno;
00296 int retransmit_leash;
00297 #if CMK_USE_GM
00298 struct PendingMsgStruct *sendhead, *sendtail;
00299 int disable;
00300 int gm_pending;
00301 #endif
00302
00303 #if CMK_USE_IBVERBS | CMK_USE_IBUD
00304 struct infiOtherNodeData *infiData;
00305 #endif
00306
00307 #if CMK_USE_AMMASSO
00308
00309
00310 cc_uint32_t recv_cq_depth;
00311 cc_cq_handle_t recv_cq;
00312 cc_uint32_t send_cq_depth;
00313 cc_cq_handle_t send_cq;
00314 cc_qp_id_t qp_id;
00315 cc_qp_handle_t qp;
00316
00317 int myNode;
00318
00319
00320 LIST_DEFINE(AmmassoBuffer,recv_buf);
00321
00322
00323
00324
00325 AmmassoBuffer *secondLastRecvBuf;
00326
00327
00328
00329 LIST_DEFINE(AmmassoBuffer,pending);
00330
00331
00332
00333 ammasso_ack_t *remoteAck;
00334
00335
00336 int messagesNotYetAcknowledged;
00337
00338
00339 cc_sq_wr_t *ack_sq_wr;
00340
00341
00342 ammasso_ack_t *directAck;
00343
00344 LIST_DEFINE(AmmassoToken,sendTokens);
00345 LIST_DEFINE(AmmassoToken,usedTokens);
00346
00347
00348
00349 ammasso_ack_t localAck;
00350
00351
00352 CmiNodeLock sendBufLock;
00353 CmiNodeLock send_next_lock;
00354 CmiNodeLock recv_expect_lock;
00355
00356 cc_ep_handle_t ep;
00357 cc_ep_handle_t cr;
00358
00359 cc_qp_query_attrs_t qp_attrs;
00360 cc_stag_index_t qp_attrs_stag_index;
00361
00362 int posted;
00363
00364 cc_inet_addr_t address;
00365 cc_inet_port_t port;
00366 qp_connection_state_t connectionState;
00367
00368 cc_stag_t remote_recv_stag;
00369 cc_uint64_t remote_starting_to;
00370 int recv_UseIndex;
00371
00372
00373 int max_used_tokens;
00374
00375 #endif
00376
00377 int asm_rank;
00378 int asm_total;
00379 int asm_fill;
00380 char *asm_msg;
00381
00382 int recv_ack_cnt;
00383 double recv_ack_time;
00384 unsigned int recv_expect;
00385 ExplicitDgram *recv_window;
00386 int recv_winsz;
00387 unsigned int recv_next;
00388 unsigned int recv_ack_seqno;
00389
00390 unsigned int stat_total_intr;
00391 unsigned int stat_proc_intr;
00392 unsigned int stat_send_pkt;
00393 unsigned int stat_resend_pkt;
00394 unsigned int stat_send_ack;
00395 unsigned int stat_recv_pkt;
00396 unsigned int stat_recv_ack;
00397 unsigned int stat_ack_pkts;
00398 unsigned int stat_consec_resend;
00399
00400 int sent_msgs;
00401 int recd_msgs;
00402 int sent_bytes;
00403 int recd_bytes;
00404 }
00405 *OtherNode;
00406
00407 static void OtherNode_init(OtherNode node)
00408 {
00409 int i;
00410 node->send_primer = 1.0e30;
00411 node->retransmit_leash = 1;
00412 node->send_last=0;
00413 node->send_window =
00414 (ImplicitDgram*)malloc(Cmi_window_size*sizeof(ImplicitDgram));
00415 for (i=0;i<Cmi_window_size;i++) node->send_window[i]=NULL;
00416 node->send_queue_h=node->send_queue_t=NULL;
00417 node->send_next=0;
00418 node->send_good=(unsigned int)(-1);
00419 node->send_ack_seqno=0;
00420 #if CMK_USE_GM
00421 node->sendhead = node->sendtail = NULL;
00422 node->disable = 0;
00423 node->gm_pending = 0;
00424 #endif
00425 #if CMK_USE_MX
00426 node->futureMsgs = CdsFifo_Create();
00427 #endif
00428
00429
00430
00431
00432
00433
00434
00435
00436 node->asm_rank=0;
00437 node->asm_total=0;
00438 node->asm_fill=0;
00439 node->asm_msg=0;
00440
00441 node->recv_ack_cnt=0;
00442 node->recv_ack_time=1.0e30;
00443 node->recv_ack_seqno=0;
00444 node->recv_expect=0;
00445 node->recv_window =
00446 (ExplicitDgram*)malloc(Cmi_window_size*sizeof(ExplicitDgram));
00447 for (i=0;i<Cmi_window_size;i++) node->recv_window[i]=NULL;
00448 node->recv_winsz=0;
00449 node->recv_next=0;
00450
00451 node->stat_total_intr=0;
00452 node->stat_proc_intr=0;
00453 node->stat_send_pkt=0;
00454 node->stat_resend_pkt=0;
00455 node->stat_send_ack=0;
00456 node->stat_recv_pkt=0;
00457 node->stat_recv_ack=0;
00458 node->stat_ack_pkts=0;
00459
00460 node->sent_msgs = 0;
00461 node->recd_msgs = 0;
00462 node->sent_bytes = 0;
00463 node->recd_bytes = 0;
00464 }
00465
00466 static OtherNode *nodes_by_pe;
00467 static OtherNode nodes;
00468
00469 #ifdef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
00470
00472 int CmiLongSendQueue(int forNode,int longerThan) {
00473 int ret=0;
00474 ImplicitDgram dg;
00475 CmiCommLock();
00476 dg=nodes[forNode].send_queue_h;
00477 while (longerThan>0 && dg) {
00478 longerThan-=dg->datalen;
00479 dg=dg->next;
00480 }
00481 CmiCommUnlock();
00482 return ret;
00483 }
00484 #endif
00485
00486 extern void CmiGmConvertMachineID(unsigned int *mach_id);
00487 extern void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501 static void node_addresses_store(ChMessage *msg)
00502 {
00503 ChMessageInt_t *n32=(ChMessageInt_t *)msg->data;
00504 ChNodeinfo *d=(ChNodeinfo *)(n32+1);
00505 int nodestart;
00506 int i,j,n;
00507 MACHSTATE(1,"node_addresses_store {");
00508 _Cmi_numnodes=ChMessageInt(n32[0]);
00509
00510 #if CMK_USE_IBVERBS
00511 ChInfiAddr *remoteInfiAddr = (ChInfiAddr *) (&msg->data[sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes]);
00512 if (Cmi_charmrun_fd == -1) {
00513 d = &((ChSingleNodeinfo*)n32)->info;
00514 }
00515 else if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes +sizeof(ChInfiAddr)*_Cmi_numnodes )
00516 !=(unsigned int)msg->len)
00517 {printf("Node table has inconsistent length!");machine_exit(1);}
00518
00519 #else
00520
00521 if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes)
00522 !=(unsigned int)msg->len)
00523 {printf("Node table has inconsistent length!");machine_exit(1);}
00524 #endif
00525 nodes = (OtherNode)malloc(_Cmi_numnodes * sizeof(struct OtherNodeStruct));
00526 nodestart=0;
00527 for (i=0; i<_Cmi_numnodes; i++) {
00528 nodes[i].nodestart = nodestart;
00529 nodes[i].nodesize = ChMessageInt(d[i].nPE);
00530 MACHSTATE2(3,"node %d nodesize %d",i,nodes[i].nodesize);
00531 nodes[i].mach_id = ChMessageInt(d[i].mach_id);
00532 #if CMK_USE_MX
00533 nodes[i].nic_id = ChMessageLong(d[i].nic_id);
00534 #endif
00535
00536 #if CMK_USE_IBUD
00537 nodes[i].infiData=initinfiData(i,ChMessageInt(d[i].qp.lid),ChMessageInt(d[i].qp.qpn),ChMessageInt(d[i].qp.psn));
00538 #endif
00539
00540 #if CMK_USE_GM
00541 CmiGmConvertMachineID(& nodes[i].mach_id);
00542 #endif
00543 nodes[i].IP=d[i].IP;
00544 if (i==_Cmi_mynode) {
00545 Cmi_nodestart=nodes[i].nodestart;
00546 _Cmi_mynodesize=nodes[i].nodesize;
00547 Cmi_self_IP=nodes[i].IP;
00548 }
00549
00550 #if CMK_USE_IBVERBS
00551 if(i != _Cmi_mynode){
00552 int addr[3];
00553 addr[0] =ChMessageInt(remoteInfiAddr[i].lid);
00554 addr[1] =ChMessageInt(remoteInfiAddr[i].qpn);
00555 addr[2] =ChMessageInt(remoteInfiAddr[i].psn);
00556 nodes[i].infiData = initInfiOtherNodeData(i,addr);
00557 }
00558 #else
00559 nodes[i].dataport = ChMessageInt(d[i].dataport);
00560 nodes[i].addr = skt_build_addr(nodes[i].IP,nodes[i].dataport);
00561 #endif
00562
00563 #if CMK_USE_TCP
00564 nodes[i].sock = INVALID_SOCKET;
00565 #endif
00566 nodestart+=nodes[i].nodesize;
00567
00568 #if CMK_USE_AMMASSO
00569 CmiAmmassoNodeAddressesStoreHandler(nodes[i].nodestart, &(nodes[i].addr), nodes[i].dataport);
00570 #endif
00571
00572 }
00573 _Cmi_numpes=nodestart;
00574 n = _Cmi_numpes;
00575 #ifdef CMK_CPV_IS_SMP
00576 n += _Cmi_numnodes;
00577 #endif
00578 nodes_by_pe = (OtherNode*)malloc(n * sizeof(OtherNode));
00579 _MEMCHECK(nodes_by_pe);
00580 for (i=0; i<_Cmi_numnodes; i++) {
00581 OtherNode node = nodes + i;
00582 OtherNode_init(node);
00583 for (j=0; j<node->nodesize; j++)
00584 nodes_by_pe[j + node->nodestart] = node;
00585 }
00586 #ifdef CMK_CPV_IS_SMP
00587
00588 for (i=_Cmi_numpes; i<_Cmi_numpes+_Cmi_numnodes; i++) {
00589 OtherNode node = nodes + i-_Cmi_numpes;
00590 nodes_by_pe[i] = node;
00591 }
00592 #endif
00593 #if CMK_USE_IBVERBS
00594 infiPostInitialRecvs();
00595 #endif
00596 MACHSTATE(1,"} node_addresses_store");
00597 }
00598
00602 static char statstr[10000];
00603
00604 void printNetStatistics(void)
00605 {
00606 char tmpstr[1024];
00607 OtherNode myNode;
00608 int i;
00609 unsigned int send_pkt=0, resend_pkt=0, recv_pkt=0, send_ack=0;
00610 unsigned int recv_ack=0, ack_pkts=0;
00611
00612 myNode = nodes+CmiMyNode();
00613 sprintf(tmpstr, "***********************************\n");
00614 strcpy(statstr, tmpstr);
00615 sprintf(tmpstr, "Net Statistics For Node %u\n", CmiMyNode());
00616 strcat(statstr, tmpstr);
00617 sprintf(tmpstr, "Interrupts: %u \tProcessed: %u\n",
00618 myNode->stat_total_intr, myNode->stat_proc_intr);
00619 strcat(statstr, tmpstr);
00620 sprintf(tmpstr, "Total Msgs Sent: %u \tTotal Bytes Sent: %u\n",
00621 myNode->sent_msgs, myNode->sent_bytes);
00622 strcat(statstr, tmpstr);
00623 sprintf(tmpstr, "Total Msgs Recv: %u \tTotal Bytes Recv: %u\n",
00624 myNode->recd_msgs, myNode->recd_bytes);
00625 strcat(statstr, tmpstr);
00626 sprintf(tmpstr, "***********************************\n");
00627 strcat(statstr, tmpstr);
00628 sprintf(tmpstr, "[Num]\tSENDTO\tRESEND\tRECV\tACKSTO\tACKSFRM\tPKTACK\n");
00629 strcat(statstr,tmpstr);
00630 sprintf(tmpstr, "=====\t======\t======\t====\t======\t=======\t======\n");
00631 strcat(statstr,tmpstr);
00632 for(i=0;i<CmiNumNodes();i++) {
00633 OtherNode node = nodes+i;
00634 sprintf(tmpstr, "[%u]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00635 i, node->stat_send_pkt, node->stat_resend_pkt,
00636 node->stat_recv_pkt, node->stat_send_ack,
00637 node->stat_recv_ack, node->stat_ack_pkts);
00638 strcat(statstr, tmpstr);
00639 send_pkt += node->stat_send_pkt;
00640 recv_pkt += node->stat_recv_pkt;
00641 resend_pkt += node->stat_resend_pkt;
00642 send_ack += node->stat_send_ack;
00643 recv_ack += node->stat_recv_ack;
00644 ack_pkts += node->stat_ack_pkts;
00645 }
00646 sprintf(tmpstr, "[TOTAL]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00647 send_pkt, resend_pkt,
00648 recv_pkt, send_ack,
00649 recv_ack, ack_pkts);
00650 strcat(statstr, tmpstr);
00651 sprintf(tmpstr, "***********************************\n");
00652 strcat(statstr, tmpstr);
00653 CmiPrintf(statstr);
00654 }
00655
00656
00657
00658
00659 static ExplicitDgram Cmi_freelist_explicit;
00660 static ImplicitDgram Cmi_freelist_implicit;
00661
00662
00663 #define FreeImplicitDgram(dg) {\
00664 ImplicitDgram d=(dg);\
00665 d->next = Cmi_freelist_implicit;\
00666 Cmi_freelist_implicit = d;\
00667 }
00668
00669 #define MallocImplicitDgram(dg) {\
00670 ImplicitDgram d = Cmi_freelist_implicit;\
00671 if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
00672 _MEMCHECK(d);\
00673 } else Cmi_freelist_implicit = d->next;\
00674 dg = d;\
00675 }
00676
00677 #define FreeExplicitDgram(dg) {\
00678 ExplicitDgram d=(dg);\
00679 d->next = Cmi_freelist_explicit;\
00680 Cmi_freelist_explicit = d;\
00681 }
00682
00683 #define MallocExplicitDgram(dg) {\
00684 ExplicitDgram d = Cmi_freelist_explicit;\
00685 if (d==0) { d = ((ExplicitDgram)malloc \
00686 (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
00687 _MEMCHECK(d);\
00688 } else Cmi_freelist_explicit = d->next;\
00689 dg = d;\
00690 }
00691
00692
00693
00694 #define FreeOutgoingMsg(m) (free(m))
00695 #define MallocOutgoingMsg(m)\
00696 {(m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct))); _MEMCHECK(m);}
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709 static int ctrlskt_ready_read;
00710 static int dataskt_ready_read;
00711 static int dataskt_ready_write;
00712
00713
00714
00715
00716
00717
00718
00719 void GarbageCollectMsg(OutgoingMsg ogm)
00720 {
00721 MACHSTATE2(3,"GarbageCollectMsg called on ogm %p refcount %d",ogm,ogm->refcount);
00722 if (ogm->refcount == 0) {
00723 if (ogm->freemode == 'A') {
00724 ogm->freemode = 'X';
00725 } else {
00726 if (ogm->freemode != 'G') CmiFree(ogm->data);
00727 FreeOutgoingMsg(ogm);
00728 }
00729 }
00730 }
00731
00732 void DiscardImplicitDgram(ImplicitDgram dg)
00733 {
00734 OutgoingMsg ogm;
00735 ogm = dg->ogm;
00736 ogm->refcount--;
00737 GarbageCollectMsg(ogm);
00738 FreeImplicitDgram(dg);
00739 }
00740
00741
00742
00743
00744
00745 static double Cmi_ack_last, Cmi_check_last;
00746 static void CommunicationsClock(void)
00747 {
00748 MACHSTATE(1,"CommunicationsClock");
00749 Cmi_clock = GetClock();
00750 if (Cmi_clock > Cmi_ack_last + 0.5*Cmi_ack_delay) {
00751 MACHSTATE(2,"CommunicationsClock timing out acks");
00752 Cmi_ack_last=Cmi_clock;
00753 writeableAcks=1;
00754 writeableDgrams=1;
00755 }
00756
00757 if (Cmi_clock > Cmi_check_last + Cmi_check_delay) {
00758 MACHSTATE(4,"CommunicationsClock pinging charmrun");
00759 Cmi_check_last = Cmi_clock;
00760 ctrl_sendone_nolock("ping",NULL,0,NULL,0);
00761 }
00762 }
00763
00764 #if CMK_SHARED_VARS_UNAVAILABLE
00765 static void CommunicationsClockCaller(void *ignored)
00766 {
00767 CmiCommLock();
00768 CommunicationsClock();
00769 CmiCommUnlock();
00770 CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
00771 }
00772
00773 static void CommunicationPeriodic(void)
00774 {
00775 #if CMK_USE_SYSVSHM
00776 CommunicationServerSysvshm();
00777 #endif
00778 #if CMK_USE_PXSHM
00779 CommunicationServerPxshm();
00780 #endif
00781 CommunicationServer(0, COMM_SERVER_FROM_SMP);
00782 }
00783
00784 static void CommunicationPeriodicCaller(void *ignored)
00785 {
00786 CommunicationPeriodic();
00787 CcdCallFnAfter((CcdVoidFn)CommunicationPeriodicCaller,NULL,Cmi_comm_periodic_delay);
00788 }
00789 #endif
00790
00791
00792
00793 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy);
00794
00795 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int nodesend);
00796 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int curcycle, int nodesend);
00797
00798 #if CMK_USE_GM
00799
00800 #include "machine-gm.c"
00801
00802 #elif CMK_USE_MX
00803
00804 #include "machine-mx.c"
00805
00806 #elif CMK_USE_AMMASSO
00807
00808 #include "machine-ammasso.c"
00809
00810 #elif CMK_USE_TCP
00811
00812 #include "machine-tcp.c"
00813
00814 #elif CMK_USE_IBVERBS
00815
00816 #include "machine-ibverbs.c"
00817
00818 #elif CMK_USE_IBUD
00819 #include "machine-ibud.c"
00820
00821 #else
00822
00823 #include "machine-eth.c"
00824
00825 #endif
00826
00827 #if CMK_USE_SYSVSHM
00828 #include "machine-sysvshm.c"
00829 #endif
00830 #if CMK_USE_PXSHM
00831 #include "machine-pxshm.c"
00832 #endif
00833
00834