00001
00007
00008 #include <stdio.h>
00009 #include <errno.h>
00010 #include "converse.h"
00011 #include <mpi.h>
00012
00013 #ifdef AMPI
00014 # warning "We got the AMPI version of mpi.h, instead of the system version--"
00015 # warning " Try doing an 'rm charm/include/mpi.h' and building again."
00016 # error "Can't build Charm++ using AMPI version of mpi.h header"
00017 #endif
00018
00019
00020 #if defined(_WIN32)
00021 #ifndef WIN32_LEAN_AND_MEAN
00022 #define WIN32_LEAN_AND_MEAN
00023 #endif
00024 #include <windows.h>
00025 #include <wincon.h>
00026 #include <sys/types.h>
00027 #include <sys/timeb.h>
00028 static char* strsignal(int sig) {
00029 static char outbuf[32];
00030 sprintf(outbuf, "%d", sig);
00031 return outbuf;
00032 }
00033 #include <process.h>
00034 #define getpid _getpid
00035 #else
00036 #include <unistd.h>
00037 #endif
00038 #include <stdlib.h>
00039
00040 #include "machine.h"
00041 #include "pcqueue.h"
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 #define CMI_MSGTYPE(msg) ((CmiMsgHeaderBasic *)msg)->mpiMsgType
00054 enum mpiMsgTypes { REGULAR, ONESIDED_BUFFER_SEND, ONESIDED_BUFFER_RECV, ONESIDED_BUFFER_DIRECT_RECV, ONESIDED_BUFFER_DIRECT_SEND, POST_DIRECT_RECV, POST_DIRECT_SEND};
00055
00056
00057
00058 #define MULTI_SENDQUEUE 0
00059
00060
00061 #define CMI_EXERT_SEND_CAP 0
00062 #define CMI_EXERT_RECV_CAP 0
00063
00064 #define CMI_DYNAMIC_EXERT_CAP 0
00065
00066
00067
00068 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
00069 #define CMI_DYNAMIC_MAXCAPSIZE 1000
00070 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
00071 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
00072
00073 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
00074 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
00075 MPI_Comm charmComm;
00076 int tagUb;
00077
00078 #if CMI_EXERT_SEND_CAP
00079 static int SEND_CAP=3;
00080 #endif
00081
00082 #if CMI_EXERT_RECV_CAP
00083 static int RECV_CAP=2;
00084 #endif
00085
00086
00087
00088 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
00089 #define CMI_MPI_TRACE_MOREDETAILED 0
00090 #undef CMI_MACH_TRACE_USEREVENTS
00091 #define CMI_MACH_TRACE_USEREVENTS 1
00092 #else
00093 #undef CMK_SMP_TRACE_COMMTHREAD
00094 #define CMK_SMP_TRACE_COMMTHREAD 0
00095 #endif
00096
00097 #define CMK_TRACE_COMMOVERHEAD 0
00098 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
00099 #undef CMI_MACH_TRACE_USEREVENTS
00100 #define CMI_MACH_TRACE_USEREVENTS 1
00101 #else
00102 #undef CMK_TRACE_COMMOVERHEAD
00103 #define CMK_TRACE_COMMOVERHEAD 0
00104 #endif
00105
00106 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
00107 CpvStaticDeclare(double, projTraceStart);
00108 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
00109 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
00110 #else
00111 #define START_EVENT()
00112 #define END_EVENT(x)
00113 #endif
00114
00115 #if CMK_SMP_TRACE_COMMTHREAD
00116 #define START_TRACE_SENDCOMM(msg) \
00117 int isTraceEligible = traceBeginCommOp(msg); \
00118 if(isTraceEligible) traceSendMsgComm(msg);
00119 #define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
00120
00121 #define CONDITIONAL_TRACE_USER_EVENT(x) \
00122 do{ \
00123 double etime = CmiWallTimer(); \
00124 if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
00125 traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
00126 }\
00127 }while(0);
00128 #else
00129 #define START_TRACE_SENDCOMM(msg)
00130 #define END_TRACE_SENDCOMM(msg)
00131 #define CONDITIONAL_TRACE_USER_EVENT(x)
00132 #endif
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143 #define MPI_POST_RECV 0
00144
00145
00146
00147 #if MPI_POST_RECV
00148 #define MPI_DYNAMIC_POST_RECV 0
00149
00150
00151
00152
00153
00154 static int MPI_POST_RECV_COUNT=10;
00155
00156
00157 static int MPI_POST_RECV_LOWERSIZE=8000;
00158 static int MPI_POST_RECV_UPPERSIZE=64000;
00159
00160
00161 static int MPI_POST_RECV_INC = 1000;
00162
00163
00164 static int MPI_POST_RECV_MSG_INC = 400;
00165
00166
00167 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
00168
00169
00170 static int MPI_POST_RECV_FREQ = 1000;
00171
00172 static int MPI_POST_RECV_SIZE;
00173
00174 typedef struct mpiPostRecvList {
00175
00176
00177
00178
00179 int msgSizeIdx;
00180 int bufCnt;
00181 MPI_Request *postedRecvReqs;
00182 char **postedRecvBufs;
00183 struct mpiPostRecvList *next;
00184 } MPIPostRecvList;
00185 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
00186 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
00187 CpvDeclare(int, msgRecvCnt);
00188
00189 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
00190 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
00191 CpvDeclare(MPI_Request*, CmiPostedRecvRequests);
00192 CpvDeclare(char**,CmiPostedRecvBuffers);
00193
00194
00195
00196
00197
00198
00199
00200 #if MPI_DYNAMIC_POST_RECV
00201 static int MSG_HISTOGRAM_BINSIZE;
00202 static int MAX_HISTOGRAM_BUCKETS;
00203 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
00204 static void recordMsgHistogramInfo(int size);
00205 static void reportMsgHistogramInfo(void);
00206 #endif
00207
00208 #endif
00209
00210
00211 #define TAG 1375
00212 #if MPI_POST_RECV
00213 #define POST_RECV_TAG (TAG+1)
00214 #define BARRIER_ZERO_TAG TAG
00215 #else
00216 #define BARRIER_ZERO_TAG (TAG-1)
00217 #endif
00218
00219 #define USE_MPI_CTRLMSG_SCHEME 0
00220
00221
00222
00223
00224
00225 #define USE_ASYNC_RECV_FUNC 0
00226
00227 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
00228 static int IRECV_MSG_THRESHOLD = 8000;
00229 typedef struct IRecvListEntry{
00230 MPI_Request req;
00231 char *msg;
00232 int size;
00233 struct IRecvListEntry *next;
00234 }*IRecvList;
00235
00236 static IRecvList freedIrecvList = NULL;
00237 static IRecvList waitIrecvListHead = NULL;
00238 static IRecvList waitIrecvListTail = NULL;
00239
00240 static IRecvList irecvListEntryAllocate(void){
00241 IRecvList ret;
00242 if(freedIrecvList == NULL) {
00243 ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));
00244 return ret;
00245 } else {
00246 ret = freedIrecvList;
00247 freedIrecvList = freedIrecvList->next;
00248 return ret;
00249 }
00250 }
00251 static void irecvListEntryFree(IRecvList used){
00252 used->next = freedIrecvList;
00253 freedIrecvList = used;
00254 }
00255
00256 #endif
00257
00258
00259
00260
00261 void CmiSetupMachineRecvBuffers(void);
00262
00263 #define CAPTURE_MSG_HISTOGRAM 0
00264 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
00265 static int MSG_HISTOGRAM_BINSIZE=1000;
00266 static int MAX_HISTOGRAM_BUCKETS=2000;
00267 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
00268 static void recordMsgHistogramInfo(int size);
00269 static void reportMsgHistogramInfo(void);
00270 #endif
00271
00272
00273
00274
00275 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
00276 #define MAX_QLEN 200
00277
00278
00279
00280
00281
00282 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
00283 #define CHARM_MAGIC_NUMBER 126
00284
00285 #if CMK_ERROR_CHECKING
00286 unsigned char computeCheckSum(unsigned char *data, int len);
00287 static int checksum_flag = 0;
00288 #define CMI_SET_CHECKSUM(msg, len) \
00289 if (checksum_flag) { \
00290 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
00291 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
00292 }
00293 #define CMI_CHECK_CHECKSUM(msg, len) \
00294 if (checksum_flag) \
00295 if (computeCheckSum((unsigned char*)msg, len) != 0) \
00296 CmiAbort("Fatal error: checksum doesn't agree!\n");
00297 #else
00298 #define CMI_SET_CHECKSUM(msg, len)
00299 #define CMI_CHECK_CHECKSUM(msg, len)
00300 #endif
00301
00302
00303
00304 #include <signal.h>
00305 #if !defined(_WIN32)
00306 struct sigaction signal_int;
00307 #else
00308 void (*signal_int)(int);
00309 #endif
00310 static int _thread_provided = -1;
00311 static int idleblock = 0;
00312
00313 #if __FAULT__
00314 typedef struct crashedrank{
00315 int rank;
00316 struct crashedrank *next;
00317 } crashedRankList;
00318 CpvDeclare(crashedRankList *, crashedRankHdr);
00319 CpvDeclare(crashedRankList *, crashedRankPtr);
00320 int isRankDie(int rank);
00321 #endif
00322
00323 #if CMK_ONESIDED_IMPL
00324 int srcRank;
00325 #if CMK_SMP
00326
00327 static CmiNodeLock rdmaTagLock = 0;
00328 #endif
00329
00330 #define RDMA_BASE_TAG TAG+2
00331 #define RDMA_ACK_TAG TAG-2
00332 int rdmaTag=RDMA_BASE_TAG;
00333 #include "machine-rdma.h"
00334 #include "machine-onesided.h"
00335 #endif //end of CMK_ONESIDED_IMPL
00336
00337
00338 typedef struct msg_list {
00339 char *msg;
00340 struct msg_list *next;
00341 int size, destpe, mode, type;
00342 MPI_Request req;
00343 #if CMK_ONESIDED_IMPL
00344 void *ref;
00345
00346
00347
00348 #endif
00349 #if __FAULT__
00350 int dstrank;
00351 #endif
00352 } SMSG_LIST;
00353
00354 CpvDeclare(SMSG_LIST *, sent_msgs);
00355 CpvDeclare(SMSG_LIST *, end_sent);
00356
00357 CpvDeclare(int, MsgQueueLen);
00358 static int request_max;
00359
00360 static int no_outstanding_sends=0;
00361
00362 #if NODE_0_IS_CONVHOST
00363 int inside_comm = 0;
00364 #endif
00365
00366 typedef struct ProcState {
00367 #if MULTI_SENDQUEUE
00368 PCQueue postMsgBuf;
00369 #endif
00370 CmiNodeLock recvLock;
00371 } ProcState;
00372 static ProcState *procState;
00373
00374 #if CMK_SMP && !MULTI_SENDQUEUE
00375 PCQueue postMsgBuf;
00376 static CmiNodeLock postMsgBufLock = NULL;
00377 #endif
00378
00379
00380 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
00381 #define FAIL_TAG 1200
00382 int num_workpes, total_pes;
00383 int *petorank = NULL;
00384 int nextrank;
00385 void mpi_end_spare(void);
00386 #endif
00387
00388
00389
00390
00391 static size_t CheckAllAsyncMsgsSent(void);
00392 static void ReleasePostedMessages(void);
00393 static int PumpMsgs(void);
00394 static void PumpMsgsBlocking(void);
00395
00396 #if CMK_SMP
00397 static int MsgQueueEmpty(void);
00398 static int RecvQueueEmpty(void);
00399 static int SendMsgBuf(void);
00400 static void EnqueueMsg(void *m, int size, int node, int mode, int type, void *ref);
00401 #endif
00402
00403
00404
00405
00406 void CmiNotifyIdleForMPI(void);
00407
00408
00409
00410
00417 #define CMK_HAS_SIZE_IN_MSGHDR 0
00418 #include "machine-lrts.h"
00419 #include "machine-common-core.C"
00420
00421 #if USE_MPI_CTRLMSG_SCHEME
00422 #include "machine-ctrlmsg.C"
00423 #endif
00424
00425 SMSG_LIST *allocateSmsgList(char *msg, int destNode, int size, int mode, int type, void *ref) {
00426 SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
00427 msg_tmp->msg = msg;
00428 msg_tmp->destpe = destNode;
00429 msg_tmp->size = size;
00430 msg_tmp->next = 0;
00431 msg_tmp->mode = mode;
00432 msg_tmp->type = type;
00433 #if CMK_ONESIDED_IMPL
00434 msg_tmp->ref = ref;
00435 #endif
00436 return msg_tmp;
00437 }
00438
00439
00440
00441 #if CMK_SMP
00442 static void EnqueueMsg(void *m, int size, int node, int mode, int type, void *ref) {
00443
00444 SMSG_LIST *msg_tmp = allocateSmsgList((char *)m, node, size, mode, type, ref);
00445 MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
00446 msg_tmp->msg = (char *)m;
00447 msg_tmp->size = size;
00448 msg_tmp->destpe = node;
00449 msg_tmp->next = 0;
00450 msg_tmp->mode = mode;
00451 #if CMK_ONESIDED_IMPL
00452 msg_tmp->ref = NULL;
00453 #endif
00454
00455 #if MULTI_SENDQUEUE
00456 PCQueuePush(procState[CmiMyRank()].postMsgBuf,(char *)msg_tmp);
00457 #else
00458
00459 PCQueuePush(postMsgBuf,(char *)msg_tmp);
00460
00461 #endif
00462
00463 MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, postMsgBuf, PCQueueLength(postMsgBuf));
00464 }
00465 #endif
00466
00467
00468 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
00469 int node = smsg->destpe;
00470 int size = smsg->size;
00471 char *msg = smsg->msg;
00472 int mode = smsg->mode;
00473 int dstrank;
00474
00475 MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
00476 #if CMK_ERROR_CHECKING
00477 CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
00478 CMI_SET_CHECKSUM(msg, size);
00479 #endif
00480
00481 #if MPI_POST_RECV
00482 if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
00483 #if MPI_DYNAMIC_POST_RECV
00484 int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
00485 START_TRACE_SENDCOMM(msg);
00486 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,charmComm,&(smsg->req)))
00487 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00488 END_TRACE_SENDCOMM(msg);
00489 #else
00490 START_TRACE_SENDCOMM(msg);
00491 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,charmComm,&(smsg->req)))
00492 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00493 END_TRACE_SENDCOMM(msg);
00494 #endif
00495 } else {
00496 START_TRACE_SENDCOMM(msg);
00497 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
00498 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00499 END_TRACE_SENDCOMM(msg);
00500 }
00501 #elif USE_MPI_CTRLMSG_SCHEME
00502 sendViaCtrlMsg(node, size, msg, smsg);
00503 #else
00504
00505
00506 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
00507 dstrank = petorank[node];
00508 smsg->dstrank = dstrank;
00509 #else
00510 dstrank=node;
00511 #endif
00512 START_TRACE_SENDCOMM(msg)
00513 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
00514 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00515 END_TRACE_SENDCOMM(msg)
00516 #endif
00517
00518 MACHSTATE(3,"}MPI_Isend end");
00519 CpvAccess(MsgQueueLen)++;
00520 if (CpvAccess(sent_msgs)==0)
00521 CpvAccess(sent_msgs) = smsg;
00522 else {
00523 CpvAccess(end_sent)->next = smsg;
00524 }
00525 CpvAccess(end_sent) = smsg;
00526
00527 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
00528 if (mode == P2P_SYNC || mode == P2P_ASYNC)
00529 {
00530 while (CpvAccess(MsgQueueLen) > request_max) {
00531 ReleasePostedMessages();
00532 PumpMsgs();
00533 }
00534 }
00535 #endif
00536
00537 return (CmiCommHandle) &(smsg->req);
00538 }
00539
00540 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode) {
00541
00542
00543 CmiState cs = CmiGetState();
00544 SMSG_LIST *msg_tmp;
00545
00546 CmiAssert(destNode != CmiMyNodeGlobal());
00547
00548 CMI_MSGTYPE(msg) = REGULAR;
00549 #if CMK_SMP
00550 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
00551 EnqueueMsg(msg, size, destNode, mode, REGULAR, NULL);
00552 return 0;
00553 }
00554 #endif
00555
00556
00557 msg_tmp = allocateSmsgList(msg, destNode, size, mode, REGULAR, NULL);
00558 return MPISendOneMsg(msg_tmp);
00559 }
00560
00561 static size_t CheckAllAsyncMsgsSent(void) {
00562 SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
00563 MPI_Status sts;
00564 int done;
00565
00566 while (msg_tmp!=0) {
00567 done = 0;
00568 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00569 CmiAbort("CheckAllAsyncMsgsSent: MPI_Test failed!\n");
00570 #if __FAULT__
00571 if(isRankDie(msg_tmp->dstrank)){
00572
00573
00574 done = 1;
00575 }
00576 #endif
00577 if (!done)
00578 return 0;
00579 msg_tmp = msg_tmp->next;
00580
00581 }
00582 return 1;
00583 }
00584
00585 int CheckAsyncMsgSent(CmiCommHandle c) {
00586
00587 SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
00588 int done;
00589 MPI_Status sts;
00590
00591 while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
00592 msg_tmp = msg_tmp->next;
00593 if (msg_tmp) {
00594 done = 0;
00595 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00596 CmiAbort("CheckAsyncMsgSent: MPI_Test failed!\n");
00597 return ((done)?1:0);
00598 } else {
00599 return 1;
00600 }
00601 }
00602
00603 #if CMK_ONESIDED_IMPL
00604 #include "machine-onesided.C"
00605 #endif
00606
00607
00608 static void ReleasePostedMessages(void) {
00609 SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
00610 SMSG_LIST *prev=0;
00611 SMSG_LIST *temp;
00612
00613 int done;
00614 MPI_Status sts;
00615
00616
00617 MACHSTATE1(2,"ReleasePostedMessages begin on %d {", CmiMyPe());
00618 while (msg_tmp!=0) {
00619 done =0;
00620 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
00621 double startT = CmiWallTimer();
00622 #endif
00623 if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
00624 CmiAbort("ReleasePostedMessages: MPI_Test failed!\n");
00625 #if __FAULT__
00626 if (isRankDie(msg_tmp->dstrank)){
00627 done = 1;
00628 }
00629 #endif
00630 if (done) {
00631 MACHSTATE2(3,"ReleasePostedMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
00632 CpvAccess(MsgQueueLen)--;
00633
00634 temp = msg_tmp->next;
00635 if (prev==0)
00636 CpvAccess(sent_msgs) = temp;
00637 else
00638 prev->next = temp;
00639 #if CMK_ONESIDED_IMPL
00640
00641 if(CpvAccess(end_sent) == msg_tmp) {
00642 CpvAccess(end_sent) = prev;
00643 }
00644 if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_RECV) {
00645
00646 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
00647
00648
00649 ncpyOpInfo->ackMode = CMK_DEST_ACK;
00650
00651
00652
00653
00654 CmiInvokeNcpyAck(ncpyOpInfo);
00655
00656 } else if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_SEND) {
00657
00658 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
00659
00660 ncpyOpInfo->ackMode = CMK_SRC_ACK;
00661
00662
00663 ncpyOpInfo->freeMe = CMK_FREE_NCPYOPINFO;
00664
00665 CmiInvokeNcpyAck(ncpyOpInfo);
00666 }
00667 else if(msg_tmp->type == POST_DIRECT_SEND || msg_tmp->type == POST_DIRECT_RECV) {
00668
00669
00670
00671 }
00672 else
00673 #endif
00674 {
00675 CmiFree(msg_tmp->msg);
00676 }
00677
00678 free(msg_tmp);
00679 msg_tmp = temp;
00680 } else {
00681 prev = msg_tmp;
00682 msg_tmp = msg_tmp->next;
00683 }
00684 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
00685 {
00686 double endT = CmiWallTimer();
00687
00688 if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
00689 }
00690 #endif
00691 }
00692 MACHSTATE(2,"} ReleasePostedMessages end");
00693 }
00694
00695 static int PumpMsgs(void) {
00696 int nbytes, flg, res;
00697 char *msg;
00698 MPI_Status sts;
00699 int recd=0;
00700
00701 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
00702 int recvCnt=0;
00703 #endif
00704
00705 MACHSTATE(2,"PumpMsgs begin {");
00706
00707 #if CMI_DYNAMIC_EXERT_CAP
00708 dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
00709 #endif
00710
00711 while (1) {
00712 int doSyncRecv = 1;
00713 #if CMI_EXERT_RECV_CAP
00714 if (recvCnt==RECV_CAP) break;
00715 #elif CMI_DYNAMIC_EXERT_CAP
00716 if (recvCnt >= dynamicRecvCap) break;
00717 #endif
00718
00719 #if USE_MPI_CTRLMSG_SCHEME
00720 doSyncRecv = 0;
00721 nbytes = recvViaCtrlMsg();
00722 recd = 1;
00723 if(nbytes == -1) break;
00724 #elif MPI_POST_RECV
00725
00726 MPIPostRecvList *postedOne = NULL;
00727 int completed_index = -1;
00728 flg = 0;
00729 #if MPI_DYNAMIC_POST_RECV
00730 MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
00731 if (oldPostRecvPtr) {
00732
00733 do {
00734
00735 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
00736 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
00737 CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00738
00739 if (flg) {
00740 postedOne = cur;
00741 break;
00742 }
00743 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
00744 } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
00745 }
00746 #else
00747 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
00748 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
00749 CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00750 #endif
00751 CONDITIONAL_TRACE_USER_EVENT(60);
00752 if (flg) {
00753 if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
00754 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
00755
00756 recd = 1;
00757 #if !MPI_DYNAMIC_POST_RECV
00758 postedOne = CpvAccess(curPostRecvPtr);
00759 #endif
00760 msg = (postedOne->postedRecvBufs)[completed_index];
00761 (postedOne->postedRecvBufs)[completed_index] = NULL;
00762
00763 CpvAccess(Cmi_posted_recv_total)++;
00764 } else {
00765 START_EVENT();
00766 res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, charmComm, &flg, &sts);
00767 if (res != MPI_SUCCESS)
00768 CmiAbort("MPI_Iprobe failed\n");
00769 if (!flg) break;
00770
00771 CONDITIONAL_TRACE_USER_EVENT(70);
00772 recd = 1;
00773 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00774 msg = (char *) CmiAlloc(nbytes);
00775
00776 #if USE_ASYNC_RECV_FUNC
00777 if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
00778 #endif
00779 if(doSyncRecv){
00780 START_EVENT();
00781 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
00782 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00783 }
00784 #if USE_ASYNC_RECV_FUNC
00785 else {
00786 START_EVENT();
00787 IRecvList one = irecvListEntryAllocate();
00788 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
00789 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00790
00791 one->msg = msg;
00792 one->size = nbytes;
00793 one->next = NULL;
00794 waitIrecvListTail->next = one;
00795 waitIrecvListTail = one;
00796 CONDITIONAL_TRACE_USER_EVENT(50);
00797 }
00798 #endif
00799 CpvAccess(Cmi_unposted_recv_total)++;
00800 }
00801 #else
00802
00803 START_EVENT();
00804 res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
00805 if (res != MPI_SUCCESS)
00806 CmiAbort("MPI_Iprobe failed\n");
00807
00808 if (!flg) break;
00809 CONDITIONAL_TRACE_USER_EVENT(70);
00810
00811 recd = 1;
00812 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00813 msg = (char *) CmiAlloc(nbytes);
00814
00815 #if USE_ASYNC_RECV_FUNC
00816 if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
00817 #endif
00818 if(doSyncRecv){
00819 START_EVENT();
00820 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
00821 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00822 }
00823 #if USE_ASYNC_RECV_FUNC
00824 else {
00825 IRecvList one = irecvListEntryAllocate();
00826 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
00827 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00828 one->msg = msg;
00829 one->size = nbytes;
00830 one->next = NULL;
00831 waitIrecvListTail->next = one;
00832 waitIrecvListTail = one;
00833
00834 CONDITIONAL_TRACE_USER_EVENT(50);
00835 }
00836 #endif
00837
00838 #endif
00839
00840 if(doSyncRecv){
00841 MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
00842 CMI_CHECK_CHECKSUM(msg, nbytes);
00843 #if CMK_ERROR_CHECKING
00844 if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) {
00845 CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
00846 CmiFree(msg);
00847 CmiAbort("Abort!\n");
00848 continue;
00849 }
00850 #endif
00851 if(CMI_MSGTYPE(msg) == REGULAR) {
00852 handleOneRecvedMsg(nbytes, msg);
00853 } else if(CMI_MSGTYPE(msg) == POST_DIRECT_RECV || CMI_MSGTYPE(msg) == POST_DIRECT_SEND) {
00854
00855 NcpyOperationInfo *ncpyOpInfoMsg = (NcpyOperationInfo *)msg;
00856 resetNcpyOpInfoPointers(ncpyOpInfoMsg);
00857
00858 int postMsgType, myPe, otherPe;
00859 const void *myBuffer;
00860 if(CMI_MSGTYPE(msg) == POST_DIRECT_RECV) {
00861
00862 postMsgType = ONESIDED_BUFFER_DIRECT_RECV;
00863 myPe = ncpyOpInfoMsg->destPe;
00864 otherPe = ncpyOpInfoMsg->srcPe;
00865 myBuffer = ncpyOpInfoMsg->destPtr;
00866 }
00867 else {
00868
00869 postMsgType = ONESIDED_BUFFER_DIRECT_SEND;
00870 myPe = ncpyOpInfoMsg->srcPe;
00871 otherPe = ncpyOpInfoMsg->destPe;
00872 myBuffer = ncpyOpInfoMsg->srcPtr;
00873 }
00874
00875 MPIPostOneBuffer(myBuffer,
00876 ncpyOpInfoMsg,
00877 ncpyOpInfoMsg->srcSize,
00878 otherPe,
00879 ncpyOpInfoMsg->tag,
00880 postMsgType);
00881
00882 } else {
00883 CmiAbort("Invalid Type of message\n");
00884 }
00885 }
00886
00887 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
00888 recordMsgHistogramInfo(nbytes);
00889 #endif
00890
00891 #if MPI_POST_RECV
00892 #if MPI_DYNAMIC_POST_RECV
00893 if (postedOne) {
00894
00895
00896 int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
00897 int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
00898
00899 (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
00900
00901
00902 START_EVENT();
00903
00904 if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
00905 postRecvBufSize,
00906 MPI_BYTE,
00907 MPI_ANY_SOURCE,
00908 postRecvTag,
00909 charmComm,
00910 &((postedOne->postedRecvReqs)[completed_index]) ))
00911 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00912 CONDITIONAL_TRACE_USER_EVENT(50);
00913 }
00914 #else
00915 if (postedOne) {
00916
00917 (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
00918
00919
00920 START_EVENT();
00921 if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
00922 MPI_POST_RECV_SIZE,
00923 MPI_BYTE,
00924 MPI_ANY_SOURCE,
00925 POST_RECV_TAG,
00926 charmComm,
00927 &((postedOne->postedRecvReqs)[completed_index]) ))
00928 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00929 CONDITIONAL_TRACE_USER_EVENT(50);
00930 }
00931 #endif
00932 #endif
00933
00934 #if CMI_EXERT_RECV_CAP
00935 recvCnt++;
00936 #elif CMI_DYNAMIC_EXERT_CAP
00937 recvCnt++;
00938 #if CMK_SMP
00939
00940
00941
00942
00943
00944 if (PCQueueLength(postMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
00945 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
00946 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
00947 }
00948 #else
00949
00950
00951
00952 if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
00953 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
00954 }
00955 #endif
00956
00957 #endif
00958
00959 }
00960
00961 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
00962
00963 {
00964
00965 IRecvList irecvEnt;
00966 int irecvDone = 0;
00967 MPI_Status sts;
00968 while(waitIrecvListHead->next) {
00969 IRecvList irecvEnt = waitIrecvListHead->next;
00970
00971
00972 if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
00973 CmiAbort("PumpMsgs: MPI_Test failed!\n");
00974 if(!irecvDone) break;
00975
00976
00977
00978 handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
00979 waitIrecvListHead->next = irecvEnt->next;
00980 irecvListEntryFree(irecvEnt);
00981
00982 }
00983 if(waitIrecvListHead->next == NULL)
00984 waitIrecvListTail = waitIrecvListHead;
00985 }
00986 #endif
00987
00988
00989 MACHSTATE(2,"} PumpMsgs end ");
00990 return recd;
00991 }
00992
00993
00994 static void PumpMsgsBlocking(void) {
00995 static int maxbytes = 20000000;
00996 static char *buf = NULL;
00997 int nbytes, flg;
00998 MPI_Status sts;
00999 char *msg;
01000 int recd=0;
01001
01002 if (!PCQueueEmpty(CmiGetState()->recv)) return;
01003 if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
01004 if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
01005 if (CpvAccess(sent_msgs)) return;
01006
01007 #if 0
01008 CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
01009 #endif
01010
01011 if (buf == NULL) {
01012 buf = (char *) CmiAlloc(maxbytes);
01013 _MEMCHECK(buf);
01014 }
01015
01016
01017 #if MPI_POST_RECV
01018 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
01019 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
01020 #endif
01021
01022 if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, charmComm,&sts))
01023 CmiAbort("PumpMsgs: PMP_Recv failed!\n");
01024
01025 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
01026 msg = (char *) CmiAlloc(nbytes);
01027 memcpy(msg, buf, nbytes);
01028
01029 #if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
01030 char tmp[32];
01031 sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
01032 traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
01033 #endif
01034
01035 handleOneRecvedMsg(nbytes, msg);
01036 }
01037
01038
01039 #if CMK_SMP
01040
01041
01042 static int SendMsgBuf(void) {
01043 SMSG_LIST *msg_tmp;
01044 char *msg;
01045 int node, rank, size;
01046 int i;
01047 int sent = 0;
01048
01049 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
01050 int sentCnt = 0;
01051 #endif
01052
01053 #if CMI_DYNAMIC_EXERT_CAP
01054 dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
01055 #endif
01056
01057 MACHSTATE(2,"SendMsgBuf begin {");
01058 #if MULTI_SENDQUEUE
01059 for (i=0; i<_Cmi_mynodesize+1; i++) {
01060 if (!PCQueueEmpty(procState[i].postMsgBuf)) {
01061 msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].postMsgBuf);
01062 #else
01063
01064
01065 msg_tmp = (SMSG_LIST *)PCQueuePop(postMsgBuf);
01066
01067 while (NULL != msg_tmp) {
01068 #endif
01069
01070 #if CMK_ONESIDED_IMPL
01071 if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_RECV || msg_tmp->type == ONESIDED_BUFFER_DIRECT_SEND) {
01072 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
01073 MPISendOrRecvOneBuffer(msg_tmp, ncpyOpInfo->tag);
01074 }
01075 else
01076 #endif
01077 {
01078 MPISendOneMsg(msg_tmp);
01079 }
01080 sent=1;
01081
01082 #if CMI_EXERT_SEND_CAP
01083 if (++sentCnt == SEND_CAP) break;
01084 #elif CMI_DYNAMIC_EXERT_CAP
01085 if (++sentCnt >= dynamicSendCap) break;
01086 if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
01087 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
01088 #endif
01089
01090 #if ! MULTI_SENDQUEUE
01091
01092 msg_tmp = (SMSG_LIST *)PCQueuePop(postMsgBuf);
01093
01094 #endif
01095 }
01096 #if MULTI_SENDQUEUE
01097 }
01098 #endif
01099 MACHSTATE(2,"}SendMsgBuf end ");
01100 return sent;
01101 }
01102
01103 static int MsgQueueEmpty() {
01104 int i;
01105 #if MULTI_SENDQUEUE
01106 for (i=0; i<_Cmi_mynodesize; i++)
01107 if (!PCQueueEmpty(procState[i].postMsgBuf)) return 0;
01108 #else
01109 return PCQueueEmpty(postMsgBuf);
01110 #endif
01111 return 1;
01112 }
01113
01114
01115 static int RecvQueueEmpty() {
01116 int i;
01117 for (i=0; i<_Cmi_mynodesize; i++) {
01118 CmiState cs=CmiGetStateN(i);
01119 if (!PCQueueEmpty(cs->recv)) return 0;
01120 }
01121 return 1;
01122 }
01123
01124
01125 #define REPORT_COMM_METRICS 0
01126 #if REPORT_COMM_METRICS
01127 static double pumptime = 0.0;
01128 static double releasetime = 0.0;
01129 static double sendtime = 0.0;
01130 #endif
01131
01132 #endif //end of CMK_SMP
01133
01134 void LrtsAdvanceCommunication(int whenidle) {
01135 #if REPORT_COMM_METRICS
01136 double t1, t2, t3, t4;
01137 t1 = CmiWallTimer();
01138 #endif
01139
01140 #if CMK_SMP
01141 PumpMsgs();
01142
01143 #if REPORT_COMM_METRICS
01144 t2 = CmiWallTimer();
01145 #endif
01146
01147 ReleasePostedMessages();
01148 #if REPORT_COMM_METRICS
01149 t3 = CmiWallTimer();
01150 #endif
01151
01152 SendMsgBuf();
01153
01154 #if REPORT_COMM_METRICS
01155 t4 = CmiWallTimer();
01156 pumptime += (t2-t1);
01157 releasetime += (t3-t2);
01158 sendtime += (t4-t3);
01159 #endif
01160
01161 #else
01162 ReleasePostedMessages();
01163
01164 #if REPORT_COMM_METRICS
01165 t2 = CmiWallTimer();
01166 #endif
01167 PumpMsgs();
01168
01169 #if REPORT_COMM_METRICS
01170 t3 = CmiWallTimer();
01171 pumptime += (t3-t2);
01172 releasetime += (t2-t1);
01173 #endif
01174
01175 #endif
01176 }
01177
01178
01179 void LrtsPostNonLocal(void) {
01180 #if !CMK_SMP
01181 if (no_outstanding_sends) {
01182 while (CpvAccess(MsgQueueLen)>0) {
01183 LrtsAdvanceCommunication(0);
01184 }
01185 }
01186
01187
01188
01189
01190 #if 0
01191 if (!msg) {
01192 ReleasePostedMessages();
01193 if (PumpMsgs())
01194 return PCQueuePop(cs->recv);
01195 else
01196 return 0;
01197 }
01198 #endif
01199 #else
01200 if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
01201 ReleasePostedMessages();
01202
01203
01204
01205
01206 }
01207 #endif
01208 }
01209
01210
01211 void CmiNotifyIdleForMPI(void) {
01212 ReleasePostedMessages();
01213 if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
01214 }
01215
01216
01217
01218 #if CMK_MACHINE_PROGRESS_DEFINED
01219 void CommunicationServerThread(int);
01220
01221 void CmiMachineProgressImpl(void) {
01222 #if !CMK_SMP
01223 PumpMsgs();
01224 #if CMK_IMMEDIATE_MSG
01225 CmiHandleImmediate();
01226 #endif
01227 #else
01228
01229
01230 if (CmiMyRank() == CmiMyNodeSize())
01231 CommunicationServerThread(0);
01232 #endif
01233 }
01234 #endif
01235
01236
01237 void LrtsDrainResources(void) {
01238 #if !CMK_SMP
01239 while (!CheckAllAsyncMsgsSent()) {
01240 PumpMsgs();
01241 ReleasePostedMessages();
01242 }
01243 #else
01244 if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
01245 while (!MsgQueueEmpty() || !CheckAllAsyncMsgsSent()) {
01246 ReleasePostedMessages();
01247 SendMsgBuf();
01248 PumpMsgs();
01249 }
01250 }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
01251 while(!CheckAllAsyncMsgsSent()) {
01252 ReleasePostedMessages();
01253 }
01254 }
01255 #endif
01256 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
01257 if (CmiMyPe() == 0 && CmiMyPartition()==0)
01258 {
01259 mpi_end_spare();
01260 }
01261 #endif
01262 MACHSTATE(2, "Machine exit barrier begin {");
01263 START_EVENT();
01264 if (MPI_SUCCESS != MPI_Barrier(charmComm))
01265 CmiAbort("LrtsDrainResources: MPI_Barrier failed!\n");
01266 END_EVENT(10);
01267 MACHSTATE(2, "} Machine exit barrier end");
01268 }
01269
01270 void LrtsExit(int exitcode) {
01271 int i;
01272 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
01273 int doPrint = 0;
01274 if (CmiMyNode()==0) doPrint = 1;
01275
01276 if (doPrint ) {
01277 #if MPI_POST_RECV
01278 CmiPrintf("node[%d]: %llu posted receives, %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
01279 #endif
01280 }
01281 #endif
01282
01283 #if MPI_POST_RECV
01284 {
01285 MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
01286 if (ptr) {
01287 do {
01288 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
01289 ptr = ptr->next;
01290 } while (ptr!=CpvAccess(postRecvListHdr));
01291 }
01292 }
01293 #endif
01294
01295 #if REPORT_COMM_METRICS
01296 #if CMK_SMP
01297 CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
01298 CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
01299 pumptime, releasetime, sendtime);
01300 #else
01301 CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
01302 CmiMyPe(), pumptime, releasetime, sendtime);
01303 #endif
01304 #endif
01305
01306 if(!CharmLibInterOperate || userDrivenMode) {
01307 #if ! CMK_AUTOBUILD
01308 #if !defined(_WIN32)
01309 sigaction(SIGINT, &signal_int, NULL);
01310 #else
01311 signal(SIGINT, signal_int);
01312 #endif
01313 MPI_Finalize();
01314 #endif
01315 exit(exitcode);
01316 }
01317 }
01318
01319 static int Cmi_truecrash;
01320
01321 static void KillOnAllSigs(int sigNo) {
01322 static int already_in_signal_handler = 0;
01323 char *m;
01324 if (already_in_signal_handler) return;
01325 already_in_signal_handler = 1;
01326
01327 CmiAbortHelper("Caught Signal", strsignal(sigNo), NULL, 1, 1);
01328 }
01329
01330
01331
01332
01333 static void registerMPITraceEvents(void) {
01334 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
01335 traceRegisterUserEvent("MPI_Barrier", 10);
01336 traceRegisterUserEvent("MPI_Send", 20);
01337 traceRegisterUserEvent("MPI_Recv", 30);
01338 traceRegisterUserEvent("MPI_Isend", 40);
01339 traceRegisterUserEvent("MPI_Irecv", 50);
01340 traceRegisterUserEvent("MPI_Test[any]", 60);
01341 traceRegisterUserEvent("MPI_Iprobe", 70);
01342 #endif
01343 }
01344
01345 static const char *thread_level_tostring(int thread_level) {
01346 #if CMK_MPI_INIT_THREAD
01347 switch (thread_level) {
01348 case MPI_THREAD_SINGLE:
01349 return "MPI_THREAD_SINGLE";
01350 case MPI_THREAD_FUNNELED:
01351 return "MPI_THREAD_FUNNELED";
01352 case MPI_THREAD_SERIALIZED:
01353 return "MPI_THREAD_SERIALIZED";
01354 case MPI_THREAD_MULTIPLE :
01355 return "MPI_THREAD_MULTIPLE";
01356 default: {
01357 char *str = (char *)malloc(5);
01358 sprintf(str,"%d", thread_level);
01359 return str;
01360 }
01361 }
01362 return "unknown";
01363 #else
01364 char *str = (char *)malloc(5);
01365 sprintf(str,"%d", thread_level);
01366 return str;
01367 #endif
01368 }
01369
01370 extern int quietMode;
01371
01376 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
01377 int n,i;
01378 int ver, subver;
01379 int provided;
01380 int thread_level;
01381 int myNID;
01382 int largc=*argc;
01383 char** largv=*argv;
01384 int tagUbGetResult;
01385 void *tagUbVal;
01386
01387 if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
01388 #if CMK_SMP
01389 Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
01390 #else
01391 CmiAbort("+comm_thread_only_recv option can only be used with SMP version of Charm++");
01392 #endif
01393 }
01394
01395 *argc = CmiGetArgc(largv);
01396
01397 if(!CharmLibInterOperate || userDrivenMode) {
01398 #if CMK_MPI_INIT_THREAD
01399 #if CMK_SMP
01400 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
01401 thread_level = MPI_THREAD_FUNNELED;
01402 else
01403 thread_level = MPI_THREAD_MULTIPLE;
01404 #else
01405 thread_level = MPI_THREAD_SINGLE;
01406 #endif
01407 MPI_Init_thread(argc, argv, thread_level, &provided);
01408 _thread_provided = provided;
01409 #else
01410 MPI_Init(argc, argv);
01411 thread_level = 0;
01412 _thread_provided = -1;
01413 #endif
01414 }
01415
01416 largc = *argc;
01417 largv = *argv;
01418 if(!CharmLibInterOperate || userDrivenMode) {
01419 MPI_Comm_dup(MPI_COMM_WORLD, &charmComm);
01420 }
01421 MPI_Comm_size(charmComm, numNodes);
01422 MPI_Comm_rank(charmComm, myNodeID);
01423
01424 #if CMK_ONESIDED_IMPL
01425
01426
01427 srcRank = *myNodeID;
01428 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tagUbVal, &tagUbGetResult);
01429 CmiAssert(tagUbGetResult);
01430 tagUb = *(int *)tagUbVal;
01431 #endif
01432
01433 MPI_Bcast(&_Cmi_mynodesize, 1, MPI_INT, 0, charmComm);
01434
01435 myNID = *myNodeID;
01436
01437 MPI_Get_version(&ver, &subver);
01438 if(!CharmLibInterOperate) {
01439 if ((myNID == 0) && (!quietMode)) {
01440 printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
01441 printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
01442 }
01443 }
01444
01445 #if CMK_SMP
01446 if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
01447 Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
01448 if ((myNID == 0) && (!quietMode)) {
01449 printf("Charm++> +comm_thread_only_recv disabled\n");
01450 }
01451 }
01452 #endif
01453
01454 {
01455 #if CMK_OPTIMIZE
01456 Cmi_truecrash = 0;
01457 #else
01458 Cmi_truecrash = 1;
01459 #endif
01460 int debug = CmiGetArgFlag(largv,"++debug");
01461 if (CmiGetArgFlagDesc(*argv,"+truecrash","Do not install signal handlers") || debug ||
01462 CmiNumNodes()<=32) Cmi_truecrash = 1;
01463 int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
01464 if (debug || debug_no_pause) {
01465 #if CMK_HAS_GETPID
01466 if (!quietMode) printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
01467 fflush(stdout);
01468 if (!debug_no_pause)
01469 sleep(15);
01470 #else
01471 if (!quietMode) printf("++debug ignored.\n");
01472 #endif
01473 }
01474 }
01475
01476
01477 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
01478 if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
01479 CmiAssert(num_workpes <= *numNodes);
01480 total_pes = *numNodes;
01481 *numNodes = num_workpes;
01482 }
01483 else
01484 total_pes = num_workpes = *numNodes;
01485 if (*myNodeID == 0)
01486 CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
01487 petorank = (int *)malloc(sizeof(int) * num_workpes);
01488 for (i=0; i<num_workpes; i++) petorank[i] = i;
01489 nextrank = num_workpes;
01490
01491 if (*myNodeID >= num_workpes) {
01492 if(CmiGetArgFlag(largv,"+isomalloc_sync")){
01493 MPI_Barrier(charmComm);
01494 MPI_Barrier(charmComm);
01495 MPI_Barrier(charmComm);
01496 MPI_Barrier(charmComm);
01497 }
01498 MPI_Status sts;
01499 int vals[2];
01500 MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
01501 int newpe = vals[0];
01502 CpvAccess(_curRestartPhase) = vals[1];
01503
01504 CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
01505
01506 if (newpe == -1) {
01507 MPI_Barrier(charmComm);
01508
01509 MPI_Finalize();
01510 exit(0);
01511 }
01512
01513
01514 MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
01515 nextrank = *myNodeID + 1;
01516 *myNodeID = newpe;
01517 myNID = newpe;
01518
01519
01520 char *phase_str;
01521 char **restart_argv;
01522 int i=0;
01523 while(largv[i]!= NULL) i++;
01524 restart_argv = (char **)malloc(sizeof(char *)*(i+3));
01525 i=0;
01526 while(largv[i]!= NULL){
01527 restart_argv[i] = largv[i];
01528 i++;
01529 }
01530 static char s_restartaftercrash[] = "+restartaftercrash";
01531 restart_argv[i] = s_restartaftercrash;
01532 phase_str = (char*)malloc(10);
01533 sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
01534 restart_argv[i+1]=phase_str;
01535 restart_argv[i+2]=NULL;
01536 *argv = restart_argv;
01537 *argc = i+2;
01538 largc = *argc;
01539 largv = *argv;
01540 }
01541 #endif
01542
01543 idleblock = CmiGetArgFlag(largv, "+idleblocking");
01544 if (idleblock && _Cmi_mynode == 0 && !quietMode) {
01545 printf("Charm++: Running in idle blocking mode.\n");
01546 }
01547
01548
01549 if(!Cmi_truecrash) {
01550 #if !defined(_WIN32)
01551 struct sigaction sa;
01552 sa.sa_handler = KillOnAllSigs;
01553 sigemptyset(&sa.sa_mask);
01554 sa.sa_flags = SA_RESTART;
01555
01556 sigaction(SIGSEGV, &sa, NULL);
01557 sigaction(SIGFPE, &sa, NULL);
01558 sigaction(SIGILL, &sa, NULL);
01559 sigaction(SIGINT, &sa, &signal_int);
01560 sigaction(SIGTERM, &sa, NULL);
01561 sigaction(SIGABRT, &sa, NULL);
01562 #else
01563 signal(SIGSEGV, KillOnAllSigs);
01564 signal(SIGFPE, KillOnAllSigs);
01565 signal(SIGILL, KillOnAllSigs);
01566 signal_int = signal(SIGINT, KillOnAllSigs);
01567 signal(SIGTERM, KillOnAllSigs);
01568 signal(SIGABRT, KillOnAllSigs);
01569 #endif
01570 #if !defined(_WIN32)
01571 sigaction(SIGQUIT, &sa, NULL);
01572 sigaction(SIGBUS, &sa, NULL);
01573 #endif
01574 }
01575
01576 #if CMK_NO_OUTSTANDING_SENDS
01577 no_outstanding_sends=1;
01578 #endif
01579 if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
01580 no_outstanding_sends = 1;
01581 if ((myNID == 0) && (!quietMode))
01582 printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
01583 no_outstanding_sends?"":" not");
01584 }
01585
01586 request_max=MAX_QLEN;
01587 CmiGetArgInt(largv,"+requestmax",&request_max);
01588
01589
01590 #if MPI_POST_RECV
01591 CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
01592 CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
01593 CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
01594 CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
01595 CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
01596 CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
01597 CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
01598 if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
01599 if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
01600 MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
01601 if ((myNID==0) && (!quietMode)) {
01602 printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
01603 MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
01604 MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
01605 }
01606 #endif
01607
01608 #if USE_MPI_CTRLMSG_SCHEME
01609 CmiGetArgInt(largv, "+ctrlMsgCnt", &MPI_CTRL_MSG_CNT);
01610 if((myNID == 0) && (!quietMode)){
01611 printf("Charm++: using the alternative ctrl msg scheme with %d pre-posted ctrl msgs\n", MPI_CTRL_MSG_CNT);
01612 }
01613 #endif
01614
01615 #if CMI_EXERT_SEND_CAP
01616 CmiGetArgInt(largv, "+dynCapSend", &SEND_CAP);
01617 if ((myNID==0) && (!quietMode)) {
01618 printf("Charm++: using static send cap %d\n", SEND_CAP);
01619 }
01620 #endif
01621 #if CMI_EXERT_RECV_CAP
01622 CmiGetArgInt(largv, "+dynCapRecv", &RECV_CAP);
01623 if ((myNID==0) && (!quietMode)) {
01624 printf("Charm++: using static recv cap %d\n", RECV_CAP);
01625 }
01626 #endif
01627 #if CMI_DYNAMIC_EXERT_CAP
01628 CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
01629 CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
01630 CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
01631 if ((myNID==0) && (!quietMode)) {
01632 printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
01633 CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
01634 }
01635 #endif
01636
01637 #if USE_ASYNC_RECV_FUNC
01638 CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
01639 if((myNID==0) && (!quietMode)) {
01640 printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
01641 }
01642 #endif
01643
01644
01645 if (CmiGetArgFlag(largv,"+checksum")) {
01646 #if CMK_ERROR_CHECKING
01647 checksum_flag = 1;
01648 if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
01649 #else
01650 if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
01651 #endif
01652 }
01653
01654 procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
01655 for (i=0; i<_Cmi_mynodesize+1; i++) {
01656 #if MULTI_SENDQUEUE
01657 procState[i].postMsgBuf = PCQueueCreate();
01658 #endif
01659 procState[i].recvLock = CmiCreateLock();
01660 }
01661 #if CMK_SMP
01662 #if !MULTI_SENDQUEUE
01663 postMsgBuf = PCQueueCreate();
01664 postMsgBufLock = CmiCreateLock();
01665 #endif
01666
01667 #if CMK_ONESIDED_IMPL && CMK_SMP
01668 rdmaTagLock = CmiCreateLock();
01669 #endif
01670 #endif
01671 }
01672
01673 INLINE_KEYWORD void LrtsNotifyIdle(void) {}
01674
01675 INLINE_KEYWORD void LrtsBeginIdle(void) {}
01676
01677 INLINE_KEYWORD void LrtsStillIdle(void) {}
01678
01679 void LrtsPreCommonInit(int everReturn) {
01680
01681 #if USE_MPI_CTRLMSG_SCHEME
01682 #if CMK_SMP
01683 if(CmiMyRank() == CmiMyNodeSize()) createCtrlMsgIrecvBufs();
01684 #else
01685 createCtrlMsgIrecvBufs();
01686 #endif
01687 #elif MPI_POST_RECV
01688 int doInit = 1;
01689 int i;
01690
01691 #if CMK_SMP
01692 if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
01693 #endif
01694
01695
01696
01697
01698
01699
01700
01701
01702 CpvInitialize(unsigned long long, Cmi_posted_recv_total);
01703 CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
01704 CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
01705 CpvInitialize(char **, CmiPostedRecvBuffers);
01706
01707 CpvAccess(CmiPostedRecvRequests) = NULL;
01708 CpvAccess(CmiPostedRecvBuffers) = NULL;
01709
01710 CpvInitialize(MPIPostRecvList *, postRecvListHdr);
01711 CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
01712 CpvInitialize(int, msgRecvCnt);
01713
01714 CpvAccess(postRecvListHdr) = NULL;
01715 CpvAccess(curPostRecvPtr) = NULL;
01716 CpvAccess(msgRecvCnt) = 0;
01717
01718 #if MPI_DYNAMIC_POST_RECV
01719 CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
01720 #endif
01721
01722 if (doInit) {
01723 #if MPI_DYNAMIC_POST_RECV
01724 MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
01725
01726 MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
01727 CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01728 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01729 #else
01730
01731
01732
01733 CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
01734
01735
01736 CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
01737 CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
01738 CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
01739
01740 CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
01741 CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
01742 CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
01743
01744
01745 for (i=0; i<MPI_POST_RECV_COUNT; i++) {
01746 char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
01747 CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
01748 if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
01749 MPI_POST_RECV_SIZE,
01750 MPI_BYTE,
01751 MPI_ANY_SOURCE,
01752 POST_RECV_TAG,
01753 charmComm,
01754 CpvAccess(postRecvListHdr)->postedRecvReqs+i ))
01755 CmiAbort("MPI_Irecv failed\n");
01756 }
01757 #endif
01758 }
01759 #endif
01760
01761 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
01762 CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
01763 CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01764 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01765 #endif
01766
01767 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
01768 #if CMK_SMP
01769
01770 if(CmiMyRank() == CmiMyNodeSize()) {
01771 waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
01772 waitIrecvListHead->next = NULL;
01773 }
01774 #else
01775 waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
01776 waitIrecvListHead->next = NULL;
01777 #endif
01778 #endif
01779 #if __FAULT__
01780 CpvInitialize(crashedRankList *, crashedRankHdr);
01781 CpvInitialize(crashedRankList *, crashedRankPtr);
01782 CpvAccess(crashedRankHdr) = NULL;
01783 CpvAccess(crashedRankPtr) = NULL;
01784 #endif
01785 }
01786
01787 void LrtsPostCommonInit(int everReturn) {
01788
01789
01790 CpvInitialize(SMSG_LIST *, sent_msgs);
01791 CpvInitialize(SMSG_LIST *, end_sent);
01792 CpvInitialize(int, MsgQueueLen);
01793 CpvAccess(sent_msgs) = NULL;
01794 CpvAccess(end_sent) = NULL;
01795 CpvAccess(MsgQueueLen) = 0;
01796
01797 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
01798 CpvInitialize(double, projTraceStart);
01799
01800 if (CmiMyPe() == 0) {
01801 registerMachineUserEventsFunction(®isterMPITraceEvents);
01802 }
01803 #endif
01804
01805 }
01806
01807
01808
01809
01810
01811
01812
01813
01814 void LrtsAbort(const char *message) {
01815 MPI_Abort(charmComm, 1);
01816 CMI_NORETURN_FUNCTION_END
01817 }
01818
01819
01820 #if CMK_TIMER_USE_SPECIAL
01821
01822
01823 static CmiNodeLock timerLock = 0;
01824 static int _absoluteTime = 0;
01825 static double starttimer = 0;
01826 static int _is_global =0;
01827
01828 int CmiTimerIsSynchronized(void) {
01829 int flag;
01830 void *v;
01831
01832
01833 #if MPI_VERSION >= 2
01834 if (MPI_SUCCESS != MPI_Comm_get_attr(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
01835 #else
01836 if (MPI_SUCCESS != MPI_Attr_get(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
01837 #endif
01838 printf("MPI_WTIME_IS_GLOBAL not valid!\n");
01839 if (flag) {
01840 _is_global = *(int*)v;
01841 if (_is_global && CmiMyPe() == 0)
01842 printf("Charm++> MPI timer is synchronized\n");
01843 }
01844 return _is_global;
01845 }
01846
01847 int CmiTimerAbsolute(void) {
01848 return _absoluteTime;
01849 }
01850
01851 double CmiStartTimer(void) {
01852 return 0.0;
01853 }
01854
01855 double CmiInitTime(void) {
01856 return starttimer;
01857 }
01858
01859 void CmiTimerInit(char **argv) {
01860 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
01861 if (_absoluteTime && CmiMyPe() == 0)
01862 printf("Charm++> absolute MPI timer is used\n");
01863
01864 #if ! CMK_MEM_CHECKPOINT && ! CMK_MESSAGE_LOGGING
01865 _is_global = CmiTimerIsSynchronized();
01866 #else
01867 _is_global = 0;
01868 #endif
01869
01870 if (_is_global) {
01871 if (CmiMyRank() == 0) {
01872 double minTimer;
01873 starttimer = MPI_Wtime();
01874
01875 MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
01876 charmComm );
01877 starttimer = minTimer;
01878 }
01879 } else {
01880 #if ! CMK_MEM_CHECKPOINT && ! CMK_MESSAGE_LOGGING
01881 CmiBarrier();
01882 CmiBarrier();
01883 CmiBarrier();
01884 #endif
01885 starttimer = MPI_Wtime();
01886 }
01887
01888 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
01889 if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
01890 timerLock = CmiCreateLock();
01891 #endif
01892 CmiNodeAllBarrier();
01893 }
01894
01901 double CmiTimer(void) {
01902 double t;
01903 #if 0 && CMK_SMP
01904 if (timerLock) CmiLock(timerLock);
01905 #endif
01906
01907 t = MPI_Wtime();
01908
01909 #if 0 && CMK_SMP
01910 if (timerLock) CmiUnlock(timerLock);
01911 #endif
01912
01913 return _absoluteTime?t: (t-starttimer);
01914 }
01915
01916 double CmiWallTimer(void) {
01917 double t;
01918 #if 0 && CMK_SMP
01919 if (timerLock) CmiLock(timerLock);
01920 #endif
01921
01922 t = MPI_Wtime();
01923
01924 #if 0 && CMK_SMP
01925 if (timerLock) CmiUnlock(timerLock);
01926 #endif
01927
01928 return _absoluteTime? t: (t-starttimer);
01929 }
01930
01931 double CmiCpuTimer(void) {
01932 double t;
01933 #if 0 && CMK_SMP
01934 if (timerLock) CmiLock(timerLock);
01935 #endif
01936 t = MPI_Wtime() - starttimer;
01937 #if 0 && CMK_SMP
01938 if (timerLock) CmiUnlock(timerLock);
01939 #endif
01940 return t;
01941 }
01942
01943 #endif
01944
01945 void LrtsBarrier(void)
01946 {
01947 if (MPI_SUCCESS != MPI_Barrier(charmComm))
01948 CmiAbort("Timernit: MPI_Barrier failed!\n");
01949 }
01950
01951
01952 int CmiBarrierZero(void) {
01953 int i;
01954 if (CmiMyRank() == 0)
01955 {
01956 char msg[1];
01957 MPI_Status sts;
01958 if (CmiMyNode() == 0) {
01959 for (i=0; i<CmiNumNodes()-1; i++) {
01960 START_EVENT();
01961 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
01962 CmiPrintf("MPI_Recv failed!\n");
01963
01964 END_EVENT(30);
01965 }
01966 } else {
01967 START_EVENT();
01968
01969 if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,charmComm))
01970 printf("MPI_Send failed!\n");
01971
01972 END_EVENT(20);
01973 }
01974 }
01975 CmiNodeAllBarrier();
01976 return 0;
01977 }
01978
01979
01980 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
01981
01982 void mpi_restart_crashed(int pe, int rank)
01983 {
01984 int vals[2];
01985 vals[0] = CmiGetPeGlobal(pe,CmiMyPartition());
01986 vals[1] = CpvAccess(_curRestartPhase)+1;
01987 MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
01988 MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
01989 }
01990
01991
01992 void mpi_end_spare(void)
01993 {
01994 int i;
01995 for (i=nextrank; i<total_pes; i++) {
01996 int vals[2] = {-1,-1};
01997 MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
01998 }
01999 }
02000
02001 int find_spare_mpirank(int _pe,int partition)
02002 {
02003 if (nextrank == total_pes) {
02004 CmiAbort("Charm++> No spare processor available.");
02005 }
02006 int pe = CmiGetPeGlobal(_pe,partition);
02007 crashedRankList * crashedRank= (crashedRankList *)(malloc(sizeof(crashedRankList)));
02008 crashedRank->rank = petorank[pe];
02009 crashedRank->next=NULL;
02010 if(CpvAccess(crashedRankHdr)==NULL){
02011 CpvAccess(crashedRankHdr) = crashedRank;
02012 CpvAccess(crashedRankPtr) = CpvAccess(crashedRankHdr);
02013 }else{
02014 CpvAccess(crashedRankPtr)->next = crashedRank;
02015 CpvAccess(crashedRankPtr) = crashedRank;
02016 }
02017 petorank[pe] = nextrank;
02018 nextrank++;
02019 return nextrank-1;
02020 }
02021
02022 int isRankDie(int rank){
02023 crashedRankList * cur = CpvAccess(crashedRankHdr);
02024 crashedRankList * head = CpvAccess(crashedRankHdr);
02025 while(cur!=NULL){
02026 if(rank == cur->rank){
02027 CpvAccess(crashedRankHdr) = head;
02028 return 1;
02029 }
02030 cur = cur->next;
02031 }
02032 CpvAccess(crashedRankHdr) = head;
02033 return 0;
02034 }
02035
02036 void CkDieNow(void)
02037 {
02038 #if __FAULT__
02039 CmiPrintf("[%d] die now.\n", CmiMyPe());
02040
02041
02042 while (!CheckAllAsyncMsgsSent()) {
02043 PumpMsgs();
02044 ReleasePostedMessages();
02045 }
02046 MPI_Barrier(charmComm);
02047
02048 MPI_Finalize();
02049 exit(0);
02050 #endif
02051 }
02052
02053 #endif
02054
02055
02056 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
02057
02058
02059 #if MPI_DYNAMIC_POST_RECV
02060
02061 static void consumeAllMsgs(void)
02062 {
02063 MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
02064 if (ptr) {
02065 do {
02066 int i;
02067 for (i=0; i<ptr->bufCnt; i++) {
02068 int done = 0;
02069 MPI_Status sts;
02070
02071
02072 if (ptr->postedRecvBufs[i] == NULL) continue;
02073
02074 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
02075 CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
02076 if (done) {
02077 int nbytes;
02078 char *msg;
02079
02080 if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
02081 CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
02082
02083 msg = (ptr->postedRecvBufs)[i];
02084 (ptr->postedRecvBufs)[i] = NULL;
02085
02086 handleOneRecvedMsg(nbytes, msg);
02087 } else {
02088 if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
02089 CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
02090 }
02091 }
02092 ptr = ptr->next;
02093 } while (ptr != CpvAccess(curPostRecvPtr));
02094 }
02095 }
02096
02097 static void recordMsgHistogramInfo(int size)
02098 {
02099 int idx = 0;
02100 size -= MPI_POST_RECV_LOWERSIZE;
02101 if (size > 0)
02102 idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
02103
02104 if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
02105 CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
02106 }
02107
02108 #define POST_RECV_USE_STATIC_PARAM 0
02109 #define POST_RECV_REPORT_STS 0
02110
02111 #if POST_RECV_REPORT_STS
02112 static int buildDynCallCnt = 0;
02113 #endif
02114
02115 static void buildDynamicRecvBuffers(void)
02116 {
02117 int i;
02118
02119 int local_MSG_CNT_THRESHOLD;
02120 int local_MSG_INC;
02121
02122 #if POST_RECV_REPORT_STS
02123 buildDynCallCnt++;
02124 #endif
02125
02126
02127 reportMsgHistogramInfo();
02128
02129 CpvAccess(msgRecvCnt) = 0;
02130
02131 consumeAllMsgs();
02132
02133 #if POST_RECV_USE_STATIC_PARAM
02134 local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
02135 local_MSG_INC = MPI_POST_RECV_MSG_INC;
02136 #else
02137 {
02138 int total = 0;
02139 int count = 0;
02140 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02141 int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02142
02143 if (tmp > 0) {
02144 total += tmp;
02145 count++;
02146 }
02147 }
02148 if (count == 1) local_MSG_CNT_THRESHOLD = 1;
02149 else local_MSG_CNT_THRESHOLD = total / count /3;
02150 local_MSG_INC = total/count;
02151 #if POST_RECV_REPORT_STS
02152 printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
02153 #endif
02154 }
02155 #endif
02156
02157
02158
02159 MPIPostRecvList *newHdr = NULL;
02160 MPIPostRecvList *newListPtr = newHdr;
02161 MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
02162 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02163 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02164 if (count >= local_MSG_CNT_THRESHOLD) {
02165
02166 #if POST_RECV_REPORT_STS
02167
02168 int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
02169 int high = low + MSG_HISTOGRAM_BINSIZE;
02170 int reportCnt;
02171 if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
02172 else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02173 printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
02174 #endif
02175
02176 int notFound = 1;
02177 MPIPostRecvList *newEntry = NULL;
02178 while (ptr) {
02179 if (ptr->msgSizeIdx < i) {
02180
02181 MPIPostRecvList *nextptr = ptr->next;
02182
02183 free(ptr->postedRecvReqs);
02184 int j;
02185 for (j=0; j<ptr->bufCnt; j++) {
02186 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02187 }
02188 free(ptr->postedRecvBufs);
02189 ptr = nextptr;
02190 } else if (ptr->msgSizeIdx == i) {
02191 int newBufCnt, j;
02192 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
02193 newEntry = ptr;
02194
02195 if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
02196 else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02197 if (newBufCnt != ptr->bufCnt) {
02198
02199 free(ptr->postedRecvReqs);
02200 ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
02201 for (j=0; j<ptr->bufCnt; j++) {
02202 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02203 }
02204 free(ptr->postedRecvBufs);
02205 ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
02206 }
02207
02208
02209 ptr->bufCnt = newBufCnt;
02210 for (j=0; j<ptr->bufCnt; j++) {
02211 ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
02212 if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
02213 MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
02214 charmComm, ptr->postedRecvReqs+j))
02215 CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
02216 }
02217
02218
02219 ptr = ptr->next;
02220
02221 if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
02222 notFound = 0;
02223 break;
02224 } else {
02225
02226 break;
02227 }
02228 if (ptr == CpvAccess(postRecvListHdr)) {
02229 ptr = NULL;
02230 break;
02231 }
02232 }
02233
02234 if (notFound) {
02235
02236 int j;
02237 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
02238 newEntry = malloc(sizeof(MPIPostRecvList));
02239 MPIPostRecvList *one = newEntry;
02240 one->msgSizeIdx = i;
02241 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
02242 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02243 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
02244 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
02245 for (j=0; j<one->bufCnt; j++) {
02246 one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
02247 if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
02248 MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
02249 charmComm, one->postedRecvReqs+j))
02250 CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
02251 }
02252 }
02253
02254
02255 CmiAssert(newEntry != NULL);
02256 if (newHdr == NULL) {
02257 newHdr = newEntry;
02258 newListPtr = newEntry;
02259 newHdr->next = newHdr;
02260 } else {
02261 newListPtr->next = newEntry;
02262 newListPtr = newEntry;
02263 newListPtr->next = newHdr;
02264 }
02265 }
02266 }
02267
02268
02269 while (ptr) {
02270
02271 MPIPostRecvList *nextptr = ptr->next;
02272
02273 free(ptr->postedRecvReqs);
02274 int j;
02275 for (j=0; j<ptr->bufCnt; j++) {
02276 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02277 }
02278 free(ptr->postedRecvBufs);
02279 ptr = nextptr;
02280 if (ptr == CpvAccess(postRecvListHdr)) break;
02281 }
02282
02283 CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
02284 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
02285 }
02286
02287 static void examineMsgHistogramInfo(int size)
02288 {
02289 int total = CpvAccess(msgRecvCnt)++;
02290 if (total < MPI_POST_RECV_FREQ) {
02291 recordMsgHistogramInfo(size);
02292 } else {
02293 buildDynamicRecvBuffers();
02294 }
02295 }
02296 #else
02297
02298 static void recordMsgHistogramInfo(int size)
02299 {
02300 int idx = size/MSG_HISTOGRAM_BINSIZE;
02301 if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
02302 CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
02303 }
02304 #endif
02305
02306 void reportMsgHistogramInfo()
02307 {
02308 #if MPI_DYNAMIC_POST_RECV
02309 int i, count;
02310 count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
02311 if (count > 0) {
02312 printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
02313 }
02314 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02315 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02316 if (count > 0) {
02317 int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
02318 int high = low + MSG_HISTOGRAM_BINSIZE;
02319 printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
02320 }
02321 }
02322 count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
02323 if (count > 0) {
02324 printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
02325 }
02326 #else
02327 int i;
02328 for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
02329 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02330 if (count > 0) {
02331 int low = i*MSG_HISTOGRAM_BINSIZE;
02332 int high = low + MSG_HISTOGRAM_BINSIZE;
02333 printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
02334 }
02335 }
02336 #endif
02337 }
02338 #endif
02339
02340 void CmiSetupMachineRecvBuffersUser(void)
02341 {
02342 #if MPI_DYNAMIC_POST_RECV
02343 buildDynamicRecvBuffers();
02344 #endif
02345 }
02346
02347
02348