00001
00035
00036 #define MX_ACTIVE_MESSAGE 1
00037
00038
00039
00040
00041 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00042 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
00043 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
00044 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
00045
00046
00047
00048
00049
00050
00051
00052
00053 typedef struct PendingSentMsgStruct
00054 {
00055 OutgoingMsg ogm;
00056 char *data;
00057 struct PendingSentMsgStruct *next;
00058 mx_request_t handle;
00059 int flag;
00060 }
00061 *PendingSentMsg;
00062
00063 #define CMK_PMPOOL 1
00064
00065 #if CMK_PMPOOL
00066 #define MAXPMS 200
00067 static PendingSentMsg pmpool[MAXPMS];
00068 static int pmNums = 0;
00069
00070 #define putPool(pm) { \
00071 if (pmNums == MAXPMS) free(pm); \
00072 else pmpool[pmNums++] = pm; }
00073
00074 #define getPool(pm) { \
00075 if (pmNums == 0) {pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct));} \
00076 else { pm = pmpool[--pmNums]; }\
00077 }
00078 #else
00079 #define putPool(pm) { free(pm); }
00080 #define getPool(pm) { pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct)); _MEMCHECK(pm);}
00081 #endif
00082
00083 static PendingSentMsg sent_handles=NULL;
00084 static PendingSentMsg sent_handles_end=NULL;
00085
00086 #define NewPendingSentMsg(pm, ogm) \
00087 { getPool(pm); \
00088 pm->next=NULL; pm->ogm=ogm; pm->data=data; \
00089 MACHSTATE1(1,"alloc msg %p",pm);\
00090 }
00091
00092 #define InsertPendingSentMsg(pm) \
00093 { if(sent_handles_end==NULL) {sent_handles=pm;} \
00094 else {sent_handles_end->next=pm;} \
00095 sent_handles_end=pm; MACHSTATE(1,"Insert done");}
00096
00097 #define FreePendingSentMsg(pm) \
00098 { sent_handles=pm->next; \
00099 if (sent_handles == NULL) sent_handles_end = NULL; \
00100 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);} \
00101 else CmiFree(pm->data); \
00102 putPool(pm); }
00103
00104 CmiUInt8 MATCH_FILTER = 0x11111111FFFFFFFFL;
00105 CmiUInt8 MATCH_MASK = 0xffffffffffffffffL;
00106
00107 static int processMessage(char *msg, int len);
00108 static const char *getErrorMsg(mx_return_t rc);
00109 static void processStatusCode(mx_status_t status);
00110 static void PumpMsgs(int getone);
00111 static void ReleaseSentMsgs(void);
00112 #if MX_ACTIVE_MESSAGE
00113 static void PumpEvents(int getone);
00114 #endif
00115
00116
00117
00118
00119
00120
00121 typedef struct {
00122 char none;
00123 } CmiIdleState;
00124
00125 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00126
00127 static void CmiNotifyStillIdle(CmiIdleState *s)
00128 {
00129 int sleep = 1;
00130 MACHSTATE(1,"CmiNotifyStillIdle {");
00131 #if 0
00132 CommunicationServer(0,2);
00133 #else
00134 #if MX_ACTIVE_MESSAGE
00135 CmiCommLock();
00136 PumpEvents(1);
00137 CmiCommUnlock();
00138 #else
00139 #if CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00140 sleep = 0;
00141 #endif
00142 CmiCommLock();
00143 ReleaseSentMsgs();
00144 PumpMsgs(sleep);
00145 CmiCommUnlock();
00146 #endif
00147 #endif
00148 MACHSTATE(1,"} CmiNotifyStillIdle");
00149 }
00150
00151 void CmiNotifyIdle(void) {
00152 CmiNotifyStillIdle(NULL);
00153 }
00154
00155 static void CmiNotifyBeginIdle(CmiIdleState *s)
00156 {
00157 CmiNotifyStillIdle(s);
00158 }
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 int CheckSocketsReady(int withDelayMs)
00172 {
00173 int nreadable;
00174 CMK_PIPE_DECL(withDelayMs);
00175
00176 CmiStdoutAdd(CMK_PIPE_SUB);
00177 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00178
00179 nreadable=CMK_PIPE_CALL();
00180 ctrlskt_ready_read = 0;
00181 dataskt_ready_read = 0;
00182 dataskt_ready_write = 0;
00183
00184 if (nreadable == 0) {
00185 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00186 return nreadable;
00187 }
00188 if (nreadable==-1) {
00189 CMK_PIPE_CHECKERR();
00190 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00191 return CheckSocketsReady(0);
00192 }
00193 CmiStdoutCheck(CMK_PIPE_SUB);
00194 if (Cmi_charmrun_fd!=-1)
00195 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00196 MACHSTATE(1,"} CheckSocketsReady")
00197 return nreadable;
00198 }
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211 static void ServiceCharmrun_nolock()
00212 {
00213 int again = 1;
00214 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00215 while (again)
00216 {
00217 again = 0;
00218 CheckSocketsReady(0);
00219 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
00220 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
00221 }
00222 MACHSTATE(2,"} ServiceCharmrun_nolock end")
00223 }
00224
00225 static void PumpMsgs(int getone) {
00226 mx_return_t rc;
00227 mx_status_t status;
00228 uint32_t result;
00229 mx_segment_t buffer_desc;
00230 mx_request_t recv_handle;
00231
00232 MACHSTATE1(2,"PumpMsgs(%d) {", getone);
00233 while (1) {
00234 if (getone)
00235 rc = mx_probe(endpoint, 1, MATCH_FILTER, MATCH_MASK, &status, &result);
00236 else
00237 rc = mx_iprobe(endpoint, MATCH_FILTER, MATCH_MASK, &status, &result);
00238 if(rc != MX_SUCCESS) {
00239 const char *errmsg = getErrorMsg(rc);
00240 CmiPrintf("mx_iprobe error: %s\n", errmsg);
00241 CmiAbort("mx_iprobe Abort");
00242 }
00243 if (result == 0) {
00244 break;
00245 }
00246 MACHSTATE(2,"PumpMsgs recv one");
00247 buffer_desc.segment_length = status.msg_length;
00248 buffer_desc.segment_ptr = (char *) CmiAlloc(status.msg_length);
00249 MACHSTATE(1,"Non-blocking receive {")
00250 MACHSTATE1(1," size %d", status.msg_length);
00251 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00252 if (rc != MX_SUCCESS) {
00253 const char *errmsg = getErrorMsg(rc);
00254 CmiPrintf("mx_irecv error: %s\n", errmsg);
00255 CmiAbort("Abort");
00256 }
00257 MACHSTATE1(1,"} Non-blocking receive return %d", rc);
00258 again:
00259 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00260
00261 MACHSTATE3(1,"mx_wait return rc=%d result=%d status=%d", rc, result, status.code);
00262 if (rc != MX_SUCCESS) {
00263 const char *errmsg = getErrorMsg(rc);
00264 CmiPrintf("mx_wait error: %s\n", errmsg);
00265 CmiAbort("Abort");
00266 }
00267 if(result==0) {
00268 CmiPrintf("mx_wait error: TIME OUT\n");
00269 goto again;
00270 }
00271 else {
00272 processMessage(buffer_desc.segment_ptr, buffer_desc.segment_length);
00273 }
00274 if (getone) break;
00275 }
00276 MACHSTATE1(2,"} PumpMsgs(%d)", getone);
00277 }
00278
00279 #if MX_ACTIVE_MESSAGE
00280 static void PumpEvents(int getone) {
00281 mx_return_t rc;
00282 mx_status_t status;
00283 uint32_t result;
00284 mx_segment_t buffer_desc;
00285 mx_request_t recv_handle;
00286
00287 while (1) {
00288 rc = mx_ipeek(endpoint, &recv_handle, &result);
00289 if (rc != MX_SUCCESS) {
00290 const char *errmsg = getErrorMsg(rc);
00291 CmiPrintf("mx_ipeek error: %s\n", errmsg);
00292 CmiAbort("Abort");
00293 }
00294 if (result == 0) break;
00295 rc = mx_test(endpoint, &recv_handle, &status, &result);
00296
00297 if (rc != MX_SUCCESS) {
00298 const char *errmsg = getErrorMsg(rc);
00299 CmiPrintf("mx_wait error: %s\n", errmsg);
00300 CmiAbort("Abort");
00301 }
00302 if(result==0) {
00303 CmiAbort("mx_test or wait: TIME OUT\n");
00304 }
00305 else {
00306 PendingSentMsg pm = (PendingSentMsg)status.context;
00307 if (pm->flag == 1) {
00308 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);}
00309 else CmiFree(pm->data);
00310 }
00311 else if (pm->flag == 2) {
00312 processMessage(pm->data, status.msg_length);
00313 }
00314 else {
00315 CmiAbort("Invalid PendingSentMsg!");
00316 }
00317 putPool(pm);
00318 }
00319 if (getone) break;
00320 }
00321 }
00322
00323
00324 void recv_callback(void * context, uint64_t match_info, int length)
00325 {
00326 mx_segment_t buffer_desc;
00327 mx_request_t recv_handle;
00328 mx_return_t rc;
00329 mx_status_t status;
00330 uint32_t result;
00331 PendingSentMsg pm;
00332
00333 buffer_desc.segment_length = length;
00334 buffer_desc.segment_ptr = (char *) CmiAlloc(length);
00335 getPool(pm);
00336 pm->flag = 2;
00337 pm->data = buffer_desc.segment_ptr;
00338 if (MATCH_FILTER != match_info) {
00339 CmiAbort("Invalid match_info");
00340 }
00341 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, pm, &recv_handle);
00342 if (rc != MX_SUCCESS) {
00343 const char *errmsg = getErrorMsg(rc);
00344 CmiPrintf("mx_irecv error: %s\n", errmsg);
00345 CmiAbort("Abort");
00346 }
00347 if (1) {
00348 rc = mx_test(endpoint, &recv_handle, &status, &result);
00349 if (rc != MX_SUCCESS) {
00350 const char *errmsg = getErrorMsg(rc);
00351 CmiPrintf("mx_wait error: %s\n", errmsg);
00352 CmiAbort("Abort");
00353 }
00354 if(result==0) {
00355 return;
00356 }
00357 else {
00358 processStatusCode(status);
00359 CmiPrintf("PUSH HERE\n");
00360 processMessage(pm->data, status.msg_length);
00361 putPool(pm);
00362 }
00363 }
00364 }
00365 #endif
00366
00367 #define test_send_complete(handle, status, result) \
00368 { \
00369 mx_return_t rc; \
00370 rc = mx_test(endpoint, &(handle), &status, &result); \
00371 if (rc != MX_SUCCESS) { \
00372 MACHSTATE1(3," mx_test returns %d", rc); \
00373 CmiAbort("mx_test failed\n"); \
00374 } \
00375 }
00376
00377 static void ReleaseSentMsgs(void) {
00378 MACHSTATE(2,"ReleaseSentMsgs {");
00379 mx_return_t rc;
00380 mx_status_t status;
00381 unsigned int result;
00382 PendingSentMsg next, pm = sent_handles;
00383 while (pm!=NULL) {
00384 test_send_complete(pm->handle, status, result);
00385 next = pm->next;
00386 if(result!=0 && status.code == MX_STATUS_SUCCESS) {
00387 MACHSTATE1(2,"Sent complete. Free sent msg size %d", status.msg_length);
00388 FreePendingSentMsg(pm);
00389 }
00390 else
00391 break;
00392 pm = next;
00393 }
00394 MACHSTATE(2,"} ReleaseSentMsgs");
00395 }
00396
00397 static void CommunicationServer_nolock(int withDelayMs) {
00398 MACHSTATE(2,"CommunicationServer_nolock start {")
00399 #if MX_ACTIVE_MESSAGE
00400 PumpEvents(0);
00401 #else
00402 PumpMsgs(0);
00403 ReleaseSentMsgs();
00404 #endif
00405 MACHSTATE(2,"}CommunicationServer_nolock end");
00406 }
00407
00408
00409
00410
00411
00412
00413
00414
00415 static void CommunicationServer(int withDelayMs, int where)
00416 {
00417
00418 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00419
00420 MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
00421
00422 if (where == 2 && machine_initiated_shutdown) {
00423
00424 return;
00425 }
00426
00427 if (where == 1) {
00428
00429 if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
00430 return;
00431 }
00432
00433 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00434
00435 CmiCommLock();
00436 CommunicationServer_nolock(withDelayMs);
00437 CmiCommUnlock();
00438
00439 #if CMK_IMMEDIATE_MSG
00440 if (where == 0)
00441 CmiHandleImmediate();
00442 #endif
00443
00444 MACHSTATE(2,"} CommunicationServer")
00445 }
00446
00447 void processFutureMessages(OtherNode node)
00448 {
00449 if (!CdsFifo_Empty(node->futureMsgs)) {
00450 int len = CdsFifo_Length(node->futureMsgs);
00451 CmiPrintf("[%d] processFutureMessages %d\n", CmiMyPe(), len);
00452 int i=0;
00453 while (i<len) {
00454 FutureMessage f = (FutureMessage)CdsFifo_Dequeue(node->futureMsgs);
00455 int status = processMessage(f->msg, f->len);
00456 free(f);
00457 i++;
00458 }
00459 }
00460 }
00461
00462 static int processMessage(char *msg, int len)
00463 {
00464 char *newmsg;
00465 int rank, srcpe, seqno, magic, i;
00466 unsigned int broot;
00467 int size;
00468 unsigned char checksum;
00469
00470 if (len >= DGRAM_HEADER_SIZE) {
00471 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00472
00473 MACHSTATE2(1, "Break header Cmi-charmrun_id=%d, magic=%d", Cmi_charmrun_pid, magic);
00474 MACHSTATE3(1, "srcpe=%d, seqno=%d, rank=%d", srcpe, seqno, rank);
00475
00476 #ifdef CMK_USE_CHECKSUM
00477 checksum = computeCheckSum(msg, len);
00478 if (checksum == 0)
00479 #else
00480 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00481 #endif
00482 {
00483 OtherNode node = nodes_by_pe[srcpe];
00484
00485 if (seqno == node->recv_expect) {
00486 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00487 }
00488 else if (seqno < node->recv_expect) {
00489 CmiPrintf("[%d] Warning: Past packet received from PE %d (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00490 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
00491 return 0;
00492 }
00493 else {
00494 CmiPrintf("[%d] Error detected - Packet out of order from PE %d (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00495
00496
00497
00498 FutureMessage f = (FutureMessage)malloc(sizeof(struct FutureMessageStruct));
00499 f->msg = msg;
00500 f->len = len;
00501 CdsFifo_Enqueue(node->futureMsgs, f);
00502 return 0;
00503 }
00504
00505 newmsg = node->asm_msg;
00506 if (newmsg == 0) {
00507 size = CmiMsgHeaderGetLength(msg);
00508 if (size != len) {
00509 newmsg = (char *)CmiAlloc(size);
00510 _MEMCHECK(newmsg);
00511 if (len > size) {
00512 CmiPrintf("size: %d, len:%d.\n", size, len);
00513 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00514 }
00515 memcpy(newmsg, msg, len);
00516 CmiFree(msg);
00517 }
00518 else
00519 newmsg = msg;
00520 node->asm_rank = rank;
00521 node->asm_total = size;
00522 node->asm_fill = len;
00523 node->asm_msg = newmsg;
00524 }
00525 else {
00526 size = len - DGRAM_HEADER_SIZE;
00527 if (node->asm_fill+size > node->asm_total) {
00528 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00529 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00530 }
00531 memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
00532 CmiFree(msg);
00533 node->asm_fill += size;
00534 }
00535
00536
00537 if (node->asm_fill == node->asm_total) {
00538 switch (rank) {
00539 case DGRAM_BROADCAST: {
00540 for (i=1; i<_Cmi_mynodesize; i++)
00541 CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00542 CmiPushPE(0, newmsg);
00543 break;
00544 }
00545 #if CMK_NODE_QUEUE_AVAILABLE
00546 case DGRAM_NODEBROADCAST:
00547 case DGRAM_NODEMESSAGE: {
00548 CmiPushNode(newmsg);
00549 break;
00550 }
00551 #endif
00552 default:
00553 CmiPushPE(rank, newmsg);
00554 }
00555 node->asm_msg = 0;
00556
00557 #if CMK_BROADCAST_SPANNING_TREE
00558 if (rank == DGRAM_BROADCAST
00559 #if CMK_NODE_QUEUE_AVAILABLE
00560 || rank == DGRAM_NODEBROADCAST
00561 #endif
00562 )
00563 SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00564 #elif CMK_BROADCAST_HYPERCUBE
00565 if (rank == DGRAM_BROADCAST
00566 #if CMK_NODE_QUEUE_AVAILABLE
00567 || rank == DGRAM_NODEBROADCAST
00568 #endif
00569 )
00570 SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00571 #endif
00572 }
00573 processFutureMessages(node);
00574 }
00575 else {
00576 #ifdef CMK_USE_CHECKSUM
00577 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
00578 #else
00579 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
00580 CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00581 #endif
00582 CmiPrintf("[%d] recved: rank:%d src:%d magic:%d seqno:%d len:%d\n", CmiMyPe(), rank, srcpe, magic, seqno,
00583 len);
00584 }
00585 }
00586 else {
00587 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
00588 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
00589 }
00590
00591 return 1;
00592 }
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602 void EnqueueOutgoingDgram
00603 (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot, int copy)
00604 {
00605 int size, len, seqno;
00606 mx_return_t rc;
00607 mx_request_t sent_handle;
00608 mx_segment_t buffer_desc;
00609 uint32_t result;
00610 char *data;
00611
00612 len = dlen + DGRAM_HEADER_SIZE;;
00613
00614 if (copy) {
00615 data = CopyMsg(ptr-DGRAM_HEADER_SIZE, len);
00616 }
00617 else {
00618 data = ptr-DGRAM_HEADER_SIZE;
00619 ogm->refcount++;
00620 }
00621
00622 seqno = node->send_next;
00623 MACHSTATE5(1, "[%d] SEQNO: %d to node %d rank: %d %d", CmiMyPe(), seqno, node-nodes, rank, broot);
00624 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
00625 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00626 #ifdef CMK_USE_CHECKSUM
00627 {
00628 DgramHeader *head = (DgramHeader *)data;
00629 head->magic ^= computeCheckSum(data, len);
00630 }
00631 #endif
00632
00633 MACHSTATE1(2, "EnqueueOutgoingDgram { len=%d", len);
00634
00635
00636
00637 buffer_desc.segment_ptr = data;
00638 buffer_desc.segment_length = len;
00639 PendingSentMsg pm;
00640 if (copy) ogm = NULL;
00641 NewPendingSentMsg(pm, ogm);
00642 pm->flag = 1;
00643 rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, pm, &(pm->handle));
00644 if (rc != MX_SUCCESS) {
00645 MACHSTATE1(3," mx_isend returns %d", rc);
00646 CmiAbort("mx_isend failed\n");
00647 }
00648 #if !MX_ACTIVE_MESSAGE
00649 InsertPendingSentMsg(pm);
00650 #endif
00651 MACHSTATE(2, "} EnqueueOutgoingDgram");
00652 }
00653
00654
00655
00656 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00657 {
00658 int size; char *data;
00659
00660 size = ogm->size - DGRAM_HEADER_SIZE;
00661 data = ogm->data + DGRAM_HEADER_SIZE;
00662
00663 MACHSTATE3(2, "DeliverViaNetwork { : size:%d, to node mach_id=%d, nic=%ld", ogm->size, node->mach_id, node->nic_id);
00664
00665 while (size > Cmi_dgram_max_data) {
00666 copy = 1;
00667 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot, copy);
00668 data += Cmi_dgram_max_data;
00669 size -= Cmi_dgram_max_data;
00670 }
00671 if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot, copy);
00672
00673 MACHSTATE(2, "} DeliverViaNetwork");
00674 }
00675
00676 static void sendBarrierMessage(int pe)
00677 {
00678 mx_request_t send_handle;
00679 mx_segment_t buffer_desc;
00680 mx_return_t rc;
00681 mx_status_t status;
00682 uint32_t result;
00683 char msg[10];
00684
00685 OtherNode node = nodes + pe;
00686 buffer_desc.segment_ptr = msg;
00687 buffer_desc.segment_length = 10;
00688 rc = mx_issend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, NULL, &send_handle);
00689 do {
00690 rc = mx_test(endpoint, &send_handle, &status, &result);
00691 } while (rc == MX_SUCCESS && !result);
00692 }
00693
00694 static void recvBarrierMessage()
00695 {
00696 mx_segment_t buffer_desc;
00697 char msg[10];
00698 mx_return_t rc;
00699 mx_status_t status;
00700 mx_request_t recv_handle;
00701 uint32_t result;
00702
00703 buffer_desc.segment_length = 10;
00704 buffer_desc.segment_ptr = msg;
00705 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00706 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00707 }
00708
00709
00710 int CmiBarrier()
00711 {
00712 int len, size, i;
00713 int status;
00714 int count = 0;
00715 OtherNode node;
00716 int numnodes = CmiNumNodes();
00717 if (CmiMyRank() == 0) {
00718
00719 if (CmiMyNode() != 0) {
00720 sendBarrierMessage(0);
00721 }
00722
00723 if (CmiMyNode() == 0)
00724 {
00725 for (count = 1; count < numnodes; count ++)
00726 {
00727 recvBarrierMessage();
00728 }
00729
00730 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00731 int p = i;
00732 if (p > numnodes - 1) break;
00733
00734 sendBarrierMessage(p);
00735 }
00736 }
00737
00738 if (CmiMyNode() != 0)
00739 {
00740 recvBarrierMessage();
00741 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00742 int p = CmiMyNode();
00743 p = BROADCAST_SPANNING_FACTOR*p + i;
00744 if (p > numnodes - 1) break;
00745 p = p%numnodes;
00746
00747 sendBarrierMessage(p);
00748 }
00749 }
00750 }
00751 CmiNodeAllBarrier();
00752
00753 return 0;
00754 }
00755
00756
00757 int CmiBarrierZero()
00758 {
00759 int i;
00760
00761 if (CmiMyRank() == 0) {
00762 if (CmiMyNode()) {
00763 sendBarrierMessage(0);
00764 }
00765 else {
00766 for (i=0; i<CmiNumNodes()-1; i++)
00767 {
00768 recvBarrierMessage();
00769 }
00770 }
00771 }
00772 CmiNodeAllBarrier();
00773 return 0;
00774 }
00775
00776
00777
00778
00779
00780
00781
00782
00783 static int maxsize;
00784
00785 void CmiMachineInit(char **argv)
00786 {
00787 MACHSTATE(3,"CmiMachineInit {");
00788 mx_return_t rc;
00789 endpoint = NULL;
00790
00791 if (dataport == -1) return;
00792
00793 rc = mx_init();
00794 if (rc != MX_SUCCESS) {
00795 MACHSTATE1(3," mx_init returns %d", rc);
00796 printf("Cannot open MX library (does the machine have a GM card?)\n");
00797 return;
00798 }
00799
00800 rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, MX_FILTER, 0, 0, &endpoint);
00801 if (rc != MX_SUCCESS) {
00802 MACHSTATE1(3," open endpoint address returns %d", rc);
00803 printf("Cannot open endpoint address\n");
00804 return;
00805 }
00806
00807
00808 rc = mx_get_endpoint_addr(endpoint, &endpoint_addr);
00809 if (rc != MX_SUCCESS) {
00810 MACHSTATE1(3," get endpoint address returns %d", rc);
00811 printf("Cannot get endpoint address\n");
00812 return;
00813 }
00814
00815
00816 rc = mx_decompose_endpoint_addr(endpoint_addr, &Cmi_nic_id, (uint32_t*)&Cmi_mach_id);
00817 if (rc != MX_SUCCESS) {
00818 MACHSTATE1(3," mx_decompose_endpoint returns %d", rc);
00819 printf("Cannot decompose endpoint address\n");
00820 return;
00821 }
00822
00823 dataport = 1;
00824
00825 MATCH_FILTER &= Cmi_charmrun_pid;
00826
00827 Cmi_dgram_max_data = 1024-DGRAM_HEADER_SIZE;
00828
00829 #if MX_ACTIVE_MESSAGE
00830 mx_register_unexp_callback(endpoint, recv_callback, NULL);
00831 #endif
00832
00833 MACHSTATE(3,"} CmiMachineInit");
00834 }
00835
00836 void CmiCommunicationInit(char **argv)
00837 {
00838 }
00839
00840 void CmiMachineExit()
00841 {
00842 MACHSTATE(3, "CmiMachineExit {");
00843 mx_return_t rc;
00844 if (endpoint) {
00845 rc = mx_close_endpoint(endpoint);
00846 if(rc!=MX_SUCCESS){
00847 MACHSTATE1(3, "mx_close_endpoint returns %d", rc);
00848 printf("Can't do mx_close_endpoint\n");
00849 return;
00850 }
00851 rc = mx_finalize();
00852 if(rc!=MX_SUCCESS){
00853 MACHSTATE1(3, "mx_finalize returns %d", rc);
00854 printf("Can't do mx_finalize\n");
00855 return;
00856 }
00857 }
00858 MACHSTATE(3, "} CmiMachineExit");
00859 }
00860
00861
00862 void CmiMXMakeConnection()
00863 {
00864 int i;
00865 int doabort = 0;
00866 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00867 if (endpoint == NULL) machine_exit(1);
00868 MACHSTATE(3,"CmiMXMakeConnection {");
00869 for (i=0; i<_Cmi_numnodes; i++) {
00870 mx_return_t rc;
00871 char ip_str[128];
00872 skt_print_ip(ip_str, nodes[i].IP);
00873 rc = mx_connect(endpoint, nodes[i].nic_id, nodes[i].mach_id, MX_FILTER, MX_INFINITE, &nodes[i].endpoint_addr);
00874 if (rc != MX_SUCCESS) {
00875 CmiPrintf("Error> mx node %d can't contact node %d. \n", CmiMyPe(), i);
00876 doabort = 1;
00877 }
00878 }
00879 if (doabort) CmiAbort("CmiMXMakeConnection");
00880 MACHSTATE(3,"}CmiMXMakeConnection");
00881 }
00882
00883 static const char *getErrorMsg(mx_return_t rc)
00884 {
00885
00886
00887
00888
00889
00890
00891
00892
00893 return mx_strerror(rc);
00894 }
00895
00896 static void processStatusCode(mx_status_t status){
00897 const char *str = mx_strstatus(status.code);
00898 CmiPrintf("processStatusCode: %s\n", str);
00899 MACHSTATE1(4, "%s", str);
00900 }
00901