00001
00002
00003
00004
00005
00006
00007
00013
00014 #include <stdio.h>
00015 #include <errno.h>
00016 #include "converse.h"
00017 #include <mpi.h>
00018 #if CMK_TIMER_USE_XT3_DCLOCK
00019 #include <catamount/dclock.h>
00020 #endif
00021
00022 #ifdef AMPI
00023 # warning "We got the AMPI version of mpi.h, instead of the system version--"
00024 # warning " Try doing an 'rm charm/include/mpi.h' and building again."
00025 # error "Can't build Charm++ using AMPI version of mpi.h header"
00026 #endif
00027
00028
00029 #include <unistd.h>
00030 #include <stdlib.h>
00031
00032 #define MULTI_SENDQUEUE 0
00033
00034 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
00035 #define CMK_SMP 1
00036 #endif
00037
00038 #include "machine.h"
00039
00040 #include "pcqueue.h"
00041
00042 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
00043
00044 #if CMK_VERSION_BLUEGENE
00045 #define MAX_QLEN 8
00046 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
00047 #else
00048 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
00049 #define MAX_QLEN 200
00050 #endif
00051
00052 #if CMI_MPI_TRACE_USEREVENTS && !defined(CMK_OPTIMIZE) && ! CMK_TRACE_IN_CHARM
00053 CpvStaticDeclare(double, projTraceStart);
00054 # define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
00055 # define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
00056 #else
00057 # define START_EVENT()
00058 # define END_EVENT(x)
00059 #endif
00060
00061
00062
00063
00064
00065
00066
00067
00068 #if CMK_SMP
00069 #define CMK_BROADCAST_SPANNING_TREE 0
00070 #else
00071 #define CMK_BROADCAST_SPANNING_TREE 1
00072 #define CMK_BROADCAST_HYPERCUBE 0
00073 #endif
00074
00075 #define BROADCAST_SPANNING_FACTOR 4
00076
00077 #define CMI_BROADCAST_ROOT(msg) ((CmiMsgHeaderBasic *)msg)->root
00078 #define CMI_GET_CYCLE(msg) ((CmiMsgHeaderBasic *)msg)->root
00079
00080 #define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic *)msg)->rank
00081 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
00082
00083
00084 #define CHARM_MAGIC_NUMBER 126
00085
00086 #if !CMK_OPTIMIZE
00087 static int checksum_flag = 0;
00088 #define CMI_SET_CHECKSUM(msg, len) \
00089 if (checksum_flag) { \
00090 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
00091 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
00092 }
00093 #define CMI_CHECK_CHECKSUM(msg, len) \
00094 if (checksum_flag) \
00095 if (computeCheckSum((unsigned char*)msg, len) != 0) \
00096 CmiAbort("Fatal error: checksum doesn't agree!\n");
00097 #else
00098 #define CMI_SET_CHECKSUM(msg, len)
00099 #define CMI_CHECK_CHECKSUM(msg, len)
00100 #endif
00101
00102 #if CMK_BROADCAST_SPANNING_TREE
00103 # define CMI_SET_BROADCAST_ROOT(msg, root) CMI_BROADCAST_ROOT(msg) = (root);
00104 #else
00105 # define CMI_SET_BROADCAST_ROOT(msg, root)
00106 #endif
00107
00108 #if CMK_BROADCAST_HYPERCUBE
00109 # define CMI_SET_CYCLE(msg, cycle) CMI_GET_CYCLE(msg) = (cycle);
00110 #else
00111 # define CMI_SET_CYCLE(msg, cycle)
00112 #endif
00113
00114
00121 #ifdef MPI_POST_RECV
00122 #define MPI_POST_RECV_COUNT 10
00123 #undef MPI_POST_RECV
00124 #endif
00125 #if MPI_POST_RECV_COUNT > 0
00126 #warning "Using MPI posted receives which have not yet been tested"
00127 #ifndef MPI_POST_RECV_SIZE
00128 #define MPI_POST_RECV_SIZE 200
00129 #endif
00130
00131 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
00132 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
00133 CpvDeclare(MPI_Request*, CmiPostedRecvRequests);
00134 CpvDeclare(char*,CmiPostedRecvBuffers);
00135 #endif
00136
00137
00138
00139
00140 #define TAG 1375
00141
00142 #if MPI_POST_RECV_COUNT > 0
00143 #define POST_RECV_TAG TAG+1
00144 #define BARRIER_ZERO_TAG TAG
00145 #else
00146 #define BARRIER_ZERO_TAG 1375
00147 #endif
00148
00149
00150
00151
00152
00153
00154 int _Cmi_numpes;
00155 int _Cmi_mynode;
00156 int _Cmi_mynodesize;
00157 int _Cmi_numnodes;
00158 int _Cmi_numpes;
00159 static int Cmi_nodestart;
00160 CpvDeclare(void*, CmiLocalQueue);
00161
00162
00163
00164 CpvDeclare(unsigned , networkProgressCount);
00165 int networkProgressPeriod;
00166
00167 int idleblock = 0;
00168
00169 #define BLK_LEN 512
00170
00171 #if CMK_NODE_QUEUE_AVAILABLE
00172 #define DGRAM_NODEMESSAGE (0xFB)
00173
00174 #define NODE_BROADCAST_OTHERS (-1)
00175 #define NODE_BROADCAST_ALL (-2)
00176 #endif
00177
00178 #if 0
00179 static void **recdQueue_blk;
00180 static unsigned int recdQueue_blk_len;
00181 static unsigned int recdQueue_first;
00182 static unsigned int recdQueue_len;
00183 static void recdQueueInit(void);
00184 static void recdQueueAddToBack(void *element);
00185 static void *recdQueueRemoveFromFront(void);
00186 #endif
00187
00188 static void ConverseRunPE(int everReturn);
00189 static void CommunicationServer(int sleepTime);
00190 static void CommunicationServerThread(int sleepTime);
00191
00192 typedef struct msg_list {
00193 char *msg;
00194 struct msg_list *next;
00195 int size, destpe;
00196 MPI_Request req;
00197 } SMSG_LIST;
00198
00199 int MsgQueueLen=0;
00200 static int request_max;
00201
00202 static SMSG_LIST *sent_msgs=0;
00203 static SMSG_LIST *end_sent=0;
00204
00205 static int Cmi_dim;
00206
00207 static int no_outstanding_sends=0;
00208
00209 #if NODE_0_IS_CONVHOST
00210 int inside_comm = 0;
00211 #endif
00212
00213 void CmiAbort(const char *message);
00214 static void PerrorExit(const char *msg);
00215
00216 void SendSpanningChildren(int size, char *msg);
00217 void SendHypercube(int size, char *msg);
00218
00219 static void PerrorExit(const char *msg)
00220 {
00221 perror(msg);
00222 exit(1);
00223 }
00224
00225 extern unsigned char computeCheckSum(unsigned char *data, int len);
00226
00227
00228
00229 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
00230
00231
00232 static CmiNodeLock timerLock = 0;
00233 static double starttimer = 0;
00234 static int _is_global = 0;
00235
00236 int CmiTimerIsSynchronized()
00237 {
00238 int flag;
00239 void *v;
00240
00241
00242 if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
00243 printf("MPI_WTIME_IS_GLOBAL not valid!\n");
00244 if (flag) {
00245 _is_global = *(int*)v;
00246 if (_is_global && CmiMyPe() == 0)
00247 printf("Charm++> MPI timer is synchronized!\n");
00248 }
00249 return _is_global;
00250 }
00251
00252 void CmiTimerInit()
00253 {
00254 _is_global = CmiTimerIsSynchronized();
00255
00256 if (CmiMyRank() == 0) {
00257 if (_is_global) {
00258 double minTimer;
00259 #if CMK_TIMER_USE_XT3_DCLOCK
00260 starttimer = dclock();
00261 #else
00262 starttimer = MPI_Wtime();
00263 #endif
00264
00265 MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
00266 MPI_COMM_WORLD );
00267 starttimer = minTimer;
00268 }
00269 else {
00270
00271 CmiBarrier();
00272 CmiBarrier();
00273 CmiBarrier();
00274 #if CMK_TIMER_USE_XT3_DCLOCK
00275 starttimer = dclock();
00276 #else
00277 starttimer = MPI_Wtime();
00278 #endif
00279 }
00280
00281 }
00282 CmiNodeAllBarrier();
00283 }
00284
00285 double CmiTimer(void)
00286 {
00287 double t;
00288 #if CMK_SMP
00289 if (timerLock) CmiLock(timerLock);
00290 #endif
00291 #if CMK_TIMER_USE_XT3_DCLOCK
00292 t = dclock() - starttimer;
00293 #else
00294 t = MPI_Wtime() - starttimer;
00295 #endif
00296 #if CMK_SMP
00297 if (timerLock) CmiUnlock(timerLock);
00298 #endif
00299 return t;
00300 }
00301
00302 double CmiWallTimer(void)
00303 {
00304 double t;
00305 #if CMK_SMP
00306 if (timerLock) CmiLock(timerLock);
00307 #endif
00308 #if CMK_TIMER_USE_XT3_DCLOCK
00309 t = dclock() - starttimer;
00310 #else
00311 t = MPI_Wtime() - starttimer;
00312 #endif
00313 #if CMK_SMP
00314 if (timerLock) CmiUnlock(timerLock);
00315 #endif
00316 return t;
00317 }
00318
00319 double CmiCpuTimer(void)
00320 {
00321 double t;
00322 #if CMK_SMP
00323 if (timerLock) CmiLock(timerLock);
00324 #endif
00325 #if CMK_TIMER_USE_XT3_DCLOCK
00326 t = dclock() - starttimer;
00327 #else
00328 t = MPI_Wtime() - starttimer;
00329 #endif
00330 #if CMK_SMP
00331 if (timerLock) CmiUnlock(timerLock);
00332 #endif
00333 return t;
00334 }
00335
00336 #endif
00337
00338 int CmiBarrier()
00339 {
00340 if (CmiMyRank() == 0) {
00341
00342 START_EVENT();
00343
00344 if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
00345 CmiAbort("Timernit: MPI_Barrier failed!\n");
00346
00347 END_EVENT(10);
00348 }
00349 return 0;
00350 }
00351
00352
00353 int CmiBarrierZero()
00354 {
00355 int i;
00356 if (CmiMyRank() == 0) {
00357 char msg[1];
00358 MPI_Status sts;
00359 if (CmiMyNode() == 0) {
00360 for (i=0; i<CmiNumNodes()-1; i++) {
00361 START_EVENT();
00362
00363 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
00364 CmiPrintf("MPI_Recv failed!\n");
00365
00366 END_EVENT(30);
00367 }
00368 }
00369 else {
00370 START_EVENT();
00371
00372 if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
00373 printf("MPI_Send failed!\n");
00374
00375 END_EVENT(20);
00376 }
00377 }
00378 CmiNodeAllBarrier();
00379 return 0;
00380 }
00381
00382 typedef struct ProcState {
00383 #if MULTI_SENDQUEUE
00384 PCQueue sendMsgBuf;
00385 #endif
00386 CmiNodeLock recvLock;
00387 } ProcState;
00388
00389 static ProcState *procState;
00390
00391 #if CMK_SMP
00392
00393 #if !MULTI_SENDQUEUE
00394 static PCQueue sendMsgBuf;
00395 static CmiNodeLock sendMsgBufLock = NULL;
00396 #endif
00397
00398 #endif
00399
00400
00401
00402
00403
00404
00405
00406
00407 static int Cmi_charmrun_fd = 0;
00408 #include "machine-smp.c"
00409
00410 CsvDeclare(CmiNodeState, NodeState);
00411
00412 #include "immediate.c"
00413
00414 #if ! CMK_SMP
00415
00416 static struct CmiStateStruct Cmi_state;
00417 int _Cmi_mype;
00418 int _Cmi_myrank;
00419
00420 void CmiMemLock() {}
00421 void CmiMemUnlock() {}
00422
00423 #define CmiGetState() (&Cmi_state)
00424 #define CmiGetStateN(n) (&Cmi_state)
00425
00426 void CmiYield(void) { sleep(0); }
00427
00428 static void CmiStartThreads(char **argv)
00429 {
00430 CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00431 _Cmi_mype = Cmi_nodestart;
00432 _Cmi_myrank = 0;
00433 }
00434 #endif
00435
00436
00437 static void CmiPushPE(int pe,void *msg)
00438 {
00439 CmiState cs = CmiGetStateN(pe);
00440 MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
00441 #if CMK_IMMEDIATE_MSG
00442 if (CmiIsImmediate(msg)) {
00443
00444
00445
00446
00447
00449 CMI_DEST_RANK(msg) = pe;
00450 CmiPushImmediateMsg(msg);
00451 return;
00452 }
00453 #endif
00454
00455 #if CMK_SMP
00456 CmiLock(procState[pe].recvLock);
00457 #endif
00458 PCQueuePush(cs->recv,msg);
00459 #if CMK_SMP
00460 CmiUnlock(procState[pe].recvLock);
00461 #endif
00462 CmiIdleLock_addMessage(&cs->idle);
00463 MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
00464 }
00465
00466 #if CMK_NODE_QUEUE_AVAILABLE
00467
00468 static void CmiPushNode(void *msg)
00469 {
00470 MACHSTATE(3,"Pushing message into NodeRecv queue");
00471 #if CMK_IMMEDIATE_MSG
00472 if (CmiIsImmediate(msg)) {
00473 CMI_DEST_RANK(msg) = 0;
00474 CmiPushImmediateMsg(msg);
00475 return;
00476 }
00477 #endif
00478 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00479 PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
00480 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00481 {
00482 CmiState cs=CmiGetStateN(0);
00483 CmiIdleLock_addMessage(&cs->idle);
00484 }
00485 }
00486 #endif
00487
00488 #ifndef CmiMyPe
00489 int CmiMyPe(void)
00490 {
00491 return CmiGetState()->pe;
00492 }
00493 #endif
00494
00495 #ifndef CmiMyRank
00496 int CmiMyRank(void)
00497 {
00498 return CmiGetState()->rank;
00499 }
00500 #endif
00501
00502 #ifndef CmiNodeFirst
00503 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
00504 int CmiNodeSize(int node) { return _Cmi_mynodesize; }
00505 #endif
00506
00507 #ifndef CmiNodeOf
00508 int CmiNodeOf(int pe) { return (pe/_Cmi_mynodesize); }
00509 int CmiRankOf(int pe) { return pe%_Cmi_mynodesize; }
00510 #endif
00511
00512 static size_t CmiAllAsyncMsgsSent(void)
00513 {
00514 SMSG_LIST *msg_tmp = sent_msgs;
00515 MPI_Status sts;
00516 int done;
00517
00518 while(msg_tmp!=0) {
00519 done = 0;
00520 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00521 CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
00522 if(!done)
00523 return 0;
00524 msg_tmp = msg_tmp->next;
00525
00526 }
00527 return 1;
00528 }
00529
00530 int CmiAsyncMsgSent(CmiCommHandle c) {
00531
00532 SMSG_LIST *msg_tmp = sent_msgs;
00533 int done;
00534 MPI_Status sts;
00535
00536 while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
00537 msg_tmp = msg_tmp->next;
00538 if(msg_tmp) {
00539 done = 0;
00540 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00541 CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
00542 return ((done)?1:0);
00543 } else {
00544 return 1;
00545 }
00546 }
00547
00548 void CmiReleaseCommHandle(CmiCommHandle c)
00549 {
00550 return;
00551 }
00552
00553 #if CMK_VERSION_BLUEGENE
00554 extern void MPID_Progress_test();
00555 #endif
00556
00557 void CmiReleaseSentMessages(void)
00558 {
00559 SMSG_LIST *msg_tmp=sent_msgs;
00560 SMSG_LIST *prev=0;
00561 SMSG_LIST *temp;
00562 int done;
00563 MPI_Status sts;
00564
00565
00566 #if CMK_VERSION_BLUEGENE
00567 MPID_Progress_test();
00568 #endif
00569
00570 MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
00571 while(msg_tmp!=0) {
00572 done =0;
00573 if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
00574 CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
00575 if(done) {
00576 MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
00577 MsgQueueLen--;
00578
00579 temp = msg_tmp->next;
00580 if(prev==0)
00581 sent_msgs = temp;
00582 else
00583 prev->next = temp;
00584 CmiFree(msg_tmp->msg);
00585 CmiFree(msg_tmp);
00586 msg_tmp = temp;
00587 } else {
00588 prev = msg_tmp;
00589 msg_tmp = msg_tmp->next;
00590 }
00591 }
00592 end_sent = prev;
00593 MACHSTATE(2,"} CmiReleaseSentMessages end");
00594 }
00595
00596 int PumpMsgs(void)
00597 {
00598 int nbytes, flg, res;
00599 char *msg;
00600 MPI_Status sts;
00601 int recd=0;
00602
00603 #if CMK_VERSION_BLUEGENE
00604 MPID_Progress_test();
00605 #endif
00606
00607 MACHSTATE(2,"PumpMsgs begin {");
00608
00609 while(1) {
00610
00611 #if MPI_POST_RECV_COUNT > 0
00612 int completed_index=-1;
00613 if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
00614 CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00615 if(flg){
00616 if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
00617 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
00618
00619 recd = 1;
00620 msg = (char *) CmiAlloc(nbytes);
00621 memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
00622
00623
00624 START_EVENT();
00625
00626 if (MPI_SUCCESS != MPI_Irecv( &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]) ,
00627 MPI_POST_RECV_SIZE,
00628 MPI_BYTE,
00629 MPI_ANY_SOURCE,
00630 POST_RECV_TAG,
00631 MPI_COMM_WORLD,
00632 &(CpvAccess(CmiPostedRecvRequests)[completed_index]) ))
00633 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00634
00635 END_EVENT(50);
00636
00637 CpvAccess(Cmi_posted_recv_total)++;
00638 }
00639 else {
00640 res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
00641 if(res != MPI_SUCCESS)
00642 CmiAbort("MPI_Iprobe failed\n");
00643 if(!flg) break;
00644 recd = 1;
00645 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00646 msg = (char *) CmiAlloc(nbytes);
00647
00648 START_EVENT();
00649
00650 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
00651 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00652
00653 END_EVENT(30);
00654
00655 CpvAccess(Cmi_unposted_recv_total)++;
00656 }
00657 #else
00658
00659 res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
00660 if(res != MPI_SUCCESS)
00661 CmiAbort("MPI_Iprobe failed\n");
00662
00663 if(!flg) break;
00664 recd = 1;
00665 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00666 msg = (char *) CmiAlloc(nbytes);
00667
00668 START_EVENT();
00669
00670 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
00671 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00672
00673 END_EVENT(30);
00674 #endif
00675
00676 MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
00677 CMI_CHECK_CHECKSUM(msg, nbytes);
00678 if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) {
00679 CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
00680 CmiFree(msg);
00681 CmiAbort("Abort!\n");
00682 continue;
00683 }
00684 #if CMK_NODE_QUEUE_AVAILABLE
00685 if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
00686 CmiPushNode(msg);
00687 else
00688 #endif
00689 CmiPushPE(CMI_DEST_RANK(msg), msg);
00690
00691 #if CMK_BROADCAST_SPANNING_TREE
00692 if (CMI_BROADCAST_ROOT(msg))
00693 SendSpanningChildren(nbytes, msg);
00694 #elif CMK_BROADCAST_HYPERCUBE
00695 if (CMI_GET_CYCLE(msg))
00696 SendHypercube(nbytes, msg);
00697 #endif
00698 }
00699 #if CMK_IMMEDIATE_MSG && !CMK_SMP
00700 CmiHandleImmediate();
00701 #endif
00702 MACHSTATE(2,"} PumpMsgs end ");
00703 return recd;
00704 }
00705
00706
00707 static void PumpMsgsBlocking(void)
00708 {
00709 static int maxbytes = 20000000;
00710 static char *buf = NULL;
00711 int nbytes, flg;
00712 MPI_Status sts;
00713 char *msg;
00714 int recd=0;
00715
00716 if (!PCQueueEmpty(CmiGetState()->recv)) return;
00717 if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
00718 if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
00719 if (sent_msgs) return;
00720
00721 #if 0
00722 CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
00723 #endif
00724
00725 if (buf == NULL) {
00726 buf = (char *) CmiAlloc(maxbytes);
00727 _MEMCHECK(buf);
00728 }
00729
00730
00731 #if MPI_POST_RECV_COUNT > 0
00732 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
00733 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
00734 #endif
00735
00736 START_EVENT();
00737
00738 if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
00739 CmiAbort("PumpMsgs: PMP_Recv failed!\n");
00740
00741 END_EVENT(30);
00742
00743 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00744 msg = (char *) CmiAlloc(nbytes);
00745 memcpy(msg, buf, nbytes);
00746
00747 #if CMK_NODE_QUEUE_AVAILABLE
00748 if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
00749 CmiPushNode(msg);
00750 else
00751 #endif
00752 CmiPushPE(CMI_DEST_RANK(msg), msg);
00753
00754 #if CMK_BROADCAST_SPANNING_TREE
00755 if (CMI_BROADCAST_ROOT(msg))
00756 SendSpanningChildren(nbytes, msg);
00757 #elif CMK_BROADCAST_HYPERCUBE
00758 if (CMI_GET_CYCLE(msg))
00759 SendHypercube(nbytes, msg);
00760 #endif
00761 }
00762
00763
00764
00765 #if CMK_SMP
00766
00767 static int inexit = 0;
00768 static CmiNodeLock exitLock = 0;
00769
00770 static int MsgQueueEmpty()
00771 {
00772 int i;
00773 #if MULTI_SENDQUEUE
00774 for (i=0; i<_Cmi_mynodesize; i++)
00775 if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
00776 #else
00777 return PCQueueEmpty(sendMsgBuf);
00778 #endif
00779 return 1;
00780 }
00781
00782 static int SendMsgBuf();
00783
00784
00785 static int RecvQueueEmpty()
00786 {
00787 int i;
00788 for (i=0; i<_Cmi_mynodesize; i++) {
00789 CmiState cs=CmiGetStateN(i);
00790 if (!PCQueueEmpty(cs->recv)) return 0;
00791 }
00792 return 1;
00793 }
00794
00798 static void CommunicationServer(int sleepTime)
00799 {
00800 int static count=0;
00801
00802
00803
00804
00805 PumpMsgs();
00806 CmiReleaseSentMessages();
00807 SendMsgBuf();
00808
00809
00810
00811 if (inexit == CmiMyNodeSize()) {
00812 MACHSTATE(2, "CommunicationServer exiting {");
00813 #if 0
00814 while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
00815 #endif
00816 while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
00817 CmiReleaseSentMessages();
00818 SendMsgBuf();
00819 PumpMsgs();
00820 }
00821 MACHSTATE(2, "CommunicationServer barrier begin {");
00822
00823 START_EVENT();
00824
00825 if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
00826 CmiAbort("ConverseExit: MPI_Barrier failed!\n");
00827
00828 END_EVENT(10);
00829
00830 MACHSTATE(2, "} CommunicationServer barrier end");
00831 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00832 if (CmiMyNode() == 0){
00833 CmiPrintf("End of program\n");
00834 }
00835 #endif
00836 MACHSTATE(2, "} CommunicationServer EXIT");
00837 #if ! CMK_MPI_VMI
00838 MPI_Finalize();
00839 #endif
00840 exit(0);
00841 }
00842 }
00843
00844 #endif
00845
00846 static void CommunicationServerThread(int sleepTime)
00847 {
00848 #if CMK_SMP
00849 CommunicationServer(sleepTime);
00850 #endif
00851 #if CMK_IMMEDIATE_MSG
00852 CmiHandleImmediate();
00853 #endif
00854 }
00855
00856 #if CMK_NODE_QUEUE_AVAILABLE
00857 char *CmiGetNonLocalNodeQ(void)
00858 {
00859 CmiState cs = CmiGetState();
00860 char *result = 0;
00861 CmiIdleLock_checkMessage(&cs->idle);
00862
00863 MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
00864 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00865 result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
00866 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00867 MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
00868
00869 return result;
00870 }
00871 #endif
00872
00873 void *CmiGetNonLocal(void)
00874 {
00875 static int count=0;
00876 CmiState cs = CmiGetState();
00877 void *msg;
00878 CmiIdleLock_checkMessage(&cs->idle);
00879
00880
00881
00882 #if ! CMK_SMP
00883 CmiReleaseSentMessages();
00884 PumpMsgs();
00885 #endif
00886
00887
00888 msg = PCQueuePop(cs->recv);
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899 #if ! CMK_SMP
00900 if (no_outstanding_sends) {
00901 while (MsgQueueLen>0) {
00902 CmiReleaseSentMessages();
00903 PumpMsgs();
00904 }
00905 }
00906
00907 if(!msg) {
00908 CmiReleaseSentMessages();
00909 if (PumpMsgs())
00910 return PCQueuePop(cs->recv);
00911 else
00912 return 0;
00913 }
00914 #endif
00915 return msg;
00916 }
00917
00918
00919 void CmiNotifyIdle(void)
00920 {
00921 CmiReleaseSentMessages();
00922 if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
00923 }
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942
00943
00944
00945
00946
00947 #if CMK_MACHINE_PROGRESS_DEFINED
00948 void CmiMachineProgressImpl()
00949 {
00950 #if !CMK_SMP
00951 PumpMsgs();
00952 #if CMK_IMMEDIATE_MSG
00953 CmiHandleImmediate();
00954 #endif
00955 #else
00956
00957
00958
00959 #endif
00960 }
00961 #endif
00962
00963
00964
00965 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
00966
00967 static void CmiSendSelf(char *msg)
00968 {
00969 #if CMK_IMMEDIATE_MSG
00970 if (CmiIsImmediate(msg)) {
00971
00972 CmiPushImmediateMsg(msg);
00973 CmiHandleImmediate();
00974 return;
00975 }
00976 #endif
00977 CQdCreate(CpvAccess(cQdState), 1);
00978 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00979 }
00980
00981 void CmiSyncSendFn(int destPE, int size, char *msg)
00982 {
00983 CmiState cs = CmiGetState();
00984 char *dupmsg = (char *) CmiAlloc(size);
00985 memcpy(dupmsg, msg, size);
00986
00987 CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00988
00989 if (cs->pe==destPE) {
00990 CmiSendSelf(dupmsg);
00991 }
00992 else
00993 CmiAsyncSendFn_(destPE, size, dupmsg);
00994 }
00995
00996 #if CMK_SMP
00997
00998
00999 static int SendMsgBuf()
01000 {
01001 SMSG_LIST *msg_tmp;
01002 char *msg;
01003 int node, rank, size;
01004 int i;
01005 int sent = 0;
01006
01007 MACHSTATE(2,"SendMsgBuf begin {");
01008 #if MULTI_SENDQUEUE
01009 for (i=0; i<_Cmi_mynodesize=1; i++)
01010 {
01011 if (!PCQueueEmpty(procState[i].sendMsgBuf))
01012 {
01013 msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
01014 #else
01015
01016
01017 msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
01018
01019 while (NULL != msg_tmp)
01020 {
01021 #endif
01022 node = msg_tmp->destpe;
01023 size = msg_tmp->size;
01024 msg = msg_tmp->msg;
01025 msg_tmp->next = 0;
01026 while (MsgQueueLen > request_max) {
01027 CmiReleaseSentMessages();
01028 PumpMsgs();
01029 }
01030 MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
01031 CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
01032 CMI_SET_CHECKSUM(msg, size);
01033
01034 #if MPI_POST_RECV_COUNT > 0
01035 if(size <= MPI_POST_RECV_SIZE){
01036
01037 START_EVENT();
01038 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
01039 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
01040
01041 STOP_EVENT(40);
01042 }
01043 else {
01044 START_EVENT();
01045 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
01046 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
01047 STOP_EVENT(40);
01048 }
01049 #else
01050