arch/net/machine-mx.c

Go to the documentation of this file.
00001 
00035 /* use unexp callback */
00036 #define MX_ACTIVE_MESSAGE                     1
00037 
00038 /*#define CMK_USE_CHECKSUM                      0*/
00039 
00040 /* default as in busywaiting mode */
00041 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00042 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
00043 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT  1
00044 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP    0
00045 
00046 
00047 /******************************************************************************
00048  *
00049  * Send messages pending queue (used internally)
00050  *
00051  *****************************************************************************/
00052 
00053 typedef struct PendingSentMsgStruct
00054 {
00055   OutgoingMsg  ogm;
00056   char *data;
00057   struct PendingSentMsgStruct *next;
00058   mx_request_t handle;
00059   int flag;                     /* used for active message mode */
00060 }
00061 *PendingSentMsg;
00062 
00063 #define CMK_PMPOOL  1
00064 
00065 #if CMK_PMPOOL
00066 #define MAXPMS 200
00067 static PendingSentMsg pmpool[MAXPMS];
00068 static int pmNums = 0;
00069 
00070 #define putPool(pm)     {       \
00071   if (pmNums == MAXPMS) free(pm);       \
00072   else pmpool[pmNums++] = pm; } 
00073 
00074 #define getPool(pm)     {       \
00075   if (pmNums == 0) {pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct));}  \
00076   else { pm = pmpool[--pmNums]; }\
00077 }
00078 #else
00079 #define putPool(pm) { free(pm); }
00080 #define getPool(pm) { pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct)); _MEMCHECK(pm);}
00081 #endif
00082 
00083 static PendingSentMsg sent_handles=NULL;     /* head of queue  */
00084 static PendingSentMsg sent_handles_end=NULL; /* end of the queue */
00085 
00086 #define NewPendingSentMsg(pm, ogm) \
00087   { getPool(pm);        \
00088     pm->next=NULL; pm->ogm=ogm; pm->data=data; \
00089     MACHSTATE1(1,"alloc msg %p",pm);\
00090   }
00091 
00092 #define InsertPendingSentMsg(pm) \
00093   { if(sent_handles_end==NULL) {sent_handles=pm;}  \
00094     else {sent_handles_end->next=pm;} \
00095     sent_handles_end=pm; MACHSTATE(1,"Insert done");}
00096 
00097 #define FreePendingSentMsg(pm) \
00098    { sent_handles=pm->next;     \
00099      if (sent_handles == NULL) sent_handles_end = NULL; \
00100      if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);} \
00101      else CmiFree(pm->data); \
00102      putPool(pm); }
00103 
00104 CmiUInt8 MATCH_FILTER = 0x11111111FFFFFFFFL;
00105 CmiUInt8 MATCH_MASK   = 0xffffffffffffffffL;
00106 
00107 static int processMessage(char *msg, int len);
00108 static const char *getErrorMsg(mx_return_t rc);
00109 static void processStatusCode(mx_status_t status);
00110 static void PumpMsgs(int getone);
00111 static void ReleaseSentMsgs(void);
00112 #if MX_ACTIVE_MESSAGE
00113 static void PumpEvents(int getone);
00114 #endif
00115 
00116 /******************************************************************************
00117  *
00118  * CmiNotifyIdle()-- wait until a packet comes in
00119  *
00120  *****************************************************************************/
00121 typedef struct {
00122 char none;  
00123 } CmiIdleState;
00124 
00125 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00126 
00127 static void CmiNotifyStillIdle(CmiIdleState *s)
00128 {
00129   int sleep = 1;
00130   MACHSTATE(1,"CmiNotifyStillIdle {");
00131 #if 0
00132   CommunicationServer(0,2);
00133 #else
00134 #if MX_ACTIVE_MESSAGE
00135   CmiCommLock();
00136   PumpEvents(1);
00137   CmiCommUnlock();
00138 #else
00139 #if CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
00140   sleep = 0;
00141 #endif
00142   CmiCommLock();
00143   ReleaseSentMsgs();
00144   PumpMsgs(sleep);                      /* busy waiting */
00145   CmiCommUnlock();
00146 #endif
00147 #endif
00148   MACHSTATE(1,"} CmiNotifyStillIdle");
00149 }
00150 
00151 void CmiNotifyIdle(void) {
00152   CmiNotifyStillIdle(NULL);
00153 }
00154 
00155 static void CmiNotifyBeginIdle(CmiIdleState *s)
00156 {
00157   CmiNotifyStillIdle(s);
00158 }
00159 
00160 /****************************************************************************
00161  *                                                                          
00162  * CheckSocketsReady
00163  *
00164  * Checks both sockets to see which are readable and which are writeable.
00165  * We check all these things at the same time since this can be done for
00166  * free with ``select.'' The result is stored in global variables, since
00167  * this is essentially global state information and several routines need it.
00168  *
00169  ***************************************************************************/
00170 
00171 int CheckSocketsReady(int withDelayMs)
00172 {   
00173   int nreadable;
00174   CMK_PIPE_DECL(withDelayMs);
00175 
00176   CmiStdoutAdd(CMK_PIPE_SUB);
00177   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00178 
00179   nreadable=CMK_PIPE_CALL();
00180   ctrlskt_ready_read = 0;
00181   dataskt_ready_read = 0;
00182   dataskt_ready_write = 0;
00183   
00184   if (nreadable == 0) {
00185     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00186     return nreadable;
00187   }
00188   if (nreadable==-1) {
00189     CMK_PIPE_CHECKERR();
00190     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00191     return CheckSocketsReady(0);
00192   }
00193   CmiStdoutCheck(CMK_PIPE_SUB);
00194   if (Cmi_charmrun_fd!=-1) 
00195           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00196   MACHSTATE(1,"} CheckSocketsReady")
00197   return nreadable;
00198 }
00199 
00200 /***********************************************************************
00201  * CommunicationServer()
00202  * 
00203  * This function does the scheduling of the tasks related to the
00204  * message sends and receives. 
00205  * It first check the charmrun port for message, and poll the gm event
00206  * for send complete and outcoming messages.
00207  *
00208  ***********************************************************************/
00209 
00210 /* always called from interrupt */
00211 static void ServiceCharmrun_nolock()
00212 {
00213   int again = 1;
00214   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00215   while (again)
00216   {
00217   again = 0;
00218   CheckSocketsReady(0);
00219   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
00220   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
00221   }
00222   MACHSTATE(2,"} ServiceCharmrun_nolock end")
00223 }
00224 
00225 static void PumpMsgs(int getone) {
00226   mx_return_t rc;
00227   mx_status_t status;
00228   uint32_t result;
00229   mx_segment_t buffer_desc;
00230   mx_request_t recv_handle; 
00231 
00232   MACHSTATE1(2,"PumpMsgs(%d) {", getone);
00233   while (1) {
00234     if (getone)
00235       rc = mx_probe(endpoint, 1, MATCH_FILTER, MATCH_MASK, &status, &result);
00236     else
00237       rc = mx_iprobe(endpoint, MATCH_FILTER, MATCH_MASK, &status, &result);
00238     if(rc != MX_SUCCESS) {
00239       const char *errmsg = getErrorMsg(rc);
00240       CmiPrintf("mx_iprobe error: %s\n", errmsg);
00241       CmiAbort("mx_iprobe Abort");
00242     } 
00243     if (result == 0) {         /* no incoming */
00244       break;
00245     }
00246     MACHSTATE(2,"PumpMsgs recv one");
00247     buffer_desc.segment_length = status.msg_length;
00248     buffer_desc.segment_ptr = (char *) CmiAlloc(status.msg_length);
00249     MACHSTATE(1,"Non-blocking receive {")
00250     MACHSTATE1(1," size %d", status.msg_length); 
00251     rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00252     if (rc != MX_SUCCESS) {
00253       const char *errmsg = getErrorMsg(rc);
00254       CmiPrintf("mx_irecv error: %s\n", errmsg);
00255       CmiAbort("Abort");
00256     }
00257     MACHSTATE1(1,"} Non-blocking receive return %d", rc);
00258 again:
00259     rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00260     /*rc = mx_test(endpoint, &recv_handle, &status, &result);*/
00261     MACHSTATE3(1,"mx_wait return rc=%d result=%d status=%d", rc, result, status.code);
00262     if (rc != MX_SUCCESS) {
00263       const char *errmsg = getErrorMsg(rc);
00264       CmiPrintf("mx_wait error: %s\n", errmsg);
00265       CmiAbort("Abort");
00266     }
00267     if(result==0) {
00268       CmiPrintf("mx_wait error: TIME OUT\n");
00269       goto again;
00270     }
00271     else {
00272       processMessage(buffer_desc.segment_ptr, buffer_desc.segment_length);
00273     }
00274     if (getone) break;
00275   }    /* end while */
00276   MACHSTATE1(2,"} PumpMsgs(%d)", getone);
00277 }
00278 
00279 #if MX_ACTIVE_MESSAGE
00280 static void PumpEvents(int getone) {
00281   mx_return_t rc;
00282   mx_status_t status;
00283   uint32_t result;
00284   mx_segment_t buffer_desc;
00285   mx_request_t recv_handle; 
00286 
00287   while (1) {
00288     rc = mx_ipeek(endpoint, &recv_handle, &result);
00289     if (rc != MX_SUCCESS) {
00290       const char *errmsg = getErrorMsg(rc);
00291       CmiPrintf("mx_ipeek error: %s\n", errmsg);
00292       CmiAbort("Abort");
00293     }
00294     if (result == 0) break;
00295     rc = mx_test(endpoint, &recv_handle, &status, &result);
00296     /*rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);*/
00297     if (rc != MX_SUCCESS) {
00298       const char *errmsg = getErrorMsg(rc);
00299       CmiPrintf("mx_wait error: %s\n", errmsg);
00300       CmiAbort("Abort");
00301     }
00302     if(result==0) {
00303       CmiAbort("mx_test or wait: TIME OUT\n");  /* this should never happen */
00304     }
00305     else {
00306       PendingSentMsg pm = (PendingSentMsg)status.context;
00307       if (pm->flag == 1) {    /* send */
00308         if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);}
00309         else CmiFree(pm->data);
00310       }
00311       else if (pm->flag == 2) {                  /* recv */
00312         processMessage(pm->data, status.msg_length);
00313       }
00314       else {
00315         CmiAbort("Invalid PendingSentMsg!");
00316       }
00317       putPool(pm);
00318     }
00319     if (getone) break;
00320   }
00321 }
00322 
00323 /* active message model */
00324 void recv_callback(void * context, uint64_t match_info, int length)
00325 {
00326   mx_segment_t buffer_desc;
00327   mx_request_t recv_handle; 
00328   mx_return_t rc;
00329   mx_status_t status;
00330   uint32_t result;
00331   PendingSentMsg pm;
00332 
00333   buffer_desc.segment_length = length;
00334   buffer_desc.segment_ptr = (char *) CmiAlloc(length);
00335   getPool(pm);
00336   pm->flag = 2;
00337   pm->data = buffer_desc.segment_ptr;
00338   if (MATCH_FILTER != match_info) {
00339     CmiAbort("Invalid match_info");
00340   }
00341   rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, pm, &recv_handle);
00342   if (rc != MX_SUCCESS) {
00343     const char *errmsg = getErrorMsg(rc);
00344     CmiPrintf("mx_irecv error: %s\n", errmsg);
00345     CmiAbort("Abort");
00346   }
00347   if (1) {
00348     rc = mx_test(endpoint, &recv_handle, &status, &result);
00349     if (rc != MX_SUCCESS) {
00350       const char *errmsg = getErrorMsg(rc);
00351       CmiPrintf("mx_wait error: %s\n", errmsg);
00352       CmiAbort("Abort");
00353     }
00354     if(result==0) {
00355       return;
00356     }
00357     else {
00358       processStatusCode(status);
00359       CmiPrintf("PUSH HERE\n");
00360       processMessage(pm->data, status.msg_length);
00361       putPool(pm);
00362     }
00363   }
00364 }
00365 #endif
00366 
00367 #define test_send_complete(handle, status, result) \
00368             {   \
00369               mx_return_t rc;   \
00370               rc = mx_test(endpoint, &(handle), &status, &result);      \
00371               if (rc != MX_SUCCESS) {   \
00372                   MACHSTATE1(3," mx_test returns %d", rc);      \
00373                   CmiAbort("mx_test failed\n"); \
00374               } \
00375             }
00376 
00377 static void ReleaseSentMsgs(void) {
00378     MACHSTATE(2,"ReleaseSentMsgs {");
00379     mx_return_t rc;
00380     mx_status_t status;
00381     unsigned int result;
00382     PendingSentMsg next, pm = sent_handles;
00383     while (pm!=NULL) {
00384       test_send_complete(pm->handle, status, result);
00385       next = pm->next;
00386       if(result!=0 && status.code == MX_STATUS_SUCCESS) {
00387         MACHSTATE1(2,"Sent complete. Free sent msg size %d", status.msg_length);
00388         FreePendingSentMsg(pm);
00389       }
00390       else 
00391         break;
00392       pm = next;
00393     }
00394     MACHSTATE(2,"} ReleaseSentMsgs");
00395 }
00396  
00397 static void CommunicationServer_nolock(int withDelayMs) {
00398   MACHSTATE(2,"CommunicationServer_nolock start {")
00399 #if MX_ACTIVE_MESSAGE
00400   PumpEvents(0);
00401 #else
00402   PumpMsgs(0);
00403   ReleaseSentMsgs();
00404 #endif
00405   MACHSTATE(2,"}CommunicationServer_nolock end");
00406 }
00407 
00408 /*
00409 0: from smp thread
00410 1: from interrupt
00411 2: from worker thread
00412    Note in netpoll mode, charmrun service is only performed in interrupt, 
00413  pingCharmrun is from sig alarm, so it is lock free 
00414 */
00415 static void CommunicationServer(int withDelayMs, int where)
00416 {
00417   /* standalone mode */
00418   if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00419 
00420   MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
00421 
00422   if (where == 2 && machine_initiated_shutdown) {
00423       /* Converse exit, wait for pingCharm to quit */
00424     return;
00425   }
00426 
00427   if (where == 1) {
00428       /* don't service charmrun if converse exits, this fixed a hang bug */
00429     if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
00430     return;
00431   }
00432 
00433   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
00434 
00435   CmiCommLock();
00436   CommunicationServer_nolock(withDelayMs);
00437   CmiCommUnlock();
00438 
00439 #if CMK_IMMEDIATE_MSG
00440   if (where == 0)
00441     CmiHandleImmediate();
00442 #endif
00443 
00444   MACHSTATE(2,"} CommunicationServer")
00445 }
00446 
00447 void processFutureMessages(OtherNode node)
00448 {
00449   if (!CdsFifo_Empty(node->futureMsgs)) {
00450     int len = CdsFifo_Length(node->futureMsgs);
00451     CmiPrintf("[%d] processFutureMessages %d\n", CmiMyPe(), len);
00452     int i=0;
00453     while (i<len) {
00454       FutureMessage f = (FutureMessage)CdsFifo_Dequeue(node->futureMsgs);
00455       int status = processMessage(f->msg, f->len);
00456       free(f);
00457       i++;
00458     }
00459   }
00460 }
00461 
00462 static int processMessage(char *msg, int len)
00463 {
00464   char *newmsg;
00465   int rank, srcpe, seqno, magic, i;
00466   unsigned int broot;
00467   int size;
00468   unsigned char checksum;
00469 
00470   if (len >= DGRAM_HEADER_SIZE) {
00471     DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00472 
00473     MACHSTATE2(1, "Break header Cmi-charmrun_id=%d, magic=%d", Cmi_charmrun_pid, magic);
00474     MACHSTATE3(1, "srcpe=%d, seqno=%d, rank=%d", srcpe, seqno, rank);    
00475  
00476 #ifdef CMK_USE_CHECKSUM
00477     checksum = computeCheckSum(msg, len);
00478     if (checksum == 0)
00479 #else
00480     if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
00481 #endif
00482     {
00483       OtherNode node = nodes_by_pe[srcpe];
00484       /* check seqno */
00485       if (seqno == node->recv_expect) {
00486         node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
00487       }
00488       else if (seqno < node->recv_expect) {
00489         CmiPrintf("[%d] Warning: Past packet received from PE %d (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00490         CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
00491         return 0;
00492       }
00493       else {
00494         CmiPrintf("[%d] Error detected - Packet out of order from PE %d (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
00495 /*
00496         CmiAbort("\n\n\t\tPacket out of order!!\n\n");
00497 */
00498         FutureMessage f = (FutureMessage)malloc(sizeof(struct FutureMessageStruct));
00499         f->msg = msg;
00500         f->len = len;
00501         CdsFifo_Enqueue(node->futureMsgs, f);
00502         return 0;
00503       }
00504 
00505       newmsg = node->asm_msg;
00506       if (newmsg == 0) {
00507         size = CmiMsgHeaderGetLength(msg);
00508         if (size != len) {
00509           newmsg = (char *)CmiAlloc(size);
00510           _MEMCHECK(newmsg);
00511           if (len > size) {
00512            CmiPrintf("size: %d, len:%d.\n", size, len);
00513            CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00514           }
00515           memcpy(newmsg, msg, len);
00516           CmiFree(msg);                 /* free original msg */
00517         }
00518         else 
00519           newmsg = msg;
00520         node->asm_rank = rank;
00521         node->asm_total = size;
00522         node->asm_fill = len;
00523         node->asm_msg = newmsg;
00524       }
00525       else {
00526         size = len - DGRAM_HEADER_SIZE;
00527         if (node->asm_fill+size > node->asm_total) {
00528          CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00529          CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00530         }
00531         memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
00532         CmiFree(msg);                   /* free original msg */
00533         node->asm_fill += size;
00534       }
00535         
00536       /* get a full packet */
00537       if (node->asm_fill == node->asm_total) {
00538         switch (rank) {
00539         case DGRAM_BROADCAST: {
00540           for (i=1; i<_Cmi_mynodesize; i++)
00541             CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
00542           CmiPushPE(0, newmsg);
00543           break;
00544         }
00545 #if CMK_NODE_QUEUE_AVAILABLE
00546         case DGRAM_NODEBROADCAST: 
00547         case DGRAM_NODEMESSAGE: {
00548           CmiPushNode(newmsg);
00549           break;
00550         }
00551 #endif
00552         default:
00553           CmiPushPE(rank, newmsg);
00554         }
00555         node->asm_msg = 0;
00556           /* do it after integration - the following function may re-entrant */
00557 #if CMK_BROADCAST_SPANNING_TREE
00558       if (rank == DGRAM_BROADCAST
00559 #if CMK_NODE_QUEUE_AVAILABLE
00560           || rank == DGRAM_NODEBROADCAST
00561 #endif
00562          )
00563         SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
00564 #elif CMK_BROADCAST_HYPERCUBE
00565       if (rank == DGRAM_BROADCAST
00566 #if CMK_NODE_QUEUE_AVAILABLE
00567           || rank == DGRAM_NODEBROADCAST
00568 #endif
00569          )
00570         SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
00571 #endif
00572       }
00573       processFutureMessages(node);
00574     } 
00575     else {   /* checksum failed */
00576 #ifdef CMK_USE_CHECKSUM
00577       CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
00578 #else
00579       CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n", 
00580                  CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
00581 #endif
00582       CmiPrintf("[%d] recved: rank:%d src:%d magic:%d seqno:%d len:%d\n", CmiMyPe(), rank, srcpe, magic, seqno,
00583 len);
00584     }
00585   }
00586   else {
00587       CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
00588       CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
00589   }
00590 
00591   return 1;
00592 }
00593 
00594 /***********************************************************************
00595  * DeliverViaNetwork()
00596  *
00597  * This function is responsible for all non-local transmission. It
00598  * first allocate a send token, if fails, put the send message to
00599  * penging message queue, otherwise invoke the GM send.
00600  ***********************************************************************/
00601 
00602 void EnqueueOutgoingDgram
00603      (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot, int copy)
00604 {
00605   int size, len, seqno;
00606   mx_return_t rc;
00607   mx_request_t sent_handle;
00608   mx_segment_t buffer_desc;
00609   uint32_t result;
00610   char *data;
00611 
00612   len = dlen + DGRAM_HEADER_SIZE;;
00613 
00614   if (copy) {
00615     data = CopyMsg(ptr-DGRAM_HEADER_SIZE, len);
00616   }
00617   else {
00618     data = ptr-DGRAM_HEADER_SIZE;
00619     ogm->refcount++; 
00620   }
00621 
00622   seqno = node->send_next;
00623   MACHSTATE5(1, "[%d] SEQNO: %d to node %d rank: %d %d", CmiMyPe(), seqno, node-nodes, rank, broot);
00624   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
00625   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
00626 #ifdef CMK_USE_CHECKSUM
00627   {
00628   DgramHeader *head = (DgramHeader *)data;
00629   head->magic ^= computeCheckSum(data, len);
00630   }
00631 #endif
00632 
00633   MACHSTATE1(2, "EnqueueOutgoingDgram { len=%d", len);
00634   /* MX will put outgoing message in queue and progress to send */
00635   /* Note: Assume that MX provides unlimited buffers 
00636        so no user maintain is required */
00637   buffer_desc.segment_ptr = data;
00638   buffer_desc.segment_length = len;
00639   PendingSentMsg pm;
00640   if (copy)  ogm = NULL;
00641   NewPendingSentMsg(pm, ogm);
00642   pm->flag = 1;
00643   rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, pm, &(pm->handle));
00644   if (rc != MX_SUCCESS) {
00645     MACHSTATE1(3," mx_isend returns %d", rc);
00646     CmiAbort("mx_isend failed\n");
00647   }
00648 #if !MX_ACTIVE_MESSAGE
00649   InsertPendingSentMsg(pm);
00650 #endif
00651   MACHSTATE(2, "} EnqueueOutgoingDgram");
00652 }
00653 
00654 /* can not guarantee that buffer is not altered after return, so it is not
00655 safe */
00656 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
00657 {
00658   int size; char *data;
00659 
00660   size = ogm->size - DGRAM_HEADER_SIZE;
00661   data = ogm->data + DGRAM_HEADER_SIZE;
00662 
00663   MACHSTATE3(2, "DeliverViaNetwork { : size:%d, to node mach_id=%d, nic=%ld", ogm->size, node->mach_id, node->nic_id);
00664 
00665   while (size > Cmi_dgram_max_data) {
00666     copy = 1;     /* since we are packetizing, we need to copy anyway now */
00667     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot, copy);
00668     data += Cmi_dgram_max_data;
00669     size -= Cmi_dgram_max_data;
00670   }
00671   if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot, copy);
00672 
00673   MACHSTATE(2, "} DeliverViaNetwork");
00674 }
00675 
00676 static void sendBarrierMessage(int pe)
00677 {
00678   mx_request_t send_handle;
00679   mx_segment_t buffer_desc;
00680   mx_return_t rc;
00681   mx_status_t status;
00682   uint32_t result;
00683   char msg[10];
00684 
00685   OtherNode  node = nodes + pe;
00686   buffer_desc.segment_ptr = msg;
00687   buffer_desc.segment_length = 10;
00688   rc = mx_issend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, NULL, &send_handle);
00689   do {
00690     rc = mx_test(endpoint, &send_handle, &status, &result);
00691   } while (rc == MX_SUCCESS && !result);
00692 }
00693 
00694 static void recvBarrierMessage()
00695 {
00696   mx_segment_t buffer_desc;
00697   char msg[10];
00698   mx_return_t rc;
00699   mx_status_t status;
00700   mx_request_t recv_handle;
00701   uint32_t result;
00702 
00703   buffer_desc.segment_length = 10;
00704   buffer_desc.segment_ptr = msg;
00705   rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
00706   rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
00707 }
00708 
00709 /* happen at node level */
00710 int CmiBarrier()
00711 {
00712   int len, size, i;
00713   int status;
00714   int count = 0;
00715   OtherNode node;
00716   int numnodes = CmiNumNodes();
00717   if (CmiMyRank() == 0) {
00718     /* every one send to pe 0 */
00719     if (CmiMyNode() != 0) {
00720       sendBarrierMessage(0);
00721     }
00722     /* printf("[%d] HERE\n", CmiMyPe()); */
00723     if (CmiMyNode() == 0) 
00724     {
00725       for (count = 1; count < numnodes; count ++) 
00726       {
00727         recvBarrierMessage();
00728       }
00729       /* pe 0 broadcast */
00730       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00731         int p = i;
00732         if (p > numnodes - 1) break;
00733         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
00734         sendBarrierMessage(p);
00735       }
00736     }
00737     /* non 0 node waiting */
00738     if (CmiMyNode() != 0) 
00739     {
00740       recvBarrierMessage();
00741       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
00742         int p = CmiMyNode();
00743         p = BROADCAST_SPANNING_FACTOR*p + i;
00744         if (p > numnodes - 1) break;
00745         p = p%numnodes;
00746         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
00747         sendBarrierMessage(p);
00748       }
00749     }
00750   }
00751   CmiNodeAllBarrier();
00752   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
00753   return 0;
00754 }
00755 
00756 /* everyone sends a message to pe 0 and go on */
00757 int CmiBarrierZero()
00758 {
00759   int i;
00760 
00761   if (CmiMyRank() == 0) {
00762     if (CmiMyNode()) {
00763       sendBarrierMessage(0);
00764     }
00765     else {
00766       for (i=0; i<CmiNumNodes()-1; i++)
00767       {
00768         recvBarrierMessage();
00769       }
00770     }
00771   }
00772   CmiNodeAllBarrier();
00773   return 0;
00774 }
00775 
00776 /***********************************************************************
00777  * CmiMachineInit()
00778  *
00779  * This function intialize the GM board. Set receive buffer
00780  *
00781  ***********************************************************************/
00782 
00783 static int maxsize;
00784 
00785 void CmiMachineInit(char **argv)
00786 {
00787   MACHSTATE(3,"CmiMachineInit {");
00788   mx_return_t  rc;
00789   endpoint = NULL;
00790   /* standalone mode */
00791   if (dataport == -1) return; 
00792 
00793   rc = mx_init();
00794   if (rc != MX_SUCCESS) { 
00795     MACHSTATE1(3," mx_init returns %d", rc);
00796     printf("Cannot open MX library (does the machine have a GM card?)\n");
00797     return; 
00798   }
00799 
00800   rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, MX_FILTER, 0, 0, &endpoint);
00801   if (rc != MX_SUCCESS) {
00802     MACHSTATE1(3," open endpoint address returns %d", rc);
00803     printf("Cannot open endpoint address\n");
00804     return;
00805   }
00806 
00807   /* get endpoint address of local endpoint */
00808   rc = mx_get_endpoint_addr(endpoint, &endpoint_addr);
00809   if (rc != MX_SUCCESS) {
00810     MACHSTATE1(3," get endpoint address returns %d", rc);
00811     printf("Cannot get endpoint address\n");
00812     return;
00813   }
00814 
00815   /* get NIC id and endpoint id */      
00816   rc = mx_decompose_endpoint_addr(endpoint_addr, &Cmi_nic_id, (uint32_t*)&Cmi_mach_id);
00817   if (rc != MX_SUCCESS) {
00818     MACHSTATE1(3," mx_decompose_endpoint returns %d", rc);
00819     printf("Cannot decompose endpoint address\n");
00820     return;
00821   }
00822 
00823   dataport = 1;     /* fake it so that charmrun checking won't fail */
00824 
00825   MATCH_FILTER &= Cmi_charmrun_pid;
00826 
00827   Cmi_dgram_max_data = 1024-DGRAM_HEADER_SIZE;
00828 
00829 #if  MX_ACTIVE_MESSAGE
00830   mx_register_unexp_callback(endpoint, recv_callback, NULL);
00831 #endif
00832 
00833   MACHSTATE(3,"} CmiMachineInit");
00834 }
00835 
00836 void CmiCommunicationInit(char **argv)
00837 {
00838 }
00839 
00840 void CmiMachineExit()
00841 {
00842   MACHSTATE(3, "CmiMachineExit {");
00843   mx_return_t  rc;
00844   if (endpoint) {
00845     rc = mx_close_endpoint(endpoint);
00846     if(rc!=MX_SUCCESS){
00847       MACHSTATE1(3, "mx_close_endpoint returns %d", rc);
00848       printf("Can't do mx_close_endpoint\n");           
00849       return;   
00850     }
00851     rc = mx_finalize();
00852     if(rc!=MX_SUCCESS){
00853       MACHSTATE1(3, "mx_finalize returns %d", rc);
00854       printf("Can't do mx_finalize\n");
00855       return;
00856     }
00857   }
00858   MACHSTATE(3, "} CmiMachineExit");
00859 }
00860 
00861 /* make sure other gm nodes are accessible in routing table */
00862 void CmiMXMakeConnection()
00863 {
00864   int i;
00865   int doabort = 0;
00866   if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
00867   if (endpoint == NULL) machine_exit(1);
00868   MACHSTATE(3,"CmiMXMakeConnection {");
00869   for (i=0; i<_Cmi_numnodes; i++) {
00870     mx_return_t  rc;
00871     char ip_str[128];
00872     skt_print_ip(ip_str, nodes[i].IP);
00873     rc = mx_connect(endpoint, nodes[i].nic_id, nodes[i].mach_id, MX_FILTER, MX_INFINITE, &nodes[i].endpoint_addr); 
00874     if (rc != MX_SUCCESS) {
00875       CmiPrintf("Error> mx node %d can't contact node %d. \n", CmiMyPe(), i);
00876       doabort = 1;
00877     }
00878   }
00879   if (doabort) CmiAbort("CmiMXMakeConnection");
00880   MACHSTATE(3,"}CmiMXMakeConnection");
00881 }
00882 
00883 static const char *getErrorMsg(mx_return_t rc)
00884 {
00885 /*
00886   char *errmsg;
00887   switch (rc) {
00888   case MX_SUCCESS:  return "MX_SUCCESS";
00889   case MX_NO_RESOURCES: return "MX_NO_RESOURCES";
00890   };
00891   return "Unknown MX error message";
00892 */
00893   return mx_strerror(rc);
00894 }
00895 
00896 static void processStatusCode(mx_status_t status){
00897   const char *str = mx_strstatus(status.code);
00898   CmiPrintf("processStatusCode: %s\n", str);
00899   MACHSTATE1(4, "%s", str);
00900 }
00901 

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1