00001
00019 #if CMK_USE_AMMASSO
00020 #include "machine-ammasso.h"
00021 #endif
00022
00023 #if CMK_USE_IBVERBS
00024 #include <infiniband/verbs.h>
00025 #endif
00026
00027 #define DGRAM_HEADER_SIZE 8
00028
00029 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
00030 #define CmiMsgHeaderGetLength(msg) (((int*)(msg))[2])
00031 #define CmiMsgNext(msg) (*((void**)(msg)))
00032
00033 #define DGRAM_ROOTPE_MASK (0xFFFFu)
00034 #define DGRAM_SRCPE_MASK (0xFFFF)
00035 #define DGRAM_MAGIC_MASK (0xFF)
00036 #define DGRAM_SEQNO_MASK (0xFFFFu)
00037
00038 #if CMK_NODE_QUEUE_AVAILABLE
00039 #define DGRAM_NODEBROADCAST (0xFA)
00040 #define DGRAM_NODEMESSAGE (0xFB)
00041 #endif
00042 #define DGRAM_DSTRANK_MAX (0xFC)
00043 #define DGRAM_SIMPLEKILL (0xFD)
00044 #define DGRAM_BROADCAST (0xFE)
00045 #define DGRAM_ACKNOWLEDGE (0xFF)
00046
00047
00048
00049 typedef struct {
00050 unsigned int seqno:16;
00051 unsigned int srcpe:16;
00052 unsigned int dstrank:8;
00053 unsigned int magic:8;
00054 unsigned int rootpe:16;
00055 } DgramHeader;
00056
00057
00058
00059 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
00060
00061 extern unsigned char computeCheckSum(unsigned char *data, int len);
00062
00063 #define DgramHeaderMake(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00064 DgramHeader *header = (DgramHeader *)(ptr); \
00065 header->seqno = seqno_; \
00066 header->srcpe = srcpe_; \
00067 header->dstrank = dstrank_; \
00068 header->magic = magic_ & DGRAM_MAGIC_MASK; \
00069 header->rootpe = root_; \
00070 }
00071
00072 #define DgramHeaderBreak(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
00073 DgramHeader *header = (DgramHeader *)(ptr); \
00074 seqno_ = header->seqno; \
00075 srcpe_ = header->srcpe; \
00076 dstrank_ = header->dstrank; \
00077 magic_ = header->magic; \
00078 root_ = header->rootpe; \
00079 }
00080
00081 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
00082 static void randomCorrupt(char *data, int len)
00083 {
00084 if (0==(rand()%CMK_RANDOMLY_CORRUPT_MESSAGES))
00085 {
00086 int badByte=rand()%len;
00087 int badBit=rand()%8;
00088 data[badByte]^=(1<<badBit);
00089 }
00090 }
00091 #endif
00092
00093 #define PE_BROADCAST_OTHERS (-101)
00094 #define PE_BROADCAST_ALL (-102)
00095
00096 #if CMK_NODE_QUEUE_AVAILABLE
00097 #define NODE_BROADCAST_OTHERS (-201)
00098 #define NODE_BROADCAST_ALL (-202)
00099 #endif
00100
00101
00102 static int Cmi_max_dgram_size;
00103 static int Cmi_os_buffer_size;
00104 static int Cmi_window_size;
00105 static int Cmi_half_window;
00106 static double Cmi_delay_retransmit;
00107 static double Cmi_ack_delay;
00108 static int Cmi_dgram_max_data;
00109 static int Cmi_comm_periodic_delay;
00110 static int Cmi_comm_clock_delay;
00111 static int writeableAcks,writeableDgrams;
00112
00113 static void setspeed_atm()
00114 {
00115 Cmi_max_dgram_size = 2048;
00116 Cmi_os_buffer_size = 50000;
00117 Cmi_window_size = 16;
00118 Cmi_delay_retransmit = 0.0150;
00119 Cmi_ack_delay = 0.0035;
00120 }
00121
00122 static void setspeed_eth()
00123 {
00124 Cmi_max_dgram_size = 1400;
00125 Cmi_window_size = 32;
00126 Cmi_os_buffer_size = Cmi_window_size*Cmi_max_dgram_size;
00127 Cmi_delay_retransmit = 0.0400;
00128 Cmi_ack_delay = 0.0050;
00129 }
00130
00131 static void setspeed_gigabit()
00132 {
00133
00134 Cmi_max_dgram_size = 9000;
00135 Cmi_window_size = 8;
00136 Cmi_os_buffer_size = 200000;
00137 Cmi_delay_retransmit = 0.020;
00138 Cmi_ack_delay = 0.018;
00139 }
00140
00141 static void extract_args(char **argv)
00142 {
00143 int ms;
00144 setspeed_eth();
00145 if (CmiGetArgFlagDesc(argv,"+atm","Tune for a low-latency ATM network"))
00146 setspeed_atm();
00147 if (CmiGetArgFlagDesc(argv,"+eth","Tune for an ethernet network"))
00148 setspeed_eth();
00149 if (CmiGetArgFlagDesc(argv,"+giga","Tune for a gigabit network"))
00150 setspeed_gigabit();
00151 CmiGetArgIntDesc(argv,"+max_dgram_size",&Cmi_max_dgram_size,"Size of each UDP packet");
00152 CmiGetArgIntDesc(argv,"+window_size",&Cmi_window_size,"Number of unacknowledged packets");
00153
00154 if ( (DGRAM_SEQNO_MASK+1)%Cmi_window_size != 0)
00155 CmiAbort("Invalid window size!");
00156 CmiGetArgIntDesc(argv,"+os_buffer_size",&Cmi_os_buffer_size, "UDP socket's SO_RCVBUF/SO_SNDBUF");
00157 if (CmiGetArgIntDesc(argv,"+delay_retransmit",&ms, "Milliseconds to wait before retransmit"))
00158 Cmi_delay_retransmit=0.001*ms;
00159 if (CmiGetArgIntDesc(argv,"+ack_delay",&ms, "Milliseconds to wait before ack'ing"))
00160 Cmi_delay_retransmit=0.001*ms;
00161 extract_common_args(argv);
00162 Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
00163 Cmi_half_window = Cmi_window_size >> 1;
00164 if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
00165 KillEveryone("Window size too big for OS buffer.");
00166 Cmi_comm_periodic_delay=(int)(1000*Cmi_delay_retransmit);
00167 if (Cmi_comm_periodic_delay>60) Cmi_comm_periodic_delay=60;
00168 Cmi_comm_clock_delay=(int)(1000*Cmi_ack_delay);
00169 if (sizeof(DgramHeader)!=DGRAM_HEADER_SIZE) {
00170 CmiAbort("DatagramHeader in machine-dgram.c is the wrong size!\n");
00171 }
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198 typedef struct OutgoingMsgStruct
00199 {
00200 struct OutgoingMsgStruct *next;
00201 int src, dst;
00202 int size;
00203 char *data;
00204 int refcount;
00205 int freemode;
00206 }
00207 *OutgoingMsg;
00208
00209 typedef struct ExplicitDgramStruct
00210 {
00211 struct ExplicitDgramStruct *next;
00212 int srcpe, rank, seqno, broot;
00213 unsigned int len, dummy;
00214 double data[1];
00215 }
00216 *ExplicitDgram;
00217
00218 typedef struct ImplicitDgramStruct
00219 {
00220 struct ImplicitDgramStruct *next;
00221 struct OtherNodeStruct *dest;
00222 int srcpe, rank, seqno, broot;
00223 char *dataptr;
00224 int datalen;
00225 OutgoingMsg ogm;
00226 }
00227 *ImplicitDgram;
00228
00229 struct PendingMsgStruct;
00230
00231 #if CMK_USE_IBVERBS
00232 struct infiOtherNodeData;
00233 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]);
00234 void infiPostInitialRecvs();
00235 #endif
00236
00237 #if CMK_USE_SYSVSHM
00238 inline void CommunicationServerSysvshm();
00239 #endif
00240 #if CMK_USE_PXSHM
00241 inline void CommunicationServerPxshm();
00242 #endif
00243
00244 #if CMK_USE_AMMASSO
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254 typedef enum __qp_connection_state {
00255 QP_CONN_STATE_PRE_CONNECT = 1,
00256 QP_CONN_STATE_CONNECTED,
00257 QP_CONN_STATE_CONNECTION_LOST,
00258 QP_CONN_STATE_CONNECTION_CLOSED
00259 } qp_connection_state_t;
00260 #endif
00261
00262 typedef struct FutureMessageStruct {
00263 char *msg;
00264 int len;
00265 } *FutureMessage;
00266
00267 typedef struct OtherNodeStruct
00268 {
00269 int nodestart, nodesize;
00270 skt_ip_t IP;
00271 unsigned int mach_id;
00272 unsigned int dataport;
00273 struct sockaddr_in addr;
00274 #if CMK_USE_TCP
00275 SOCKET sock;
00276 #endif
00277 #if CMK_USE_MX
00278 CmiUInt8 nic_id;
00279 mx_endpoint_addr_t endpoint_addr;
00280 CdsFifo futureMsgs;
00281 #endif
00282
00283 unsigned int send_last;
00284 ImplicitDgram *send_window;
00285 ImplicitDgram send_queue_h;
00286 ImplicitDgram send_queue_t;
00287 unsigned int send_next;
00288 unsigned int send_good;
00289 double send_primer;
00290 unsigned int send_ack_seqno;
00291 int retransmit_leash;
00292 #if CMK_USE_GM
00293 struct PendingMsgStruct *sendhead, *sendtail;
00294 int disable;
00295 int gm_pending;
00296 #endif
00297
00298 #if CMK_USE_IBVERBS
00299 struct infiOtherNodeData *infiData;
00300 #endif
00301
00302 #if CMK_USE_AMMASSO
00303
00304
00305 cc_uint32_t recv_cq_depth;
00306 cc_cq_handle_t recv_cq;
00307 cc_uint32_t send_cq_depth;
00308 cc_cq_handle_t send_cq;
00309 cc_qp_id_t qp_id;
00310 cc_qp_handle_t qp;
00311
00312 int myNode;
00313
00314
00315 LIST_DEFINE(AmmassoBuffer,recv_buf);
00316
00317
00318
00319
00320 AmmassoBuffer *secondLastRecvBuf;
00321
00322
00323
00324 LIST_DEFINE(AmmassoBuffer,pending);
00325
00326
00327
00328 ammasso_ack_t *remoteAck;
00329
00330
00331 int messagesNotYetAcknowledged;
00332
00333
00334 cc_sq_wr_t *ack_sq_wr;
00335
00336
00337 ammasso_ack_t *directAck;
00338
00339 LIST_DEFINE(AmmassoToken,sendTokens);
00340 LIST_DEFINE(AmmassoToken,usedTokens);
00341
00342
00343
00344 ammasso_ack_t localAck;
00345
00346
00347 CmiNodeLock sendBufLock;
00348 CmiNodeLock send_next_lock;
00349 CmiNodeLock recv_expect_lock;
00350
00351 cc_ep_handle_t ep;
00352 cc_ep_handle_t cr;
00353
00354 cc_qp_query_attrs_t qp_attrs;
00355 cc_stag_index_t qp_attrs_stag_index;
00356
00357 int posted;
00358
00359 cc_inet_addr_t address;
00360 cc_inet_port_t port;
00361 qp_connection_state_t connectionState;
00362
00363 cc_stag_t remote_recv_stag;
00364 cc_uint64_t remote_starting_to;
00365 int recv_UseIndex;
00366
00367
00368 int max_used_tokens;
00369
00370 #endif
00371
00372 int asm_rank;
00373 int asm_total;
00374 int asm_fill;
00375 char *asm_msg;
00376
00377 int recv_ack_cnt;
00378 double recv_ack_time;
00379 unsigned int recv_expect;
00380 ExplicitDgram *recv_window;
00381 int recv_winsz;
00382 unsigned int recv_next;
00383 unsigned int recv_ack_seqno;
00384
00385 unsigned int stat_total_intr;
00386 unsigned int stat_proc_intr;
00387 unsigned int stat_send_pkt;
00388 unsigned int stat_resend_pkt;
00389 unsigned int stat_send_ack;
00390 unsigned int stat_recv_pkt;
00391 unsigned int stat_recv_ack;
00392 unsigned int stat_ack_pkts;
00393 unsigned int stat_consec_resend;
00394
00395 int sent_msgs;
00396 int recd_msgs;
00397 int sent_bytes;
00398 int recd_bytes;
00399 }
00400 *OtherNode;
00401
00402 static void OtherNode_init(OtherNode node)
00403 {
00404 int i;
00405 node->send_primer = 1.0e30;
00406 node->retransmit_leash = 1;
00407 node->send_last=0;
00408 node->send_window =
00409 (ImplicitDgram*)malloc(Cmi_window_size*sizeof(ImplicitDgram));
00410 for (i=0;i<Cmi_window_size;i++) node->send_window[i]=NULL;
00411 node->send_queue_h=node->send_queue_t=NULL;
00412 node->send_next=0;
00413 node->send_good=(unsigned int)(-1);
00414 node->send_ack_seqno=0;
00415 #if CMK_USE_GM
00416 node->sendhead = node->sendtail = NULL;
00417 node->disable = 0;
00418 node->gm_pending = 0;
00419 #endif
00420 #if CMK_USE_MX
00421 node->futureMsgs = CdsFifo_Create();
00422 #endif
00423
00424
00425
00426
00427
00428
00429
00430
00431 node->asm_rank=0;
00432 node->asm_total=0;
00433 node->asm_fill=0;
00434 node->asm_msg=0;
00435
00436 node->recv_ack_cnt=0;
00437 node->recv_ack_time=1.0e30;
00438 node->recv_ack_seqno=0;
00439 node->recv_expect=0;
00440 node->recv_window =
00441 (ExplicitDgram*)malloc(Cmi_window_size*sizeof(ExplicitDgram));
00442 for (i=0;i<Cmi_window_size;i++) node->recv_window[i]=NULL;
00443 node->recv_winsz=0;
00444 node->recv_next=0;
00445
00446 node->stat_total_intr=0;
00447 node->stat_proc_intr=0;
00448 node->stat_send_pkt=0;
00449 node->stat_resend_pkt=0;
00450 node->stat_send_ack=0;
00451 node->stat_recv_pkt=0;
00452 node->stat_recv_ack=0;
00453 node->stat_ack_pkts=0;
00454
00455 node->sent_msgs = 0;
00456 node->recd_msgs = 0;
00457 node->sent_bytes = 0;
00458 node->recd_bytes = 0;
00459 }
00460
00461 static OtherNode *nodes_by_pe;
00462 static OtherNode nodes;
00463
00464 #ifdef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
00465
00467 int CmiLongSendQueue(int forNode,int longerThan) {
00468 int ret=0;
00469 ImplicitDgram dg;
00470 CmiCommLock();
00471 dg=nodes[forNode].send_queue_h;
00472 while (longerThan>0 && dg) {
00473 longerThan-=dg->datalen;
00474 dg=dg->next;
00475 }
00476 CmiCommUnlock();
00477 return ret;
00478 }
00479 #endif
00480
00481 extern void CmiGmConvertMachineID(unsigned int *mach_id);
00482 extern void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496 static void node_addresses_store(ChMessage *msg)
00497 {
00498 ChMessageInt_t *n32=(ChMessageInt_t *)msg->data;
00499 ChNodeinfo *d=(ChNodeinfo *)(n32+1);
00500 int nodestart;
00501 int i,j,n;
00502 MACHSTATE(1,"node_addresses_store {");
00503 _Cmi_numnodes=ChMessageInt(n32[0]);
00504
00505 #if CMK_USE_IBVERBS
00506 ChInfiAddr *remoteInfiAddr = (ChInfiAddr *) (&msg->data[sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes]);
00507 if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes +sizeof(ChInfiAddr)*_Cmi_numnodes )
00508 !=(unsigned int)msg->len)
00509 {printf("Node table has inconsistent length!");machine_exit(1);}
00510
00511 #else
00512
00513 if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes)
00514 !=(unsigned int)msg->len)
00515 {printf("Node table has inconsistent length!");machine_exit(1);}
00516 #endif//CMK_USE_IBVERBS
00517 nodes = (OtherNode)malloc(_Cmi_numnodes * sizeof(struct OtherNodeStruct));
00518 nodestart=0;
00519 for (i=0; i<_Cmi_numnodes; i++) {
00520 nodes[i].nodestart = nodestart;
00521 nodes[i].nodesize = ChMessageInt(d[i].nPE);
00522 MACHSTATE2(3,"node %d nodesize %d",i,nodes[i].nodesize);
00523 nodes[i].mach_id = ChMessageInt(d[i].mach_id);
00524 #if CMK_USE_MX
00525 nodes[i].nic_id = ChMessageLong(d[i].nic_id);
00526 #endif
00527 #if CMK_USE_GM
00528 CmiGmConvertMachineID(& nodes[i].mach_id);
00529 #endif
00530 nodes[i].IP=d[i].IP;
00531 if (i==_Cmi_mynode) {
00532 Cmi_nodestart=nodes[i].nodestart;
00533 _Cmi_mynodesize=nodes[i].nodesize;
00534 Cmi_self_IP=nodes[i].IP;
00535 }
00536 #if CMK_USE_IBVERBS
00537 {
00538 if(i != _Cmi_mynode){
00539 int addr[3];
00540 addr[0] =ChMessageInt(remoteInfiAddr[i].lid);
00541 addr[1] =ChMessageInt(remoteInfiAddr[i].qpn);
00542 addr[2] =ChMessageInt(remoteInfiAddr[i].psn);
00543 nodes[i].infiData = initInfiOtherNodeData(i,addr);
00544 }
00545 }
00546 #else
00547 nodes[i].dataport = ChMessageInt(d[i].dataport);
00548 nodes[i].addr = skt_build_addr(nodes[i].IP,nodes[i].dataport);
00549 #endif
00550
00551 #if CMK_USE_TCP
00552 nodes[i].sock = INVALID_SOCKET;
00553 #endif
00554 nodestart+=nodes[i].nodesize;
00555
00556 #if CMK_USE_AMMASSO
00557 CmiAmmassoNodeAddressesStoreHandler(nodes[i].nodestart, &(nodes[i].addr), nodes[i].dataport);
00558 #endif
00559
00560 }
00561 _Cmi_numpes=nodestart;
00562 n = _Cmi_numpes;
00563 #ifdef CMK_CPV_IS_SMP
00564 n += _Cmi_numnodes;
00565 #endif
00566 nodes_by_pe = (OtherNode*)malloc(n * sizeof(OtherNode));
00567 _MEMCHECK(nodes_by_pe);
00568 for (i=0; i<_Cmi_numnodes; i++) {
00569 OtherNode node = nodes + i;
00570 OtherNode_init(node);
00571 for (j=0; j<node->nodesize; j++)
00572 nodes_by_pe[j + node->nodestart] = node;
00573 }
00574 #ifdef CMK_CPV_IS_SMP
00575
00576 for (i=_Cmi_numpes; i<_Cmi_numpes+_Cmi_numnodes; i++) {
00577 OtherNode node = nodes + i-_Cmi_numpes;
00578 nodes_by_pe[i] = node;
00579 }
00580 #endif
00581 #if CMK_USE_IBVERBS
00582 infiPostInitialRecvs();
00583 #endif
00584 MACHSTATE(1,"} node_addresses_store");
00585 }
00586
00590 static char statstr[10000];
00591
00592 void printNetStatistics(void)
00593 {
00594 char tmpstr[1024];
00595 OtherNode myNode;
00596 int i;
00597 unsigned int send_pkt=0, resend_pkt=0, recv_pkt=0, send_ack=0;
00598 unsigned int recv_ack=0, ack_pkts=0;
00599
00600 myNode = nodes+CmiMyNode();
00601 sprintf(tmpstr, "***********************************\n");
00602 strcpy(statstr, tmpstr);
00603 sprintf(tmpstr, "Net Statistics For Node %u\n", CmiMyNode());
00604 strcat(statstr, tmpstr);
00605 sprintf(tmpstr, "Interrupts: %u \tProcessed: %u\n",
00606 myNode->stat_total_intr, myNode->stat_proc_intr);
00607 strcat(statstr, tmpstr);
00608 sprintf(tmpstr, "Total Msgs Sent: %u \tTotal Bytes Sent: %u\n",
00609 myNode->sent_msgs, myNode->sent_bytes);
00610 strcat(statstr, tmpstr);
00611 sprintf(tmpstr, "Total Msgs Recv: %u \tTotal Bytes Recv: %u\n",
00612 myNode->recd_msgs, myNode->recd_bytes);
00613 strcat(statstr, tmpstr);
00614 sprintf(tmpstr, "***********************************\n");
00615 strcat(statstr, tmpstr);
00616 sprintf(tmpstr, "[Num]\tSENDTO\tRESEND\tRECV\tACKSTO\tACKSFRM\tPKTACK\n");
00617 strcat(statstr,tmpstr);
00618 sprintf(tmpstr, "=====\t======\t======\t====\t======\t=======\t======\n");
00619 strcat(statstr,tmpstr);
00620 for(i=0;i<CmiNumNodes();i++) {
00621 OtherNode node = nodes+i;
00622 sprintf(tmpstr, "[%u]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00623 i, node->stat_send_pkt, node->stat_resend_pkt,
00624 node->stat_recv_pkt, node->stat_send_ack,
00625 node->stat_recv_ack, node->stat_ack_pkts);
00626 strcat(statstr, tmpstr);
00627 send_pkt += node->stat_send_pkt;
00628 recv_pkt += node->stat_recv_pkt;
00629 resend_pkt += node->stat_resend_pkt;
00630 send_ack += node->stat_send_ack;
00631 recv_ack += node->stat_recv_ack;
00632 ack_pkts += node->stat_ack_pkts;
00633 }
00634 sprintf(tmpstr, "[TOTAL]\t%u\t%u\t%u\t%u\t%u\t%u\n",
00635 send_pkt, resend_pkt,
00636 recv_pkt, send_ack,
00637 recv_ack, ack_pkts);
00638 strcat(statstr, tmpstr);
00639 sprintf(tmpstr, "***********************************\n");
00640 strcat(statstr, tmpstr);
00641 CmiPrintf(statstr);
00642 }
00643
00644
00645
00646
00647 static ExplicitDgram Cmi_freelist_explicit;
00648 static ImplicitDgram Cmi_freelist_implicit;
00649
00650
00651 #define FreeImplicitDgram(dg) {\
00652 ImplicitDgram d=(dg);\
00653 d->next = Cmi_freelist_implicit;\
00654 Cmi_freelist_implicit = d;\
00655 }
00656
00657 #define MallocImplicitDgram(dg) {\
00658 ImplicitDgram d = Cmi_freelist_implicit;\
00659 if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
00660 _MEMCHECK(d);\
00661 } else Cmi_freelist_implicit = d->next;\
00662 dg = d;\
00663 }
00664
00665 #define FreeExplicitDgram(dg) {\
00666 ExplicitDgram d=(dg);\
00667 d->next = Cmi_freelist_explicit;\
00668 Cmi_freelist_explicit = d;\
00669 }
00670
00671 #define MallocExplicitDgram(dg) {\
00672 ExplicitDgram d = Cmi_freelist_explicit;\
00673 if (d==0) { d = ((ExplicitDgram)malloc \
00674 (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
00675 _MEMCHECK(d);\
00676 } else Cmi_freelist_explicit = d->next;\
00677 dg = d;\
00678 }
00679
00680
00681
00682 #define FreeOutgoingMsg(m) (free(m))
00683 #define MallocOutgoingMsg(m)\
00684 {(m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct))); _MEMCHECK(m);}
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697 static int ctrlskt_ready_read;
00698 static int dataskt_ready_read;
00699 static int dataskt_ready_write;
00700
00701
00702
00703
00704
00705
00706
00707 void GarbageCollectMsg(OutgoingMsg ogm)
00708 {
00709 MACHSTATE2(3,"GarbageCollectMsg called on ogm %p refcount %d",ogm,ogm->refcount);
00710 if (ogm->refcount == 0) {
00711 if (ogm->freemode == 'A') {
00712 ogm->freemode = 'X';
00713 } else {
00714 if (ogm->freemode != 'G') CmiFree(ogm->data);
00715 FreeOutgoingMsg(ogm);
00716 }
00717 }
00718 }
00719
00720 void DiscardImplicitDgram(ImplicitDgram dg)
00721 {
00722 OutgoingMsg ogm;
00723 ogm = dg->ogm;
00724 ogm->refcount--;
00725 GarbageCollectMsg(ogm);
00726 FreeImplicitDgram(dg);
00727 }
00728
00729
00730
00731
00732
00733 static double Cmi_ack_last, Cmi_check_last;
00734 static void CommunicationsClock(void)
00735 {
00736 MACHSTATE(1,"CommunicationsClock");
00737 Cmi_clock = GetClock();
00738 if (Cmi_clock > Cmi_ack_last + 0.5*Cmi_ack_delay) {
00739 MACHSTATE(2,"CommunicationsClock timing out acks");
00740 Cmi_ack_last=Cmi_clock;
00741 writeableAcks=1;
00742 writeableDgrams=1;
00743 }
00744
00745 if (Cmi_clock > Cmi_check_last + Cmi_check_delay) {
00746 MACHSTATE(4,"CommunicationsClock pinging charmrun");
00747 Cmi_check_last = Cmi_clock;
00748 ctrl_sendone_nolock("ping",NULL,0,NULL,0);
00749 }
00750 }
00751
00752 #if CMK_SHARED_VARS_UNAVAILABLE
00753 static void CommunicationsClockCaller(void *ignored)
00754 {
00755 CmiCommLock();
00756 CommunicationsClock();
00757 CmiCommUnlock();
00758 CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
00759 }
00760
00761 static void CommunicationPeriodic(void)
00762 {
00763 #if CMK_USE_SYSVSHM
00764 CommunicationServerSysvshm();
00765 #endif
00766 #if CMK_USE_PXSHM
00767 CommunicationServerPxshm();
00768 #endif
00769 CommunicationServer(0, COMM_SERVER_FROM_SMP);
00770 }
00771
00772 static void CommunicationPeriodicCaller(void *ignored)
00773 {
00774 CommunicationPeriodic();
00775 CcdCallFnAfter((CcdVoidFn)CommunicationPeriodicCaller,NULL,Cmi_comm_periodic_delay);
00776 }
00777 #endif
00778
00779
00780
00781 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy);
00782
00783 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int nodesend);
00784 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int curcycle, int nodesend);
00785
00786 #if CMK_USE_GM
00787
00788 #include "machine-gm.c"
00789
00790 #elif CMK_USE_MX
00791
00792 #include "machine-mx.c"
00793
00794 #elif CMK_USE_AMMASSO
00795
00796 #include "machine-ammasso.c"
00797 #define BARRIER_NULL 1
00798
00799 #elif CMK_USE_TCP
00800
00801 #include "machine-tcp.c"
00802 #define BARRIER_NULL 1
00803
00804 #elif CMK_USE_IBVERBS
00805
00806 #include "machine-ibverbs.c"
00807
00808
00809 #else
00810
00811 #include "machine-eth.c"
00812
00813 #endif
00814
00815 #if CMK_USE_SYSVSHM
00816 #include "machine-sysvshm.c"
00817 #endif
00818 #if CMK_USE_PXSHM
00819 #include "machine-pxshm.c"
00820 #endif
00821
00822
00823
00824 #if BARRIER_NULL
00825 int CmiBarrier()
00826 {
00827 return -1;
00828 }
00829
00830 int CmiBarrierZero()
00831 {
00832 return -1;
00833 }
00834 #endif
00835