arch/net/machine-gm.c

Go to the documentation of this file.
00001 
00032 #ifdef GM_API_VERSION_2_0
00033 #if GM_API_VERSION >= GM_API_VERSION_2_0
00034 #define CMK_USE_GM2    1
00035 #endif
00036 #endif
00037 
00038 /* default as in busywaiting mode */
00039 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00040 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
00041 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
00042 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
00043 
00044 #ifdef __ONESIDED_IMPL
00045 #ifdef __ONESIDED_GM_HARDWARE
00046 #include "conv-onesided.h"
00047 int getSrcHandler;
00048 int getDestHandler;
00049 void handleGetSrc(void *msg);
00050 void handleGetDest(void *msg);
00051 #endif
00052 #endif
00053 
00054 static gm_alarm_t gmalarm;
00055 
00056 /*#define CMK_USE_CHECKSUM*/
00057 
00058 /******************************************************************************
00059  *
00060  *  GM layer network statistics collection
00061  *
00062  *****************************************************************************/
00063 
00064 #define GM_STATS                0
00065 
00066 #if GM_STATS
00067 static FILE *gmf;                       /* one file per processor */
00068 static int  *gm_stats;                  /* send count for each size */
00069 static int   possible_streamed = 0;     /* possible streaming counts */
00070 static int   defrag = 0;                /* number of defragment */
00071 static int   maxQueueLength = 0;        /* maximum send queue length */
00072 #endif
00073 
00074 /******************************************************************************
00075  *
00076  * Send messages pending queue (used internally)
00077  *
00078  *****************************************************************************/
00079 
00080 
00081 /* max length of pending messages */
00082 #define MAXPENDINGSEND  500
00083 
00084 typedef struct PendingMsgStruct
00085 {
00086   void *msg;
00087   int length;           /* length of message */
00088   int size;             /* size of message, usually around log2(length)  */
00089   int mach_id;          /* receiver machine id */
00090   int dataport;         /* receiver data port */
00091   int node_idx;         /* receiver pe id */
00092   struct PendingMsgStruct *next;
00093 }
00094 *PendingMsg;
00095 
00096 static int pendinglen = 0;
00097 
00098 /* reuse PendingMsg memory */
00099 static PendingMsg pend_freelist=NULL;
00100 
00101 #define FreePendingMsg(d)       \
00102   d->next = pend_freelist;\
00103   pend_freelist = d;\
00104 
00105 #define MallocPendingMsg(d) \
00106   d = pend_freelist;\
00107   if (d==0) {d = ((PendingMsg)malloc(sizeof(struct PendingMsgStruct)));\
00108              _MEMCHECK(d);\
00109   } else pend_freelist = d->next;
00110 
00111 void enqueue_sending(char *msg, int length, OtherNode node, int size)
00112 {
00113   PendingMsg pm;
00114   MallocPendingMsg(pm);
00115   pm->msg = msg;
00116   pm->length = length;
00117   pm->mach_id = node->mach_id;
00118   pm->dataport = node->dataport;
00119   pm->node_idx = node-nodes;
00120   pm->size = size;
00121   pm->next = NULL;
00122   if (node->sendhead == NULL) {
00123     node->sendhead = node->sendtail = pm;
00124   }
00125   else {
00126     node->sendtail->next = pm;
00127     node->sendtail = pm;
00128   }
00129   pendinglen ++;
00130 #if GM_STATS
00131   if (pendinglen > maxQueueLength) maxQueueLength = pendinglen;
00132 #endif
00133 }
00134 
00135 #define peek_sending(node) (node->sendhead)
00136 
00137 #define dequeue_sending(node)  \
00138   if (node->sendhead != NULL) { \
00139     node->sendhead = node->sendhead->next;      \
00140     pendinglen --;      \
00141   }
00142 
00143 static void alarmcallback (void *context) {
00144   MACHSTATE(4,"GM Alarm callback executed")
00145 }
00146 static int processEvent(gm_recv_event_t *e);
00147 static void send_progress();
00148 static void alarmInterrupt(int arg);
00149 static int gmExit(int code,const char *msg);
00150 static char *getErrorMsg(gm_status_t status);
00151 
00152 /******************************************************************************
00153  *
00154  * DMA message pool
00155  *
00156  *****************************************************************************/
00157 
00158 #define CMK_MSGPOOL  1
00159 
00160 #define MAXMSGLEN  200
00161 
00162 static char* msgpool[MAXMSGLEN];
00163 static int msgNums = 0;
00164 
00165 static int maxMsgSize = 0;
00166 
00167 #define putPool(msg)    {       \
00168   if (msgNums == MAXMSGLEN) gm_dma_free(gmport, msg);   \
00169   else msgpool[msgNums++] = msg; }      
00170 
00171 #define getPool(msg, len)       {       \
00172   if (msgNums == 0) msg  = gm_dma_malloc(gmport, maxMsgSize);   \
00173   else msg = msgpool[--msgNums];        \
00174 }
00175 
00176 
00177 /******************************************************************************
00178  *
00179  * CmiNotifyIdle()-- wait until a packet comes in
00180  *
00181  *****************************************************************************/
00182 typedef struct {
00183 char none;  
00184 } CmiIdleState;
00185 
00186 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00187 
00188 static void CmiNotifyStillIdle(CmiIdleState *s);
00189 
00190 static void CmiNotifyBeginIdle(CmiIdleState *s)
00191 {
00192   CmiNotifyStillIdle(s);
00193 }
00194 
00195 
00196 
00197 static void CmiNotifyStillIdle(CmiIdleState *s)
00198 {
00199 #if CMK_SHARED_VARS_UNAVAILABLE
00200   /*No comm. thread-- listen on sockets for incoming messages*/
00201   int nreadable;
00202   gm_recv_event_t *e;
00203   int pollMs = 4;
00204 
00205 #define SLEEP_USING_ALARM 0
00206 #if SLEEP_USING_ALARM /*Enable the alarm, so we don't sleep forever*/
00207   gm_set_alarm (gmport, &gmalarm, (gm_u64_t) pollMs*1000, alarmcallback,
00208                     (void *)NULL );
00209 #endif
00210 
00211 #if SLEEP_USING_ALARM
00212   MACHSTATE(3,"Blocking on receive {")
00213   e = gm_blocking_receive_no_spin(gmport);
00214   MACHSTATE(3,"} receive returned");
00215 #else
00216   MACHSTATE(3,"CmiNotifyStillIdle NonBlocking on receive {")
00217   e = gm_receive(gmport);
00218   MACHSTATE(3,"} CmiNotifyStillIdle nonblocking receive returned");
00219 #endif
00220 
00221 #if SLEEP_USING_ALARM /*Cancel the alarm*/
00222   gm_cancel_alarm (&gmalarm);
00223 #endif
00224   
00225   /* have to handle this event now */
00226   CmiCommLock();
00227   nreadable = processEvent(e);
00228   CmiCommUnlock();
00229   if (nreadable) {
00230     return;
00231   }
00232 #else
00233   /*Comm. thread will listen on sockets-- just sleep*/
00234   CmiIdleLock_sleep(&CmiGetState()->idle,5);
00235 #endif
00236 }
00237 
00238 void CmiNotifyIdle(void) {
00239   CmiNotifyStillIdle(NULL);
00240 }
00241 
00242 /****************************************************************************
00243  *                                                                          
00244  * CheckSocketsReady
00245  *
00246  * Checks both sockets to see which are readable and which are writeable.
00247  * We check all these things at the same time since this can be done for
00248  * free with ``select.'' The result is stored in global variables, since
00249  * this is essentially global state information and several routines need it.
00250  *
00251  ***************************************************************************/
00252 
00253 int CheckSocketsReady(int withDelayMs)
00254 {   
00255   int nreadable;
00256   CMK_PIPE_DECL(withDelayMs);
00257 
00258   CmiStdoutAdd(CMK_PIPE_SUB);
00259   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00260 
00261   nreadable=CMK_PIPE_CALL();
00262   ctrlskt_ready_read = 0;
00263   dataskt_ready_read = 0;
00264   dataskt_ready_write = 0;
00265   
00266   if (nreadable == 0) {
00267     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00268     return nreadable;
00269   }
00270   if (nreadable==-1) {
00271     CMK_PIPE_CHECKERR();
00272     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00273     return CheckSocketsReady(0);
00274   }
00275   CmiStdoutCheck(CMK_PIPE_SUB);
00276   if (Cmi_charmrun_fd!=-1) 
00277           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00278   MACHSTATE(1,"} CheckSocketsReady")
00279   return nreadable;
00280 }
00281 
00282 /***********************************************************************
00283  * CommunicationServer()
00284  * 
00285  * This function does the scheduling of the tasks related to the
00286  * message sends and receives. 
00287  * It first check the charmrun port for message, and poll the gm event
00288  * for send complete and outcoming messages.
00289  *
00290  ***********************************************************************/
00291 
00292 /* always called from interrupt */
00293 static void ServiceCharmrun_nolock()
00294 {
00295   int again = 1;
00296   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00297   while (again)
00298   {
00299   again = 0;
00300   CheckSocketsReady(0);
00301   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
00302   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
00303   }
00304   MACHSTATE(2,"} ServiceCharmrun_nolock end")
00305 }
00306 
00307 static void CommunicationServer_nolock(int withDelayMs) {
00308   gm_recv_event_t *e;
00309 
00310   MACHSTATE(2,"CommunicationServer_nolock start {")
00311   while (1) {
00312     MACHSTATE(3,"Non-blocking receive {")
00313     e = gm_receive(gmport);
00314     MACHSTATE(3,"} Non-blocking receive")
00315     if (!processEvent(e)) break;
00316   }
00317   MACHSTATE(2,"}CommunicationServer_nolock end")
00318 }
00319 
00320 /*
00321 0: from smp thread
00322 1: from interrupt
00323 2: from worker thread
00324    Note in netpoll mode, charmrun service is only performed in interrupt, 
00325  pingCharmrun is from sig alarm, so it is lock free 
00326 */
00327 static void CommunicationServer(int withDelayMs, int where)
00328 {
00329   /* standalone mode */
00330   if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
00331 
00332   MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
00333 
00334   if (where == 1) {
00335     /* don't service charmrun if converse exits, this fixed a hang bug */
00336     if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
00337     return;
00338   }
00339 
00340   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00341 
00342   CmiCommLock();
00343   CommunicationServer_nolock(withDelayMs);
00344   CmiCommUnlock();
00345 
00346 #if CMK_IMMEDIATE_MSG
00347   if (where == 0)
00348   CmiHandleImmediate();
00349 #endif
00350 
00351   MACHSTATE(2,"} CommunicationServer")
00352 }
00353 
00354 static void processMessage(char *msg, int len)
00355 {
00356   char *newmsg;
00357   int rank, srcpe, seqno, magic, i;
00358   unsigned int broot;
00359   int size;
00360   unsigned char checksum;
00361   
00362   if (len >= DGRAM_HEADER_SIZE) {
00363     DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00364 #ifdef CMK_USE_CHECKSUM
00365     checksum = computeCheckSum(msg, len);
00366     if (checksum == 0)
00367 #else
00368     if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00369 #endif
00370     {
00371       OtherNode node = nodes_by_pe[srcpe];
00372       /* check seqno */
00373       if (seqno == node->recv_expect) {
00374         node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00375       }
00376       else if (seqno < node->recv_expect) {
00377         CmiPrintf("[%d] Warning: Past packet received from PE %d, something wrong with GM hardware? (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00378         CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
00379         return;
00380       }
00381       else {
00382          CmiPrintf("[%d] Error detected - Packet out of order from PE %d, something wrong with GM hardware? (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00383          CmiAbort("\n\n\t\tPacket out of order!!\n\n");
00384       }
00385       newmsg = node->asm_msg;
00386       if (newmsg == 0) {
00387         size = CmiMsgHeaderGetLength(msg);
00388         if (len > size) {
00389          CmiPrintf("size: %d, len:%d.\n", size, len);
00390          CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00391         }
00392         newmsg = (char *)CmiAlloc(size);
00393         _MEMCHECK(newmsg);
00394         memcpy(newmsg, msg, len);
00395         node->asm_rank = rank;
00396         node->asm_total = size;
00397         node->asm_fill = len;
00398         node->asm_msg = newmsg;
00399       } else {
00400         size = len - DGRAM_HEADER_SIZE;
00401         if (node->asm_fill+size > node->asm_total) {
00402          CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00403          CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00404         }
00405         memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
00406         node->asm_fill += size;
00407       }
00408       /* get a full packet */
00409       if (node->asm_fill == node->asm_total) {
00410         int total_size = node->asm_total;
00411         node->asm_msg = 0;
00412 
00413       /* do it after integration - the following function may re-entrant */
00414 #if CMK_BROADCAST_SPANNING_TREE
00415         if (rank == DGRAM_BROADCAST
00416 #if CMK_NODE_QUEUE_AVAILABLE
00417           || rank == DGRAM_NODEBROADCAST
00418 #endif
00419            )
00420           SendSpanningChildren(NULL, 0, total_size, newmsg, broot, rank);
00421 #elif CMK_BROADCAST_HYPERCUBE
00422         if (rank == DGRAM_BROADCAST
00423 #if CMK_NODE_QUEUE_AVAILABLE
00424           || rank == DGRAM_NODEBROADCAST
00425 #endif
00426            )
00427           SendHypercube(NULL, 0, total_size, newmsg, broot, rank);
00428 #endif
00429 
00430         switch (rank) {
00431         case DGRAM_BROADCAST: {
00432           for (i=1; i<_Cmi_mynodesize; i++)
00433             CmiPushPE(i, CopyMsg(newmsg, total_size));
00434           CmiPushPE(0, newmsg);
00435           break;
00436         }
00437 #if CMK_NODE_QUEUE_AVAILABLE
00438         case DGRAM_NODEBROADCAST: 
00439         case DGRAM_NODEMESSAGE: {
00440           CmiPushNode(newmsg);
00441           break;
00442         }
00443 #endif
00444         default:
00445           CmiPushPE(rank, newmsg);
00446         }    /* end of switch */
00447       }
00448     } 
00449     else {
00450 #ifdef CMK_USE_CHECKSUM
00451       CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
00452 #else
00453       CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n", 
00454                  CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00455 #endif
00456       CmiPrintf("recved: rank:%d src:%d mag:%d seqno:%d len:%d\n", rank, srcpe, magic, seqno, len);
00457     }
00458   } 
00459   else {
00460       CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
00461       CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
00462   }
00463 }
00464 
00465 /* return 1 - recv'ed  0 - no msg */
00466 static int processEvent(gm_recv_event_t *e)
00467 {
00468   int size, len;
00469   char *msg, *buf;
00470   int status = 1;
00471   switch (gm_ntohc(e->recv.type))
00472   {
00473       /* avoid copy from message to buffer */
00474     case GM_FAST_PEER_RECV_EVENT:
00475     case GM_FAST_RECV_EVENT:
00476       MACHSTATE(4,"Incoming message")
00477       msg = gm_ntohp(e->recv.message);
00478       len = gm_ntohl(e->recv.length);
00479       processMessage(msg, len);
00480       break;
00481 /*
00482     case GM_FAST_HIGH_PEER_RECV_EVENT:
00483     case GM_FAST_HIGH_RECV_EVENT:
00484 */
00485     case GM_HIGH_RECV_EVENT:
00486     case GM_RECV_EVENT:
00487       MACHSTATE(4,"Incoming message")
00488       size = gm_ntohc(e->recv.size);
00489       msg = gm_ntohp(e->recv.buffer);
00490       len = gm_ntohl(e->recv.length);
00491       processMessage(msg, len);
00492       gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
00493       break;
00494     case GM_NO_RECV_EVENT:
00495       return 0;
00496     case GM_ALARM_EVENT:
00497       status = 0;
00498     default:
00499       MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
00500       gm_unknown(gmport, e);
00501   }
00502   return status;
00503 }
00504 
00505 
00506 #ifdef __FAULT__ 
00507 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
00508 {
00509   PendingMsg out = (PendingMsg)context;
00510   void *msg = out->msg;
00511 
00512   printf("[%d] drop_send_callback dropped msg: %p\n", CmiMyPe(), msg);
00513 #if !CMK_MSGPOOL
00514   gm_dma_free(gmport, msg);
00515 #else
00516   putPool(msg);
00517 #endif
00518 
00519   FreePendingMsg(out);
00520 }
00521 #else
00522 void send_callback(struct gm_port *p, void *context, gm_status_t status);
00523 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
00524 {
00525   PendingMsg out = (PendingMsg)context;
00526   void *msg = out->msg;
00527   gm_send_with_callback(gmport, msg, out->size, out->length,
00528                         GM_HIGH_PRIORITY, out->mach_id, out->dataport, 
00529                         send_callback, out);
00530 }
00531 #endif
00532 
00533 void send_callback(struct gm_port *p, void *context, gm_status_t status)
00534 {
00535   PendingMsg out = (PendingMsg)context;
00536   void *msg = out->msg;
00537   unsigned char cksum;
00538   OtherNode  node = nodes+out->node_idx;
00539 
00540   if (status != GM_SUCCESS) { 
00541     int srcpe, seqno, magic, broot;
00542     char rank;
00543     char *errmsg;
00544     DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00545     errmsg = getErrorMsg(status);
00546     CmiPrintf("GM Error> PE:%d send to msg %p node %d rank %d mach_id %d port %d len %d size %d failed to complete (error %d): %s\n", srcpe, msg, out->node_idx, rank, out->mach_id, out->dataport, out->length, out->size, status, errmsg); 
00547     switch (status) {
00548 #ifdef __FAULT__ 
00549       case GM_SEND_DROPPED: {
00550         OtherNode node = nodes + out->node_idx;
00551         if (out->mach_id == node->mach_id && out->dataport == node->dataport) {
00552           /* it not crashed, resent */
00553           gm_send_with_callback(gmport, msg, out->size, out->length, 
00554                             GM_HIGH_PRIORITY, out->mach_id, out->dataport, 
00555                             send_callback, out);
00556           return;
00557         }
00558       }
00559       default: {
00560         gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00561                                              drop_send_callback, out);
00562         return;
00563       }
00564 #else
00565       case GM_SEND_TIMED_OUT: {
00566         CmiPrintf("gm send_callback timeout, drop sends and re-enable send. \n");
00567         gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
00568                          drop_send_callback, out);
00569         node->disable=1;        /* disable normal send */
00570         return;
00571       }
00572       case GM_SEND_DROPPED:
00573         CmiPrintf("Got DROPPED_SEND notification, resend\n");
00574         gm_send_with_callback(gmport, msg, out->size, out->length,
00575                             GM_HIGH_PRIORITY, out->mach_id, out->dataport, 
00576                             send_callback, out);
00577         return;
00578       default:
00579         CmiAbort("gm send_callback failed");
00580 #endif
00581     }
00582   }
00583 
00584 #ifdef CMK_USE_CHECKSUM
00585 /*
00586   {
00587   cksum = computeCheckSum((unsigned char*)msg, out->length);
00588   if (cksum != 0) {
00589     CmiPrintf("[%d] Message altered during send, checksum (%d) does not agree!\n", CmiMyPe(), cksum);
00590     CmiAbort("Myrinet error was detected!\n");
00591   }
00592   }
00593 */
00594 #endif
00595 
00596 #if !CMK_MSGPOOL
00597   gm_dma_free(gmport, msg);
00598 #else
00599   putPool(msg);
00600 #endif
00601 
00602   gm_free_send_token (gmport, GM_HIGH_PRIORITY);
00603   FreePendingMsg(out);
00604 
00605   /* send message pending in gm firmware */
00606   node->gm_pending --;
00607   if (node->disable && node->gm_pending == 0) node->disable = 0;
00608 
00609   /* since we have one free send token, start next send */
00610   send_progress();
00611 }
00612 
00613 static void send_progress()
00614 {
00615   static int nextnode = 0;
00616   int skip;
00617   OtherNode node;
00618   PendingMsg  out;
00619 
00620   while (1)
00621   {
00622     int sent = 0;
00623     for (skip=0; skip<_Cmi_numnodes; skip++) {
00624       node = nodes+nextnode;
00625       nextnode = (nextnode + 1) % _Cmi_numnodes;
00626       if (node->disable) continue;
00627       out = peek_sending(node);
00628       if (!out) continue;
00629       if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
00630         if (dataport == out->dataport) {
00631           gm_send_to_peer_with_callback(gmport, out->msg, out->size, 
00632                             out->length, GM_HIGH_PRIORITY, out->mach_id,
00633                             send_callback, out);
00634         }
00635         else {
00636           gm_send_with_callback(gmport, out->msg, out->size, out->length, 
00637                             GM_HIGH_PRIORITY, out->mach_id, out->dataport, 
00638                             send_callback, out);
00639         }
00640         node->gm_pending ++;
00641         sent=1;
00642         /* dequeue out, but not free it, used at callback */
00643         dequeue_sending(node);
00644 #if GM_STATS
00645         gm_stats[out->size] ++;
00646         /* if we streaming, count how many message we possibly can combine */
00647         {
00648         PendingMsg  curout;
00649         curout = peek_sending(node);
00650         if (curout) {
00651           out = curout->next;
00652           while (out) {
00653             possible_streamed ++;
00654             out = out->next;
00655           }
00656         }
00657         }
00658 #endif
00659       }
00660       else return;              /* no send token */
00661     }           /* end for */
00662     if (sent==0) return;
00663   }
00664 }
00665 
00666 
00667 /***********************************************************************
00668  * DeliverViaNetwork()
00669  *
00670  * This function is responsible for all non-local transmission. It
00671  * first allocate a send token, if fails, put the send message to
00672  * penging message queue, otherwise invoke the GM send.
00673  ***********************************************************************/
00674 
00675 void EnqueueOutgoingDgram
00676      (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot)
00677 {
00678   char *buf;
00679   int size, len, seqno;
00680   int alloclen, allocSize;
00681 
00682 /* CmiPrintf("DeliverViaNetwork: size:%d\n", size); */
00683 
00684   /* don't have to worry about ref count because we do copy, and
00685      ogm can be free'ed right away */
00686   /* ogm->refcount++; */
00687 
00688   len = dlen + DGRAM_HEADER_SIZE;
00689 
00690   /* allocate DMAable memory to prepare sending */
00691   /* FIXME: another memory copy here from user buffer to DMAable buffer */
00692   /* which however means the user buffer is untouched and can be reused */
00693 #if !CMK_MSGPOOL
00694   buf = (char *)gm_dma_malloc(gmport, len);
00695 #else
00696   getPool(buf, len);
00697 #endif
00698   _MEMCHECK(buf);
00699 
00700   seqno = node->send_next;
00701   DgramHeaderMake(buf, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
00702   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00703   memcpy(buf+DGRAM_HEADER_SIZE, ptr, dlen);
00704 #ifdef CMK_USE_CHECKSUM
00705   {
00706   DgramHeader *head = (DgramHeader *)buf;
00707   head->magic ^= computeCheckSum(buf, len);
00708   }
00709 #endif
00710   size = gm_min_size_for_length(len);
00711 
00712   /* if queue is not empty, enqueue msg. this is to guarantee the order */
00713   if (pendinglen != 0) {
00714     /* this potential screw up broadcast, because bcast packets can not be
00715        interrupted by other sends in CommunicationServer_nolock */
00716     enqueue_sending(buf, len, node, size);
00717     return;
00718   }
00719   enqueue_sending(buf, len, node, size);
00720   send_progress();
00721 }
00722 
00723 /* copy is ignored, since we always copy */
00724 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00725 {
00726   int size; char *data;
00727  
00728   size = ogm->size - DGRAM_HEADER_SIZE;
00729   data = ogm->data + DGRAM_HEADER_SIZE;
00730 #if GM_STATS
00731   if (size > Cmi_dgram_max_data) defrag ++;
00732 #endif
00733   while (size > Cmi_dgram_max_data) {
00734     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00735     data += Cmi_dgram_max_data;
00736     size -= Cmi_dgram_max_data;
00737   }
00738   if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
00739 
00740   /* a simple flow control */
00741   while (pendinglen >= MAXPENDINGSEND) {
00742       /* pending max len exceeded, busy wait until get a token 
00743          Doing this surprisingly improve the performance by 2s for 200MB msg */
00744       MACHSTATE(4,"Polling until token available")
00745       CommunicationServer_nolock(0);
00746   }
00747 }
00748 
00749 /* simple barrier at machine layer */
00750 /* assuming no other flying messages */
00751 static void send_callback_nothing(struct gm_port *p, void *context, gm_status_t status)
00752 {
00753   gm_dma_free(gmport, context);
00754 }
00755 
00756 static void sendBarrierMessage(int pe)
00757 {
00758   int len = 32;
00759   char *buf = (char *)gm_dma_malloc(gmport, len);
00760   int size = gm_min_size_for_length(len);
00761   OtherNode  node = nodes + pe;
00762   CmiAssert(buf);
00763   gm_send_with_callback(gmport, buf, size, len,
00764               GM_HIGH_PRIORITY, node->mach_id, node->dataport,
00765               send_callback_nothing, buf);
00766 }
00767 
00768 static void recvBarrierMessage()
00769 {
00770   gm_recv_event_t *e;
00771   int size, len;
00772   char *msg;
00773   while (1) {
00774     e = gm_receive(gmport);
00775     switch (gm_ntohc(e->recv.type))
00776     {
00777       case GM_HIGH_RECV_EVENT:
00778       case GM_RECV_EVENT:
00779         MACHSTATE(4,"Incoming message")
00780         size = gm_ntohc(e->recv.size);
00781         msg = gm_ntohp(e->recv.buffer);
00782         len = gm_ntohl(e->recv.length);
00783         gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
00784         return;
00785       case GM_NO_RECV_EVENT:
00786         continue ;
00787       default:
00788         MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
00789         gm_unknown(gmport, e);
00790     }
00791   }
00792 }
00793 
00794 /* happen at node level */
00795 int CmiBarrier()
00796 {
00797   int len, size, i;
00798   int status;
00799   int count = 0;
00800   OtherNode node;
00801   int numnodes = CmiNumNodes();
00802   if (CmiMyRank() == 0) {
00803     /* every one send to pe 0 */
00804     if (CmiMyNode() != 0) {
00805       sendBarrierMessage(0);
00806     }
00807     /* printf("[%d] HERE\n", CmiMyPe()); */
00808     if (CmiMyNode() == 0) 
00809     {
00810       for (count = 1; count < numnodes; count ++) 
00811       {
00812         recvBarrierMessage();
00813       }
00814       /* pe 0 broadcast */
00815       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00816         int p = i;
00817         if (p > numnodes - 1) break;
00818         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
00819         sendBarrierMessage(p);
00820       }
00821     }
00822     /* non 0 node waiting */
00823     if (CmiMyNode() != 0) 
00824     {
00825       recvBarrierMessage();
00826       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00827         int p = CmiMyNode();
00828         p = BROADCAST_SPANNING_FACTOR*p + i;
00829         if (p > numnodes - 1) break;
00830         p = p%numnodes;
00831         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
00832         sendBarrierMessage(p);
00833       }
00834     }
00835   }
00836   CmiNodeAllBarrier();
00837   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
00838   return 0;
00839 }
00840 
00841 /* everyone sends a message to pe 0 and go on */
00842 int CmiBarrierZero()
00843 {
00844   int i;
00845 
00846   if (CmiMyRank() == 0) {
00847     if (CmiMyNode()) {
00848       sendBarrierMessage(0);
00849     }
00850     else {
00851       for (i=0; i<CmiNumNodes()-1; i++)
00852       {
00853         recvBarrierMessage();
00854       }
00855     }
00856   }
00857   CmiNodeAllBarrier();
00858   return 0;
00859 }
00860 
00861 /***********************************************************************
00862  * CmiMachineInit()
00863  *
00864  * This function intialize the GM board. Set receive buffer
00865  *
00866  ***********************************************************************/
00867 
00868 static int maxsize;
00869 
00870 void CmiMachineInit(char **argv)
00871 {
00872   int dataport_max=16; /*number of largest GM port to check*/
00873   gm_status_t status;
00874   int device, i, j;
00875   int retry = 10;
00876   char *buf;
00877   int mlen;
00878 
00879   MACHSTATE(3,"CmiMachineInit {");
00880 
00881   gmport = NULL;
00882   /* standalone mode */
00883   if (dataport == -1) return; 
00884 
00885   /* try a few times init gm */
00886   for (i=0; i<retry; i++) {
00887     status = gm_init();
00888     if (status == GM_SUCCESS) break;
00889     sleep(1);
00890   }
00891   if (status != GM_SUCCESS) { 
00892     printf("Cannot open GM library (does the machine have a GM card?)\n");
00893     gm_perror("gm_init", status); 
00894     return; 
00895   }
00896   
00897   device = 0;
00898   for (dataport=2;dataport<dataport_max;dataport++) {
00899     char portname[200];
00900     sprintf(portname, "converse_port%d_%d", Cmi_charmrun_pid, _Cmi_mynode);
00901 #if CMK_USE_GM2
00902     status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_2_0);
00903 #else
00904     status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_1_1);
00905 #endif
00906     if (status == GM_SUCCESS) { break; }
00907   }
00908   if (dataport==dataport_max) 
00909   { /* Couldn't open any GM port... */
00910     dataport=0;
00911     return;
00912   }
00913   
00914   /* get our node id */
00915   status = gm_get_node_id(gmport, (unsigned int *)&Cmi_mach_id);
00916   if (status != GM_SUCCESS) { gm_perror("gm_get_node_id", status); return; }
00917 #if CMK_USE_GM2
00918   gm_node_id_to_global_id(gmport, (unsigned int)Cmi_mach_id, (unsigned int*)&Cmi_mach_id);
00919 #endif
00920   
00921   /* default abort will take care of gm clean up */
00922   skt_set_abort(gmExit);
00923 
00924   /* set up recv buffer */
00925 /*
00926   maxsize = gm_min_size_for_length(4096);
00927   Cmi_dgram_max_data = 4096 - DGRAM_HEADER_SIZE;
00928 */
00929   maxsize = 16;
00930   CmiGetArgIntDesc(argv,"+gm_maxsize",&maxsize,"maximum packet size in rank (2^maxsize)");
00931 
00932 #if GM_STATS
00933   gm_stats = (int*)malloc((maxsize+1) * sizeof(int));
00934   for (i=0; i<=maxsize; i++) gm_stats[i] = 0;
00935 #endif
00936 
00937   for (i=1; i<=maxsize; i++) {
00938     int len = gm_max_length_for_size(i);
00939     int num = 2;
00940 
00941     maxMsgSize = len;
00942 
00943     if (i<5) num = 0;
00944     else if (i<7)  num = 4;
00945     else if (i<13)  num = 20;
00946     else if (i<17)  num = 10;
00947     else if (i>22) num = 1;
00948     for (j=0; j<num; j++) {
00949       buf = gm_dma_malloc(gmport, len);
00950       _MEMCHECK(buf);
00951       gm_provide_receive_buffer(gmport, buf, i, GM_HIGH_PRIORITY);
00952     }
00953   }
00954   Cmi_dgram_max_data = maxMsgSize - DGRAM_HEADER_SIZE;
00955 
00956   status = gm_set_acceptable_sizes (gmport, GM_HIGH_PRIORITY, (1<<(maxsize+1))-1);
00957 
00958   gm_free_send_tokens (gmport, GM_HIGH_PRIORITY,
00959                        gm_num_send_tokens (gmport));
00960 
00961 #if CMK_MSGPOOL
00962   msgpool[msgNums++]  = gm_dma_malloc(gmport, maxMsgSize);
00963 #endif
00964 
00965   /* alarm will ping charmrun */
00966   gm_initialize_alarm(&gmalarm);
00967 
00968   MACHSTATE(3,"} CmiMachineInit");
00969 }
00970 
00971 void CmiCommunicationInit(char **argv)
00972 {
00973 }
00974 
00975 void CmiMachineExit()
00976 {
00977 #if GM_STATS
00978   int i;
00979   int mype;
00980   char fname[128];
00981   sprintf(fname, "gm-stats.%d", CmiMyPe());
00982   gmf = fopen(fname, "w");
00983   mype = CmiMyPe();
00984   for (i=5; i<=maxsize; i++)  {
00985     fprintf(gmf, "[%d] size:%d count:%d\n", mype, i, gm_stats[i]);
00986   }
00987   fprintf(gmf, "[%d] max quelen: %d possible streaming: %d  defrag: %d \n", mype, maxQueueLength, possible_streamed, defrag);
00988   fclose(gmf);
00989 #endif
00990 }
00991 
00992 void CmiGmConvertMachineID(unsigned int *mach_id)
00993 {
00994 #if CMK_USE_GM2 
00995     gm_status_t status;
00996     int newid;
00997     /* skip if running without charmrun */
00998     if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
00999     status = gm_global_id_to_node_id(gmport, *mach_id, &newid);
01000     if (status == GM_SUCCESS) *mach_id = newid;
01001 #endif
01002 }
01003 
01004 /* make sure other gm nodes are accessible in routing table */
01005 void CmiCheckGmStatus()
01006 {
01007   int i;
01008   int doabort = 0;
01009   if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
01010   if (gmport == NULL) machine_exit(1);
01011   for (i=0; i<_Cmi_numnodes; i++) {
01012     gm_status_t status;
01013     char uid[6], str[100];
01014     unsigned int mach_id=nodes[i].mach_id;
01015     status = gm_node_id_to_unique_id(gmport, mach_id, uid);
01016     if (status != GM_SUCCESS || ( uid[0]==0 && uid[1]== 0 
01017          && uid[2]==0 && uid[3]==0 && uid[4]==0 && uid[5]==0)) { 
01018       CmiPrintf("Error> gm node %d can't contact node %d. \n", CmiMyPe(), i);
01019       doabort = 1;
01020     }
01021     /*CmiPrintf("[%d]: %d mach:%d ip:%d %d %d %d\n", CmiMyPe(), i, mach_id, nodes[i].IP,uid[0], uid[3], uid[5]);*/
01022   }
01023   if (doabort) CmiAbort("");
01024 }
01025 
01026 static int gmExit(int code,const char *msg)
01027 {
01028   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
01029   machine_exit(code);
01030 }
01031 
01032 
01033 static char *getErrorMsg(gm_status_t status)
01034 {
01035   char *errmsg;
01036   switch (status) {
01037   case GM_SEND_TIMED_OUT:
01038     errmsg = "send time out"; break;
01039   case GM_SEND_REJECTED:
01040     errmsg = "send rejected"; break;
01041   case GM_SEND_TARGET_NODE_UNREACHABLE:
01042     errmsg = "target node unreachable"; break;
01043   case GM_SEND_TARGET_PORT_CLOSED:
01044     errmsg = "target port closed"; break;
01045   case GM_SEND_DROPPED:
01046     errmsg = "send dropped"; break;
01047   default:
01048     errmsg = "unknown error"; break;
01049   }
01050   return errmsg;
01051 }
01052 
01053 #ifdef __ONESIDED_IMPL
01054 #ifdef __ONESIDED_GM_HARDWARE
01055 
01056 struct CmiCb {
01057   CmiRdmaCallbackFn fn;
01058   void *param;
01059 };
01060 typedef struct CmiCb CmiCb;
01061 
01062 struct CmiRMA {
01063   int type;
01064   union {
01065     int completed;
01066     CmiCb *cb;
01067   } ready;
01068 };
01069 typedef struct CmiRMA CmiRMA;
01070 
01071 struct CmiRMAMsg {
01072   char core[CmiMsgHeaderSizeBytes];
01073   CmiRMA* stat;
01074 };
01075 typedef struct CmiRMAMsg CmiRMAMsg;
01076 
01077 struct RMAPutMsg {
01078   char core[CmiMsgHeaderSizeBytes];
01079   void *Saddr;
01080   void *Taddr;
01081   unsigned int size;
01082   unsigned int targetId;
01083   unsigned int sourceId;
01084   unsigned int dataport;
01085   CmiRMA *stat;
01086 };
01087 typedef struct RMAPutMsg RMAPutMsg;
01088 
01089 void *CmiDMAAlloc(int size) {
01090   void *addr = gm_dma_calloc(gmport, 1, gm_max_length_for_size(size));
01091   //gm_allow_remote_memory_access(gmport);
01092   return addr;
01093 }
01094 
01095 int CmiRegisterMemory(void *addr, unsigned int size) {
01096   gm_status_t status;
01097   status = gm_register_memory(gmport, addr, size);
01098   if (status != GM_SUCCESS) { 
01099     CmiPrintf("Cannot register memory at %p of size %d\n",addr,size);
01100     gm_perror("registerMemory", status); 
01101     return 0;
01102   }
01103   //gm_allow_remote_memory_access(gmport);
01104   return 1;
01105 }
01106 
01107 int CmiUnRegisterMemory(v