arch/mpi/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/mpi/machine.c,v $
00003  * $Author: gzheng $
00004  * $Date: 2008-06-06 21:13:17 $
00005  * $Revision: 1.111 $
00006  *****************************************************************************/
00007 
00013 
00014 #include <stdio.h>
00015 #include <errno.h>
00016 #include "converse.h"
00017 #include <mpi.h>
00018 #if CMK_TIMER_USE_XT3_DCLOCK
00019 #include <catamount/dclock.h>
00020 #endif
00021 
00022 #ifdef AMPI
00023 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
00024 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
00025 #  error "Can't build Charm++ using AMPI version of mpi.h header"
00026 #endif
00027 
00028 /*Support for ++debug: */
00029 #include <unistd.h> /*For getpid()*/
00030 #include <stdlib.h> /*For sleep()*/
00031 
00032 #define MULTI_SENDQUEUE    0
00033 
00034 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
00035 #define CMK_SMP 1
00036 #endif
00037 
00038 #include "machine.h"
00039 
00040 #include "pcqueue.h"
00041 
00042 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
00043 
00044 #if CMK_VERSION_BLUEGENE
00045 #define MAX_QLEN 8
00046 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
00047 #else
00048 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
00049 #define MAX_QLEN 200
00050 #endif
00051 
00052 #if CMI_MPI_TRACE_USEREVENTS && !defined(CMK_OPTIMIZE) && ! CMK_TRACE_IN_CHARM
00053 CpvStaticDeclare(double, projTraceStart);
00054 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
00055 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
00056 #else
00057 # define  START_EVENT()
00058 # define  END_EVENT(x)
00059 #endif
00060 
00061 /*
00062     To reduce the buffer used in broadcast and distribute the load from
00063   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
00064   spanning tree broadcast algorithm.
00065     This will use the fourth short in message as an indicator of spanning tree
00066   root.
00067 */
00068 #if CMK_SMP
00069 #define CMK_BROADCAST_SPANNING_TREE    0
00070 #else
00071 #define CMK_BROADCAST_SPANNING_TREE    1
00072 #define CMK_BROADCAST_HYPERCUBE        0
00073 #endif
00074 
00075 #define BROADCAST_SPANNING_FACTOR      4
00076 
00077 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
00078 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
00079 
00080 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
00081 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
00082 
00083 /* FIXME: need a random number that everyone agrees ! */
00084 #define CHARM_MAGIC_NUMBER               126
00085 
00086 #if !CMK_OPTIMIZE
00087 static int checksum_flag = 0;
00088 #define CMI_SET_CHECKSUM(msg, len)      \
00089         if (checksum_flag)  {   \
00090           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
00091           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
00092         }
00093 #define CMI_CHECK_CHECKSUM(msg, len)    \
00094         if (checksum_flag)      \
00095           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
00096             CmiAbort("Fatal error: checksum doesn't agree!\n");
00097 #else
00098 #define CMI_SET_CHECKSUM(msg, len)
00099 #define CMI_CHECK_CHECKSUM(msg, len)
00100 #endif
00101 
00102 #if CMK_BROADCAST_SPANNING_TREE
00103 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
00104 #else
00105 #  define CMI_SET_BROADCAST_ROOT(msg, root)
00106 #endif
00107 
00108 #if CMK_BROADCAST_HYPERCUBE
00109 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
00110 #else
00111 #  define CMI_SET_CYCLE(msg, cycle)
00112 #endif
00113 
00114 
00121 #ifdef MPI_POST_RECV
00122 #define MPI_POST_RECV_COUNT 10
00123 #undef MPI_POST_RECV
00124 #endif
00125 #if MPI_POST_RECV_COUNT > 0
00126 #warning "Using MPI posted receives which have not yet been tested"
00127 #ifndef MPI_POST_RECV_SIZE
00128 #define MPI_POST_RECV_SIZE 200
00129 #endif
00130 /* #undef  MPI_POST_RECV_DEBUG  */
00131 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
00132 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
00133 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
00134 CpvDeclare(char*,CmiPostedRecvBuffers);
00135 #endif
00136 
00137 /*
00138  to avoid MPI's in order delivery, changing MPI Tag all the time
00139 */
00140 #define TAG     1375
00141 
00142 #if MPI_POST_RECV_COUNT > 0
00143 #define POST_RECV_TAG TAG+1
00144 #define BARRIER_ZERO_TAG TAG
00145 #else
00146 #define BARRIER_ZERO_TAG     1375
00147 #endif
00148 
00149 /*
00150 static int mpi_tag = TAG;
00151 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
00152 */
00153 
00154 int               _Cmi_numpes;
00155 int               _Cmi_mynode;    /* Which address space am I */
00156 int               _Cmi_mynodesize;/* Number of processors in my address space */
00157 int               _Cmi_numnodes;  /* Total number of address spaces */
00158 int               _Cmi_numpes;    /* Total number of processors */
00159 static int        Cmi_nodestart; /* First processor in this address space */
00160 CpvDeclare(void*, CmiLocalQueue);
00161 
00162 /*Network progress utility variables. Period controls the rate at
00163   which the network poll is called */
00164 CpvDeclare(unsigned , networkProgressCount);
00165 int networkProgressPeriod;
00166 
00167 int               idleblock = 0;
00168 
00169 #define BLK_LEN  512
00170 
00171 #if CMK_NODE_QUEUE_AVAILABLE
00172 #define DGRAM_NODEMESSAGE   (0xFB)
00173 
00174 #define NODE_BROADCAST_OTHERS (-1)
00175 #define NODE_BROADCAST_ALL    (-2)
00176 #endif
00177 
00178 #if 0
00179 static void **recdQueue_blk;
00180 static unsigned int recdQueue_blk_len;
00181 static unsigned int recdQueue_first;
00182 static unsigned int recdQueue_len;
00183 static void recdQueueInit(void);
00184 static void recdQueueAddToBack(void *element);
00185 static void *recdQueueRemoveFromFront(void);
00186 #endif
00187 
00188 static void ConverseRunPE(int everReturn);
00189 static void CommunicationServer(int sleepTime);
00190 static void CommunicationServerThread(int sleepTime);
00191 
00192 typedef struct msg_list {
00193      char *msg;
00194      struct msg_list *next;
00195      int size, destpe;
00196      MPI_Request req;
00197 } SMSG_LIST;
00198 
00199 int MsgQueueLen=0;
00200 static int request_max;
00201 
00202 static SMSG_LIST *sent_msgs=0;
00203 static SMSG_LIST *end_sent=0;
00204 
00205 static int Cmi_dim;
00206 
00207 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
00208 
00209 #if NODE_0_IS_CONVHOST
00210 int inside_comm = 0;
00211 #endif
00212 
00213 void CmiAbort(const char *message);
00214 static void PerrorExit(const char *msg);
00215 
00216 void SendSpanningChildren(int size, char *msg);
00217 void SendHypercube(int size, char *msg);
00218 
00219 static void PerrorExit(const char *msg)
00220 {
00221   perror(msg);
00222   exit(1);
00223 }
00224 
00225 extern unsigned char computeCheckSum(unsigned char *data, int len);
00226 
00227 /**************************  TIMER FUNCTIONS **************************/
00228 
00229 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
00230 
00231 /* MPI calls are not threadsafe, even the timer on some machines */
00232 static CmiNodeLock  timerLock = 0;
00233 static double starttimer = 0;
00234 static int _is_global = 0;
00235 
00236 int CmiTimerIsSynchronized()
00237 {
00238   int  flag;
00239   void *v;
00240 
00241   /*  check if it using synchronized timer */
00242   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
00243     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
00244   if (flag) {
00245     _is_global = *(int*)v;
00246     if (_is_global && CmiMyPe() == 0)
00247       printf("Charm++> MPI timer is synchronized!\n");
00248   }
00249   return _is_global;
00250 }
00251 
00252 void CmiTimerInit()
00253 {
00254   _is_global = CmiTimerIsSynchronized();
00255 
00256   if (CmiMyRank() == 0) {
00257     if (_is_global) {
00258       double minTimer;
00259 #if CMK_TIMER_USE_XT3_DCLOCK
00260       starttimer = dclock();
00261 #else
00262       starttimer = MPI_Wtime();
00263 #endif
00264 
00265       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
00266                                   MPI_COMM_WORLD );
00267       starttimer = minTimer;
00268     }
00269     else {
00270       /* we don't have a synchronous timer, set our own start time */
00271       CmiBarrier();
00272       CmiBarrier();
00273       CmiBarrier();
00274 #if CMK_TIMER_USE_XT3_DCLOCK
00275       starttimer = dclock();
00276 #else
00277       starttimer = MPI_Wtime();
00278 #endif
00279     }
00280     /*  timerLock = CmiCreateLock();  */
00281   }
00282   CmiNodeAllBarrier();          /* for smp */
00283 }
00284 
00285 double CmiTimer(void)
00286 {
00287   double t;
00288 #if CMK_SMP
00289   if (timerLock) CmiLock(timerLock);
00290 #endif
00291 #if CMK_TIMER_USE_XT3_DCLOCK
00292   t = dclock() - starttimer;
00293 #else
00294   t = MPI_Wtime() - starttimer;
00295 #endif
00296 #if CMK_SMP
00297   if (timerLock) CmiUnlock(timerLock);
00298 #endif
00299   return t;
00300 }
00301 
00302 double CmiWallTimer(void)
00303 {
00304   double t;
00305 #if CMK_SMP
00306   if (timerLock) CmiLock(timerLock);
00307 #endif
00308 #if CMK_TIMER_USE_XT3_DCLOCK
00309   t = dclock() - starttimer;
00310 #else
00311   t = MPI_Wtime() - starttimer;
00312 #endif
00313 #if CMK_SMP
00314   if (timerLock) CmiUnlock(timerLock);
00315 #endif
00316   return t;
00317 }
00318 
00319 double CmiCpuTimer(void)
00320 {
00321   double t;
00322 #if CMK_SMP
00323   if (timerLock) CmiLock(timerLock);
00324 #endif
00325 #if CMK_TIMER_USE_XT3_DCLOCK
00326   t = dclock() - starttimer;
00327 #else
00328   t = MPI_Wtime() - starttimer;
00329 #endif
00330 #if CMK_SMP
00331   if (timerLock) CmiUnlock(timerLock);
00332 #endif
00333   return t;
00334 }
00335 
00336 #endif
00337 
00338 int CmiBarrier()
00339 {
00340   if (CmiMyRank() == 0) {
00341 
00342     START_EVENT();
00343 
00344     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
00345         CmiAbort("Timernit: MPI_Barrier failed!\n");
00346 
00347     END_EVENT(10);
00348   }
00349   return 0;
00350 }
00351 
00352 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
00353 int CmiBarrierZero()
00354 {
00355   int i;
00356   if (CmiMyRank() == 0) {
00357     char msg[1];
00358     MPI_Status sts;
00359     if (CmiMyNode() == 0)  {
00360       for (i=0; i<CmiNumNodes()-1; i++) {
00361          START_EVENT();
00362 
00363          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
00364             CmiPrintf("MPI_Recv failed!\n");
00365 
00366          END_EVENT(30);
00367       }
00368     }
00369     else {
00370       START_EVENT();
00371 
00372       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
00373          printf("MPI_Send failed!\n");
00374 
00375       END_EVENT(20);
00376     }
00377   }
00378   CmiNodeAllBarrier();
00379   return 0;
00380 }
00381 
00382 typedef struct ProcState {
00383 #if MULTI_SENDQUEUE
00384 PCQueue      sendMsgBuf;       /* per processor message sending queue */
00385 #endif
00386 CmiNodeLock  recvLock;              /* for cs->recv */
00387 } ProcState;
00388 
00389 static ProcState  *procState;
00390 
00391 #if CMK_SMP
00392 
00393 #if !MULTI_SENDQUEUE
00394 static PCQueue sendMsgBuf;
00395 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
00396 #endif
00397 
00398 #endif
00399 
00400 /************************************************************
00401  *
00402  * Processor state structure
00403  *
00404  ************************************************************/
00405 
00406 /* fake Cmi_charmrun_fd */
00407 static int Cmi_charmrun_fd = 0;
00408 #include "machine-smp.c"
00409 
00410 CsvDeclare(CmiNodeState, NodeState);
00411 
00412 #include "immediate.c"
00413 
00414 #if ! CMK_SMP
00415 /************ non SMP **************/
00416 static struct CmiStateStruct Cmi_state;
00417 int _Cmi_mype;
00418 int _Cmi_myrank;
00419 
00420 void CmiMemLock() {}
00421 void CmiMemUnlock() {}
00422 
00423 #define CmiGetState() (&Cmi_state)
00424 #define CmiGetStateN(n) (&Cmi_state)
00425 
00426 void CmiYield(void) { sleep(0); }
00427 
00428 static void CmiStartThreads(char **argv)
00429 {
00430   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00431   _Cmi_mype = Cmi_nodestart;
00432   _Cmi_myrank = 0;
00433 }
00434 #endif  /* non smp */
00435 
00436 /*Add a message to this processor's receive queue, pe is a rank */
00437 static void CmiPushPE(int pe,void *msg)
00438 {
00439   CmiState cs = CmiGetStateN(pe);
00440   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
00441 #if CMK_IMMEDIATE_MSG
00442   if (CmiIsImmediate(msg)) {
00443 /*
00444 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
00445     CmiHandleMessage(msg);
00446 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
00447 */
00449     CMI_DEST_RANK(msg) = pe;
00450     CmiPushImmediateMsg(msg);
00451     return;
00452   }
00453 #endif
00454 
00455 #if CMK_SMP
00456   CmiLock(procState[pe].recvLock);
00457 #endif
00458   PCQueuePush(cs->recv,msg);
00459 #if CMK_SMP
00460   CmiUnlock(procState[pe].recvLock);
00461 #endif
00462   CmiIdleLock_addMessage(&cs->idle);
00463   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
00464 }
00465 
00466 #if CMK_NODE_QUEUE_AVAILABLE
00467 /*Add a message to this processor's receive queue */
00468 static void CmiPushNode(void *msg)
00469 {
00470   MACHSTATE(3,"Pushing message into NodeRecv queue");
00471 #if CMK_IMMEDIATE_MSG
00472   if (CmiIsImmediate(msg)) {
00473     CMI_DEST_RANK(msg) = 0;
00474     CmiPushImmediateMsg(msg);
00475     return;
00476   }
00477 #endif
00478   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00479   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
00480   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00481   {
00482   CmiState cs=CmiGetStateN(0);
00483   CmiIdleLock_addMessage(&cs->idle);
00484   }
00485 }
00486 #endif
00487 
00488 #ifndef CmiMyPe
00489 int CmiMyPe(void)
00490 {
00491   return CmiGetState()->pe;
00492 }
00493 #endif
00494 
00495 #ifndef CmiMyRank
00496 int CmiMyRank(void)
00497 {
00498   return CmiGetState()->rank;
00499 }
00500 #endif
00501 
00502 #ifndef CmiNodeFirst
00503 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
00504 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
00505 #endif
00506 
00507 #ifndef CmiNodeOf
00508 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
00509 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
00510 #endif
00511 
00512 static size_t CmiAllAsyncMsgsSent(void)
00513 {
00514    SMSG_LIST *msg_tmp = sent_msgs;
00515    MPI_Status sts;
00516    int done;
00517 
00518    while(msg_tmp!=0) {
00519     done = 0;
00520     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00521       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
00522     if(!done)
00523       return 0;
00524     msg_tmp = msg_tmp->next;
00525 /*    MsgQueueLen--; ????? */
00526    }
00527    return 1;
00528 }
00529 
00530 int CmiAsyncMsgSent(CmiCommHandle c) {
00531 
00532   SMSG_LIST *msg_tmp = sent_msgs;
00533   int done;
00534   MPI_Status sts;
00535 
00536   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
00537     msg_tmp = msg_tmp->next;
00538   if(msg_tmp) {
00539     done = 0;
00540     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00541       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
00542     return ((done)?1:0);
00543   } else {
00544     return 1;
00545   }
00546 }
00547 
00548 void CmiReleaseCommHandle(CmiCommHandle c)
00549 {
00550   return;
00551 }
00552 
00553 #if CMK_VERSION_BLUEGENE
00554 extern void MPID_Progress_test();
00555 #endif
00556 
00557 void CmiReleaseSentMessages(void)
00558 {
00559   SMSG_LIST *msg_tmp=sent_msgs;
00560   SMSG_LIST *prev=0;
00561   SMSG_LIST *temp;
00562   int done;
00563   MPI_Status sts;
00564 
00565 
00566 #if CMK_VERSION_BLUEGENE
00567   MPID_Progress_test();
00568 #endif
00569 
00570   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
00571   while(msg_tmp!=0) {
00572     done =0;
00573     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
00574       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
00575     if(done) {
00576       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
00577       MsgQueueLen--;
00578       /* Release the message */
00579       temp = msg_tmp->next;
00580       if(prev==0)  /* first message */
00581         sent_msgs = temp;
00582       else
00583         prev->next = temp;
00584       CmiFree(msg_tmp->msg);
00585       CmiFree(msg_tmp);
00586       msg_tmp = temp;
00587     } else {
00588       prev = msg_tmp;
00589       msg_tmp = msg_tmp->next;
00590     }
00591   }
00592   end_sent = prev;
00593   MACHSTATE(2,"} CmiReleaseSentMessages end");
00594 }
00595 
00596 int PumpMsgs(void)
00597 {
00598   int nbytes, flg, res;
00599   char *msg;
00600   MPI_Status sts;
00601   int recd=0;
00602 
00603 #if CMK_VERSION_BLUEGENE
00604   MPID_Progress_test();
00605 #endif
00606 
00607   MACHSTATE(2,"PumpMsgs begin {");
00608 
00609   while(1) {
00610     /* First check posted recvs then do  probe unmatched outstanding messages */
00611 #if MPI_POST_RECV_COUNT > 0 
00612     int completed_index=-1;
00613     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
00614         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00615     if(flg){
00616         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
00617             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
00618 
00619         recd = 1;
00620         msg = (char *) CmiAlloc(nbytes);
00621         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
00622         /* and repost the recv */
00623 
00624         START_EVENT();
00625 
00626         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
00627             MPI_POST_RECV_SIZE,
00628             MPI_BYTE,
00629             MPI_ANY_SOURCE,
00630             POST_RECV_TAG,
00631             MPI_COMM_WORLD,
00632             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
00633                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00634 
00635         END_EVENT(50);
00636 
00637         CpvAccess(Cmi_posted_recv_total)++;
00638     }
00639     else {
00640         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
00641         if(res != MPI_SUCCESS)
00642         CmiAbort("MPI_Iprobe failed\n");
00643         if(!flg) break;
00644         recd = 1;
00645         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00646         msg = (char *) CmiAlloc(nbytes);
00647 
00648         START_EVENT();
00649 
00650         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
00651             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00652 
00653         END_EVENT(30);
00654 
00655         CpvAccess(Cmi_unposted_recv_total)++;
00656     }
00657 #else
00658     /* Original version */
00659     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
00660     if(res != MPI_SUCCESS)
00661       CmiAbort("MPI_Iprobe failed\n");
00662 
00663     if(!flg) break;
00664     recd = 1;
00665     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00666     msg = (char *) CmiAlloc(nbytes);
00667 
00668     START_EVENT();
00669 
00670     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
00671       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00672 
00673     END_EVENT(30);
00674 #endif
00675 
00676     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
00677     CMI_CHECK_CHECKSUM(msg, nbytes);
00678     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
00679       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
00680       CmiFree(msg);
00681       CmiAbort("Abort!\n");
00682       continue;
00683     }
00684 #if CMK_NODE_QUEUE_AVAILABLE
00685     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
00686       CmiPushNode(msg);
00687     else
00688 #endif
00689       CmiPushPE(CMI_DEST_RANK(msg), msg);
00690 
00691 #if CMK_BROADCAST_SPANNING_TREE
00692     if (CMI_BROADCAST_ROOT(msg))
00693       SendSpanningChildren(nbytes, msg);
00694 #elif CMK_BROADCAST_HYPERCUBE
00695     if (CMI_GET_CYCLE(msg))
00696       SendHypercube(nbytes, msg);
00697 #endif
00698   }
00699 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00700   CmiHandleImmediate();
00701 #endif
00702   MACHSTATE(2,"} PumpMsgs end ");
00703   return recd;
00704 }
00705 
00706 /* blocking version */
00707 static void PumpMsgsBlocking(void)
00708 {
00709   static int maxbytes = 20000000;
00710   static char *buf = NULL;
00711   int nbytes, flg;
00712   MPI_Status sts;
00713   char *msg;
00714   int recd=0;
00715 
00716   if (!PCQueueEmpty(CmiGetState()->recv)) return;
00717   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
00718   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
00719   if (sent_msgs)  return;
00720 
00721 #if 0
00722   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
00723 #endif
00724 
00725   if (buf == NULL) {
00726     buf = (char *) CmiAlloc(maxbytes);
00727     _MEMCHECK(buf);
00728   }
00729 
00730 
00731 #if MPI_POST_RECV_COUNT > 0
00732 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
00733 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
00734 #endif
00735 
00736   START_EVENT();
00737 
00738   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
00739       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
00740 
00741   END_EVENT(30);
00742 
00743    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00744    msg = (char *) CmiAlloc(nbytes);
00745    memcpy(msg, buf, nbytes);
00746 
00747 #if CMK_NODE_QUEUE_AVAILABLE
00748    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
00749       CmiPushNode(msg);
00750    else
00751 #endif
00752       CmiPushPE(CMI_DEST_RANK(msg), msg);
00753 
00754 #if CMK_BROADCAST_SPANNING_TREE
00755    if (CMI_BROADCAST_ROOT(msg))
00756       SendSpanningChildren(nbytes, msg);
00757 #elif CMK_BROADCAST_HYPERCUBE
00758    if (CMI_GET_CYCLE(msg))
00759       SendHypercube(nbytes, msg);
00760 #endif
00761 }
00762 
00763 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
00764 
00765 #if CMK_SMP
00766 
00767 static int inexit = 0;
00768 static CmiNodeLock  exitLock = 0;
00769 
00770 static int MsgQueueEmpty()
00771 {
00772   int i;
00773 #if MULTI_SENDQUEUE
00774   for (i=0; i<_Cmi_mynodesize; i++)
00775     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
00776 #else
00777   return PCQueueEmpty(sendMsgBuf);
00778 #endif
00779   return 1;
00780 }
00781 
00782 static int SendMsgBuf();
00783 
00784 /* test if all processors recv queues are empty */
00785 static int RecvQueueEmpty()
00786 {
00787   int i;
00788   for (i=0; i<_Cmi_mynodesize; i++) {
00789     CmiState cs=CmiGetStateN(i);
00790     if (!PCQueueEmpty(cs->recv)) return 0;
00791   }
00792   return 1;
00793 }
00794 
00798 static void CommunicationServer(int sleepTime)
00799 {
00800   int static count=0;
00801 /*
00802   count ++;
00803   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
00804 */
00805   PumpMsgs();
00806   CmiReleaseSentMessages();
00807   SendMsgBuf();
00808 /*
00809   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
00810 */
00811   if (inexit == CmiMyNodeSize()) {
00812     MACHSTATE(2, "CommunicationServer exiting {");
00813 #if 0
00814     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
00815 #endif
00816     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
00817       CmiReleaseSentMessages();
00818       SendMsgBuf();
00819       PumpMsgs();
00820     }
00821     MACHSTATE(2, "CommunicationServer barrier begin {");
00822 
00823     START_EVENT();
00824 
00825     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
00826       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
00827 
00828     END_EVENT(10);
00829 
00830     MACHSTATE(2, "} CommunicationServer barrier end");
00831 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00832     if (CmiMyNode() == 0){
00833       CmiPrintf("End of program\n");
00834     }
00835 #endif
00836     MACHSTATE(2, "} CommunicationServer EXIT");
00837 #if ! CMK_MPI_VMI
00838     MPI_Finalize();
00839 #endif
00840     exit(0);
00841   }
00842 }
00843 
00844 #endif
00845 
00846 static void CommunicationServerThread(int sleepTime)
00847 {
00848 #if CMK_SMP
00849   CommunicationServer(sleepTime);
00850 #endif
00851 #if CMK_IMMEDIATE_MSG
00852   CmiHandleImmediate();
00853 #endif
00854 }
00855 
00856 #if CMK_NODE_QUEUE_AVAILABLE
00857 char *CmiGetNonLocalNodeQ(void)
00858 {
00859   CmiState cs = CmiGetState();
00860   char *result = 0;
00861   CmiIdleLock_checkMessage(&cs->idle);
00862 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
00863     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
00864     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00865     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
00866     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00867     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
00868 /*  }  */
00869   return result;
00870 }
00871 #endif
00872 
00873 void *CmiGetNonLocal(void)
00874 {
00875   static int count=0;
00876   CmiState cs = CmiGetState();
00877   void *msg;
00878   CmiIdleLock_checkMessage(&cs->idle);
00879   /* although it seems that lock is not needed, I found it crashes very often
00880      on mpi-smp without lock */
00881 
00882 #if ! CMK_SMP
00883   CmiReleaseSentMessages();
00884   PumpMsgs();
00885 #endif
00886 
00887   /* CmiLock(procState[cs->rank].recvLock); */
00888   msg =  PCQueuePop(cs->recv);
00889   /* CmiUnlock(procState[cs->rank].recvLock); */
00890 
00891 /*
00892   if (msg) {
00893     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
00894   else {
00895     count++;
00896     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
00897   }
00898 */
00899 #if ! CMK_SMP
00900   if (no_outstanding_sends) {
00901     while (MsgQueueLen>0) {
00902       CmiReleaseSentMessages();
00903       PumpMsgs();
00904     }
00905   }
00906 
00907   if(!msg) {
00908     CmiReleaseSentMessages();
00909     if (PumpMsgs())
00910       return  PCQueuePop(cs->recv);
00911     else
00912       return 0;
00913   }
00914 #endif
00915   return msg;
00916 }
00917 
00918 /* called in non-smp mode */
00919 void CmiNotifyIdle(void)
00920 {
00921   CmiReleaseSentMessages();
00922   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
00923 }
00924 
00925 
00926 /********************************************************
00927     The call to probe immediate messages has been renamed to
00928     CmiMachineProgressImpl
00929 ******************************************************/
00930 /* user call to handle immediate message, only useful in non SMP version
00931    using polling method to schedule message.
00932 */
00933 /*
00934 #if CMK_IMMEDIATE_MSG
00935 void CmiProbeImmediateMsg()
00936 {
00937 #if !CMK_SMP
00938   PumpMsgs();
00939   CmiHandleImmediate();
00940 #endif
00941 }
00942 #endif
00943 */
00944 
00945 /* Network progress function is used to poll the network when for
00946    messages. This flushes receive buffers on some  implementations*/
00947 #if CMK_MACHINE_PROGRESS_DEFINED
00948 void CmiMachineProgressImpl()
00949 {
00950 #if !CMK_SMP
00951     PumpMsgs();
00952 #if CMK_IMMEDIATE_MSG
00953     CmiHandleImmediate();
00954 #endif
00955 #else
00956     /*Not implemented yet. Communication server does not seem to be
00957       thread safe */
00958     /* CommunicationServerThread(0); */
00959 #endif
00960 }
00961 #endif
00962 
00963 /********************* MESSAGE SEND FUNCTIONS ******************/
00964 
00965 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
00966 
00967 static void CmiSendSelf(char *msg)
00968 {
00969 #if CMK_IMMEDIATE_MSG
00970     if (CmiIsImmediate(msg)) {
00971       /* CmiBecomeNonImmediate(msg); */
00972       CmiPushImmediateMsg(msg);
00973       CmiHandleImmediate();
00974       return;
00975     }
00976 #endif
00977     CQdCreate(CpvAccess(cQdState), 1);
00978     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00979 }
00980 
00981 void CmiSyncSendFn(int destPE, int size, char *msg)
00982 {
00983   CmiState cs = CmiGetState();
00984   char *dupmsg = (char *) CmiAlloc(size);
00985   memcpy(dupmsg, msg, size);
00986 
00987   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00988 
00989   if (cs->pe==destPE) {
00990     CmiSendSelf(dupmsg);
00991   }
00992   else
00993     CmiAsyncSendFn_(destPE, size, dupmsg);
00994 }
00995 
00996 #if CMK_SMP
00997 
00998 /* called by communication thread in SMP */
00999 static int SendMsgBuf()
01000 {
01001   SMSG_LIST *msg_tmp;
01002   char *msg;
01003   int node, rank, size;
01004   int i;
01005   int sent = 0;
01006 
01007   MACHSTATE(2,"SendMsgBuf begin {");
01008 #if MULTI_SENDQUEUE
01009   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
01010   {
01011     if (!PCQueueEmpty(procState[i].sendMsgBuf))
01012     {
01013       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
01014 #else
01015     /* single message sending queue */
01016     /* CmiLock(sendMsgBufLock); */
01017     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
01018     /* CmiUnlock(sendMsgBufLock); */
01019     while (NULL != msg_tmp)
01020     {
01021 #endif
01022       node = msg_tmp->destpe;
01023       size = msg_tmp->size;
01024       msg = msg_tmp->msg;
01025       msg_tmp->next = 0;
01026       while (MsgQueueLen > request_max) {
01027         CmiReleaseSentMessages();
01028         PumpMsgs();
01029       }
01030       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
01031       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
01032       CMI_SET_CHECKSUM(msg, size);
01033 
01034 #if MPI_POST_RECV_COUNT > 0
01035         if(size <= MPI_POST_RECV_SIZE){
01036 
01037           START_EVENT();
01038           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
01039                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
01040 
01041           STOP_EVENT(40);
01042         }
01043         else {
01044             START_EVENT();
01045             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
01046                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
01047             STOP_EVENT(40);
01048         }
01049 #else
01050