00001
00035
00036 #define MX_ACTIVE_MESSAGE 1
00037
00038
00039
00040
00041 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00042 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
00043 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
00044 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
00045
00046
00047
00048
00049
00050
00051
00052
00053 typedef struct PendingSentMsgStruct
00054 {
00055 OutgoingMsg ogm;
00056 char *data;
00057 struct PendingSentMsgStruct *next;
00058 mx_request_t handle;
00059 int flag;
00060 }
00061 *PendingSentMsg;
00062
00063 #define CMK_PMPOOL 1
00064
00065 #if CMK_PMPOOL
00066 #define MAXPMS 200
00067 static PendingSentMsg pmpool[MAXPMS];
00068 static int pmNums = 0;
00069
00070 #define putPool(pm) { \
00071 if (pmNums == MAXPMS) free(pm); \
00072 else pmpool[pmNums++] = pm; }
00073
00074 #define getPool(pm) { \
00075 if (pmNums == 0) {pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct));} \
00076 else { pm = pmpool[--pmNums]; }\
00077 }
00078 #else
00079 #define putPool(pm) { free(pm); }
00080 #define getPool(pm) { pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct)); _MEMCHECK(pm);}
00081 #endif
00082
00083 static PendingSentMsg sent_handles=NULL;
00084 static PendingSentMsg sent_handles_end=NULL;
00085
00086 #define NewPendingSentMsg(pm, ogm) \
00087 { getPool(pm); \
00088 pm->next=NULL; pm->ogm=ogm; pm->data=data; \
00089 MACHSTATE1(1,"alloc msg %p",pm);\
00090 }
00091
00092 #define InsertPendingSentMsg(pm) \
00093 { if(sent_handles_end==NULL) {sent_handles=pm;} \
00094 else {sent_handles_end->next=pm;} \
00095 sent_handles_end=pm; MACHSTATE(1,"Insert done");}
00096
00097 #define FreePendingSentMsg(pm) \
00098 { sent_handles=pm->next; \
00099 if (sent_handles == NULL) sent_handles_end = NULL; \
00100 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);} \
00101 else CmiFree(pm->data); \
00102 putPool(pm); }
00103
00104 CmiUInt8 MATCH_FILTER = 0x11111111FFFFFFFFLL;
00105 CmiUInt8 MATCH_MASK = 0xffffffffffffffffLL;
00106
00107 static int processMessage(char *msg, int len);
00108 static const char *getErrorMsg(mx_return_t rc);
00109 static void processStatusCode(mx_status_t status);
00110 static void PumpMsgs(int getone);
00111 static void ReleaseSentMsgs(void);
00112 #if MX_ACTIVE_MESSAGE
00113 static void PumpEvents(int getone);
00114 static volatile int gotone = 0;
00115 #endif
00116
00117
00118
00119
00120
00121
00122 typedef struct {
00123 char none;
00124 } CmiIdleState;
00125
00126 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00127
00128 static void CmiNotifyStillIdle(CmiIdleState *s)
00129 {
00130 int sleep = 1;
00131 MACHSTATE(1,"CmiNotifyStillIdle {");
00132 #if 0
00133 CommunicationServer(0, COMM_SERVER_FROM_WORKER);
00134 #else
00135 #if MX_ACTIVE_MESSAGE
00136 CmiCommLock();
00137 PumpEvents(1);
00138 CmiCommUnlock();
00139 #else
00140 #if CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00141 sleep = 0;
00142 #endif
00143 CmiCommLock();
00144 ReleaseSentMsgs();
00145 PumpMsgs(sleep);
00146 CmiCommUnlock();
00147 #endif
00148 #endif
00149 MACHSTATE(1,"} CmiNotifyStillIdle");
00150 }
00151
00152 void CmiNotifyIdle(void) {
00153 CmiNotifyStillIdle(NULL);
00154 }
00155
00156 static void CmiNotifyBeginIdle(CmiIdleState *s)
00157 {
00158 CmiNotifyStillIdle(s);
00159 }
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 int CheckSocketsReady(int withDelayMs)
00173 {
00174 int nreadable;
00175 CMK_PIPE_DECL(withDelayMs);
00176
00177 CmiStdoutAdd(CMK_PIPE_SUB);
00178 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00179
00180 nreadable=CMK_PIPE_CALL();
00181 ctrlskt_ready_read = 0;
00182 dataskt_ready_read = 0;
00183 dataskt_ready_write = 0;
00184
00185 if (nreadable == 0) {
00186 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00187 return nreadable;
00188 }
00189 if (nreadable==-1) {
00190 CMK_PIPE_CHECKERR();
00191 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00192 return CheckSocketsReady(0);
00193 }
00194 CmiStdoutCheck(CMK_PIPE_SUB);
00195 if (Cmi_charmrun_fd!=-1)
00196 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00197 MACHSTATE(1,"} CheckSocketsReady")
00198 return nreadable;
00199 }
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212 static void ServiceCharmrun_nolock()
00213 {
00214 int again = 1;
00215 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00216 while (again)
00217 {
00218 again = 0;
00219 CheckSocketsReady(0);
00220 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
00221 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
00222 }
00223 MACHSTATE(2,"} ServiceCharmrun_nolock end")
00224 }
00225
00226 static void PumpMsgs(int getone) {
00227 mx_return_t rc;
00228 mx_status_t status;
00229 uint32_t result;
00230 mx_segment_t buffer_desc;
00231 mx_request_t recv_handle;
00232
00233 MACHSTATE1(2,"PumpMsgs(%d) {", getone);
00234 while (1) {
00235 if (getone)
00236 rc = mx_probe(endpoint, 1, MATCH_FILTER, MATCH_MASK, &status, &result);
00237 else
00238 rc = mx_iprobe(endpoint, MATCH_FILTER, MATCH_MASK, &status, &result);
00239 if(rc != MX_SUCCESS) {
00240 const char *errmsg = getErrorMsg(rc);
00241 CmiPrintf("mx_iprobe error: %s\n", errmsg);
00242 CmiAbort("mx_iprobe Abort");
00243 }
00244 if (result == 0) {
00245 break;
00246 }
00247 MACHSTATE(2,"PumpMsgs recv one");
00248 buffer_desc.segment_length = status.msg_length;
00249 buffer_desc.segment_ptr = (char *) CmiAlloc(status.msg_length);
00250 MACHSTATE(1,"Non-blocking receive {")
00251 MACHSTATE1(1," size %d", status.msg_length);
00252 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00253 if (rc != MX_SUCCESS) {
00254 const char *errmsg = getErrorMsg(rc);
00255 CmiPrintf("mx_irecv error: %s\n", errmsg);
00256 CmiAbort("Abort");
00257 }
00258 MACHSTATE1(1,"} Non-blocking receive return %d", rc);
00259 again:
00260 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00261
00262 MACHSTATE3(1,"mx_wait return rc=%d result=%d status=%d", rc, result, status.code);
00263 if (rc != MX_SUCCESS) {
00264 const char *errmsg = getErrorMsg(rc);
00265 CmiPrintf("mx_wait error: %s\n", errmsg);
00266 CmiAbort("Abort");
00267 }
00268 if(result==0) {
00269 CmiPrintf("mx_wait error: TIME OUT\n");
00270 goto again;
00271 }
00272 else {
00273 processMessage(buffer_desc.segment_ptr, buffer_desc.segment_length);
00274 }
00275 if (getone) break;
00276 }
00277 MACHSTATE1(2,"} PumpMsgs(%d)", getone);
00278 }
00279
00280 #if MX_ACTIVE_MESSAGE
00281 static void PumpEvents(int getone) {
00282 mx_return_t rc;
00283 mx_status_t status;
00284 uint32_t result;
00285 mx_segment_t buffer_desc;
00286 mx_request_t recv_handle;
00287
00288 while (1) {
00289 rc = mx_ipeek(endpoint, &recv_handle, &result);
00290 if (rc != MX_SUCCESS) {
00291 const char *errmsg = getErrorMsg(rc);
00292 CmiPrintf("mx_ipeek error: %s\n", errmsg);
00293 CmiAbort("Abort");
00294 }
00295 if (result == 0) break;
00296 rc = mx_test(endpoint, &recv_handle, &status, &result);
00297
00298 if (rc != MX_SUCCESS) {
00299 const char *errmsg = getErrorMsg(rc);
00300 CmiPrintf("mx_wait error: %s\n", errmsg);
00301 CmiAbort("Abort");
00302 }
00303 if(result==0) {
00304 CmiAbort("mx_test or wait: TIME OUT\n");
00305 }
00306 else {
00307 PendingSentMsg pm = (PendingSentMsg)status.context;
00308 if (pm->flag == 1) {
00309 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);}
00310 else CmiFree(pm->data);
00311 }
00312 else if (pm->flag == 2) {
00313 #if MX_ACTIVE_MESSAGE
00314 if (status.msg_length == 4) {
00315 gotone ++;
00316 CmiFree(pm->data);
00317 }
00318 else
00319 #endif
00320 processMessage(pm->data, status.msg_length);
00321 }
00322 else {
00323 CmiAbort("Invalid PendingSentMsg!");
00324 }
00325 putPool(pm);
00326 }
00327 if (getone) break;
00328 }
00329 }
00330
00331
00332 void recv_callback(void * context, uint64_t match_info, int length)
00333 {
00334 mx_segment_t buffer_desc;
00335 mx_request_t recv_handle;
00336 mx_return_t rc;
00337 mx_status_t status;
00338 uint32_t result;
00339 PendingSentMsg pm;
00340
00341 buffer_desc.segment_length = length;
00342 buffer_desc.segment_ptr = (char *) CmiAlloc(length);
00343 getPool(pm);
00344 pm->flag = 2;
00345 pm->data = buffer_desc.segment_ptr;
00346 if (MATCH_FILTER != match_info) {
00347 CmiAbort("Invalid match_info");
00348 }
00349 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, pm, &recv_handle);
00350 if (rc != MX_SUCCESS) {
00351 const char *errmsg = getErrorMsg(rc);
00352 CmiPrintf("mx_irecv error: %s\n", errmsg);
00353 CmiAbort("Abort");
00354 }
00355 if (1) {
00356 rc = mx_test(endpoint, &recv_handle, &status, &result);
00357 if (rc != MX_SUCCESS) {
00358 const char *errmsg = getErrorMsg(rc);
00359 CmiPrintf("mx_wait error: %s\n", errmsg);
00360 CmiAbort("Abort");
00361 }
00362 if(result==0) {
00363 return;
00364 }
00365 else {
00366 processStatusCode(status);
00367 CmiPrintf("PUSH HERE\n");
00368 processMessage(pm->data, status.msg_length);
00369 putPool(pm);
00370 }
00371 }
00372 }
00373 #endif
00374
00375 #define test_send_complete(handle, status, result) \
00376 { \
00377 mx_return_t rc; \
00378 rc = mx_test(endpoint, &(handle), &status, &result); \
00379 if (rc != MX_SUCCESS) { \
00380 MACHSTATE1(3," mx_test returns %d", rc); \
00381 CmiAbort("mx_test failed\n"); \
00382 } \
00383 }
00384
00385 static void ReleaseSentMsgs(void) {
00386 MACHSTATE(2,"ReleaseSentMsgs {");
00387 mx_return_t rc;
00388 mx_status_t status;
00389 unsigned int result;
00390 PendingSentMsg next, pm = sent_handles;
00391 while (pm!=NULL) {
00392 test_send_complete(pm->handle, status, result);
00393 next = pm->next;
00394 if(result!=0 && status.code == MX_STATUS_SUCCESS) {
00395 MACHSTATE1(2,"Sent complete. Free sent msg size %d", status.msg_length);
00396 FreePendingSentMsg(pm);
00397 }
00398 else
00399 break;
00400 pm = next;
00401 }
00402 MACHSTATE(2,"} ReleaseSentMsgs");
00403 }
00404
00405 static void CommunicationServer_nolock(int withDelayMs) {
00406 if (endpoint == NULL) return;
00407 MACHSTATE(2,"CommunicationServer_nolock start {")
00408 #if MX_ACTIVE_MESSAGE
00409 PumpEvents(0);
00410 #else
00411 PumpMsgs(0);
00412 ReleaseSentMsgs();
00413 #endif
00414 MACHSTATE(2,"}CommunicationServer_nolock end");
00415 }
00416
00417
00418
00419
00420
00421
00422
00423
00424 static void CommunicationServer(int withDelayMs, int where)
00425 {
00426
00427 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00428
00429 MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
00430
00431 if (where == COMM_SERVER_FROM_WORKER && machine_initiated_shutdown) {
00432
00433 return;
00434 }
00435
00436 if (where == COMM_SERVER_FROM_INTERRUPT) {
00437
00438 if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
00439 return;
00440 }
00441 else if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_WORKER) {
00442 if (machine_initiated_shutdown) ServiceCharmrun_nolock();
00443 }
00444
00445 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00446
00447 CmiCommLock();
00448 CommunicationServer_nolock(withDelayMs);
00449 CmiCommUnlock();
00450
00451 #if CMK_IMMEDIATE_MSG
00452 if (where == COMM_SERVER_FROM_SMP)
00453 CmiHandleImmediate();
00454 #endif
00455
00456 MACHSTATE(2,"} CommunicationServer")
00457 }
00458
00459 void processFutureMessages(OtherNode node)
00460 {
00461 if (!CdsFifo_Empty(node->futureMsgs)) {
00462 int len = CdsFifo_Length(node->futureMsgs);
00463 CmiPrintf("[%d] processFutureMessages %d\n", CmiMyPe(), len);
00464 int i=0;
00465 while (i<len) {
00466 FutureMessage f = (FutureMessage)CdsFifo_Dequeue(node->futureMsgs);
00467 int status = processMessage(f->msg, f->len);
00468 free(f);
00469 i++;
00470 }
00471 }
00472 }
00473
00474 static int processMessage(char *msg, int len)
00475 {
00476 char *newmsg;
00477 int rank, srcpe, seqno, magic, i;
00478 unsigned int broot;
00479 int size;
00480 unsigned char checksum;
00481
00482 if (len >= DGRAM_HEADER_SIZE) {
00483 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00484
00485 MACHSTATE2(1, "Break header Cmi-charmrun_id=%d, magic=%d", Cmi_charmrun_pid, magic);
00486 MACHSTATE3(1, "srcpe=%d, seqno=%d, rank=%d", srcpe, seqno, rank);
00487
00488 #ifdef CMK_USE_CHECKSUM
00489 checksum = computeCheckSum(msg, len);
00490 if (checksum == 0)
00491 #else
00492 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00493 #endif
00494 {
00495 OtherNode node = nodes_by_pe[srcpe];
00496
00497 if (seqno == node->recv_expect) {
00498 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00499 }
00500 else if (seqno < node->recv_expect) {
00501 CmiPrintf("[%d] Warning: Past packet received from PE %d (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00502 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
00503 return 0;
00504 }
00505 else {
00506 CmiPrintf("[%d] Error detected - Packet out of order from PE %d (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00507
00508
00509
00510 FutureMessage f = (FutureMessage)malloc(sizeof(struct FutureMessageStruct));
00511 f->msg = msg;
00512 f->len = len;
00513 CdsFifo_Enqueue(node->futureMsgs, f);
00514 return 0;
00515 }
00516
00517 newmsg = node->asm_msg;
00518 if (newmsg == 0) {
00519 size = CmiMsgHeaderGetLength(msg);
00520 if (size != len) {
00521 newmsg = (char *)CmiAlloc(size);
00522 _MEMCHECK(newmsg);
00523 if (len > size) {
00524 CmiPrintf("size: %d, len:%d.\n", size, len);
00525 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00526 }
00527 memcpy(newmsg, msg, len);
00528 CmiFree(msg);
00529 }
00530 else
00531 newmsg = msg;
00532 node->asm_rank = rank;
00533 node->asm_total = size;
00534 node->asm_fill = len;
00535 node->asm_msg = newmsg;
00536 }
00537 else {
00538 size = len - DGRAM_HEADER_SIZE;
00539 if (node->asm_fill+size > node->asm_total) {
00540 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00541 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00542 }
00543 memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
00544 CmiFree(msg);
00545 node->asm_fill += size;
00546 }
00547
00548
00549 if (node->asm_fill == node->asm_total) {
00550 switch (rank) {
00551 case DGRAM_BROADCAST: {
00552 for (i=1; i<_Cmi_mynodesize; i++)
00553 CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00554 CmiPushPE(0, newmsg);
00555 break;
00556 }
00557 #if CMK_NODE_QUEUE_AVAILABLE
00558 case DGRAM_NODEBROADCAST:
00559 case DGRAM_NODEMESSAGE: {
00560 CmiPushNode(newmsg);
00561 break;
00562 }
00563 #endif
00564 default:
00565 CmiPushPE(rank, newmsg);
00566 }
00567 node->asm_msg = 0;
00568
00569 #if CMK_BROADCAST_SPANNING_TREE
00570 if (rank == DGRAM_BROADCAST
00571 #if CMK_NODE_QUEUE_AVAILABLE
00572 || rank == DGRAM_NODEBROADCAST
00573 #endif
00574 )
00575 SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00576 #elif CMK_BROADCAST_HYPERCUBE
00577 if (rank == DGRAM_BROADCAST
00578 #if CMK_NODE_QUEUE_AVAILABLE
00579 || rank == DGRAM_NODEBROADCAST
00580 #endif
00581 )
00582 SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00583 #endif
00584 }
00585 processFutureMessages(node);
00586 }
00587 else {
00588 #ifdef CMK_USE_CHECKSUM
00589 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
00590 #else
00591 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
00592 CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00593 #endif
00594 CmiPrintf("[%d] recved: rank:%d src:%d magic:%d seqno:%d len:%d\n", CmiMyPe(), rank, srcpe, magic, seqno,
00595 len);
00596 }
00597 }
00598 else {
00599 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
00600 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
00601 }
00602
00603 return 1;
00604 }
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614 void EnqueueOutgoingDgram
00615 (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot, int copy)
00616 {
00617 int size, len, seqno;
00618 mx_return_t rc;
00619 mx_request_t sent_handle;
00620 mx_segment_t buffer_desc;
00621 uint32_t result;
00622 char *data;
00623
00624 len = dlen + DGRAM_HEADER_SIZE;;
00625
00626 if (copy) {
00627 data = CopyMsg(ptr-DGRAM_HEADER_SIZE, len);
00628 }
00629 else {
00630 data = ptr-DGRAM_HEADER_SIZE;
00631 ogm->refcount++;
00632 }
00633
00634 seqno = node->send_next;
00635 MACHSTATE5(1, "[%d] SEQNO: %d to node %d rank: %d %d", CmiMyPe(), seqno, node-nodes, rank, broot);
00636 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
00637 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00638 #ifdef CMK_USE_CHECKSUM
00639 {
00640 DgramHeader *head = (DgramHeader *)data;
00641 head->magic ^= computeCheckSum(data, len);
00642 }
00643 #endif
00644
00645 MACHSTATE1(2, "EnqueueOutgoingDgram { len=%d", len);
00646
00647
00648
00649 buffer_desc.segment_ptr = data;
00650 buffer_desc.segment_length = len;
00651 PendingSentMsg pm;
00652 if (copy) ogm = NULL;
00653 NewPendingSentMsg(pm, ogm);
00654 pm->flag = 1;
00655 rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, pm, &(pm->handle));
00656 if (rc != MX_SUCCESS) {
00657 MACHSTATE1(3," mx_isend returns %d", rc);
00658 CmiAbort("mx_isend failed\n");
00659 }
00660 #if !MX_ACTIVE_MESSAGE
00661 InsertPendingSentMsg(pm);
00662 #endif
00663 MACHSTATE(2, "} EnqueueOutgoingDgram");
00664 }
00665
00666
00667
00668 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00669 {
00670 int size; char *data;
00671
00672 size = ogm->size - DGRAM_HEADER_SIZE;
00673 data = ogm->data + DGRAM_HEADER_SIZE;
00674
00675 MACHSTATE3(2, "DeliverViaNetwork { : size:%d, to node mach_id=%d, nic=%ld", ogm->size, node->mach_id, node->nic_id);
00676
00677 while (size > Cmi_dgram_max_data) {
00678 copy = 1;
00679 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot, copy);
00680 data += Cmi_dgram_max_data;
00681 size -= Cmi_dgram_max_data;
00682 }
00683 if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot, copy);
00684
00685 MACHSTATE(2, "} DeliverViaNetwork");
00686 }
00687
00688 static void sendBarrierMessage(int pe)
00689 {
00690 mx_request_t send_handle;
00691 mx_segment_t buffer_desc;
00692 mx_return_t rc;
00693 mx_status_t status;
00694 uint32_t result;
00695 char msg[4];
00696
00697 OtherNode node = nodes + pe;
00698 buffer_desc.segment_ptr = msg;
00699 buffer_desc.segment_length = 4;
00700 rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, NULL, &send_handle);
00701 do {
00702 rc = mx_test(endpoint, &send_handle, &status, &result);
00703 } while (rc != MX_SUCCESS || result==0);
00704 }
00705
00706 static void recvBarrierMessage()
00707 {
00708 mx_segment_t buffer_desc;
00709 char msg[4];
00710 mx_return_t rc;
00711 mx_status_t status;
00712 mx_request_t recv_handle;
00713 uint32_t result;
00714
00715 #if MX_ACTIVE_MESSAGE
00716 while (gotone == 0) {
00717 mx_progress(endpoint);
00718 PumpEvents(1);
00719 }
00720 gotone--;
00721 #else
00722 do {
00723 rc = mx_probe(endpoint, 100, MATCH_FILTER, MATCH_MASK, &status, &result);
00724 } while (result == 0);
00725 CmiAssert(status.msg_length == 4);
00726
00727 buffer_desc.segment_length = 4;
00728 buffer_desc.segment_ptr = msg;
00729 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00730 do {
00731 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00732 } while (rc!=MX_SUCCESS || result == 0);
00733 #endif
00734 }
00735
00736
00737 int CmiBarrier()
00738 {
00739 int len, size, i;
00740 int status;
00741 int count = 0;
00742 OtherNode node;
00743 int numnodes = CmiNumNodes();
00744 if (CmiMyRank() == 0) {
00745
00746 if (CmiMyNode() != 0) {
00747 sendBarrierMessage(0);
00748 }
00749
00750 if (CmiMyNode() == 0)
00751 {
00752 for (count = 1; count < numnodes; count ++)
00753 {
00754 recvBarrierMessage();
00755 }
00756
00757 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00758 int p = i;
00759 if (p > numnodes - 1) break;
00760
00761 sendBarrierMessage(p);
00762 }
00763 }
00764
00765 if (CmiMyNode() != 0)
00766 {
00767 recvBarrierMessage();
00768 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00769 int p = CmiMyNode();
00770 p = BROADCAST_SPANNING_FACTOR*p + i;
00771 if (p > numnodes - 1) break;
00772 p = p%numnodes;
00773
00774 sendBarrierMessage(p);
00775 }
00776 }
00777 }
00778 CmiNodeAllBarrier();
00779
00780 return 0;
00781 }
00782
00783
00784 int CmiBarrierZero()
00785 {
00786 int i;
00787
00788 if (CmiMyRank() == 0) {
00789 if (CmiMyNode()) {
00790 sendBarrierMessage(0);
00791 }
00792 else {
00793 for (i=0; i<CmiNumNodes()-1; i++)
00794 {
00795 recvBarrierMessage();
00796 }
00797 }
00798 }
00799 CmiNodeAllBarrier();
00800 return 0;
00801 }
00802
00803
00804
00805
00806
00807
00808
00809
00810 static int maxsize;
00811
00812 void CmiMachineInit(char **argv)
00813 {
00814 MACHSTATE(3,"CmiMachineInit {");
00815 mx_return_t rc;
00816 endpoint = NULL;
00817
00818 if (dataport == -1) return;
00819
00820 rc = mx_init();
00821 if (rc != MX_SUCCESS) {
00822 MACHSTATE1(3," mx_init returns %d", rc);
00823 printf("Cannot open MX library (does the machine have a GM card?)\n");
00824 return;
00825 }
00826
00827 rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, MX_FILTER, 0, 0, &endpoint);
00828 if (rc != MX_SUCCESS) {
00829 MACHSTATE1(3," open endpoint address returns %d", rc);
00830 printf("Cannot open endpoint address\n");
00831 return;
00832 }
00833
00834
00835 rc = mx_get_endpoint_addr(endpoint, &endpoint_addr);
00836 if (rc != MX_SUCCESS) {
00837 MACHSTATE1(3," get endpoint address returns %d", rc);
00838 printf("Cannot get endpoint address\n");
00839 return;
00840 }
00841
00842
00843 rc = mx_decompose_endpoint_addr(endpoint_addr, &Cmi_nic_id, (uint32_t*)&Cmi_mach_id);
00844 if (rc != MX_SUCCESS) {
00845 MACHSTATE1(3," mx_decompose_endpoint returns %d", rc);
00846 printf("Cannot decompose endpoint address\n");
00847 return;
00848 }
00849
00850 dataport = 1;
00851
00852 MATCH_FILTER &= Cmi_charmrun_pid;
00853
00854 Cmi_dgram_max_data = 1024-DGRAM_HEADER_SIZE;
00855
00856 #if MX_ACTIVE_MESSAGE
00857 mx_register_unexp_callback(endpoint, recv_callback, NULL);
00858 #endif
00859
00860 MACHSTATE(3,"} CmiMachineInit");
00861 }
00862
00863 void CmiMXMakeConnection();
00864
00865 void CmiCommunicationInit(char **argv)
00866 {
00867 CmiMXMakeConnection();
00868 }
00869
00870 void CmiMachineExit()
00871 {
00872 MACHSTATE(3, "CmiMachineExit {");
00873 mx_return_t rc;
00874 if (endpoint) {
00875 rc = mx_close_endpoint(endpoint);
00876 if(rc!=MX_SUCCESS){
00877 MACHSTATE1(3, "mx_close_endpoint returns %d", rc);
00878 printf("Can't do mx_close_endpoint\n");
00879 return;
00880 }
00881 endpoint = NULL;
00882 rc = mx_finalize();
00883 if(rc!=MX_SUCCESS){
00884 MACHSTATE1(3, "mx_finalize returns %d", rc);
00885 printf("Can't do mx_finalize\n");
00886 return;
00887 }
00888 }
00889 MACHSTATE(3, "} CmiMachineExit");
00890 }
00891
00892
00893 void CmiMXMakeConnection()
00894 {
00895 int i;
00896 int doabort = 0;
00897 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00898 if (endpoint == NULL) machine_exit(1);
00899 MACHSTATE(3,"CmiMXMakeConnection {");
00900 for (i=0; i<_Cmi_numnodes; i++) {
00901 mx_return_t rc;
00902 char ip_str[128];
00903 skt_print_ip(ip_str, nodes[i].IP);
00904 rc = mx_connect(endpoint, nodes[i].nic_id, nodes[i].mach_id, MX_FILTER, MX_INFINITE, &nodes[i].endpoint_addr);
00905 if (rc != MX_SUCCESS) {
00906 CmiPrintf("Error> mx node %d can't contact node %d. \n", CmiMyPe(), i);
00907 doabort = 1;
00908 }
00909 }
00910 if (doabort) CmiAbort("CmiMXMakeConnection");
00911 MACHSTATE(3,"}CmiMXMakeConnection");
00912 }
00913
00914 static const char *getErrorMsg(mx_return_t rc)
00915 {
00916
00917
00918
00919
00920
00921
00922
00923
00924 return mx_strerror(rc);
00925 }
00926
00927 static void processStatusCode(mx_status_t status){
00928 const char *str = mx_strstatus(status.code);
00929 CmiPrintf("processStatusCode: %s\n", str);
00930 MACHSTATE1(4, "%s", str);
00931 }
00932