00001
00006
00007
00008
00009
00010 #include <stdio.h>
00011 #include <sys/time.h>
00012 #include <assert.h>
00013 #include <errno.h>
00014 #include "converse.h"
00015 #include <elan/elan.h>
00016
00017
00018
00019 #include <unistd.h>
00020 #include <stdlib.h>
00021
00022 #include "machine.h"
00023 #include "pcqueue.h"
00024
00025 #if CMK_PERSISTENT_COMM
00026 #include "persist_impl.h"
00027 #endif
00028
00029
00030 #ifndef QSNETLIBS_VERSION_CODE
00031 #define QSNETLIBS_VERSION(a,b,c) (((a) << 16) + ((b) << 8) + (c))
00032 #define QSNETLIBS_VERSION_CODE QSNETLIBS_VERSION(1,3,0)
00033 #endif
00034
00035 #define MAX_QLEN 1000
00036 #define MAX_BYTES 1000000
00037
00038 #define USE_SHM 0
00039
00040
00041
00042
00043
00044
00045
00046
00047 #define CMK_BROADCAST_SPANNING_TREE 1
00048 #define BROADCAST_SPANNING_FACTOR 4
00049
00050 #define CMI_BROADCAST_ROOT(msg) ((CmiMsgHeaderBasic *)msg)->root
00051 #define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic *)msg)->rank
00052 #define CMI_MESSAGE_SIZE(msg) ((CmiMsgHeaderBasic *)msg)->size
00053
00054 #if CMK_BROADCAST_SPANNING_TREE
00055 # define CMI_SET_BROADCAST_ROOT(msg, root) CMI_BROADCAST_ROOT(msg) = (root);
00056 #else
00057 # define CMI_SET_BROADCAST_ROOT(msg, root)
00058 #endif
00059
00060 ELAN_BASE *elan_base;
00061 ELAN_TPORT *elan_port;
00062 ELAN_QUEUE *elan_q;
00063
00064 int enableGetBasedSend = 1;
00065 int enableBufferPooling = 0;
00066
00067 int SMALL_MESSAGE_SIZE=4080;
00068
00069
00070 int MID_MESSAGE_SIZE=65536;
00071
00072
00073
00074 #define SYNC_MESSAGE_SIZE MID_MESSAGE_SIZE * 10
00075
00076
00077
00078 #define NON_BLOCKING_MSG 4
00079
00080 #define RECV_MSG_Q_SIZE 8 //Maximim queue size for short messages
00081 #define MID_MSG_Q_SIZE 4 //Maximum queue size for mid-range messages
00082
00083
00084 int smallQSize = RECV_MSG_Q_SIZE;
00085 int midQSize = MID_MSG_Q_SIZE;
00086
00087 ELAN_EVENT *esmall[RECV_MSG_Q_SIZE], *emid[MID_MSG_Q_SIZE], *elarge;
00088
00089 #define TAG_SMALL 0x1
00090 #define TAG_LARGE_HEADER 0x3
00091 #define TAG_GET_BASED_SEND 0x5
00092
00093 #define TAG_MID 0x10
00094 #define TAG_LARGE 0x100
00095
00096
00097 #define BASIC_SEND 0
00098 #define GET_BASED_SEND 1
00099 #define RECEIVE_GET 2
00100 #define GET_FINISHED_RECEIVE 3
00101
00102 int _Cmi_mynode;
00103 int _Cmi_mynodesize;
00104 int _Cmi_numnodes;
00105 int _Cmi_numpes;
00106
00107 static int Cmi_nodestart;
00108 CpvDeclare(void*, CmiLocalQueue);
00109
00110 #define BLK_LEN 512
00111
00112 static int MsgQueueLen=0;
00113 static int MsgQueueBytes=0;
00114 static int request_max;
00115 static int request_bytes;
00116
00117 #include "pcqueue.h"
00118 PCQueue localSmallBufferQueue;
00119 PCQueue localMidBufferQueue;
00120
00121 int outstandingMsgs[3000];
00122
00123 int stretchFlag = 0;
00124 int blockingReceiveFlag = 0;
00125
00126 static void ConverseRunPE(int everReturn);
00127
00128 typedef struct {
00129 char header[CmiMsgHeaderSizeBytes];
00130 int size;
00131 char* src_addr;
00132 char* flag_addr;
00133 } GetHeader;
00134
00135 typedef struct msg_list {
00136 ELAN_EVENT *e;
00137 char *msg;
00138 struct msg_list *next;
00139 int size, destpe;
00140 int sent;
00141
00142
00143 int status;
00144 long done;
00145 long *flag_addr;
00146 char *gmsg;
00147 char *newmsg;
00148 int is_broadcast;
00149 } SMSG_LIST;
00150
00151
00152 static int Cmi_dim;
00153
00154 static SMSG_LIST *sent_msgs=0;
00155 static SMSG_LIST *end_sent=0;
00156 static SMSG_LIST *cur_unsent=0;
00157
00158 void ElanSendQueuedMessages();
00159 static int CmiReleaseSentMessages();
00160
00161 void ElanGetBasedSend(SMSG_LIST *ptr);
00162 void handleGetHeader(char *msg, int src);
00163 void processGetEnv(SMSG_LIST *ptr);
00164
00165 #if NODE_0_IS_CONVHOST
00166 int inside_comm = 0;
00167 #endif
00168
00169 double starttimer;
00170
00171 void CmiAbort(const char *message);
00172 static void PerrorExit(const char *msg);
00173
00174 void SendSpanningChildren(int size, char *msg);
00175
00176 typedef struct __elanChunkHeader {
00177 int type;
00178 int size;
00179 } ElanChunkHeader;
00180
00181 typedef struct __chunkHeader {
00182 ElanChunkHeader elan;
00183 CmiChunkHeader conv;
00184 } ChunkHeader;
00185
00186 #define TYPE_FIELD(buf) (((ChunkHeader*)(buf))->elan.type)
00187 #define SIZE_FIELD(buf) (((ChunkHeader*)(buf))->elan.size)
00188 #define CONV_SIZE_FIELD(buf) (((ChunkHeader*)(buf))->conv.size)
00189 #define REF_FIELD(buf) (((ChunkHeader*)(buf))->conv.ref)
00190
00191
00192 #define CONV_BUF_START(res) ((char*)(res) + sizeof(ElanChunkHeader))
00193
00194
00195 #define MACHINE_BUF_START(res) ((char*)(res) - sizeof(ElanChunkHeader))
00196
00197
00198 #define USER_BUF_START(res) ((char*)(res) - sizeof(ChunkHeader))
00199
00200 #define DYNAMIC_MESSAGE 0
00201 #define STATIC_MESSAGE 1
00202 #define ELAN_MESSAGE 3
00203
00204 static void PerrorExit(const char *msg)
00205 {
00206 perror(msg);
00207 exit(1);
00208 }
00209
00210
00211
00212 #if CMK_TIMER_USE_SPECIAL
00213
00214 #include <sys/timers.h>
00215 void CmiTimerInit(void)
00216 {
00217 starttimer = elan_clock(elan_base->state);
00218 }
00219
00220 double CmiTimer(void)
00221 {
00222 return (elan_clock(elan_base->state) - starttimer)/1e9;
00223 }
00224
00225 double CmiWallTimer(void)
00226 {
00227 return (elan_clock(elan_base->state) - starttimer)/1e9;
00228 }
00229
00230 double CmiCpuTimer(void)
00231 {
00232 return (elan_clock(elan_base->state) - starttimer)/1e9;
00233 }
00234
00235 #endif
00236
00237 static PCQueue msgBuf;
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249 #include "machine-smp.c"
00250
00251 CsvDeclare(CmiNodeState, NodeState);
00252
00253 static struct CmiStateStruct Cmi_state;
00254 int _Cmi_mype;
00255 int _Cmi_myrank;
00256
00257 #include "immediate.c"
00258
00259 void CmiMemLock(void) {}
00260 void CmiMemUnlock(void) {}
00261
00262 #define CmiGetState() (&Cmi_state)
00263 #define CmiGetStateN(n) (&Cmi_state)
00264
00265 static void CmiStartThreads(char **argv)
00266 {
00267 CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00268 _Cmi_mype = Cmi_nodestart;
00269 _Cmi_myrank = 0;
00270 }
00271
00272
00273 static void CmiPushPE(int pe,void *msg)
00274 {
00275 CmiState cs=CmiGetStateN(pe);
00276 MACHSTATE1(2,"Pushing message into %d's queue",pe);
00277 #if CMK_IMMEDIATE_MSG
00278 if (CmiIsImmediate(msg)) {
00279
00280
00281 CmiPushImmediateMsg(msg);
00282
00283 return;
00284 }
00285 #endif
00286 CmiIdleLock_addMessage(&cs->idle);
00287 PCQueuePush(cs->recv,msg);
00288 }
00289
00290 #ifndef CmiMyPe
00291 int CmiMyPe(void)
00292 {
00293 return CmiGetState()->pe;
00294 }
00295 #endif
00296
00297 #ifndef CmiMyRank
00298 int CmiMyRank(void)
00299 {
00300 return CmiGetState()->rank;
00301 }
00302 #endif
00303
00304 #ifndef CmiNodeFirst
00305 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
00306 int CmiNodeSize(int node) { return _Cmi_mynodesize; }
00307 #endif
00308
00309 #ifndef CmiNodeOf
00310 int CmiNodeOf(int pe) { return (pe/_Cmi_mynodesize); }
00311 int CmiRankOf(int pe) { return pe%_Cmi_mynodesize; }
00312 #endif
00313
00314 int CmiAllAsyncMsgsSent(void)
00315 {
00316 SMSG_LIST *msg_tmp = NULL;
00317
00318 int done;
00319
00320 CmiReleaseSentMessages();
00321
00322 msg_tmp = sent_msgs;
00323 while((msg_tmp != cur_unsent) && (msg_tmp->e != NULL)){
00324 done = 0;
00325
00326 if(elan_tportTxDone(msg_tmp->e))
00327 done = 1;
00328 else
00329 #if USE_SHM
00330 elan_deviceCheck(elan_base->state);
00331 #else
00332 ;
00333 #endif
00334
00335 if(!done)
00336 return 0;
00337 msg_tmp = msg_tmp->next;
00338
00339 }
00340 return 1;
00341 }
00342
00343 int CmiAsyncMsgSent(CmiCommHandle c) {
00344
00345 SMSG_LIST *msg_tmp = sent_msgs;
00346 int done;
00347
00348 while ((msg_tmp) && (msg_tmp ->e != NULL) &&
00349 ((CmiCommHandle)(msg_tmp->e) != c))
00350 msg_tmp = msg_tmp->next;
00351
00352 if(msg_tmp) {
00353 done = 0;
00354
00355 if(elan_tportTxDone(msg_tmp->e))
00356 done = 1;
00357 else
00358 #if USE_SHM
00359 elan_deviceCheck(elan_base->state);
00360 #else
00361 ;
00362 #endif
00363
00364 return ((done)?1:0);
00365 } else {
00366 return 1;
00367 }
00368 }
00369
00370 void CmiReleaseCommHandle(CmiCommHandle c)
00371 {
00372 return;
00373 }
00374
00375 void release_pmsg_list();
00376
00377 #define MAX_RELEASE_POLL 4096
00378
00379 static int CmiReleaseSentMessages(void)
00380 {
00381 SMSG_LIST *msg_tmp=sent_msgs;
00382 SMSG_LIST *prev=0;
00383 SMSG_LIST *temp;
00384 int done;
00385 int locked = 0;
00386
00387 #ifndef CMK_OPTIMIZE
00388 double rel_start_time = CmiWallTimer();
00389 #endif
00390
00391 #if CMK_PERSISTENT_COMM
00392 release_pmsg_list();
00393 #endif
00394
00395 int ncheck = MAX_RELEASE_POLL;
00396
00397 while(msg_tmp != NULL && ncheck > 0){
00398 if(msg_tmp->sent) {
00399 done =0;
00400
00401 if(msg_tmp->status == BASIC_SEND) {
00402 if(elan_tportTxDone(msg_tmp->e)) {
00403 elan_tportTxWait(msg_tmp->e);
00404 done = 1;
00405 }
00406 else
00407 #if USE_SHM
00408 elan_deviceCheck(elan_base->state);
00409 #else
00410 ;
00411 #endif
00412 }
00413 else {
00414 processGetEnv(msg_tmp);
00415 done = msg_tmp->done;
00416 }
00417
00418 if(done) {
00419 MsgQueueLen--;
00420 MsgQueueBytes -= msg_tmp->size;
00421
00422 outstandingMsgs[msg_tmp->destpe] = 0;
00423
00424
00425 temp = msg_tmp->next;
00426 if(prev==0)
00427 sent_msgs = temp;
00428 else
00429 prev->next = temp;
00430
00431 CmiFree(msg_tmp->msg);
00432 CmiFree(msg_tmp);
00433 msg_tmp = temp;
00434 } else {
00435 prev = msg_tmp;
00436 msg_tmp = msg_tmp->next;
00437 ncheck --;
00438 }
00439 }
00440 else {
00441 prev = msg_tmp;
00442 msg_tmp = msg_tmp->next;
00443 }
00444 }
00445
00446
00447
00448
00449 end_sent = prev;
00450
00451 #if CMK_PERSISTENT_COMM
00452 release_pmsg_list();
00453 #endif
00454
00455 #ifndef CMK_OPTIMIZE
00456 #if ! CMK_TRACE_IN_CHARM
00457 {
00458 double rel_end_time = CmiWallTimer();
00459 if(rel_end_time > rel_start_time + 50/1e6)
00460 traceUserBracketEvent(20, rel_start_time, rel_end_time);
00461 }
00462 #endif
00463 #endif
00464
00465
00466 if(msg_tmp != cur_unsent)
00467 return 0;
00468 else
00469
00470 return 1;
00471 }
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491 int PumpMsgs(int retflag)
00492 {
00493
00494 static char recv_small_done[RECV_MSG_Q_SIZE];
00495
00496
00497
00498 static char recv_mid_done[MID_MSG_Q_SIZE];
00499 static int recv_large_done = 0;
00500
00501 static char *sbuf[RECV_MSG_Q_SIZE];
00502
00503 static char *mbuf[MID_MSG_Q_SIZE];
00504 static char *lbuf;
00505
00506 static int event_idx = 0;
00507 static int post_idx = 0;
00508
00509 static int event_m_idx = 0;
00510 static int post_m_idx = 0;
00511
00512 static int nlarge_torecv = 0;
00513
00514
00515
00516 static int step1 = 0;
00517
00518
00519
00520
00521
00522 int flg, res, rcount, mcount;
00523 char *msg = 0;
00524
00525 int recd=0;
00526 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,0)
00527 unsigned long size= 0;
00528 #else
00529 int size= 0;
00530 #endif
00531 int tag=0;
00532 int src=-1;
00533
00534 #ifndef CMK_OPTIMIZE
00535 double pmp_start_time = CmiWallTimer();
00536 #endif
00537
00538 int ecount = 0, emcount = 0;
00539
00540 #if CMK_PERSISTENT_COMM
00541 if (PumpPersistent()) return 1;
00542 #endif
00543
00544 while(1) {
00545 msg = 0;
00546
00547 ecount = 0;
00548 for(rcount = 0; rcount < smallQSize; rcount ++){
00549 ecount = (rcount + post_idx) % smallQSize;
00550 if(!recv_small_done[ecount]) {
00551 sbuf[ecount] = (char *) CmiAlloc(SMALL_MESSAGE_SIZE);
00552
00553 esmall[ecount] = elan_tportRxStart(elan_port, 0, 0, 0, 1,
00554 TAG_SMALL, sbuf[ecount],
00555 SMALL_MESSAGE_SIZE);
00556 recv_small_done[ecount] = 1;
00557 }
00558 else {
00559 ecount = (ecount + smallQSize - 1) % smallQSize;
00560 break;
00561 }
00562 }
00563 post_idx = ecount + 1;
00564
00565 emcount = 0;
00566 for(mcount = 0; mcount < midQSize; mcount ++){
00567 emcount = (mcount + post_m_idx) % midQSize;
00568 if(!recv_mid_done[emcount]) {
00569 mbuf[emcount] = (char *) CmiAlloc(MID_MESSAGE_SIZE);
00570
00571 emid[emcount] = elan_tportRxStart(elan_port, 0, 0, 0, -1,
00572 TAG_MID, mbuf[emcount],
00573 MID_MESSAGE_SIZE);
00574 recv_mid_done[emcount] = 1;
00575 }
00576 else {
00577 emcount = (emcount + midQSize - 1) % midQSize;
00578 break;
00579 }
00580 }
00581 post_m_idx = emcount + 1;
00582
00583 if(!recv_large_done) {
00584 elarge = elan_tportRxStart(elan_port, ELAN_TPORT_RXPROBE, 0, 0,
00585 -1, TAG_LARGE, NULL, 0);
00586 recv_large_done = 1;
00587 }
00588
00589 if(!step1 && elan_tportRxDone(elarge)) {
00590 elan_tportRxWait(elarge, NULL, NULL, &size );
00591
00592 lbuf = (char *) CmiAlloc(size);
00593 elarge = elan_tportRxStart(elan_port, 0, 0, 0, -1, TAG_LARGE,
00594 lbuf,size);
00595 step1 = 1;
00596 }
00597
00598 if(step1 && elan_tportRxDone(elarge)) {
00599 elan_tportRxWait(elarge, NULL, NULL, &size);
00600
00601 msg = lbuf;
00602 recv_large_done = 0;
00603 flg = 1;
00604
00605 CmiPushPE(CMI_DEST_RANK(msg), msg);
00606 #if CMK_BROADCAST_SPANNING_TREE
00607 if (CMI_BROADCAST_ROOT(msg))
00608 SendSpanningChildren(size, msg);
00609 #endif
00610 step1 = 0;
00611
00612 if(blockingReceiveFlag)
00613 nlarge_torecv --;
00614 }
00615
00616 emcount = 0;
00617 for(mcount = 0; mcount < midQSize; mcount ++){
00618 emcount = (mcount + event_m_idx) % midQSize;
00619 if(elan_tportRxDone(emid[emcount])) {
00620 elan_tportRxWait(emid[emcount], NULL, NULL, &size);
00621
00622 msg = mbuf[emcount];
00623 mbuf[emcount] = NULL;
00624
00625 recv_mid_done[emcount] = 0;
00626 flg = 1;
00627
00628 CmiPushPE(CMI_DEST_RANK(msg), msg);
00629
00630 #if CMK_BROADCAST_SPANNING_TREE
00631 if (CMI_BROADCAST_ROOT(msg))
00632 SendSpanningChildren(size, msg);
00633 #endif
00634 if(blockingReceiveFlag)
00635 nlarge_torecv --;
00636 }
00637 else {
00638 #if USE_SHM
00639 elan_deviceCheck(elan_base->state);
00640 #else
00641 ;
00642 #endif
00643 emcount = (emcount + midQSize - 1) % midQSize;
00644 break;
00645 }
00646 }
00647 event_m_idx = emcount + 1;
00648
00649 ecount = 0;
00650 for(rcount = 0; rcount < smallQSize; rcount ++){
00651 ecount = (rcount + event_idx) % smallQSize;
00652 if(elan_tportRxDone(esmall[ecount]) ||
00653 (retflag == 3 && nlarge_torecv == 0 && !flg)) {
00654 elan_tportRxWait(esmall[ecount], &src, &tag, &size );
00655
00656 msg = sbuf[ecount];
00657 sbuf[ecount] = NULL;
00658
00659 recv_small_done[ecount] = 0;
00660
00661 if(tag == TAG_SMALL) {
00662 flg = 1;
00663 CmiPushPE(CMI_DEST_RANK(msg), msg);
00664 #if CMK_BROADCAST_SPANNING_TREE
00665 if (CMI_BROADCAST_ROOT(msg))
00666 SendSpanningChildren(size, msg);
00667 #endif
00668 }
00669 else if(tag == TAG_LARGE_HEADER) {
00670
00671 nlarge_torecv ++;
00672 CmiFree(msg);
00673 }
00674 else if(tag == TAG_GET_BASED_SEND) {
00675 handleGetHeader(msg, src);
00676 }
00677
00678 if(retflag == 3)
00679 retflag = 1;
00680 }
00681 else {
00682 #if USE_SHM
00683 elan_deviceCheck(elan_base->state);
00684 #else
00685 ;
00686 #endif
00687 ecount = (ecount + smallQSize - 1) % smallQSize;
00688 break;
00689 }
00690 }
00691 event_idx = ecount + 1;
00692
00693 #if CMK_PERSISTENT_COMM
00694 PumpPersistent();
00695 #endif
00696
00697 if(!flg) {
00698 #ifndef CMK_OPTIMIZE
00699 #if ! CMK_TRACE_IN_CHARM
00700
00701 double pmp_end_time = CmiWallTimer();
00702 if(pmp_end_time > pmp_start_time + 50/1e6)
00703 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
00704 #endif
00705 #endif
00706 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00707 CmiHandleImmediate();
00708 #endif
00709 return recd;
00710 }
00711
00712 if (retflag) {
00713 #ifndef CMK_OPTIMIZE
00714 #if ! CMK_TRACE_IN_CHARM
00715 double pmp_end_time = CmiWallTimer();
00716 if(pmp_end_time > pmp_start_time + 50/1e6)
00717 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
00718 #endif
00719 #endif
00720 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00721 CmiHandleImmediate();
00722 #endif
00723 return flg;
00724 }
00725
00726 recd = 1;
00727 flg = 0;
00728 }
00729 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00730 CmiHandleImmediate();
00731 #endif
00732 return recd;
00733 }
00734
00735 void *remote_get(void * srcptr, void *destptr, int size, int srcPE){
00736 return (void *)elan_get(elan_base->state, srcptr, destptr, size, srcPE);
00737 }
00738
00739 int remote_get_done(void *e){
00740 ELAN_EVENT *evt = (ELAN_EVENT *)e;
00741
00742 int flag = elan_poll(evt, ELAN_POLL_EVENT);
00743 return flag;
00744 }
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757 void *CmiGetNonLocal(void)
00758 {
00759 register CmiState cs = CmiGetState();
00760 register void *msg = NULL;
00761 CmiIdleLock_checkMessage(&cs->idle);
00762
00763 msg = PCQueuePop(cs->recv);
00764
00765 if(msg) {
00766 return msg;
00767 }
00768
00769
00770 PumpMsgs(1);
00771
00772 CmiReleaseSentMessages();
00773 ElanSendQueuedMessages();
00774
00775 msg = PCQueuePop(cs->recv);
00776 return msg;
00777 }
00778
00779 void CmiPing() {
00780 CmiReleaseSentMessages();
00781 PumpMsgs(0);
00782 ElanSendQueuedMessages();
00783 }
00784
00785
00786 void enableBlockingReceives(){
00787 blockingReceiveFlag = 1;
00788 }
00789
00790 void disableBlockingReceives(){
00791 blockingReceiveFlag = 0;
00792 }
00793
00794 static int toggle = 0;
00795
00796
00797 void CmiNotifyIdle(void)
00798 {
00799 static int previousSleepTime = 0;
00800 CmiReleaseSentMessages();
00801 ElanSendQueuedMessages();
00802
00803 PumpMsgs(1);
00804 toggle = 0;
00805 }
00806
00807 void CmiNotifyStillIdle(void)
00808 {
00809 static int previousSleepTime = 0;
00810 CmiReleaseSentMessages();
00811 ElanSendQueuedMessages();
00812
00813 if(!PumpMsgs(1) && blockingReceiveFlag && toggle){
00814 if (!PCQueueEmpty(CmiGetState()->recv)) return;
00815 if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
00816 if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
00817 if (sent_msgs) return;
00818 if (cur_unsent) return;
00819 PumpMsgs(3);
00820 }
00821 toggle = 1;
00822 }
00823
00824
00825
00826 #if CMK_IMMEDIATE_MSG
00827 void CmiProbeImmediateMsg()
00828 {
00829 PumpMsgs(0);
00830 }
00831 #endif
00832
00833
00834 static void CmiSendSelf(char *msg)
00835 {
00836 #if CMK_IMMEDIATE_MSG
00837 if (CmiIsImmediate(msg)) {
00838
00839 CmiHandleImmediateMessage(msg);
00840 return;
00841 }
00842 #endif
00843 CQdCreate(CpvAccess(cQdState), 1);
00844 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00845 }
00846
00847 void CmiSyncSendFn(int destPE, int size, char *msg)
00848 {
00849 CmiState cs = CmiGetState();
00850
00851 char *dupmsg;
00852 dupmsg = (char *) CmiAlloc(size);
00853 memcpy(dupmsg, msg, size);
00854
00855
00856 CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00857
00858 if (cs->pe==destPE)
00859 CmiSendSelf(dupmsg);
00860 else
00861 CmiAsyncSendFn(destPE, size, dupmsg);
00862 }
00863
00864 void ElanBasicSendFn(SMSG_LIST * ptr){
00865 int tag = 0, sync_mode = 0;
00866 int tiny_msg = 0;
00867
00868 ptr->status = BASIC_SEND;
00869
00870 if (ptr->size <= SMALL_MESSAGE_SIZE)
00871 tag = TAG_SMALL;
00872 else if (ptr->size < MID_MESSAGE_SIZE)
00873 tag = TAG_MID;
00874 else {
00875 if(!ptr->is_broadcast && enableGetBasedSend) {
00876 ElanGetBasedSend(ptr);
00877 return;
00878 }
00879
00880 tag = TAG_LARGE;
00881 }
00882
00883
00884
00885
00886 tiny_msg = 0;
00887
00888
00889
00890
00891
00892 if(ptr->size > SMALL_MESSAGE_SIZE && blockingReceiveFlag) {
00893 elan_tportTxWait(elan_tportTxStart(elan_port, 0, ptr->destpe,
00894 CmiMyPe(), TAG_LARGE_HEADER,
00895 &tiny_msg, sizeof(int)));
00896 }
00897
00898 ptr->e = elan_tportTxStart(elan_port, sync_mode, ptr->destpe, CmiMyPe(),
00899 tag, ptr->msg, ptr->size);
00900 ptr->sent = 1;
00901
00902 MsgQueueLen++;
00903 MsgQueueBytes += ptr->size;
00904
00905 outstandingMsgs[ptr->destpe] = 1;
00906 }
00907
00908 CmiCommHandle ElanSendFn(int destPE, int size, char *msg, int flag)
00909 {
00910 CmiState cs = CmiGetState();
00911 SMSG_LIST *msg_tmp;
00912 CmiUInt2 rank, node;
00913
00914 if(destPE == cs->pe) {
00915 char *dupmsg = (char *) CmiAlloc(size);
00916 memcpy(dupmsg, msg, size);
00917 CmiSendSelf(dupmsg);
00918 return 0;
00919 }
00920
00921 CQdCreate(CpvAccess(cQdState), 1);
00922 #if CMK_PERSISTENT_COMM
00923 if (phs) {
00924 CmiAssert(phsSize == 1);
00925 CmiSendPersistentMsg(*phs, destPE, size, msg);
00926 return NULL;
00927 }
00928 #endif
00929
00930 msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
00931 msg_tmp->msg = msg;
00932 msg_tmp->next = 0;
00933 msg_tmp->size = size;
00934 msg_tmp->sent = 0;
00935 msg_tmp->e = NULL;
00936 msg_tmp->destpe = destPE;
00937 msg_tmp->is_broadcast = flag;
00938
00939 if ((MsgQueueLen > request_max || MsgQueueBytes > request_bytes)
00940 && (!flag)) {
00941 CmiReleaseSentMessages();
00942 PumpMsgs(1);
00943 }
00944
00945 ElanSendQueuedMessages();
00946
00947 if(MsgQueueLen > request_max || MsgQueueBytes > request_bytes
00948 || outstandingMsgs[destPE]){
00949
00950 if(sent_msgs==0)
00951 sent_msgs = msg_tmp;
00952 else
00953 end_sent->next = msg_tmp;
00954 end_sent = msg_tmp;
00955
00956 if(cur_unsent == 0)
00957 cur_unsent = msg_tmp;
00958
00959 }
00960 else{
00961 ElanBasicSendFn(msg_tmp);
00962
00963 if(sent_msgs==0)
00964 sent_msgs = msg_tmp;
00965 else
00966 end_sent->next = msg_tmp;
00967 end_sent = msg_tmp;
00968
00969 return (CmiCommHandle) msg_tmp->e;
00970 }
00971 return NULL;
00972 }
00973
00974 void ElanSendQueuedMessages() {
00975 SMSG_LIST * ptr = cur_unsent, *new_unsent = NULL;
00976 while (MsgQueueLen <= request_max && MsgQueueBytes <= request_bytes
00977 && ptr != NULL) {
00978
00979 if(!outstandingMsgs[ptr->destpe] && !ptr->sent)
00980 ElanBasicSendFn(ptr);
00981 else if ((!ptr->sent) && (new_unsent == NULL))
00982 new_unsent = ptr;
00983
00984 ptr = ptr->next;
00985 }
00986
00987 if(new_unsent)
00988 cur_unsent = new_unsent;
00989 else
00990 cur_unsent = ptr;
00991 }
00992
00993 void ElanGetBasedSend(SMSG_LIST *msg_tmp){
00994
00995 GetHeader *gmsg = (GetHeader *) CmiAlloc(sizeof(GetHeader));
00996
00997 gmsg->src_addr = msg_tmp->msg;
00998 gmsg->size = msg_tmp->size;
00999 CMI_SET_BROADCAST_ROOT(gmsg, 0);
01000
01001 msg_tmp->sent = 1;
01002 msg_tmp->e = NULL;
01003 msg_tmp->status = GET_BASED_SEND;
01004 msg_tmp->done = 0;
01005
01006 msg_tmp->gmsg = (char *)gmsg;
01007 gmsg->flag_addr = (char *)&(msg_tmp->done);
01008
01009 msg_tmp->e = elan_tportTxStart(elan_port, 0, msg_tmp->destpe, CmiMyPe(),
01010 TAG_GET_BASED_SEND, gmsg, sizeof(GetHeader));
01011
01012 MsgQueueLen++;
01013 MsgQueueBytes += msg_tmp->size;
01014
01015 outstandingMsgs[msg_tmp->destpe] = 1;
01016 }
01017
01018 void handleGetHeader(char *msg, int src){
01019 GetHeader *gmsg = (GetHeader *) msg;
01020
01021 char *newmsg = CmiAlloc(gmsg->size);
01022
01023 SMSG_LIST *msg_tmp = (SMSG_LIST *)CmiAlloc(sizeof(SMSG_LIST));
01024 msg_tmp->msg = msg;
01025 msg_tmp->next = 0;
01026 msg_tmp->size = gmsg->size;
01027 msg_tmp->sent = 1;
01028 msg_tmp->destpe = src;
01029 msg_tmp->status = RECEIVE_GET;
01030 msg_tmp->done = 0;
01031 msg_tmp->flag_addr = (long *)gmsg->flag_addr;
01032 msg_tmp->newmsg = newmsg;
01033
01034 msg_tmp->e = elan_get(elan_base->state, gmsg->src_addr, msg_tmp->newmsg, gmsg->size, src);
01035
01036 if(sent_msgs==0) {
01037 sent_msgs = msg_tmp;
01038 end_sent = msg_tmp;
01039 }
01040 else {
01041 msg_tmp->next = sent_msgs;
01042 sent_msgs = msg_tmp;
01043 }
01044 }
01045
01046 long trueFlag = 1;
01047 void processGetEnv(SMSG_LIST *ptr){
01048
01049 if(ptr->status == BASIC_SEND)
01050 return;
01051
01052 if(ptr->status == GET_BASED_SEND) {
01053 if(ptr->gmsg != NULL) {
01054