00001
00020
00021
00022
00023
00024
00025
00026 typedef struct {
00027 int sleepMs;
00028 int nIdles;
00029 CmiState cs;
00030 } CmiIdleState;
00031
00032 static CmiIdleState *CmiNotifyGetState(void)
00033 {
00034 CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
00035 s->sleepMs=0;
00036 s->nIdles=0;
00037 s->cs=CmiGetState();
00038 return s;
00039 }
00040
00041 static void CmiNotifyBeginIdle(CmiIdleState *s)
00042 {
00043 s->sleepMs=0;
00044 s->nIdles=0;
00045
00046 MACHSTATE(3,"begin idle")
00047 }
00048
00049 static void CmiNotifyStillIdle(CmiIdleState *s)
00050 {
00051 #if CMK_SHARED_VARS_UNAVAILABLE
00052
00053 MACHSTATE(1,"idle commserver {")
00054 CommunicationServer(Cmi_idlepoll?0:10, 0);
00055 MACHSTATE(1,"} idle commserver")
00056 #else
00057 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00058 if(_Cmi_noprocforcommthread ){
00059 #endif
00060 int nSpins=20;
00061 s->nIdles++;
00062 if (s->nIdles>nSpins) {
00063 s->sleepMs+=2;
00064 if (s->sleepMs>10) s->sleepMs=10;
00065 }
00066
00067 if (s->sleepMs>0) {
00068 MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
00069 CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
00070 CsdResetPeriodic();
00071 MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
00072 }
00073 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00074 }
00075 #endif
00076 #endif
00077 }
00078
00079 void CmiNotifyIdle(void) {
00080 CmiIdleState s;
00081 s.sleepMs=5;
00082 CmiNotifyStillIdle(&s);
00083 }
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096 int CheckSocketsReady(int withDelayMs)
00097 {
00098 int nreadable,dataWrite=writeableDgrams || writeableAcks;
00099 CMK_PIPE_DECL(withDelayMs);
00100
00101
00102 #if CMK_USE_KQUEUE && 0
00103
00104
00105
00106
00107
00108 static int first = 1;
00109 if(first){
00110 first = 0;
00111 CmiStdoutAdd(CMK_PIPE_SUB);
00112 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
00113 else return 0;
00114 if (dataskt!=-1) {
00115 CMK_PIPE_ADDREAD(dataskt);
00116 CMK_PIPE_ADDWRITE(dataskt);
00117 }
00118 }
00119
00120 #else
00121 CmiStdoutAdd(CMK_PIPE_SUB);
00122 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
00123 else return 0;
00124 if (dataskt!=-1) {
00125 { CMK_PIPE_ADDREAD(dataskt); }
00126 if (dataWrite)
00127 CMK_PIPE_ADDWRITE(dataskt);
00128 }
00129 #endif
00130
00131 nreadable=CMK_PIPE_CALL();
00132 ctrlskt_ready_read = 0;
00133 dataskt_ready_read = 0;
00134 dataskt_ready_write = 0;
00135
00136 if (nreadable == 0) {
00137 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00138 return nreadable;
00139 }
00140 if (nreadable==-1) {
00141 CMK_PIPE_CHECKERR();
00142 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00143 return CheckSocketsReady(0);
00144 }
00145
00146 CmiStdoutCheck(CMK_PIPE_SUB);
00147 if (Cmi_charmrun_fd!=-1)
00148 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00149 if (dataskt!=-1) {
00150 dataskt_ready_read = CMK_PIPE_CHECKREAD(dataskt);
00151 if (dataWrite)
00152 dataskt_ready_write = CMK_PIPE_CHECKWRITE(dataskt);
00153 }
00154 return nreadable;
00155 }
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166 void TransmitAckDatagram(OtherNode node)
00167 {
00168 DgramAck ack; int i, seqno, slot; ExplicitDgram dg;
00169 int retval;
00170
00171 seqno = node->recv_next;
00172 MACHSTATE2(3," TransmitAckDgram [seq %d to 'pe' %d]",seqno,node->nodestart)
00173 DgramHeaderMake(&ack, DGRAM_ACKNOWLEDGE, Cmi_nodestart, Cmi_charmrun_pid, seqno, 0);
00174 LOG(Cmi_clock, Cmi_nodestart, 'A', node->nodestart, seqno);
00175 for (i=0; i<Cmi_window_size; i++) {
00176 slot = seqno % Cmi_window_size;
00177 dg = node->recv_window[slot];
00178 ack.window[i] = (dg && (dg->seqno == seqno));
00179 seqno = ((seqno+1) & DGRAM_SEQNO_MASK);
00180 }
00181 memcpy(&ack.window[Cmi_window_size], &(node->send_ack_seqno),
00182 sizeof(unsigned int));
00183 node->send_ack_seqno = ((node->send_ack_seqno + 1) & DGRAM_SEQNO_MASK);
00184 retval = (-1);
00185 #ifdef CMK_USE_CHECKSUM
00186 DgramHeader *head = (DgramHeader *)(&ack);
00187 head->magic ^= computeCheckSum((unsigned char*)&ack, DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int));
00188 #endif
00189 while(retval==(-1))
00190 retval = sendto(dataskt, (char *)&ack,
00191 DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int), 0,
00192 (struct sockaddr *)&(node->addr),
00193 sizeof(struct sockaddr_in));
00194 node->stat_send_ack++;
00195 }
00196
00197
00198
00199
00200
00201
00202
00203
00204 void TransmitImplicitDgram(ImplicitDgram dg)
00205 {
00206 char *data; DgramHeader *head; int len; DgramHeader temp;
00207 OtherNode dest;
00208 int retval;
00209
00210 MACHSTATE3(3," TransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
00211 dg->datalen,dg->seqno,dg->dest->nodestart)
00212 len = dg->datalen;
00213 data = dg->dataptr;
00214 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00215 temp = *head;
00216 dest = dg->dest;
00217 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, dg->seqno, dg->broot);
00218 #ifdef CMK_USE_CHECKSUM
00219 head->magic ^= computeCheckSum((unsigned char*)head, len + DGRAM_HEADER_SIZE);
00220 #endif
00221 LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
00222 retval = (-1);
00223 while(retval==(-1))
00224 retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
00225 (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
00226 *head = temp;
00227 dest->stat_send_pkt++;
00228 }
00229
00230 void TransmitImplicitDgram1(ImplicitDgram dg)
00231 {
00232 char *data; DgramHeader *head; int len; DgramHeader temp;
00233 OtherNode dest;
00234 int retval;
00235
00236 MACHSTATE3(4," RETransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
00237 dg->datalen,dg->seqno,dg->dest->nodestart)
00238 len = dg->datalen;
00239 data = dg->dataptr;
00240 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
00241 temp = *head;
00242 dest = dg->dest;
00243 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_charmrun_pid, dg->seqno, dg->broot);
00244 #ifdef CMK_USE_CHECKSUM
00245 head->magic ^= computeCheckSum((unsigned char *)head, len + DGRAM_HEADER_SIZE);
00246 #endif
00247 LOG(Cmi_clock, Cmi_nodestart, 'P', dest->nodestart, dg->seqno);
00248 retval = (-1);
00249 while (retval == (-1))
00250 retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
00251 (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
00252 *head = temp;
00253 dest->stat_resend_pkt++;
00254 }
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265 int TransmitAcknowledgement()
00266 {
00267 int skip; static int nextnode=0; OtherNode node;
00268 for (skip=0; skip<_Cmi_numnodes; skip++) {
00269 node = nodes+nextnode;
00270 nextnode = (nextnode + 1) % _Cmi_numnodes;
00271 if (node->recv_ack_cnt) {
00272 if ((node->recv_ack_cnt > Cmi_half_window) ||
00273 (Cmi_clock >= node->recv_ack_time)) {
00274 TransmitAckDatagram(node);
00275 if (node->recv_winsz) {
00276 node->recv_ack_cnt = 1;
00277 node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
00278 } else {
00279 node->recv_ack_cnt = 0;
00280 node->recv_ack_time = 0.0;
00281 }
00282 return 1;
00283 }
00284 }
00285 }
00286 return 0;
00287 }
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297 int TransmitDatagram()
00298 {
00299 ImplicitDgram dg; OtherNode node;
00300 static int nextnode=0; int skip, count, slot;
00301 unsigned int seqno;
00302
00303 for (skip=0; skip<_Cmi_numnodes; skip++) {
00304 node = nodes+nextnode;
00305 nextnode = (nextnode + 1) % _Cmi_numnodes;
00306 dg = node->send_queue_h;
00307 if (dg) {
00308 seqno = dg->seqno;
00309 slot = seqno % Cmi_window_size;
00310 if (node->send_window[slot] == 0) {
00311 node->send_queue_h = dg->next;
00312 node->send_window[slot] = dg;
00313 TransmitImplicitDgram(dg);
00314 if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK))
00315 node->send_last = seqno;
00316 node->send_primer = Cmi_clock + Cmi_delay_retransmit;
00317 return 1;
00318 }
00319 }
00320 if (Cmi_clock > node->send_primer) {
00321 slot = (node->send_last % Cmi_window_size);
00322 for (count=0; count<Cmi_window_size; count++) {
00323 dg = node->send_window[slot];
00324 if (dg) break;
00325 slot = ((slot+Cmi_window_size-1) % Cmi_window_size);
00326 }
00327 if (dg) {
00328 TransmitImplicitDgram1(node->send_window[slot]);
00329 node->send_primer = Cmi_clock + Cmi_delay_retransmit;
00330 return 1;
00331 }
00332 }
00333 }
00334 return 0;
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344 void EnqueueOutgoingDgram
00345 (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
00346 {
00347 int seqno, dst, src; ImplicitDgram dg;
00348 src = ogm->src;
00349 dst = ogm->dst;
00350 seqno = node->send_next;
00351 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00352 MallocImplicitDgram(dg);
00353 dg->dest = node;
00354 dg->srcpe = src;
00355 dg->rank = rank;
00356 dg->seqno = seqno;
00357 dg->broot = broot;
00358 dg->dataptr = ptr;
00359 dg->datalen = len;
00360 dg->ogm = ogm;
00361 ogm->refcount++;
00362 dg->next = 0;
00363 if (node->send_queue_h == 0) {
00364 node->send_queue_h = dg;
00365 node->send_queue_t = dg;
00366 } else {
00367 node->send_queue_t->next = dg;
00368 node->send_queue_t = dg;
00369 }
00370 }
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00381 {
00382 int size; char *data;
00383 OtherNode myNode = nodes+CmiMyNode();
00384
00385 MACHSTATE2(3,"DeliverViaNetwork %d-byte message to pe %d",
00386 ogm->size,node->nodestart+rank);
00387 size = ogm->size - DGRAM_HEADER_SIZE;
00388 data = ogm->data + DGRAM_HEADER_SIZE;
00389 writeableDgrams++;
00390 while (size > Cmi_dgram_max_data) {
00391 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00392 data += Cmi_dgram_max_data;
00393 size -= Cmi_dgram_max_data;
00394 }
00395 EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00396
00397 myNode->sent_msgs++;
00398 myNode->sent_bytes += ogm->size;
00399
00400 writeableDgrams=1;
00401
00402 }
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414 void AssembleDatagram(OtherNode node, ExplicitDgram dg)
00415 {
00416 int i;
00417 unsigned int size; char *msg;
00418 OtherNode myNode = nodes+CmiMyNode();
00419
00420 MACHSTATE3(2," AssembleDatagram [seq %d from 'pe' %d, packet len %d]",
00421 dg->seqno,node->nodestart,dg->len)
00422 LOG(Cmi_clock, Cmi_nodestart, 'X', dg->srcpe, dg->seqno);
00423 msg = node->asm_msg;
00424 if (msg == 0) {
00425 size = CmiMsgHeaderGetLength(dg->data);
00426 MACHSTATE3(4," Assemble new datagram seq %d from 'pe' %d, len %d",
00427 dg->seqno,node->nodestart,size)
00428 msg = (char *)CmiAlloc(size);
00429 if (!msg)
00430 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
00431 if (size < dg->len) KillEveryoneCode(4559312);
00432 #ifndef CMK_OPTIMIZE
00433 setMemoryTypeMessage(msg);
00434 #endif
00435 memcpy(msg, (char*)(dg->data), dg->len);
00436 node->asm_rank = dg->rank;
00437 node->asm_total = size;
00438 node->asm_fill = dg->len;
00439 node->asm_msg = msg;
00440 } else {
00441 size = dg->len - DGRAM_HEADER_SIZE;
00442 memcpy(msg + node->asm_fill, ((char*)(dg->data))+DGRAM_HEADER_SIZE, size);
00443 node->asm_fill += size;
00444 }
00445 MACHSTATE3(2," AssembleDatagram: now have %d of %d bytes from %d",
00446 node->asm_fill, node->asm_total, node->nodestart)
00447 if (node->asm_fill > node->asm_total) {
00448 fprintf(stderr, "\n\n\t\tLength mismatch!!\n\n");
00449 fflush(stderr);
00450 MACHSTATE4(5,"Length mismatch seq %d, from 'pe' %d, fill %d, total %d\n", dg->seqno,node->nodestart,node->asm_fill,node->asm_total)
00451 KillEveryoneCode(4559313);
00452 }
00453 if (node->asm_fill == node->asm_total) {
00454
00455 #if CMK_BROADCAST_SPANNING_TREE
00456 if (node->asm_rank == DGRAM_BROADCAST
00457 #if CMK_NODE_QUEUE_AVAILABLE
00458 || node->asm_rank == DGRAM_NODEBROADCAST
00459 #endif
00460 )
00461 SendSpanningChildren(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
00462 #elif CMK_BROADCAST_HYPERCUBE
00463 if (node->asm_rank == DGRAM_BROADCAST
00464 #if CMK_NODE_QUEUE_AVAILABLE
00465 || node->asm_rank == DGRAM_NODEBROADCAST
00466 #endif
00467 )
00468 SendHypercube(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
00469 #endif
00470 if (node->asm_rank == DGRAM_BROADCAST) {
00471 int len = node->asm_total;
00472 for (i=1; i<_Cmi_mynodesize; i++)
00473 CmiPushPE(i, CopyMsg(msg, len));
00474 CmiPushPE(0, msg);
00475 } else {
00476 #if CMK_NODE_QUEUE_AVAILABLE
00477 if (node->asm_rank==DGRAM_NODEMESSAGE ||
00478 node->asm_rank==DGRAM_NODEBROADCAST)
00479 {
00480 CmiPushNode(msg);
00481 }
00482 else
00483 #endif
00484 CmiPushPE(node->asm_rank, msg);
00485 }
00486 node->asm_msg = 0;
00487 myNode->recd_msgs++;
00488 myNode->recd_bytes += node->asm_total;
00489 }
00490 FreeExplicitDgram(dg);
00491 }
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501 void AssembleReceivedDatagrams(OtherNode node)
00502 {
00503 unsigned int next, slot; ExplicitDgram dg;
00504 next = node->recv_next;
00505 while (1) {
00506 slot = (next % Cmi_window_size);
00507 dg = node->recv_window[slot];
00508 if (dg == 0) break;
00509 AssembleDatagram(node, dg);
00510 node->recv_window[slot] = 0;
00511 node->recv_winsz--;
00512 next = ((next + 1) & DGRAM_SEQNO_MASK);
00513 }
00514 node->recv_next = next;
00515 }
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532 void IntegrateMessageDatagram(ExplicitDgram dg)
00533 {
00534 int seqno;
00535 unsigned int slot; OtherNode node;
00536
00537 LOG(Cmi_clock, Cmi_nodestart, 'M', dg->srcpe, dg->seqno);
00538 MACHSTATE2(2," IntegrateMessageDatagram [seq %d from pe %d]", dg->seqno,dg->srcpe)
00539
00540 node = nodes_by_pe[dg->srcpe];
00541 node->stat_recv_pkt++;
00542 seqno = dg->seqno;
00543 writeableAcks=1;
00544 node->recv_ack_cnt++;
00545 if (node->recv_ack_time == 0.0)
00546 node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
00547 if (((seqno - node->recv_next) & DGRAM_SEQNO_MASK) < Cmi_window_size) {
00548 slot = (seqno % Cmi_window_size);
00549 if (node->recv_window[slot] == 0) {
00550 node->recv_window[slot] = dg;
00551 node->recv_winsz++;
00552 if (seqno == node->recv_next)
00553 AssembleReceivedDatagrams(node);
00554 if (seqno > node->recv_expect)
00555 node->recv_ack_time = 0.0;
00556 if (seqno >= node->recv_expect)
00557 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00558 LOG(Cmi_clock, Cmi_nodestart, 'Y', node->recv_next, dg->seqno);
00559 return;
00560 }
00561 }
00562 LOG(Cmi_clock, Cmi_nodestart, 'y', node->recv_next, dg->seqno);
00563 FreeExplicitDgram(dg);
00564 }
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616 void IntegrateAckDatagram(ExplicitDgram dg)
00617 {
00618 OtherNode node; DgramAck *ack; ImplicitDgram idg;
00619 int i; unsigned int slot, rxing, dgseqno, seqno, ackseqno;
00620 int diff;
00621 unsigned int tmp;
00622
00623 node = nodes_by_pe[dg->srcpe];
00624 ack = ((DgramAck*)(dg->data));
00625 memcpy(&ackseqno, &(ack->window[Cmi_window_size]), sizeof(unsigned int));
00626 dgseqno = dg->seqno;
00627 seqno = (dgseqno + Cmi_window_size) & DGRAM_SEQNO_MASK;
00628 slot = seqno % Cmi_window_size;
00629 rxing = 0;
00630 node->stat_recv_ack++;
00631 LOG(Cmi_clock, Cmi_nodestart, 'R', node->nodestart, dg->seqno);
00632
00633 tmp = node->recv_ack_seqno;
00634
00635 if ( !((node->recv_ack_seqno >=
00636 ((DGRAM_SEQNO_MASK >> 1) + (DGRAM_SEQNO_MASK >> 2))) &&
00637 (ackseqno < (DGRAM_SEQNO_MASK >> 1))) &&
00638 (ackseqno <= node->recv_ack_seqno))
00639 {
00640 FreeExplicitDgram(dg);
00641 return;
00642 }
00643
00644 node->recv_ack_seqno = ackseqno;
00645 writeableDgrams=1;
00646
00647 for (i=Cmi_window_size-1; i>=0; i--) {
00648 slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size;
00649 seqno = (seqno-1) & DGRAM_SEQNO_MASK;
00650 idg = node->send_window[slot];
00651 if (idg) {
00652 if (idg->seqno == seqno) {
00653 if (ack->window[i]) {
00654
00655
00656 node->stat_ack_pkts++;
00657 LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, seqno);
00658 node->send_window[slot] = 0;
00659 DiscardImplicitDgram(idg);
00660 rxing = 1;
00661 } else if (rxing) {
00662 node->send_window[slot] = 0;
00663 idg->next = node->send_queue_h;
00664 if (node->send_queue_h == 0) {
00665 node->send_queue_t = idg;
00666 }
00667 node->send_queue_h = idg;
00668 }
00669 } else {
00670 diff = dgseqno >= idg->seqno ?
00671 ((dgseqno - idg->seqno) & DGRAM_SEQNO_MASK) :
00672 ((dgseqno + (DGRAM_SEQNO_MASK - idg->seqno) + 1) & DGRAM_SEQNO_MASK);
00673
00674 if ((diff <= 0) || (diff > Cmi_window_size))
00675 {
00676 continue;
00677 }
00678
00679
00680 if (dgseqno < idg->seqno && (idg->seqno - dgseqno <= Cmi_window_size))
00681 {
00682 continue;
00683 }
00684 if (dgseqno == idg->seqno)
00685 {
00686 continue;
00687 }
00688 node->stat_ack_pkts++;
00689 LOG(Cmi_clock, Cmi_nodestart, 'o', node->nodestart, idg->seqno);
00690 node->send_window[slot] = 0;
00691 DiscardImplicitDgram(idg);
00692 }
00693 }
00694 }
00695 FreeExplicitDgram(dg);
00696 }
00697
00698 void ReceiveDatagram()
00699 {
00700 ExplicitDgram dg; int ok, magic;
00701 MACHLOCK_ASSERT(comm_flag,"ReceiveDatagram")
00702 MallocExplicitDgram(dg);
00703 ok = recv(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0);
00704
00705
00706 if (ok < 0) {
00707 MACHSTATE1(4," recv dgram failed (errno=%d)",errno)
00708 FreeExplicitDgram(dg);
00709 if (errno == EINTR) return;
00710 if (errno == EAGAIN) return;
00711 #if !defined(_WIN32) || defined(__CYGWIN__)
00712 if (errno == EWOULDBLOCK) return;
00713 if (errno == ECONNREFUSED) return;
00714 #endif
00715 CmiPrintf("ReceiveDatagram: recv: %s(%d)\n", strerror(errno), errno) ;
00716 KillEveryoneCode(37489437);
00717 }
00718 dg->len = ok;
00719 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
00720
00721 randomCorrupt((char*)dg->data, dg->len);
00722 #endif
00723
00724 if (ok >= DGRAM_HEADER_SIZE) {
00725 DgramHeaderBreak(dg->data, dg->rank, dg->srcpe, magic, dg->seqno, dg->broot);
00726 MACHSTATE3(2," recv dgram [seq %d, for rank %d, from pe %d]",
00727 dg->seqno,dg->rank,dg->srcpe)
00728 #ifdef CMK_USE_CHECKSUM
00729 if (computeCheckSum((unsigned char*)dg->data, dg->len) == 0)
00730 #else
00731 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00732 #endif
00733 {
00734 if (dg->rank == DGRAM_ACKNOWLEDGE)
00735 IntegrateAckDatagram(dg);
00736 else IntegrateMessageDatagram(dg);
00737 } else FreeExplicitDgram(dg);
00738 } else {
00739 MACHSTATE1(4," recv dgram failed (len=%d)",ok)
00740 FreeExplicitDgram(dg);
00741 }
00742 }
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756 void CmiHandleImmediate();
00757
00758
00759
00760
00761
00762
00763 static void CommunicationServer(int sleepTime, int where)
00764 {
00765 unsigned int nTimes=0;
00766 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00767 MACHSTATE2(1,"CommunicationsServer(%d,%d)",
00768 sleepTime,writeableAcks||writeableDgrams)
00769 #if !CMK_SHARED_VARS_UNAVAILABLE
00770 if (sleepTime!=0) {
00771 MACHSTATE(1,"CommServer going to sleep (NO LOCK)");
00772 if (CheckSocketsReady(sleepTime)<=0) {
00773 MACHSTATE(1,"CommServer finished without anything happening.");
00774 }
00775 }
00776 sleepTime=0;
00777 #endif
00778 CmiCommLock();
00779
00780 if (Cmi_netpoll && where == 1) {
00781 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00782 CmiCommUnlock();
00783 return;
00784 }
00785 CommunicationsClock();
00786
00787 if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
00788 while (CheckSocketsReady(sleepTime)>0) {
00789 int again=0;
00790 MACHSTATE(2,"CheckSocketsReady returned true");
00791 sleepTime=0;
00792 if (ctrlskt_ready_read) {again=1;ctrl_getone();}
00793 if (dataskt_ready_read) {again=1;ReceiveDatagram();}
00794 if (dataskt_ready_write) {
00795 if (writeableAcks)
00796 if (0!=(writeableAcks=TransmitAcknowledgement())) again=1;
00797 if (writeableDgrams)
00798 if (0!=(writeableDgrams=TransmitDatagram())) again=1;
00799 }
00800 if (CmiStdoutNeedsService()) {CmiStdoutService();}
00801 if (!again) break;
00802 if ((nTimes++ &16)==15) {
00803
00804 CommunicationsClock();
00805 }
00806 }
00807 CmiCommUnlock();
00808
00809
00810 if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_INTERRUPT) {
00811 #if CMK_IMMEDIATE_MSG
00812 CmiHandleImmediate();
00813 #endif
00814 #if CMK_PERSISTENT_COMM
00815 PumpPersistent();
00816 #endif
00817 }
00818
00819 MACHSTATE(1,"} CommunicationServer")
00820 }
00821
00822 void CmiMachineInit(char **argv)
00823 {
00824 }
00825
00826 void CmiCommunicationInit(char **argv)
00827 {
00828 }
00829
00830 void CmiMachineExit()
00831 {
00832 }
00833
00834 static void sendBarrierMessage(int pe)
00835 {
00836 char buf[32];
00837 OtherNode node = nodes + pe;
00838 int retval = -1;
00839 while (retval == -1) {
00840 retval = sendto(dataskt, (char *)buf, 32, 0,
00841 (struct sockaddr *)&(node->addr),
00842 sizeof(struct sockaddr_in));
00843 }
00844 }
00845
00846 static void recvBarrierMessage()
00847 {
00848 char buf[32];
00849 int nreadable, ok, s;
00850
00851 if (dataskt!=-1) {
00852 do {
00853 CMK_PIPE_DECL(10);
00854 CMK_PIPE_ADDREAD(dataskt);
00855 nreadable=CMK_PIPE_CALL();
00856 if (nreadable == 0) continue;
00857 s = CMK_PIPE_CHECKREAD(dataskt);
00858 if (s) break;
00859 } while (1);
00860 ok = recv(dataskt,buf,32,0);
00861 CmiAssert(ok >= 0);
00862 }
00863 }
00864
00865
00866
00867 int CmiBarrier()
00868 {
00869 int len, size, i;
00870 int status;
00871 int count = 0;
00872 OtherNode node;
00873 int numnodes = CmiNumNodes();
00874
00875 if (Cmi_netpoll == 0) return -1;
00876
00877 if (CmiMyRank() == 0) {
00878
00879 if (CmiMyNode() != 0) {
00880 sendBarrierMessage(0);
00881 }
00882 if (CmiMyNode() == 0)
00883 {
00884 for (count = 1; count < numnodes; count ++)
00885 {
00886 recvBarrierMessage();
00887 }
00888
00889 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00890 int p = i;
00891 if (p > numnodes - 1) break;
00892
00893 sendBarrierMessage(p);
00894 }
00895 }
00896
00897 if (CmiMyNode() != 0)
00898 {
00899 recvBarrierMessage();
00900 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00901 int p = CmiMyNode();
00902 p = BROADCAST_SPANNING_FACTOR*p + i;
00903 if (p > numnodes - 1) break;
00904 p = p%numnodes;
00905
00906 sendBarrierMessage(p);
00907 }
00908 }
00909 }
00910 CmiNodeAllBarrier();
00911
00912 return 0;
00913 }
00914
00915
00916 int CmiBarrierZero()
00917 {
00918 int i;
00919
00920 if (Cmi_netpoll == 0) return -1;
00921
00922 if (CmiMyRank() == 0) {
00923 if (CmiMyNode()) {
00924 sendBarrierMessage(0);
00925 }
00926 else {
00927 for (i=0; i<CmiNumNodes()-1; i++)
00928 {
00929 recvBarrierMessage();
00930 }
00931 }
00932 }
00933 CmiNodeAllBarrier();
00934 return 0;
00935 }