arch/elan/machine.c

Go to the documentation of this file.
00001 
00006 /* Charm++ Machine Layer for ELAN network interface 
00007 Developed by Sameer Kumar
00008 */
00009 
00010 #include <stdio.h>
00011 #include <sys/time.h>
00012 #include <assert.h>
00013 #include <errno.h>
00014 #include "converse.h"
00015 #include <elan/elan.h>
00016 /*#include <elan3/elan3.h>*/
00017 
00018 /*Support for ++debug: */
00019 #include <unistd.h> /*For getpid()*/
00020 #include <stdlib.h> /*For sleep()*/
00021 
00022 #include "machine.h"
00023 #include "pcqueue.h"
00024 
00025 #if CMK_PERSISTENT_COMM
00026 #include "persist_impl.h"
00027 #endif
00028 
00029 /* copy from elan/version.h */
00030 #ifndef QSNETLIBS_VERSION_CODE
00031 #define QSNETLIBS_VERSION(a,b,c)        (((a) << 16) + ((b) << 8) + (c))
00032 #define QSNETLIBS_VERSION_CODE          QSNETLIBS_VERSION(1,3,0)
00033 #endif
00034 
00035 #define MAX_QLEN 1000
00036 #define MAX_BYTES 1000000
00037 
00038 #define USE_SHM 0
00039 
00040 /*
00041   To reduce the buffer used in broadcast and distribute the load from 
00042   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of 
00043   spanning tree broadcast algorithm.
00044   This will use the fourth short in message as an indicator of spanning tree
00045   root.
00046 */
00047 #define CMK_BROADCAST_SPANNING_TREE    1
00048 #define BROADCAST_SPANNING_FACTOR      4
00049 
00050 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
00051 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
00052 #define CMI_MESSAGE_SIZE(msg)            ((CmiMsgHeaderBasic *)msg)->size
00053 
00054 #if CMK_BROADCAST_SPANNING_TREE
00055 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
00056 #else
00057 #  define CMI_SET_BROADCAST_ROOT(msg, root)
00058 #endif
00059 
00060 ELAN_BASE     *elan_base;
00061 ELAN_TPORT    *elan_port;
00062 ELAN_QUEUE    *elan_q;
00063 
00064 int enableGetBasedSend = 1;
00065 int enableBufferPooling = 0;
00066 
00067 int SMALL_MESSAGE_SIZE=4080;  /* Smallest message size queue 
00068                                  used for receiving short messages */
00069                                      
00070 int MID_MESSAGE_SIZE=65536;     /* Queue for larger messages 
00071                                    which need pre posted receives
00072                                    Message sizes greater will be 
00073                                    probe received adding 5us overhead*/
00074 #define SYNC_MESSAGE_SIZE MID_MESSAGE_SIZE * 10
00075                                /* Message sizes greater will be 
00076                                   sent synchronously thus avoiding copying*/
00077 
00078 #define NON_BLOCKING_MSG  4     /* Message sizes greater 
00079                                     than this will be sent asynchronously*/
00080 #define RECV_MSG_Q_SIZE  8   //Maximim queue size for short messages
00081 #define MID_MSG_Q_SIZE   4   //Maximum queue size for mid-range messages
00082 
00083 //Actual sizes, can also be set from the command line
00084 int smallQSize = RECV_MSG_Q_SIZE;
00085 int midQSize = MID_MSG_Q_SIZE;
00086 
00087 ELAN_EVENT *esmall[RECV_MSG_Q_SIZE], *emid[MID_MSG_Q_SIZE], *elarge;
00088 
00089 #define TAG_SMALL 0x1
00090 #define TAG_LARGE_HEADER 0x3     /* Header that a large message is coming*/
00091 #define TAG_GET_BASED_SEND 0x5     /* Header that a large message is coming
00092                                     as a get request*/
00093 #define TAG_MID   0x10
00094 #define TAG_LARGE 0x100
00095 
00096 /*Release sent messages status to check for*/
00097 #define BASIC_SEND 0
00098 #define GET_BASED_SEND 1
00099 #define RECEIVE_GET   2
00100 #define GET_FINISHED_RECEIVE 3
00101 
00102 int               _Cmi_mynode;    /* Which address space am I */
00103 int               _Cmi_mynodesize;/* Number of processors in my address space */
00104 int               _Cmi_numnodes;  /* Total number of address spaces */
00105 int               _Cmi_numpes;    /* Total number of processors */
00106 
00107 static int        Cmi_nodestart; /* First processor in this address space */ 
00108 CpvDeclare(void*, CmiLocalQueue);
00109 
00110 #define BLK_LEN  512
00111 
00112 static int MsgQueueLen=0;
00113 static int MsgQueueBytes=0;
00114 static int request_max;
00115 static int request_bytes;
00116 
00117 #include "pcqueue.h"
00118 PCQueue localSmallBufferQueue;
00119 PCQueue localMidBufferQueue;
00120 
00121 int outstandingMsgs[3000];
00122 
00123 int stretchFlag = 0;
00124 int blockingReceiveFlag = 0;
00125 
00126 static void ConverseRunPE(int everReturn);
00127 
00128 typedef struct {
00129     char header[CmiMsgHeaderSizeBytes];
00130     int size;
00131     char* src_addr;
00132     char* flag_addr;
00133 } GetHeader;
00134 
00135 typedef struct msg_list {
00136     ELAN_EVENT *e;
00137     char *msg;
00138     struct msg_list *next;
00139     int size, destpe;
00140     int sent;
00141     
00142     //Fields for get based send
00143     int status;
00144     long done;
00145     long *flag_addr;
00146     char *gmsg;
00147     char *newmsg;       
00148     int is_broadcast;
00149 } SMSG_LIST;
00150 
00151 
00152 static int Cmi_dim;
00153 
00154 static SMSG_LIST *sent_msgs=0;
00155 static SMSG_LIST *end_sent=0;
00156 static SMSG_LIST *cur_unsent=0;
00157 
00158 void ElanSendQueuedMessages();
00159 static int CmiReleaseSentMessages();
00160 
00161 void ElanGetBasedSend(SMSG_LIST *ptr);
00162 void handleGetHeader(char *msg, int src);
00163 void processGetEnv(SMSG_LIST *ptr);
00164 
00165 #if NODE_0_IS_CONVHOST
00166 int inside_comm = 0;
00167 #endif
00168 
00169 double starttimer;
00170 
00171 void CmiAbort(const char *message);
00172 static void PerrorExit(const char *msg);
00173 
00174 void SendSpanningChildren(int size, char *msg);
00175 
00176 typedef struct __elanChunkHeader {
00177   int type;
00178   int size;
00179 } ElanChunkHeader;
00180 
00181 typedef struct __chunkHeader {
00182   ElanChunkHeader elan;
00183   CmiChunkHeader conv;
00184 } ChunkHeader;
00185 
00186 #define TYPE_FIELD(buf)       (((ChunkHeader*)(buf))->elan.type)
00187 #define SIZE_FIELD(buf)       (((ChunkHeader*)(buf))->elan.size)
00188 #define CONV_SIZE_FIELD(buf)  (((ChunkHeader*)(buf))->conv.size)
00189 #define REF_FIELD(buf)        (((ChunkHeader*)(buf))->conv.ref)
00190 
00191 // CONV_BUF_START moves the res from pointing to the start of the elan chunk to the start of the converse chunk
00192 #define CONV_BUF_START(res)     ((char*)(res) + sizeof(ElanChunkHeader))
00193 
00194 // MACHINE_BUF_START moves the res from pointing to the start of the converse chunk to the start of the elan chunk
00195 #define MACHINE_BUF_START(res)  ((char*)(res) - sizeof(ElanChunkHeader))
00196 
00197 // USER_BUF_START moves the res from pointing to the start of the payload to the start of the elan chunk
00198 #define USER_BUF_START(res)     ((char*)(res) - sizeof(ChunkHeader))
00199 
00200 #define DYNAMIC_MESSAGE 0
00201 #define STATIC_MESSAGE 1
00202 #define ELAN_MESSAGE 3
00203 
00204 static void PerrorExit(const char *msg)
00205 {
00206   perror(msg);
00207   exit(1);
00208 }
00209 
00210 /**************************  TIMER FUNCTIONS **************************/
00211 
00212 #if CMK_TIMER_USE_SPECIAL
00213 
00214 #include <sys/timers.h>
00215 void CmiTimerInit(void)
00216 {
00217     starttimer =  elan_clock(elan_base->state); 
00218 }
00219 
00220 double CmiTimer(void)
00221 {
00222     return (elan_clock(elan_base->state) - starttimer)/1e9;
00223 }
00224 
00225 double CmiWallTimer(void)
00226 {
00227     return (elan_clock(elan_base->state) - starttimer)/1e9;
00228 }
00229 
00230 double CmiCpuTimer(void)
00231 {
00232     return (elan_clock(elan_base->state) - starttimer)/1e9;
00233 }
00234 
00235 #endif
00236 
00237 static PCQueue   msgBuf;
00238 
00239 /************************************************************
00240  * 
00241  * Processor state structure
00242  *
00243  ************************************************************/
00244 
00245 /*****
00246       SMP version Extend later, currently only NON SMP version 
00247 ***************/
00248 
00249 #include "machine-smp.c"
00250 
00251 CsvDeclare(CmiNodeState, NodeState);
00252 
00253 static struct CmiStateStruct Cmi_state;
00254 int _Cmi_mype;
00255 int _Cmi_myrank;
00256 
00257 #include "immediate.c"
00258 
00259 void CmiMemLock(void) {}
00260 void CmiMemUnlock(void) {}
00261 
00262 #define CmiGetState() (&Cmi_state)
00263 #define CmiGetStateN(n) (&Cmi_state)
00264 
00265 static void CmiStartThreads(char **argv)
00266 {
00267   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00268   _Cmi_mype = Cmi_nodestart;
00269   _Cmi_myrank = 0;
00270 }      
00271 
00272 /*Add a message to this processor's receive queue */
00273 static void CmiPushPE(int pe,void *msg)
00274 {
00275   CmiState cs=CmiGetStateN(pe);
00276   MACHSTATE1(2,"Pushing message into %d's queue",pe);
00277 #if CMK_IMMEDIATE_MSG
00278   if (CmiIsImmediate(msg)) {
00279       /*CmiPrintf("[node %d] Immediate Message %d %d {{. \n", CmiMyNode(), CmiGetHandler(msg), _ImmediateMsgHandlerIdx);*/
00280       /*CmiHandleMessage(msg);*/
00281       CmiPushImmediateMsg(msg);
00282       /*CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());*/
00283       return;
00284   }
00285 #endif
00286   CmiIdleLock_addMessage(&cs->idle); 
00287   PCQueuePush(cs->recv,msg);
00288 }
00289 
00290 #ifndef CmiMyPe
00291 int CmiMyPe(void)
00292 {
00293   return CmiGetState()->pe;
00294 }
00295 #endif
00296 
00297 #ifndef CmiMyRank
00298 int CmiMyRank(void)
00299 {
00300   return CmiGetState()->rank;
00301 }
00302 #endif
00303 
00304 #ifndef CmiNodeFirst
00305 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
00306 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
00307 #endif
00308 
00309 #ifndef CmiNodeOf
00310 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
00311 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
00312 #endif
00313 
00314 int CmiAllAsyncMsgsSent(void)
00315 {
00316    SMSG_LIST *msg_tmp = NULL; 
00317 
00318    int done;
00319    
00320    CmiReleaseSentMessages();
00321 
00322    msg_tmp = sent_msgs;
00323    while((msg_tmp != cur_unsent) && (msg_tmp->e != NULL)){
00324        done = 0;
00325     
00326        if(elan_tportTxDone(msg_tmp->e))
00327            done = 1;
00328        else
00329 #if USE_SHM 
00330            elan_deviceCheck(elan_base->state);
00331 #else 
00332        ;
00333 #endif
00334 
00335        if(!done)
00336            return 0;
00337        msg_tmp = msg_tmp->next;
00338        //    MsgQueueLen--;
00339    }
00340    return 1;
00341 }
00342 
00343 int CmiAsyncMsgSent(CmiCommHandle c) {
00344      
00345   SMSG_LIST *msg_tmp = sent_msgs;
00346   int done;
00347 
00348   while ((msg_tmp) && (msg_tmp ->e != NULL) && 
00349          ((CmiCommHandle)(msg_tmp->e) != c))
00350       msg_tmp = msg_tmp->next;
00351   
00352   if(msg_tmp) {
00353     done = 0;
00354     
00355     if(elan_tportTxDone(msg_tmp->e))
00356         done = 1;
00357     else 
00358 #if USE_SHM 
00359            elan_deviceCheck(elan_base->state);
00360 #else 
00361        ;
00362 #endif
00363     
00364     return ((done)?1:0);
00365   } else {
00366       return 1;
00367   }
00368 }
00369 
00370 void CmiReleaseCommHandle(CmiCommHandle c)
00371 {
00372   return;
00373 }
00374 
00375 void release_pmsg_list();
00376 
00377 #define MAX_RELEASE_POLL 4096
00378 
00379 static int CmiReleaseSentMessages(void)
00380 {
00381     SMSG_LIST *msg_tmp=sent_msgs;
00382     SMSG_LIST *prev=0;
00383     SMSG_LIST *temp;
00384     int done;
00385     int locked = 0;
00386     
00387 #ifndef CMK_OPTIMIZE 
00388     double rel_start_time = CmiWallTimer();
00389 #endif
00390 
00391 #if CMK_PERSISTENT_COMM
00392   release_pmsg_list();
00393 #endif
00394 
00395   int ncheck = MAX_RELEASE_POLL;
00396 
00397   while(msg_tmp != NULL && ncheck > 0){
00398       if(msg_tmp->sent) {
00399           done =0;
00400 
00401           if(msg_tmp->status == BASIC_SEND) {
00402               if(elan_tportTxDone(msg_tmp->e)) {
00403                   elan_tportTxWait(msg_tmp->e);
00404                   done = 1;
00405               }          
00406               else 
00407 #if USE_SHM 
00408                   elan_deviceCheck(elan_base->state);
00409 #else 
00410               ;
00411 #endif
00412           }
00413           else {
00414               processGetEnv(msg_tmp);
00415               done = msg_tmp->done;
00416           }
00417           
00418           if(done) {
00419               MsgQueueLen--;
00420               MsgQueueBytes -= msg_tmp->size;
00421               
00422               outstandingMsgs[msg_tmp->destpe] = 0;
00423               
00424               /* Release the message */
00425               temp = msg_tmp->next;
00426               if(prev==0)  /* first message */
00427                   sent_msgs = temp;
00428               else
00429                   prev->next = temp;
00430               
00431               CmiFree(msg_tmp->msg);
00432               CmiFree(msg_tmp);
00433               msg_tmp = temp;
00434           } else {
00435               prev = msg_tmp;
00436               msg_tmp = msg_tmp->next;
00437               ncheck --;
00438           }
00439       }
00440       else {
00441           prev = msg_tmp;
00442           msg_tmp = msg_tmp->next;
00443       }
00444   }
00445   
00446   //if(msg_tmp)
00447   //  elan_deviceCheck(elan_base->state);
00448 
00449   end_sent = prev;
00450 
00451 #if CMK_PERSISTENT_COMM
00452   release_pmsg_list();
00453 #endif
00454 
00455 #ifndef CMK_OPTIMIZE 
00456 #if ! CMK_TRACE_IN_CHARM
00457   {
00458   double rel_end_time = CmiWallTimer();
00459   if(rel_end_time > rel_start_time + 50/1e6)
00460       traceUserBracketEvent(20, rel_start_time, rel_end_time);
00461   }
00462 #endif
00463 #endif
00464 
00465   //So messages not finished sending
00466   if(msg_tmp != cur_unsent)
00467       return 0;
00468   else
00469       //All messages sent
00470       return 1;
00471 }
00472 
00473 /* retflag = 0, receive as many messages as can be 
00474    and then post another receive
00475    retflag = 1, receive the first message and return 
00476    retflag =3 blocking receives
00477 
00478    Pump Msgs posts a circular queue of receives. The main idea 
00479    is that if a large number of receives are being posted only 
00480    minimum receive events should be polled. 
00481 
00482    So there are two indices, event_idx and post_idx. event_idx points
00483    to the first posted receive. So if a large number of messages come
00484    for a particular tag then this position in the queue will receive
00485    the first message. The new receives should be posted from
00486    post_idx. Notice that event_idx and post_idx are not the same. If a
00487    large number of messages are received then receives should be
00488    posted from the position of where first message was received.
00489 
00490 */
00491 int PumpMsgs(int retflag)
00492 {
00493 
00494     static char recv_small_done[RECV_MSG_Q_SIZE]; /*A list of flags which 
00495                                                     tells if a particular 
00496                                                     slot in the queue has 
00497                                                     a receive posted or not. */
00498     static char recv_mid_done[MID_MSG_Q_SIZE];  
00499     static int recv_large_done = 0;
00500     
00501     static char *sbuf[RECV_MSG_Q_SIZE];    /* Buffer of pointer to the 
00502                                               messages received */
00503     static char *mbuf[MID_MSG_Q_SIZE];
00504     static char *lbuf;
00505     
00506     static int event_idx = 0;             /* As defined earlier */
00507     static int post_idx = 0;
00508 
00509     static int event_m_idx = 0;
00510     static int post_m_idx = 0;
00511 
00512     static int nlarge_torecv = 0; /*this variable specifies how many
00513                                     large messages need to be received
00514                                     before we can block again.*/
00515 
00516     static int step1 = 0;   /* Large message are received in two
00517                                steps, first the envelope is probed by
00518                                posting a receive and then memory is
00519                                allocated for the message and the
00520                                message is finally received */
00521 
00522     int flg, res, rcount, mcount;
00523     char *msg = 0;
00524     
00525     int recd=0;
00526 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,0)
00527     unsigned long size= 0;
00528 #else
00529     int size= 0;
00530 #endif
00531     int tag=0;
00532     int src=-1;
00533     
00534 #ifndef CMK_OPTIMIZE 
00535     double pmp_start_time = CmiWallTimer();
00536 #endif
00537 
00538     int ecount = 0, emcount = 0;
00539 
00540 #if CMK_PERSISTENT_COMM
00541     if (PumpPersistent()) return 1;
00542 #endif
00543         
00544     while(1) {
00545         msg = 0;
00546         
00547         ecount = 0;
00548         for(rcount = 0; rcount < smallQSize; rcount ++){
00549             ecount = (rcount + post_idx) % smallQSize;
00550             if(!recv_small_done[ecount]) {
00551                 sbuf[ecount] = (char *) CmiAlloc(SMALL_MESSAGE_SIZE);
00552                 
00553                 esmall[ecount] = elan_tportRxStart(elan_port, 0, 0, 0, 1, 
00554                                                    TAG_SMALL, sbuf[ecount], 
00555                                                    SMALL_MESSAGE_SIZE);
00556                 recv_small_done[ecount] = 1;
00557             }
00558             else {
00559                 ecount = (ecount + smallQSize - 1) % smallQSize;
00560                 break;
00561             }
00562         }
00563         post_idx = ecount + 1;
00564 
00565         emcount = 0;
00566         for(mcount = 0; mcount < midQSize; mcount ++){
00567             emcount = (mcount + post_m_idx) % midQSize;
00568             if(!recv_mid_done[emcount]) {
00569                 mbuf[emcount] = (char *) CmiAlloc(MID_MESSAGE_SIZE);
00570                                                 
00571                 emid[emcount] = elan_tportRxStart(elan_port, 0, 0, 0, -1, 
00572                                                   TAG_MID, mbuf[emcount], 
00573                                                   MID_MESSAGE_SIZE);
00574                 recv_mid_done[emcount] = 1;
00575             }
00576             else {
00577                 emcount = (emcount + midQSize - 1) % midQSize;
00578                 break;
00579             }
00580         }
00581         post_m_idx = emcount + 1;        
00582         
00583         if(!recv_large_done) {
00584             elarge = elan_tportRxStart(elan_port, ELAN_TPORT_RXPROBE, 0, 0, 
00585                                        -1, TAG_LARGE, NULL, 0);
00586             recv_large_done = 1;
00587         }
00588     
00589         if(!step1 && elan_tportRxDone(elarge)) {
00590             elan_tportRxWait(elarge, NULL, NULL, &size );
00591       
00592             lbuf = (char *) CmiAlloc(size);
00593             elarge = elan_tportRxStart(elan_port, 0, 0, 0, -1, TAG_LARGE, 
00594                                        lbuf,size);
00595             step1 = 1;
00596         }
00597         
00598         if(step1 && elan_tportRxDone(elarge)) {
00599             elan_tportRxWait(elarge, NULL, NULL, &size);
00600             
00601             msg = lbuf;
00602             recv_large_done = 0;
00603             flg = 1;
00604             
00605             CmiPushPE(CMI_DEST_RANK(msg), msg);
00606 #if CMK_BROADCAST_SPANNING_TREE
00607             if (CMI_BROADCAST_ROOT(msg))
00608                 SendSpanningChildren(size, msg);
00609 #endif
00610             step1 = 0;
00611             
00612             if(blockingReceiveFlag)
00613                 nlarge_torecv --;
00614         }
00615 
00616         emcount = 0;
00617         for(mcount = 0; mcount < midQSize; mcount ++){
00618             emcount = (mcount + event_m_idx) % midQSize;
00619             if(elan_tportRxDone(emid[emcount])) {
00620                 elan_tportRxWait(emid[emcount], NULL, NULL, &size);
00621                 
00622                 msg = mbuf[emcount];
00623                 mbuf[emcount] = NULL;
00624                 
00625                 recv_mid_done[emcount] = 0;
00626                 flg = 1;
00627                 
00628                 CmiPushPE(CMI_DEST_RANK(msg), msg);
00629                 
00630 #if CMK_BROADCAST_SPANNING_TREE
00631                 if (CMI_BROADCAST_ROOT(msg))
00632                     SendSpanningChildren(size, msg);
00633 #endif
00634                 if(blockingReceiveFlag)
00635                     nlarge_torecv --;
00636             }
00637             else {
00638 #if USE_SHM 
00639                 elan_deviceCheck(elan_base->state);
00640 #else 
00641                 ;
00642 #endif
00643                 emcount = (emcount + midQSize - 1) % midQSize;
00644                 break;
00645             }
00646         }
00647         event_m_idx = emcount + 1;
00648         
00649         ecount = 0;
00650         for(rcount = 0; rcount < smallQSize; rcount ++){
00651             ecount = (rcount + event_idx) % smallQSize;
00652             if(elan_tportRxDone(esmall[ecount]) || 
00653                (retflag == 3 && nlarge_torecv == 0 && !flg)) {
00654                 elan_tportRxWait(esmall[ecount], &src, &tag, &size );
00655                 
00656                 msg = sbuf[ecount];
00657                 sbuf[ecount] = NULL;
00658                 
00659                 recv_small_done[ecount] = 0;
00660                     
00661                 if(tag == TAG_SMALL) {
00662                     flg = 1;
00663                     CmiPushPE(CMI_DEST_RANK(msg), msg);                
00664 #if CMK_BROADCAST_SPANNING_TREE
00665                     if (CMI_BROADCAST_ROOT(msg))
00666                         SendSpanningChildren(size, msg);
00667 #endif
00668                 }
00669                 else if(tag == TAG_LARGE_HEADER) {
00670                     //CmiPrintf("[%d] Received Header\n", CmiMyPe());
00671                     nlarge_torecv ++;
00672                     CmiFree(msg);
00673                 }
00674                 else if(tag == TAG_GET_BASED_SEND) {
00675                     handleGetHeader(msg, src);
00676                 }                    
00677 
00678                 if(retflag == 3)
00679                     retflag = 1;
00680             }
00681             else {
00682 #if USE_SHM 
00683                 elan_deviceCheck(elan_base->state);
00684 #else 
00685                 ;
00686 #endif
00687                 ecount = (ecount + smallQSize - 1) % smallQSize;
00688                 break;
00689             }
00690         }
00691         event_idx = ecount + 1;
00692         
00693 #if CMK_PERSISTENT_COMM
00694         PumpPersistent();
00695 #endif
00696         
00697         if(!flg) {
00698 #ifndef CMK_OPTIMIZE 
00699 #if ! CMK_TRACE_IN_CHARM
00700 
00701             double pmp_end_time = CmiWallTimer();
00702             if(pmp_end_time > pmp_start_time + 50/1e6)
00703                 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
00704 #endif
00705 #endif
00706 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00707             CmiHandleImmediate();
00708 #endif
00709             return recd;    
00710         }
00711         
00712         if (retflag) {
00713 #ifndef CMK_OPTIMIZE 
00714 #if ! CMK_TRACE_IN_CHARM
00715             double pmp_end_time = CmiWallTimer();
00716             if(pmp_end_time > pmp_start_time + 50/1e6)
00717                 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
00718 #endif
00719 #endif
00720 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00721             CmiHandleImmediate();
00722 #endif
00723             return flg;
00724         }
00725         
00726         recd = 1;
00727         flg = 0;
00728     }
00729 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00730     CmiHandleImmediate();
00731 #endif
00732     return recd;
00733 }
00734 
00735 void *remote_get(void * srcptr, void *destptr, int size, int srcPE){
00736     return (void *)elan_get(elan_base->state, srcptr, destptr, size, srcPE);
00737 }
00738 
00739 int remote_get_done(void *e){
00740     ELAN_EVENT *evt = (ELAN_EVENT *)e;
00741 
00742     int flag = elan_poll(evt, ELAN_POLL_EVENT);
00743     return flag;
00744 }
00745 
00746 /*
00747 void remote_get_wait_all(){
00748     elan_getWaitAll(elan_base->state, ELAN_WAIT_EVENT);
00749 }
00750 
00751 void remote_put_wait_all(){
00752     elan_putWaitAll(elan_base->state, ELAN_WAIT_EVENT);
00753 }
00754 */
00755 
00756 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
00757 void *CmiGetNonLocal(void)
00758 {
00759     register CmiState cs = CmiGetState();
00760     register void *msg = NULL;
00761     CmiIdleLock_checkMessage(&cs->idle);
00762     
00763     msg =  PCQueuePop(cs->recv); 
00764     
00765     if(msg) {
00766         return msg;
00767     }
00768 
00769     //get new messages and flush receive buffers
00770     PumpMsgs(1);           // PumpMsgs(0)
00771     //we are idle do more work
00772     CmiReleaseSentMessages();
00773     ElanSendQueuedMessages();
00774 
00775     msg =  PCQueuePop(cs->recv); 
00776     return msg;
00777 }
00778 
00779 void CmiPing() {
00780     CmiReleaseSentMessages();
00781     PumpMsgs(0);
00782     ElanSendQueuedMessages();
00783 }
00784 
00785 
00786 void enableBlockingReceives(){
00787     blockingReceiveFlag = 1;
00788 }
00789 
00790 void disableBlockingReceives(){
00791     blockingReceiveFlag = 0;
00792 }
00793 
00794 static int toggle = 0;  //Blocking receive posted only after all idle
00795                         //handlers are called
00796 
00797 void CmiNotifyIdle(void)
00798 {
00799     static int previousSleepTime = 0;
00800     CmiReleaseSentMessages();
00801     ElanSendQueuedMessages();
00802 
00803     PumpMsgs(1);    
00804     toggle = 0;
00805 }
00806 
00807 void CmiNotifyStillIdle(void)
00808 {
00809     static int previousSleepTime = 0;
00810     CmiReleaseSentMessages();
00811     ElanSendQueuedMessages();
00812     
00813     if(!PumpMsgs(1) && blockingReceiveFlag && toggle){
00814         if (!PCQueueEmpty(CmiGetState()->recv)) return; 
00815         if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
00816         if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
00817         if (sent_msgs) return;
00818         if (cur_unsent) return;
00819         PumpMsgs(3); 
00820     }
00821     toggle = 1;
00822 }
00823 
00824 
00825  
00826 #if CMK_IMMEDIATE_MSG
00827 void CmiProbeImmediateMsg()
00828 {
00829     PumpMsgs(0);
00830 }
00831 #endif
00832 /********************* MESSAGE SEND FUNCTIONS ******************/
00833 
00834 static void CmiSendSelf(char *msg)
00835 {
00836 #if CMK_IMMEDIATE_MSG
00837     if (CmiIsImmediate(msg)) {
00838       /* CmiBecomeNonImmediate(msg); */
00839       CmiHandleImmediateMessage(msg);
00840       return;
00841     }
00842 #endif
00843     CQdCreate(CpvAccess(cQdState), 1);
00844     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00845 }
00846 
00847 void CmiSyncSendFn(int destPE, int size, char *msg)
00848 {
00849     CmiState cs = CmiGetState();
00850     
00851     char *dupmsg;
00852     dupmsg = (char *) CmiAlloc(size);
00853     memcpy(dupmsg, msg, size);
00854     
00855     //  CmiPrintf("Setting root to %d\n", 0);
00856     CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00857     
00858     if (cs->pe==destPE)
00859         CmiSendSelf(dupmsg);
00860     else
00861         CmiAsyncSendFn(destPE, size, dupmsg);
00862 }
00863 
00864 void ElanBasicSendFn(SMSG_LIST * ptr){
00865     int tag = 0, sync_mode = 0;
00866     int tiny_msg = 0;
00867     
00868     ptr->status = BASIC_SEND;
00869     
00870     if (ptr->size <= SMALL_MESSAGE_SIZE)
00871         tag = TAG_SMALL;
00872     else if (ptr->size < MID_MESSAGE_SIZE)
00873         tag = TAG_MID;
00874     else {        
00875         if(!ptr->is_broadcast && enableGetBasedSend) {
00876             ElanGetBasedSend(ptr);
00877             return;
00878         }
00879         
00880         tag = TAG_LARGE;
00881     }
00882 
00883     //if(ptr->size > SYNC_MESSAGE_SIZE)
00884     //  sync_mode = ELAN_TPORT_TXSYNC;
00885     
00886     tiny_msg = 0; //A sizeof(int) byte message 
00887     //sent to wake up a blocked process
00888     
00889     //WAKE A PROCESS SLEEPING ON A BLOCKING RECEIVE UP,
00890     //WITH A SMALL MESSAGE THAT MATCHES THE TAG OF A SMALL MESSAGE
00891     //BUT IS ACTUALLY A MID OR LARGE MESSAGE
00892     if(ptr->size > SMALL_MESSAGE_SIZE && blockingReceiveFlag) {
00893         elan_tportTxWait(elan_tportTxStart(elan_port, 0, ptr->destpe, 
00894                                            CmiMyPe(), TAG_LARGE_HEADER, 
00895                                            &tiny_msg, sizeof(int)));
00896     }
00897 
00898     ptr->e = elan_tportTxStart(elan_port, sync_mode, ptr->destpe, CmiMyPe(),
00899                                tag, ptr->msg, ptr->size);
00900     ptr->sent = 1;
00901     
00902     MsgQueueLen++;
00903     MsgQueueBytes += ptr->size;
00904     
00905     outstandingMsgs[ptr->destpe] = 1;
00906 }
00907 
00908 CmiCommHandle ElanSendFn(int destPE, int size, char *msg, int flag)
00909 {
00910     CmiState cs = CmiGetState();
00911     SMSG_LIST *msg_tmp;
00912     CmiUInt2  rank, node;
00913     
00914     if(destPE == cs->pe) {
00915         char *dupmsg = (char *) CmiAlloc(size);
00916         memcpy(dupmsg, msg, size);
00917         CmiSendSelf(dupmsg);
00918         return 0;
00919     }
00920     
00921     CQdCreate(CpvAccess(cQdState), 1);
00922 #if CMK_PERSISTENT_COMM
00923     if (phs) {
00924         CmiAssert(phsSize == 1);
00925         CmiSendPersistentMsg(*phs, destPE, size, msg);
00926         return NULL;
00927     }
00928 #endif
00929     
00930     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
00931     msg_tmp->msg = msg;
00932     msg_tmp->next = 0;
00933     msg_tmp->size = size;
00934     msg_tmp->sent = 0;
00935     msg_tmp->e = NULL;
00936     msg_tmp->destpe = destPE;
00937     msg_tmp->is_broadcast = flag;
00938 
00939     if ((MsgQueueLen > request_max || MsgQueueBytes > request_bytes) 
00940         && (!flag)) {
00941         CmiReleaseSentMessages();
00942         PumpMsgs(1); //PumpMsgs(0) 
00943     }
00944 
00945     ElanSendQueuedMessages();
00946 
00947     if(MsgQueueLen > request_max || MsgQueueBytes > request_bytes 
00948        || outstandingMsgs[destPE]){
00949         
00950         if(sent_msgs==0)
00951             sent_msgs = msg_tmp;
00952         else
00953             end_sent->next = msg_tmp;
00954         end_sent = msg_tmp;
00955         
00956         if(cur_unsent == 0)
00957             cur_unsent = msg_tmp;
00958         
00959     }
00960     else{        
00961         ElanBasicSendFn(msg_tmp);
00962 
00963         if(sent_msgs==0)
00964             sent_msgs = msg_tmp;
00965         else
00966             end_sent->next = msg_tmp;
00967         end_sent = msg_tmp;
00968         
00969         return (CmiCommHandle) msg_tmp->e;
00970     }
00971     return NULL;
00972 }
00973 
00974 void ElanSendQueuedMessages() {
00975     SMSG_LIST * ptr = cur_unsent, *new_unsent = NULL;
00976     while (MsgQueueLen <= request_max && MsgQueueBytes <= request_bytes 
00977            && ptr != NULL) {
00978 
00979         if(!outstandingMsgs[ptr->destpe] && !ptr->sent)
00980             ElanBasicSendFn(ptr);
00981         else if ((!ptr->sent) && (new_unsent == NULL))
00982             new_unsent = ptr;
00983         
00984         ptr = ptr->next;
00985     }
00986     
00987     if(new_unsent)
00988         cur_unsent = new_unsent;
00989     else
00990         cur_unsent = ptr;
00991 }
00992 
00993 void ElanGetBasedSend(SMSG_LIST *msg_tmp){
00994     //CmiPrintf("using get based send\n");
00995     GetHeader *gmsg = (GetHeader *) CmiAlloc(sizeof(GetHeader));
00996 
00997     gmsg->src_addr = msg_tmp->msg;
00998     gmsg->size = msg_tmp->size;
00999     CMI_SET_BROADCAST_ROOT(gmsg, 0);
01000     
01001     msg_tmp->sent = 1;
01002     msg_tmp->e = NULL;
01003     msg_tmp->status = GET_BASED_SEND;
01004     msg_tmp->done = 0;
01005     
01006     msg_tmp->gmsg = (char *)gmsg;
01007     gmsg->flag_addr = (char *)&(msg_tmp->done);
01008     
01009     msg_tmp->e = elan_tportTxStart(elan_port, 0, msg_tmp->destpe, CmiMyPe(), 
01010                                    TAG_GET_BASED_SEND, gmsg, sizeof(GetHeader));
01011     
01012     MsgQueueLen++;
01013     MsgQueueBytes += msg_tmp->size;
01014     
01015     outstandingMsgs[msg_tmp->destpe] = 1;
01016 }
01017 
01018 void handleGetHeader(char *msg, int src){
01019     GetHeader *gmsg = (GetHeader *) msg;
01020 
01021     char *newmsg = CmiAlloc(gmsg->size);
01022     
01023     SMSG_LIST *msg_tmp = (SMSG_LIST *)CmiAlloc(sizeof(SMSG_LIST));
01024     msg_tmp->msg = msg;
01025     msg_tmp->next = 0;
01026     msg_tmp->size = gmsg->size;
01027     msg_tmp->sent = 1;
01028     msg_tmp->destpe = src;
01029     msg_tmp->status = RECEIVE_GET;
01030     msg_tmp->done = 0;
01031     msg_tmp->flag_addr = (long *)gmsg->flag_addr;
01032     msg_tmp->newmsg = newmsg;
01033     
01034     msg_tmp->e = elan_get(elan_base->state, gmsg->src_addr, msg_tmp->newmsg, gmsg->size, src);
01035     
01036     if(sent_msgs==0) {
01037         sent_msgs = msg_tmp;
01038         end_sent = msg_tmp;
01039     }
01040     else {
01041         msg_tmp->next = sent_msgs;
01042         sent_msgs = msg_tmp;
01043     }
01044 }
01045 
01046 long trueFlag = 1;
01047 void processGetEnv(SMSG_LIST *ptr){
01048 
01049     if(ptr->status == BASIC_SEND)
01050         return;
01051 
01052     if(ptr->status == GET_BASED_SEND) {
01053         if(ptr->gmsg != NULL) {
01054