00001
00032 #ifdef GM_API_VERSION_2_0
00033 #if GM_API_VERSION >= GM_API_VERSION_2_0
00034 #define CMK_USE_GM2 1
00035 #endif
00036 #endif
00037
00038
00039 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00040 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
00041 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
00042 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
00043
00044 #ifdef __ONESIDED_IMPL
00045 #ifdef __ONESIDED_GM_HARDWARE
00046 #include "conv-onesided.h"
00047 int getSrcHandler;
00048 int getDestHandler;
00049 void handleGetSrc(void *msg);
00050 void handleGetDest(void *msg);
00051 #endif
00052 #endif
00053
00054 static gm_alarm_t gmalarm;
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 #define GM_STATS 0
00065
00066 #if GM_STATS
00067 static FILE *gmf;
00068 static int *gm_stats;
00069 static int possible_streamed = 0;
00070 static int defrag = 0;
00071 static int maxQueueLength = 0;
00072 #endif
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 #define MAXPENDINGSEND 500
00083
00084 typedef struct PendingMsgStruct
00085 {
00086 void *msg;
00087 int length;
00088 int size;
00089 int mach_id;
00090 int dataport;
00091 int node_idx;
00092 struct PendingMsgStruct *next;
00093 }
00094 *PendingMsg;
00095
00096 static int pendinglen = 0;
00097
00098
00099 static PendingMsg pend_freelist=NULL;
00100
00101 #define FreePendingMsg(d) \
00102 d->next = pend_freelist;\
00103 pend_freelist = d;\
00104
00105 #define MallocPendingMsg(d) \
00106 d = pend_freelist;\
00107 if (d==0) {d = ((PendingMsg)malloc(sizeof(struct PendingMsgStruct)));\
00108 _MEMCHECK(d);\
00109 } else pend_freelist = d->next;
00110
00111 void enqueue_sending(char *msg, int length, OtherNode node, int size)
00112 {
00113 PendingMsg pm;
00114 MallocPendingMsg(pm);
00115 pm->msg = msg;
00116 pm->length = length;
00117 pm->mach_id = node->mach_id;
00118 pm->dataport = node->dataport;
00119 pm->node_idx = node-nodes;
00120 pm->size = size;
00121 pm->next = NULL;
00122 if (node->sendhead == NULL) {
00123 node->sendhead = node->sendtail = pm;
00124 }
00125 else {
00126 node->sendtail->next = pm;
00127 node->sendtail = pm;
00128 }
00129 pendinglen ++;
00130 #if GM_STATS
00131 if (pendinglen > maxQueueLength) maxQueueLength = pendinglen;
00132 #endif
00133 }
00134
00135 #define peek_sending(node) (node->sendhead)
00136
00137 #define dequeue_sending(node) \
00138 if (node->sendhead != NULL) { \
00139 node->sendhead = node->sendhead->next; \
00140 pendinglen --; \
00141 }
00142
00143 static void alarmcallback (void *context) {
00144 MACHSTATE(4,"GM Alarm callback executed")
00145 }
00146 static int processEvent(gm_recv_event_t *e);
00147 static void send_progress();
00148 static void alarmInterrupt(int arg);
00149 static int gmExit(int code,const char *msg);
00150 static char *getErrorMsg(gm_status_t status);
00151
00152
00153
00154
00155
00156
00157
00158 #define CMK_MSGPOOL 1
00159
00160 #define MAXMSGLEN 200
00161
00162 static char* msgpool[MAXMSGLEN];
00163 static int msgNums = 0;
00164
00165 static int maxMsgSize = 0;
00166
00167 #define putPool(msg) { \
00168 if (msgNums == MAXMSGLEN) gm_dma_free(gmport, msg); \
00169 else msgpool[msgNums++] = msg; }
00170
00171 #define getPool(msg, len) { \
00172 if (msgNums == 0) msg = gm_dma_malloc(gmport, maxMsgSize); \
00173 else msg = msgpool[--msgNums]; \
00174 }
00175
00176
00177
00178
00179
00180
00181
00182 typedef struct {
00183 char none;
00184 } CmiIdleState;
00185
00186 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00187
00188 static void CmiNotifyStillIdle(CmiIdleState *s);
00189
00190 static void CmiNotifyBeginIdle(CmiIdleState *s)
00191 {
00192 CmiNotifyStillIdle(s);
00193 }
00194
00195
00196
00197 static void CmiNotifyStillIdle(CmiIdleState *s)
00198 {
00199 #if CMK_SHARED_VARS_UNAVAILABLE
00200
00201 int nreadable;
00202 gm_recv_event_t *e;
00203 int pollMs = 4;
00204
00205 #define SLEEP_USING_ALARM 0
00206 #if SLEEP_USING_ALARM
00207 gm_set_alarm (gmport, &gmalarm, (gm_u64_t) pollMs*1000, alarmcallback,
00208 (void *)NULL );
00209 #endif
00210
00211 #if SLEEP_USING_ALARM
00212 MACHSTATE(3,"Blocking on receive {")
00213 e = gm_blocking_receive_no_spin(gmport);
00214 MACHSTATE(3,"} receive returned");
00215 #else
00216 MACHSTATE(3,"CmiNotifyStillIdle NonBlocking on receive {")
00217 e = gm_receive(gmport);
00218 MACHSTATE(3,"} CmiNotifyStillIdle nonblocking receive returned");
00219 #endif
00220
00221 #if SLEEP_USING_ALARM
00222 gm_cancel_alarm (&gmalarm);
00223 #endif
00224
00225
00226 CmiCommLock();
00227 nreadable = processEvent(e);
00228 CmiCommUnlock();
00229 if (nreadable) {
00230 return;
00231 }
00232 #else
00233
00234 CmiIdleLock_sleep(&CmiGetState()->idle,5);
00235 #endif
00236 }
00237
00238 void CmiNotifyIdle(void) {
00239 CmiNotifyStillIdle(NULL);
00240 }
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253 int CheckSocketsReady(int withDelayMs)
00254 {
00255 int nreadable;
00256 CMK_PIPE_DECL(withDelayMs);
00257
00258 CmiStdoutAdd(CMK_PIPE_SUB);
00259 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00260
00261 nreadable=CMK_PIPE_CALL();
00262 ctrlskt_ready_read = 0;
00263 dataskt_ready_read = 0;
00264 dataskt_ready_write = 0;
00265
00266 if (nreadable == 0) {
00267 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00268 return nreadable;
00269 }
00270 if (nreadable==-1) {
00271 CMK_PIPE_CHECKERR();
00272 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00273 return CheckSocketsReady(0);
00274 }
00275 CmiStdoutCheck(CMK_PIPE_SUB);
00276 if (Cmi_charmrun_fd!=-1)
00277 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00278 MACHSTATE(1,"} CheckSocketsReady")
00279 return nreadable;
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 static void ServiceCharmrun_nolock()
00294 {
00295 int again = 1;
00296 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00297 while (again)
00298 {
00299 again = 0;
00300 CheckSocketsReady(0);
00301 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
00302 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
00303 }
00304 MACHSTATE(2,"} ServiceCharmrun_nolock end")
00305 }
00306
00307 static void CommunicationServer_nolock(int withDelayMs) {
00308 gm_recv_event_t *e;
00309
00310 MACHSTATE(2,"CommunicationServer_nolock start {")
00311 while (1) {
00312 MACHSTATE(3,"Non-blocking receive {")
00313 e = gm_receive(gmport);
00314 MACHSTATE(3,"} Non-blocking receive")
00315 if (!processEvent(e)) break;
00316 }
00317 MACHSTATE(2,"}CommunicationServer_nolock end")
00318 }
00319
00320
00321
00322
00323
00324
00325
00326
00327 static void CommunicationServer(int withDelayMs, int where)
00328 {
00329
00330 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
00331
00332 MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
00333
00334 if (where == 1) {
00335
00336 if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
00337 return;
00338 }
00339
00340 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00341
00342 CmiCommLock();
00343 CommunicationServer_nolock(withDelayMs);
00344 CmiCommUnlock();
00345
00346 #if CMK_IMMEDIATE_MSG
00347 if (where == 0)
00348 CmiHandleImmediate();
00349 #endif
00350
00351 MACHSTATE(2,"} CommunicationServer")
00352 }
00353
00354 static void processMessage(char *msg, int len)
00355 {
00356 char *newmsg;
00357 int rank, srcpe, seqno, magic, i;
00358 unsigned int broot;
00359 int size;
00360 unsigned char checksum;
00361
00362 if (len >= DGRAM_HEADER_SIZE) {
00363 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00364 #ifdef CMK_USE_CHECKSUM
00365 checksum = computeCheckSum(msg, len);
00366 if (checksum == 0)
00367 #else
00368 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00369 #endif
00370 {
00371 OtherNode node = nodes_by_pe[srcpe];
00372
00373 if (seqno == node->recv_expect) {
00374 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00375 }
00376 else if (seqno < node->recv_expect) {
00377 CmiPrintf("[%d] Warning: Past packet received from PE %d, something wrong with GM hardware? (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00378 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
00379 return;
00380 }
00381 else {
00382 CmiPrintf("[%d] Error detected - Packet out of order from PE %d, something wrong with GM hardware? (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00383 CmiAbort("\n\n\t\tPacket out of order!!\n\n");
00384 }
00385 newmsg = node->asm_msg;
00386 if (newmsg == 0) {
00387 size = CmiMsgHeaderGetLength(msg);
00388 if (len > size) {
00389 CmiPrintf("size: %d, len:%d.\n", size, len);
00390 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00391 }
00392 newmsg = (char *)CmiAlloc(size);
00393 _MEMCHECK(newmsg);
00394 memcpy(newmsg, msg, len);
00395 node->asm_rank = rank;
00396 node->asm_total = size;
00397 node->asm_fill = len;
00398 node->asm_msg = newmsg;
00399 } else {
00400 size = len - DGRAM_HEADER_SIZE;
00401 if (node->asm_fill+size > node->asm_total) {
00402 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00403 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00404 }
00405 memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
00406 node->asm_fill += size;
00407 }
00408
00409 if (node->asm_fill == node->asm_total) {
00410 int total_size = node->asm_total;
00411 node->asm_msg = 0;
00412
00413
00414 #if CMK_BROADCAST_SPANNING_TREE
00415 if (rank == DGRAM_BROADCAST
00416 #if CMK_NODE_QUEUE_AVAILABLE
00417 || rank == DGRAM_NODEBROADCAST
00418 #endif
00419 )
00420 SendSpanningChildren(NULL, 0, total_size, newmsg, broot, rank);
00421 #elif CMK_BROADCAST_HYPERCUBE
00422 if (rank == DGRAM_BROADCAST
00423 #if CMK_NODE_QUEUE_AVAILABLE
00424 || rank == DGRAM_NODEBROADCAST
00425 #endif
00426 )
00427 SendHypercube(NULL, 0, total_size, newmsg, broot, rank);
00428 #endif
00429
00430 switch (rank) {
00431 case DGRAM_BROADCAST: {
00432 for (i=1; i<_Cmi_mynodesize; i++)
00433 CmiPushPE(i, CopyMsg(newmsg, total_size));
00434 CmiPushPE(0, newmsg);
00435 break;
00436 }
00437 #if CMK_NODE_QUEUE_AVAILABLE
00438 case DGRAM_NODEBROADCAST:
00439 case DGRAM_NODEMESSAGE: {
00440 CmiPushNode(newmsg);
00441 break;
00442 }
00443 #endif
00444 default:
00445 CmiPushPE(rank, newmsg);
00446 }
00447 }
00448 }
00449 else {
00450 #ifdef CMK_USE_CHECKSUM
00451 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
00452 #else
00453 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
00454 CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00455 #endif
00456 CmiPrintf("recved: rank:%d src:%d mag:%d seqno:%d len:%d\n", rank, srcpe, magic, seqno, len);
00457 }
00458 }
00459 else {
00460 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
00461 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
00462 }
00463 }
00464
00465
00466 static int processEvent(gm_recv_event_t *e)
00467 {
00468 int size, len;
00469 char *msg, *buf;
00470 int status = 1;
00471 switch (gm_ntohc(e->recv.type))
00472 {
00473
00474 case GM_FAST_PEER_RECV_EVENT:
00475 case GM_FAST_RECV_EVENT:
00476 MACHSTATE(4,"Incoming message")
00477 msg = gm_ntohp(e->recv.message);
00478 len = gm_ntohl(e->recv.length);
00479 processMessage(msg, len);
00480 break;
00481
00482
00483
00484
00485 case GM_HIGH_RECV_EVENT:
00486 case GM_RECV_EVENT:
00487 MACHSTATE(4,"Incoming message")
00488 size = gm_ntohc(e->recv.size);
00489 msg = gm_ntohp(e->recv.buffer);
00490 len = gm_ntohl(e->recv.length);
00491 processMessage(msg, len);
00492 gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
00493 break;
00494 case GM_NO_RECV_EVENT:
00495 return 0;
00496 case GM_ALARM_EVENT:
00497 status = 0;
00498 default:
00499 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
00500 gm_unknown(gmport, e);
00501 }
00502 return status;
00503 }
00504
00505
00506 #ifdef __FAULT__
00507 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
00508 {
00509 PendingMsg out = (PendingMsg)context;
00510 void *msg = out->msg;
00511
00512 printf("[%d] drop_send_callback dropped msg: %p\n", CmiMyPe(), msg);
00513 #if !CMK_MSGPOOL
00514 gm_dma_free(gmport, msg);
00515 #else
00516 putPool(msg);
00517 #endif
00518
00519 FreePendingMsg(out);
00520 }
00521 #else
00522 void send_callback(struct gm_port *p, void *context, gm_status_t status);
00523 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
00524 {
00525 PendingMsg out = (PendingMsg)context;
00526 void *msg = out->msg;
00527 gm_send_with_callback(gmport, msg, out->size, out->length,
00528 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00529 send_callback, out);
00530 }
00531 #endif
00532
00533 void send_callback(struct gm_port *p, void *context, gm_status_t status)
00534 {
00535 PendingMsg out = (PendingMsg)context;
00536 void *msg = out->msg;
00537 unsigned char cksum;
00538 OtherNode node = nodes+out->node_idx;
00539
00540 if (status != GM_SUCCESS) {
00541 int srcpe, seqno, magic, broot;
00542 char rank;
00543 char *errmsg;
00544 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00545 errmsg = getErrorMsg(status);
00546 CmiPrintf("GM Error> PE:%d send to msg %p node %d rank %d mach_id %d port %d len %d size %d failed to complete (error %d): %s\n", srcpe, msg, out->node_idx, rank, out->mach_id, out->dataport, out->length, out->size, status, errmsg);
00547 switch (status) {
00548 #ifdef __FAULT__
00549 case GM_SEND_DROPPED: {
00550 OtherNode node = nodes + out->node_idx;
00551 if (out->mach_id == node->mach_id && out->dataport == node->dataport) {
00552
00553 gm_send_with_callback(gmport, msg, out->size, out->length,
00554 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00555 send_callback, out);
00556 return;
00557 }
00558 }
00559 default: {
00560 gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00561 drop_send_callback, out);
00562 return;
00563 }
00564 #else
00565 case GM_SEND_TIMED_OUT: {
00566 CmiPrintf("gm send_callback timeout, drop sends and re-enable send. \n");
00567 gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00568 drop_send_callback, out);
00569 node->disable=1;
00570 return;
00571 }
00572 case GM_SEND_DROPPED:
00573 CmiPrintf("Got DROPPED_SEND notification, resend\n");
00574 gm_send_with_callback(gmport, msg, out->size, out->length,
00575 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00576 send_callback, out);
00577 return;
00578 default:
00579 CmiAbort("gm send_callback failed");
00580 #endif
00581 }
00582 }
00583
00584 #ifdef CMK_USE_CHECKSUM
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 #endif
00595
00596 #if !CMK_MSGPOOL
00597 gm_dma_free(gmport, msg);
00598 #else
00599 putPool(msg);
00600 #endif
00601
00602 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
00603 FreePendingMsg(out);
00604
00605
00606 node->gm_pending --;
00607 if (node->disable && node->gm_pending == 0) node->disable = 0;
00608
00609
00610 send_progress();
00611 }
00612
00613 static void send_progress()
00614 {
00615 static int nextnode = 0;
00616 int skip;
00617 OtherNode node;
00618 PendingMsg out;
00619
00620 while (1)
00621 {
00622 int sent = 0;
00623 for (skip=0; skip<_Cmi_numnodes; skip++) {
00624 node = nodes+nextnode;
00625 nextnode = (nextnode + 1) % _Cmi_numnodes;
00626 if (node->disable) continue;
00627 out = peek_sending(node);
00628 if (!out) continue;
00629 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
00630 if (dataport == out->dataport) {
00631 gm_send_to_peer_with_callback(gmport, out->msg, out->size,
00632 out->length, GM_HIGH_PRIORITY, out->mach_id,
00633 send_callback, out);
00634 }
00635 else {
00636 gm_send_with_callback(gmport, out->msg, out->size, out->length,
00637 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00638 send_callback, out);
00639 }
00640 node->gm_pending ++;
00641 sent=1;
00642
00643 dequeue_sending(node);
00644 #if GM_STATS
00645 gm_stats[out->size] ++;
00646
00647 {
00648 PendingMsg curout;
00649 curout = peek_sending(node);
00650 if (curout) {
00651 out = curout->next;
00652 while (out) {
00653 possible_streamed ++;
00654 out = out->next;
00655 }
00656 }
00657 }
00658 #endif
00659 }
00660 else return;
00661 }
00662 if (sent==0) return;
00663 }
00664 }
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675 void EnqueueOutgoingDgram
00676 (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot)
00677 {
00678 char *buf;
00679 int size, len, seqno;
00680 int alloclen, allocSize;
00681
00682
00683
00684
00685
00686
00687
00688 len = dlen + DGRAM_HEADER_SIZE;
00689
00690
00691
00692
00693 #if !CMK_MSGPOOL
00694 buf = (char *)gm_dma_malloc(gmport, len);
00695 #else
00696 getPool(buf, len);
00697 #endif
00698 _MEMCHECK(buf);
00699
00700 seqno = node->send_next;
00701 DgramHeaderMake(buf, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
00702 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00703 memcpy(buf+DGRAM_HEADER_SIZE, ptr, dlen);
00704 #ifdef CMK_USE_CHECKSUM
00705 {
00706 DgramHeader *head = (DgramHeader *)buf;
00707 head->magic ^= computeCheckSum(buf, len);
00708 }
00709 #endif
00710 size = gm_min_size_for_length(len);
00711
00712
00713 if (pendinglen != 0) {
00714
00715
00716 enqueue_sending(buf, len, node, size);
00717 return;
00718 }
00719 enqueue_sending(buf, len, node, size);
00720 send_progress();
00721 }
00722
00723
00724 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00725 {
00726 int size; char *data;
00727
00728 size = ogm->size - DGRAM_HEADER_SIZE;
00729 data = ogm->data + DGRAM_HEADER_SIZE;
00730 #if GM_STATS
00731 if (size > Cmi_dgram_max_data) defrag ++;
00732 #endif
00733 while (size > Cmi_dgram_max_data) {
00734 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00735 data += Cmi_dgram_max_data;
00736 size -= Cmi_dgram_max_data;
00737 }
00738 if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00739
00740
00741 while (pendinglen >= MAXPENDINGSEND) {
00742
00743
00744 MACHSTATE(4,"Polling until token available")
00745 CommunicationServer_nolock(0);
00746 }
00747 }
00748
00749
00750
00751 static void send_callback_nothing(struct gm_port *p, void *context, gm_status_t status)
00752 {
00753 gm_dma_free(gmport, context);
00754 }
00755
00756 static void sendBarrierMessage(int pe)
00757 {
00758 int len = 32;
00759 char *buf = (char *)gm_dma_malloc(gmport, len);
00760 int size = gm_min_size_for_length(len);
00761 OtherNode node = nodes + pe;
00762 CmiAssert(buf);
00763 gm_send_with_callback(gmport, buf, size, len,
00764 GM_HIGH_PRIORITY, node->mach_id, node->dataport,
00765 send_callback_nothing, buf);
00766 }
00767
00768 static void recvBarrierMessage()
00769 {
00770 gm_recv_event_t *e;
00771 int size, len;
00772 char *msg;
00773 while (1) {
00774 e = gm_receive(gmport);
00775 switch (gm_ntohc(e->recv.type))
00776 {
00777 case GM_HIGH_RECV_EVENT:
00778 case GM_RECV_EVENT:
00779 MACHSTATE(4,"Incoming message")
00780 size = gm_ntohc(e->recv.size);
00781 msg = gm_ntohp(e->recv.buffer);
00782 len = gm_ntohl(e->recv.length);
00783 gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
00784 return;
00785 case GM_NO_RECV_EVENT:
00786 continue ;
00787 default:
00788 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
00789 gm_unknown(gmport, e);
00790 }
00791 }
00792 }
00793
00794
00795 int CmiBarrier()
00796 {
00797 int len, size, i;
00798 int status;
00799 int count = 0;
00800 OtherNode node;
00801 int numnodes = CmiNumNodes();
00802 if (CmiMyRank() == 0) {
00803
00804 if (CmiMyNode() != 0) {
00805 sendBarrierMessage(0);
00806 }
00807
00808 if (CmiMyNode() == 0)
00809 {
00810 for (count = 1; count < numnodes; count ++)
00811 {
00812 recvBarrierMessage();
00813 }
00814
00815 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00816 int p = i;
00817 if (p > numnodes - 1) break;
00818
00819 sendBarrierMessage(p);
00820 }
00821 }
00822
00823 if (CmiMyNode() != 0)
00824 {
00825 recvBarrierMessage();
00826 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00827 int p = CmiMyNode();
00828 p = BROADCAST_SPANNING_FACTOR*p + i;
00829 if (p > numnodes - 1) break;
00830 p = p%numnodes;
00831
00832 sendBarrierMessage(p);
00833 }
00834 }
00835 }
00836 CmiNodeAllBarrier();
00837
00838 return 0;
00839 }
00840
00841
00842 int CmiBarrierZero()
00843 {
00844 int i;
00845
00846 if (CmiMyRank() == 0) {
00847 if (CmiMyNode()) {
00848 sendBarrierMessage(0);
00849 }
00850 else {
00851 for (i=0; i<CmiNumNodes()-1; i++)
00852 {
00853 recvBarrierMessage();
00854 }
00855 }
00856 }
00857 CmiNodeAllBarrier();
00858 return 0;
00859 }
00860
00861
00862
00863
00864
00865
00866
00867
00868 static int maxsize;
00869
00870 void CmiMachineInit(char **argv)
00871 {
00872 int dataport_max=16;
00873 gm_status_t status;
00874 int device, i, j;
00875 int retry = 10;
00876 char *buf;
00877 int mlen;
00878
00879 MACHSTATE(3,"CmiMachineInit {");
00880
00881 gmport = NULL;
00882
00883 if (dataport == -1) return;
00884
00885
00886 for (i=0; i<retry; i++) {
00887 status = gm_init();
00888 if (status == GM_SUCCESS) break;
00889 sleep(1);
00890 }
00891 if (status != GM_SUCCESS) {
00892 printf("Cannot open GM library (does the machine have a GM card?)\n");
00893 gm_perror("gm_init", status);
00894 return;
00895 }
00896
00897 device = 0;
00898 for (dataport=2;dataport<dataport_max;dataport++) {
00899 char portname[200];
00900 sprintf(portname, "converse_port%d_%d", Cmi_charmrun_pid, _Cmi_mynode);
00901 #if CMK_USE_GM2
00902 status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_2_0);
00903 #else
00904 status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_1_1);
00905 #endif
00906 if (status == GM_SUCCESS) { break; }
00907 }
00908 if (dataport==dataport_max)
00909 {
00910 dataport=0;
00911 return;
00912 }
00913
00914
00915 status = gm_get_node_id(gmport, (unsigned int *)&Cmi_mach_id);
00916 if (status != GM_SUCCESS) { gm_perror("gm_get_node_id", status); return; }
00917 #if CMK_USE_GM2
00918 gm_node_id_to_global_id(gmport, (unsigned int)Cmi_mach_id, (unsigned int*)&Cmi_mach_id);
00919 #endif
00920
00921
00922 skt_set_abort(gmExit);
00923
00924
00925
00926
00927
00928
00929 maxsize = 16;
00930 CmiGetArgIntDesc(argv,"+gm_maxsize",&maxsize,"maximum packet size in rank (2^maxsize)");
00931
00932 #if GM_STATS
00933 gm_stats = (int*)malloc((maxsize+1) * sizeof(int));
00934 for (i=0; i<=maxsize; i++) gm_stats[i] = 0;
00935 #endif
00936
00937 for (i=1; i<=maxsize; i++) {
00938 int len = gm_max_length_for_size(i);
00939 int num = 2;
00940
00941 maxMsgSize = len;
00942
00943 if (i<5) num = 0;
00944 else if (i<7) num = 4;
00945 else if (i<13) num = 20;
00946 else if (i<17) num = 10;
00947 else if (i>22) num = 1;
00948 for (j=0; j<num; j++) {
00949 buf = gm_dma_malloc(gmport, len);
00950 _MEMCHECK(buf);
00951 gm_provide_receive_buffer(gmport, buf, i, GM_HIGH_PRIORITY);
00952 }
00953 }
00954 Cmi_dgram_max_data = maxMsgSize - DGRAM_HEADER_SIZE;
00955
00956 status = gm_set_acceptable_sizes (gmport, GM_HIGH_PRIORITY, (1<<(maxsize+1))-1);
00957
00958 gm_free_send_tokens (gmport, GM_HIGH_PRIORITY,
00959 gm_num_send_tokens (gmport));
00960
00961 #if CMK_MSGPOOL
00962 msgpool[msgNums++] = gm_dma_malloc(gmport, maxMsgSize);
00963 #endif
00964
00965
00966 gm_initialize_alarm(&gmalarm);
00967
00968 MACHSTATE(3,"} CmiMachineInit");
00969 }
00970
00971 void CmiCommunicationInit(char **argv)
00972 {
00973 }
00974
00975 void CmiMachineExit()
00976 {
00977 #if GM_STATS
00978 int i;
00979 int mype;
00980 char fname[128];
00981 sprintf(fname, "gm-stats.%d", CmiMyPe());
00982 gmf = fopen(fname, "w");
00983 mype = CmiMyPe();
00984 for (i=5; i<=maxsize; i++) {
00985 fprintf(gmf, "[%d] size:%d count:%d\n", mype, i, gm_stats[i]);
00986 }
00987 fprintf(gmf, "[%d] max quelen: %d possible streaming: %d defrag: %d \n", mype, maxQueueLength, possible_streamed, defrag);
00988 fclose(gmf);
00989 #endif
00990 }
00991
00992 void CmiGmConvertMachineID(unsigned int *mach_id)
00993 {
00994 #if CMK_USE_GM2
00995 gm_status_t status;
00996 int newid;
00997
00998 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
00999 status = gm_global_id_to_node_id(gmport, *mach_id, &newid);
01000 if (status == GM_SUCCESS) *mach_id = newid;
01001 #endif
01002 }
01003
01004
01005 void CmiCheckGmStatus()
01006 {
01007 int i;
01008 int doabort = 0;
01009 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
01010 if (gmport == NULL) machine_exit(1);
01011 for (i=0; i<_Cmi_numnodes; i++) {
01012 gm_status_t status;
01013 char uid[6], str[100];
01014 unsigned int mach_id=nodes[i].mach_id;
01015 status = gm_node_id_to_unique_id(gmport, mach_id, uid);
01016 if (status != GM_SUCCESS || ( uid[0]==0 && uid[1]== 0
01017 && uid[2]==0 && uid[3]==0 && uid[4]==0 && uid[5]==0)) {
01018 CmiPrintf("Error> gm node %d can't contact node %d. \n", CmiMyPe(), i);
01019 doabort = 1;
01020 }
01021
01022 }
01023 if (doabort) CmiAbort("");
01024 }
01025
01026 static int gmExit(int code,const char *msg)
01027 {
01028 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
01029 machine_exit(code);
01030 }
01031
01032
01033 static char *getErrorMsg(gm_status_t status)
01034 {
01035 char *errmsg;
01036 switch (status) {
01037 case GM_SEND_TIMED_OUT:
01038 errmsg = "send time out"; break;
01039 case GM_SEND_REJECTED:
01040 errmsg = "send rejected"; break;
01041 case GM_SEND_TARGET_NODE_UNREACHABLE:
01042 errmsg = "target node unreachable"; break;
01043 case GM_SEND_TARGET_PORT_CLOSED:
01044 errmsg = "target port closed"; break;
01045 case GM_SEND_DROPPED:
01046 errmsg = "send dropped"; break;
01047 default:
01048 errmsg = "unknown error"; break;
01049 }
01050 return errmsg;
01051 }
01052
01053 #ifdef __ONESIDED_IMPL
01054 #ifdef __ONESIDED_GM_HARDWARE
01055
01056 struct CmiCb {
01057 CmiRdmaCallbackFn fn;
01058 void *param;
01059 };
01060 typedef struct CmiCb CmiCb;
01061
01062 struct CmiRMA {
01063 int type;
01064 union {
01065 int completed;
01066 CmiCb *cb;
01067 } ready;
01068 };
01069 typedef struct CmiRMA CmiRMA;
01070
01071 struct CmiRMAMsg {
01072 char core[CmiMsgHeaderSizeBytes];
01073 CmiRMA* stat;
01074 };
01075 typedef struct CmiRMAMsg CmiRMAMsg;
01076
01077 struct RMAPutMsg {
01078 char core[CmiMsgHeaderSizeBytes];
01079 void *Saddr;
01080 void *Taddr;
01081 unsigned int size;
01082 unsigned int targetId;
01083 unsigned int sourceId;
01084 unsigned int dataport;
01085 CmiRMA *stat;
01086 };
01087 typedef struct RMAPutMsg RMAPutMsg;
01088
01089 void *CmiDMAAlloc(int size) {
01090 void *addr = gm_dma_calloc(gmport, 1, gm_max_length_for_size(size));
01091
01092 return addr;
01093 }
01094
01095 int CmiRegisterMemory(void *addr, unsigned int size) {
01096 gm_status_t status;
01097 status = gm_register_memory(gmport, addr, size);
01098 if (status != GM_SUCCESS) {
01099 CmiPrintf("Cannot register memory at %p of size %d\n",addr,size);
01100 gm_perror("registerMemory", status);
01101 return 0;
01102 }
01103
01104 return 1;
01105 }
01106
01107 int CmiUnRegisterMemory(void *addr, unsigned int size) {
01108 gm_status_t status;
01109 status = gm_deregister_memory(gmport, addr, size);
01110 if (status != GM_SUCCESS) {
01111 CmiPrintf("Cannot unregister memory at %p of size %d\n",addr,size);
01112 gm_perror("UnregisterMemory", status);
01113 return 0;
01114 }
01115 return 1;
01116 }
01117
01118 void put_callback(struct gm_port *p, void *context, gm_status_t status)
01119 {
01120 RMAPutMsg *out = (RMAPutMsg*)context;
01121 unsigned int destMachId = (nodes_by_pe[out->targetId])->mach_id;
01122 if (status != GM_SUCCESS) {
01123 switch (status) {
01124 case GM_SEND_DROPPED:
01125 CmiPrintf("Got DROPPED_PUT notification, resend\n");
01126 gm_directed_send_with_callback(gmport,out->Saddr,(gm_remote_ptr_t)(out->Taddr),
01127 out->size,GM_HIGH_PRIORITY,
01128 destMachId, out->dataport,
01129 put_callback, out);
01130 return;
01131 default:
01132 CmiAbort("gm send_callback failed");
01133 }
01134 }
01135 if(out->stat->type==1) {
01136 out->stat->ready.completed = 1;
01137
01138 }
01139 else {
01140 (*(out->stat->ready.cb->fn))(out->stat->ready.cb->param);
01141
01142 CmiFree(out->stat->ready.cb);
01143 CmiFree(out->stat);
01144 }
01145 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
01146 return;
01147 }
01148
01149 void *CmiPut(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size) {
01150 unsigned int dataport = (nodes_by_pe[targetId])->dataport;
01151 unsigned int destMachId = (nodes_by_pe[targetId])->mach_id;
01152 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
01153 context->Saddr = Saddr;
01154 context->Taddr = Taddr;
01155 context->size = size;
01156 context->targetId = targetId;
01157 context->sourceId = sourceId;
01158 context->dataport = dataport;
01159 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
01160 context->stat->type = 1;
01161 context->stat->ready.completed = 0;
01162 void *stat = (void*)(context->stat);
01163
01164 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
01165
01166 gm_directed_send_with_callback(gmport,Saddr,(gm_remote_ptr_t)Taddr,size,
01167 GM_HIGH_PRIORITY,destMachId,dataport,
01168 put_callback,(void*)context);
01169 }
01170 return stat;
01171 }
01172
01173 void CmiPutCb(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size, CmiRdmaCallbackFn fn, void *param) {
01174 unsigned int dataport = (nodes_by_pe[targetId])->dataport;
01175 unsigned int destMachId = (nodes_by_pe[targetId])->mach_id;
01176 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
01177
01178 context->Saddr = Saddr;
01179 context->Taddr = Taddr;
01180 context->size = size;
01181 context->targetId = targetId;
01182 context->sourceId = sourceId;
01183 context->dataport = dataport;
01184 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
01185 context->stat->type = 0;
01186 context->stat->ready.cb = (CmiCb*)CmiAlloc(sizeof(CmiCb));
01187 context->stat->ready.cb->fn = fn;
01188 context->stat->ready.cb->param = param;
01189
01190 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
01191
01192 gm_directed_send_with_callback(gmport,Saddr,(gm_remote_ptr_t)Taddr,size,
01193 GM_HIGH_PRIORITY,destMachId,dataport,
01194 put_callback,(void*)context);
01195 }
01196 return;
01197 }
01198
01199 void handleGetSrc(void *msg) {
01200 CmiRMA* stat = ((CmiRMAMsg*)msg)->stat;
01201 if(stat->type==1) {
01202 stat->ready.completed = 1;
01203
01204 }
01205 else {
01206 (*(stat->ready.cb->fn))(stat->ready.cb->param);
01207
01208 CmiFree(stat->ready.cb);
01209 CmiFree(stat);
01210 }
01211 CmiFree(msg);
01212 return;
01213 }
01214
01215 void get_callback_dest(struct gm_port *p, void *context, gm_status_t status)
01216 {
01217 RMAPutMsg *out = (RMAPutMsg*)context;
01218 int sizeRmaStat = sizeof(CmiRMAMsg);
01219 char *msgRmaStat;
01220 unsigned int srcMachId = (nodes_by_pe[out->targetId])->mach_id;
01221 if (status != GM_SUCCESS) {
01222 switch (status) {
01223 case GM_SEND_DROPPED:
01224 CmiPrintf("Got DROPPED_PUT notification, resend\n");
01225 gm_directed_send_with_callback(gmport,out->Saddr,(int)(out->Taddr),
01226 out->size,GM_HIGH_PRIORITY,
01227 out->targetId, out->dataport,
01228 get_callback_dest, out);
01229 return;
01230 default:
01231 CmiAbort("gm send_callback failed");
01232 }
01233 }
01234 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
01235
01236 msgRmaStat = (void*)CmiAlloc(sizeRmaStat);
01237 ((CmiRMAMsg*)msgRmaStat)->stat = out->stat;
01238 CmiSetHandler(msgRmaStat,getSrcHandler);
01239 CmiSyncSendAndFree(out->targetId,sizeRmaStat,msgRmaStat);
01240 return;
01241 }
01242
01243 void handleGetDest(void *msg) {
01244 RMAPutMsg *context1 = (RMAPutMsg*)msg;
01245 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
01246 unsigned int srcMachId = (nodes_by_pe[context1->sourceId])->mach_id;
01247 context->Saddr = context1->Taddr;
01248 context->Taddr = context1->Saddr;
01249 context->size = context1->size;
01250 context->targetId = context1->sourceId;
01251 context->sourceId = context1->targetId;
01252 context->dataport = context1->dataport;
01253 context->stat = context1->stat;
01254
01255 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
01256
01257 gm_directed_send_with_callback(gmport,context->Saddr,(gm_remote_ptr_t)context->Taddr,context->size,
01258 GM_HIGH_PRIORITY,srcMachId,context->dataport,
01259 get_callback_dest,(void*)context);
01260 }
01261 CmiFree(msg);
01262 return;
01263 }
01264
01265
01266
01267
01268 void *CmiGet(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size) {
01269 int dataport = (nodes_by_pe[targetId])->dataport;
01270 int sizeRma;
01271 char *msgRma;
01272 RMAPutMsg *context;
01273 sizeRma = sizeof(RMAPutMsg);
01274 msgRma = (void*)CmiAlloc(sizeRma);
01275
01276 context = (RMAPutMsg*)msgRma;
01277 context->Saddr = Saddr;
01278 context->Taddr = Taddr;
01279 context->size = size;
01280 context->targetId = targetId;
01281 context->sourceId = sourceId;
01282 context->dataport = dataport;
01283 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
01284 context->stat->type = 1;
01285 context->stat->ready.completed = 0;
01286 void *stat = (void*)(context->stat);
01287
01288 CmiSetHandler(msgRma,getDestHandler);
01289 CmiSyncSendAndFree(targetId,sizeRma,msgRma);
01290
01291 return stat;
01292 }
01293
01294 void CmiGetCb(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size, CmiRdmaCallbackFn fn, void *param) {
01295 int dataport = (nodes_by_pe[targetId])->dataport;
01296 int sizeRma;
01297 char *msgRma;
01298 RMAPutMsg *context;
01299 sizeRma = sizeof(RMAPutMsg);
01300 msgRma = (void*)CmiAlloc(sizeRma);
01301
01302 context = (RMAPutMsg*)msgRma;
01303 context->Saddr = Saddr;
01304 context->Taddr = Taddr;
01305 context->size = size;
01306 context->targetId = targetId;
01307 context->sourceId = sourceId;
01308 context->dataport = dataport;
01309 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
01310 context->stat->type = 0;
01311 context->stat->ready.cb = (CmiCb*)CmiAlloc(sizeof(CmiCb));
01312 context->stat->ready.cb->fn = fn;
01313 context->stat->ready.cb->param = param;
01314
01315 CmiSetHandler(msgRma,getDestHandler);
01316 CmiSyncSendAndFree(targetId,sizeRma,msgRma);
01317 return;
01318 }
01319
01320
01321 int CmiWaitTest(void *obj){
01322 CmiRMA *stat = (CmiRMA*)obj;
01323 return stat->ready.completed;
01324 }
01325
01326 #endif
01327 #endif
01328