00001
00007
00008 #include <stdio.h>
00009 #include <errno.h>
00010 #include "converse.h"
00011 #include <mpi.h>
00012 #if CMK_TIMER_USE_XT3_DCLOCK
00013 #include <catamount/dclock.h>
00014 #endif
00015
00016
00017 #ifdef AMPI
00018 # warning "We got the AMPI version of mpi.h, instead of the system version--"
00019 # warning " Try doing an 'rm charm/include/mpi.h' and building again."
00020 # error "Can't build Charm++ using AMPI version of mpi.h header"
00021 #endif
00022
00023
00024 #if defined(_WIN32) && ! defined(__CYGWIN__)
00025 #include <windows.h>
00026 #include <wincon.h>
00027 #include <sys/types.h>
00028 #include <sys/timeb.h>
00029 static void sleep(int secs) {
00030 Sleep(1000*secs);
00031 }
00032 #else
00033 #include <unistd.h>
00034 #endif
00035 #include <stdlib.h>
00036
00037 #include "machine.h"
00038 #include "pcqueue.h"
00039
00040
00041
00042 #define MULTI_SENDQUEUE 0
00043
00044
00045 #define CMI_EXERT_SEND_CAP 0
00046 #define CMI_EXERT_RECV_CAP 0
00047
00048 #define CMI_DYNAMIC_EXERT_CAP 0
00049
00050
00051
00052 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
00053 #define CMI_DYNAMIC_MAXCAPSIZE 1000
00054 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
00055 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
00056
00057 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
00058 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
00059 static MPI_Comm charmComm;
00060
00061 #if CMI_EXERT_SEND_CAP
00062 static int SEND_CAP=3;
00063 #endif
00064
00065 #if CMI_EXERT_RECV_CAP
00066 static int RECV_CAP=2;
00067 #endif
00068
00069
00070
00071 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
00072 #define CMI_MPI_TRACE_MOREDETAILED 0
00073 #undef CMI_MPI_TRACE_USEREVENTS
00074 #define CMI_MPI_TRACE_USEREVENTS 1
00075 #else
00076 #undef CMK_SMP_TRACE_COMMTHREAD
00077 #define CMK_SMP_TRACE_COMMTHREAD 0
00078 #endif
00079
00080 #define CMK_TRACE_COMMOVERHEAD 0
00081 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
00082 #undef CMI_MPI_TRACE_USEREVENTS
00083 #define CMI_MPI_TRACE_USEREVENTS 1
00084 #else
00085 #undef CMK_TRACE_COMMOVERHEAD
00086 #define CMK_TRACE_COMMOVERHEAD 0
00087 #endif
00088
00089 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
00090 CpvStaticDeclare(double, projTraceStart);
00091 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
00092 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
00093 #else
00094 #define START_EVENT()
00095 #define END_EVENT(x)
00096 #endif
00097
00098 #if CMK_SMP_TRACE_COMMTHREAD
00099 #define START_TRACE_SENDCOMM(msg) \
00100 int isTraceEligible = traceBeginCommOp(msg); \
00101 if(isTraceEligible) traceSendMsgComm(msg);
00102 #define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
00103 #define START_TRACE_RECVCOMM(msg) CpvAccess(projTraceStart) = CmiWallTimer();
00104 #define END_TRACE_RECVCOMM(msg) \
00105 if(traceBeginCommOp(msg)){ \
00106 traceChangeLastTimestamp(CpvAccess(projTraceStart)); \
00107 traceSendMsgComm(msg); \
00108 traceEndCommOp(msg); \
00109 }
00110 #define CONDITIONAL_TRACE_USER_EVENT(x) \
00111 do{ \
00112 double etime = CmiWallTimer(); \
00113 if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
00114 traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
00115 }\
00116 }while(0);
00117 #else
00118 #define START_TRACE_SENDCOMM(msg)
00119 #define END_TRACE_SENDCOMM(msg)
00120 #define START_TRACE_RECVCOMM(msg)
00121 #define END_TRACE_RECVCOMM(msg)
00122 #define CONDITIONAL_TRACE_USER_EVENT(x)
00123 #endif
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134 #define MPI_POST_RECV 0
00135
00136
00137
00138 #if MPI_POST_RECV
00139 #define MPI_DYNAMIC_POST_RECV 0
00140
00141
00142
00143
00144
00145 static int MPI_POST_RECV_COUNT=10;
00146
00147
00148 static int MPI_POST_RECV_LOWERSIZE=8000;
00149 static int MPI_POST_RECV_UPPERSIZE=64000;
00150
00151
00152 static int MPI_POST_RECV_INC = 1000;
00153
00154
00155 static int MPI_POST_RECV_MSG_INC = 400;
00156
00157
00158 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
00159
00160
00161 static int MPI_POST_RECV_FREQ = 1000;
00162
00163 static int MPI_POST_RECV_SIZE;
00164
00165 typedef struct mpiPostRecvList {
00166
00167
00168
00169
00170 int msgSizeIdx;
00171 int bufCnt;
00172 MPI_Request *postedRecvReqs;
00173 char **postedRecvBufs;
00174 struct mpiPostRecvList *next;
00175 } MPIPostRecvList;
00176 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
00177 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
00178 CpvDeclare(int, msgRecvCnt);
00179
00180 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
00181 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
00182 CpvDeclare(MPI_Request*, CmiPostedRecvRequests);
00183 CpvDeclare(char**,CmiPostedRecvBuffers);
00184
00185
00186
00187
00188
00189
00190
00191 #if MPI_DYNAMIC_POST_RECV
00192 static int MSG_HISTOGRAM_BINSIZE;
00193 static int MAX_HISTOGRAM_BUCKETS;
00194 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
00195 static void recordMsgHistogramInfo(int size);
00196 static void reportMsgHistogramInfo();
00197 #endif
00198
00199 #endif
00200
00201
00202
00203
00204
00205 #define USE_ASYNC_RECV_FUNC 0
00206
00207 #ifdef USE_ASYNC_RECV_FUNC
00208 static int IRECV_MSG_THRESHOLD = 8000;
00209 typedef struct IRecvListEntry{
00210 MPI_Request req;
00211 char *msg;
00212 int size;
00213 struct IRecvListEntry *next;
00214 }*IRecvList;
00215
00216 static IRecvList freedIrecvList = NULL;
00217 static IRecvList waitIrecvListHead = NULL;
00218 static IRecvList waitIrecvListTail = NULL;
00219
00220 static IRecvList irecvListEntryAllocate(){
00221 IRecvList ret;
00222 if(freedIrecvList == NULL) {
00223 ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));
00224 return ret;
00225 } else {
00226 ret = freedIrecvList;
00227 freedIrecvList = freedIrecvList->next;
00228 return ret;
00229 }
00230 }
00231 static void irecvListEntryFree(IRecvList used){
00232 used->next = freedIrecvList;
00233 freedIrecvList = used;
00234 }
00235
00236 #endif
00237
00238
00239
00240
00241 void CmiSetupMachineRecvBuffers();
00242
00243 #define CAPTURE_MSG_HISTOGRAM 0
00244 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
00245 static int MSG_HISTOGRAM_BINSIZE=1000;
00246 static int MAX_HISTOGRAM_BUCKETS=2000;
00247 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
00248 static void recordMsgHistogramInfo(int size);
00249 static void reportMsgHistogramInfo();
00250 #endif
00251
00252
00253 #define TAG 1375
00254 #if MPI_POST_RECV
00255 #define POST_RECV_TAG (TAG+1)
00256 #define BARRIER_ZERO_TAG TAG
00257 #else
00258 #define BARRIER_ZERO_TAG (TAG-1)
00259 #endif
00260
00261
00262 #if CMK_BLUEGENEL
00263 #define MAX_QLEN 8
00264 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
00265 #else
00266 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
00267 #define MAX_QLEN 200
00268 #endif
00269
00270
00271
00272
00273 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
00274 #define CHARM_MAGIC_NUMBER 126
00275
00276 #if CMK_ERROR_CHECKING
00277 extern unsigned char computeCheckSum(unsigned char *data, int len);
00278 static int checksum_flag = 0;
00279 #define CMI_SET_CHECKSUM(msg, len) \
00280 if (checksum_flag) { \
00281 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
00282 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
00283 }
00284 #define CMI_CHECK_CHECKSUM(msg, len) \
00285 if (checksum_flag) \
00286 if (computeCheckSum((unsigned char*)msg, len) != 0) \
00287 CmiAbort("Fatal error: checksum doesn't agree!\n");
00288 #else
00289 #define CMI_SET_CHECKSUM(msg, len)
00290 #define CMI_CHECK_CHECKSUM(msg, len)
00291 #endif
00292
00293
00294
00295 #include <signal.h>
00296 void (*signal_int)(int);
00297
00298 static int _thread_provided = -1;
00299 static int idleblock = 0;
00300
00301
00302 typedef struct msg_list {
00303 char *msg;
00304 struct msg_list *next;
00305 int size, destpe, mode;
00306 MPI_Request req;
00307 } SMSG_LIST;
00308
00309 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
00310 CpvStaticDeclare(SMSG_LIST *, end_sent);
00311
00312 CpvStaticDeclare(int, MsgQueueLen);
00313 static int request_max;
00314
00315 static int no_outstanding_sends=0;
00316
00317 #if NODE_0_IS_CONVHOST
00318 int inside_comm = 0;
00319 #endif
00320
00321 typedef struct ProcState {
00322 #if MULTI_SENDQUEUE
00323 PCQueue sendMsgBuf;
00324 #endif
00325 CmiNodeLock recvLock;
00326 } ProcState;
00327 static ProcState *procState;
00328
00329 #if CMK_SMP && !MULTI_SENDQUEUE
00330 static PCQueue sendMsgBuf;
00331 static CmiNodeLock sendMsgBufLock = NULL;
00332 #endif
00333
00334
00335 #if CMK_MEM_CHECKPOINT
00336 #define FAIL_TAG 1200
00337 int num_workpes, total_pes;
00338 int *petorank = NULL;
00339 int nextrank;
00340 void mpi_end_spare();
00341 #endif
00342
00343
00344
00345 #if CMK_BLUEGENEL
00346 extern void MPID_Progress_test();
00347 #endif
00348 static size_t CmiAllAsyncMsgsSent(void);
00349 static void CmiReleaseSentMessages(void);
00350 static int PumpMsgs(void);
00351 static void PumpMsgsBlocking(void);
00352
00353 #if CMK_SMP
00354 static int MsgQueueEmpty();
00355 static int RecvQueueEmpty();
00356 static int SendMsgBuf();
00357 static void EnqueueMsg(void *m, int size, int node, int mode);
00358 #endif
00359
00360
00361 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
00362 #define LrtsSendFunc MachineSpecificSendForMPI
00363
00364
00365 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
00366 #define LrtsInit MachineInitForMPI
00367
00368 static void MachinePreCommonInitForMPI(int everReturn);
00369 static void MachinePostCommonInitForMPI(int everReturn);
00370 #define LrtsPreCommonInit MachinePreCommonInitForMPI
00371 #define LrtsPostCommonInit MachinePostCommonInitForMPI
00372
00373
00374
00375 static void AdvanceCommunicationForMPI(int whenidle);
00376 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
00377
00378 static void DrainResourcesForMPI();
00379 #define LrtsDrainResources DrainResourcesForMPI
00380
00381 static void MachineExitForMPI();
00382 #define LrtsExit MachineExitForMPI
00383
00384
00385
00386 void CmiNotifyIdleForMPI(void);
00387
00388
00389 static void MachinePostNonLocalForMPI();
00390 #define LrtsPostNonLocal MachinePostNonLocalForMPI
00391
00392
00393
00400 #define CMK_HAS_SIZE_IN_MSGHDR 0
00401 #include "machine-lrts.h"
00402 #include "machine-common-core.c"
00403
00404
00405
00406 #if CMK_SMP
00407 static void EnqueueMsg(void *m, int size, int node, int mode) {
00408
00409 SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
00410 MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
00411 msg_tmp->msg = m;
00412 msg_tmp->size = size;
00413 msg_tmp->destpe = node;
00414 msg_tmp->next = 0;
00415 msg_tmp->mode = mode;
00416
00417 #if MULTI_SENDQUEUE
00418 PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
00419 #else
00420
00421 PCQueuePush(sendMsgBuf,(char *)msg_tmp);
00422
00423 #endif
00424
00425 MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
00426 }
00427 #endif
00428
00429
00430 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
00431 int node = smsg->destpe;
00432 int size = smsg->size;
00433 char *msg = smsg->msg;
00434 int mode = smsg->mode;
00435 int dstrank;
00436
00437 MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
00438 #if CMK_ERROR_CHECKING
00439 CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
00440 CMI_SET_CHECKSUM(msg, size);
00441 #endif
00442
00443 #if MPI_POST_RECV
00444 if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
00445 #if MPI_DYNAMIC_POST_RECV
00446 int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
00447 START_TRACE_SENDCOMM(msg);
00448 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,charmComm,&(smsg->req)))
00449 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00450 END_TRACE_SENDCOMM(msg);
00451 #else
00452 START_TRACE_SENDCOMM(msg);
00453 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,charmComm,&(smsg->req)))
00454 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00455 END_TRACE_SENDCOMM(msg);
00456 #endif
00457 } else {
00458 START_TRACE_SENDCOMM(msg);
00459 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
00460 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00461 END_TRACE_SENDCOMM(msg);
00462 }
00463 #else
00464
00465
00466 #if CMK_MEM_CHECKPOINT
00467 dstrank = petorank[node];
00468 #else
00469 dstrank=node;
00470 #endif
00471 START_TRACE_SENDCOMM(msg)
00472 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
00473 CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
00474 END_TRACE_SENDCOMM(msg)
00475 #endif
00476
00477 MACHSTATE(3,"}MPI_Isend end");
00478 CpvAccess(MsgQueueLen)++;
00479 if (CpvAccess(sent_msgs)==0)
00480 CpvAccess(sent_msgs) = smsg;
00481 else
00482 CpvAccess(end_sent)->next = smsg;
00483 CpvAccess(end_sent) = smsg;
00484
00485 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
00486 if (mode == P2P_SYNC || mode == P2P_ASYNC)
00487 {
00488 while (CpvAccess(MsgQueueLen) > request_max) {
00489 CmiReleaseSentMessages();
00490 PumpMsgs();
00491 }
00492 }
00493 #endif
00494
00495 return (CmiCommHandle) &(smsg->req);
00496 }
00497
00498 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
00499
00500
00501 CmiState cs = CmiGetState();
00502 SMSG_LIST *msg_tmp;
00503 int rank;
00504
00505 CmiAssert(destNode != CmiMyNode());
00506 #if CMK_SMP
00507 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
00508 EnqueueMsg(msg, size, destNode, mode);
00509 return 0;
00510 }
00511 #endif
00512
00513
00514 msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
00515 msg_tmp->msg = msg;
00516 msg_tmp->destpe = destNode;
00517 msg_tmp->size = size;
00518 msg_tmp->next = 0;
00519 msg_tmp->mode = mode;
00520 return MPISendOneMsg(msg_tmp);
00521 }
00522
00523 static size_t CmiAllAsyncMsgsSent(void) {
00524 SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
00525 MPI_Status sts;
00526 int done;
00527
00528 while (msg_tmp!=0) {
00529 done = 0;
00530 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00531 CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
00532 if (!done)
00533 return 0;
00534 msg_tmp = msg_tmp->next;
00535
00536 }
00537 return 1;
00538 }
00539
00540 int CmiAsyncMsgSent(CmiCommHandle c) {
00541
00542 SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
00543 int done;
00544 MPI_Status sts;
00545
00546 while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
00547 msg_tmp = msg_tmp->next;
00548 if (msg_tmp) {
00549 done = 0;
00550 if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
00551 CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
00552 return ((done)?1:0);
00553 } else {
00554 return 1;
00555 }
00556 }
00557
00558 void CmiReleaseCommHandle(CmiCommHandle c) {
00559 return;
00560 }
00561
00562
00563 static void CmiReleaseSentMessages(void) {
00564 SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
00565 SMSG_LIST *prev=0;
00566 SMSG_LIST *temp;
00567 int done;
00568 MPI_Status sts;
00569
00570 #if CMK_BLUEGENEL
00571 MPID_Progress_test();
00572 #endif
00573
00574 MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
00575 while (msg_tmp!=0) {
00576 done =0;
00577 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
00578 double startT = CmiWallTimer();
00579 #endif
00580 if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
00581 CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
00582 if (done) {
00583 MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
00584 CpvAccess(MsgQueueLen)--;
00585
00586 temp = msg_tmp->next;
00587 if (prev==0)
00588 CpvAccess(sent_msgs) = temp;
00589 else
00590 prev->next = temp;
00591 CmiFree(msg_tmp->msg);
00592
00593 free(msg_tmp);
00594 msg_tmp = temp;
00595 } else {
00596 prev = msg_tmp;
00597 msg_tmp = msg_tmp->next;
00598 }
00599 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
00600 {
00601 double endT = CmiWallTimer();
00602
00603 if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
00604 }
00605 #endif
00606 }
00607 CpvAccess(end_sent) = prev;
00608 MACHSTATE(2,"} CmiReleaseSentMessages end");
00609 }
00610
00611 static int PumpMsgs(void) {
00612 int nbytes, flg, res;
00613 char *msg;
00614 MPI_Status sts;
00615 int recd=0;
00616
00617 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
00618 int recvCnt=0;
00619 #endif
00620
00621 #if CMK_BLUEGENEL
00622 MPID_Progress_test();
00623 #endif
00624
00625 MACHSTATE(2,"PumpMsgs begin {");
00626
00627 #if CMI_DYNAMIC_EXERT_CAP
00628 dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
00629 #endif
00630
00631 while (1) {
00632 int doSyncRecv = 1;
00633 #if CMI_EXERT_RECV_CAP
00634 if (recvCnt==RECV_CAP) break;
00635 #elif CMI_DYNAMIC_EXERT_CAP
00636 if (recvCnt >= dynamicRecvCap) break;
00637 #endif
00638
00639 START_TRACE_RECVCOMM(NULL);
00640
00641
00642 #if MPI_POST_RECV
00643 MPIPostRecvList *postedOne = NULL;
00644 int completed_index = -1;
00645 flg = 0;
00646 #if MPI_DYNAMIC_POST_RECV
00647 MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
00648 if (oldPostRecvPtr) {
00649
00650 do {
00651
00652 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
00653 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
00654 CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00655
00656 if (flg) {
00657 postedOne = cur;
00658 break;
00659 }
00660 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
00661 } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
00662 }
00663 #else
00664 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
00665 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
00666 CmiAbort("PumpMsgs: MPI_Testany failed!\n");
00667 #endif
00668 CONDITIONAL_TRACE_USER_EVENT(60);
00669 if (flg) {
00670 if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
00671 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
00672
00673 recd = 1;
00674 #if !MPI_DYNAMIC_POST_RECV
00675 postedOne = CpvAccess(curPostRecvPtr);
00676 #endif
00677 msg = (postedOne->postedRecvBufs)[completed_index];
00678 (postedOne->postedRecvBufs)[completed_index] = NULL;
00679
00680 CpvAccess(Cmi_posted_recv_total)++;
00681 } else {
00682 START_EVENT();
00683 res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, charmComm, &flg, &sts);
00684 if (res != MPI_SUCCESS)
00685 CmiAbort("MPI_Iprobe failed\n");
00686 if (!flg) break;
00687
00688 CONDITIONAL_TRACE_USER_EVENT(70);
00689 recd = 1;
00690 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00691 msg = (char *) CmiAlloc(nbytes);
00692
00693 #if USE_ASYNC_RECV_FUNC
00694 if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
00695 #endif
00696 if(doSyncRecv){
00697 START_EVENT();
00698 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
00699 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00700 }
00701 #if USE_ASYNC_RECV_FUNC
00702 else {
00703 START_EVENT();
00704 IRecvList one = irecvListEntryAllocate();
00705 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
00706 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00707
00708 one->msg = msg;
00709 one->size = nbytes;
00710 one->next = NULL;
00711 waitIrecvListTail->next = one;
00712 waitIrecvListTail = one;
00713 CONDITIONAL_TRACE_USER_EVENT(50);
00714 }
00715 #endif
00716 CpvAccess(Cmi_unposted_recv_total)++;
00717 }
00718 #else
00719
00720 START_EVENT();
00721 res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
00722 if (res != MPI_SUCCESS)
00723 CmiAbort("MPI_Iprobe failed\n");
00724
00725 if (!flg) break;
00726 CONDITIONAL_TRACE_USER_EVENT(70);
00727
00728 recd = 1;
00729 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00730 msg = (char *) CmiAlloc(nbytes);
00731
00732 #if USE_ASYNC_RECV_FUNC
00733 if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
00734 #endif
00735 if(doSyncRecv){
00736 START_EVENT();
00737 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
00738 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
00739 }
00740 #if USE_ASYNC_RECV_FUNC
00741 else {
00742 START_EVENT();
00743 IRecvList one = irecvListEntryAllocate();
00744 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
00745 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00746 one->msg = msg;
00747 one->size = nbytes;
00748 one->next = NULL;
00749 waitIrecvListTail->next = one;
00750 waitIrecvListTail = one;
00751
00752 CONDITIONAL_TRACE_USER_EVENT(50);
00753 }
00754 #endif
00755
00756 #endif
00757
00758 MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
00759 CMI_CHECK_CHECKSUM(msg, nbytes);
00760 #if CMK_ERROR_CHECKING
00761 if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) {
00762 CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
00763 CmiFree(msg);
00764 CmiAbort("Abort!\n");
00765 continue;
00766 }
00767 #endif
00768
00769 if(doSyncRecv){
00770 END_TRACE_RECVCOMM(msg);
00771 handleOneRecvedMsg(nbytes, msg);
00772 }
00773
00774 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
00775 recordMsgHistogramInfo(nbytes);
00776 #endif
00777
00778 #if MPI_POST_RECV
00779 #if MPI_DYNAMIC_POST_RECV
00780 if (postedOne) {
00781
00782
00783 int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
00784 int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
00785
00786 (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
00787
00788
00789 START_EVENT();
00790
00791 if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
00792 postRecvBufSize,
00793 MPI_BYTE,
00794 MPI_ANY_SOURCE,
00795 postRecvTag,
00796 charmComm,
00797 &((postedOne->postedRecvReqs)[completed_index]) ))
00798 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00799 CONDITIONAL_TRACE_USER_EVENT(50);
00800 }
00801 #else
00802 if (postedOne) {
00803
00804 (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
00805
00806
00807 START_EVENT();
00808 if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
00809 MPI_POST_RECV_SIZE,
00810 MPI_BYTE,
00811 MPI_ANY_SOURCE,
00812 POST_RECV_TAG,
00813 charmComm,
00814 &((postedOne->postedRecvReqs)[completed_index]) ))
00815 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
00816 CONDITIONAL_TRACE_USER_EVENT(50);
00817 }
00818 #endif
00819 #endif
00820
00821 #if CMI_EXERT_RECV_CAP
00822 recvCnt++;
00823 #elif CMI_DYNAMIC_EXERT_CAP
00824 recvCnt++;
00825 #if CMK_SMP
00826
00827
00828
00829
00830
00831 if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
00832 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
00833 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
00834 }
00835 #else
00836
00837
00838
00839 if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
00840 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
00841 }
00842 #endif
00843
00844 #endif
00845
00846 }
00847
00848 #if USE_ASYNC_RECV_FUNC
00849
00850 {
00851 IRecvList irecvEnt;
00852 int irecvDone = 0;
00853 MPI_Status sts;
00854 while(waitIrecvListHead->next) {
00855 IRecvList irecvEnt = waitIrecvListHead->next;
00856
00857 START_EVENT();
00858
00859
00860 if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
00861 CmiAbort("PumpMsgs: MPI_Test failed!\n");
00862 if(!irecvDone) break;
00863
00864 END_TRACE_RECVCOMM((irecvEnt->msg));
00865
00866
00867 handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
00868 waitIrecvListHead->next = irecvEnt->next;
00869 irecvListEntryFree(irecvEnt);
00870 recd = 1;
00871 }
00872 if(waitIrecvListHead->next == NULL)
00873 waitIrecvListTail = waitIrecvListHead;
00874 }
00875 #endif
00876
00877
00878 MACHSTATE(2,"} PumpMsgs end ");
00879 return recd;
00880 }
00881
00882
00883 static void PumpMsgsBlocking(void) {
00884 static int maxbytes = 20000000;
00885 static char *buf = NULL;
00886 int nbytes, flg;
00887 MPI_Status sts;
00888 char *msg;
00889 int recd=0;
00890
00891 if (!PCQueueEmpty(CmiGetState()->recv)) return;
00892 if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
00893 if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
00894 if (CpvAccess(sent_msgs)) return;
00895
00896 #if 0
00897 CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
00898 #endif
00899
00900 if (buf == NULL) {
00901 buf = (char *) CmiAlloc(maxbytes);
00902 _MEMCHECK(buf);
00903 }
00904
00905
00906 #if MPI_POST_RECV
00907 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
00908 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
00909 #endif
00910
00911 START_TRACE_RECVCOMM(NULL);
00912 if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, charmComm,&sts))
00913 CmiAbort("PumpMsgs: PMP_Recv failed!\n");
00914
00915 MPI_Get_count(&sts, MPI_BYTE, &nbytes);
00916 msg = (char *) CmiAlloc(nbytes);
00917 memcpy(msg, buf, nbytes);
00918 END_TRACE_RECVCOMM(msg);
00919
00920 #if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
00921 char tmp[32];
00922 sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
00923 traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
00924 #endif
00925
00926 handleOneRecvedMsg(nbytes, msg);
00927 }
00928
00929
00930 #if CMK_SMP
00931
00932
00933 static int SendMsgBuf() {
00934 SMSG_LIST *msg_tmp;
00935 char *msg;
00936 int node, rank, size;
00937 int i;
00938 int sent = 0;
00939
00940 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
00941 int sentCnt = 0;
00942 #endif
00943
00944 #if CMI_DYNAMIC_EXERT_CAP
00945 dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
00946 #endif
00947
00948 MACHSTATE(2,"SendMsgBuf begin {");
00949 #if MULTI_SENDQUEUE
00950 for (i=0; i<_Cmi_mynodesize+1; i++) {
00951 if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
00952 msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
00953 #else
00954
00955
00956 msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
00957
00958 while (NULL != msg_tmp) {
00959 #endif
00960 MPISendOneMsg(msg_tmp);
00961 sent=1;
00962
00963 #if CMI_EXERT_SEND_CAP
00964 if (++sentCnt == SEND_CAP) break;
00965 #elif CMI_DYNAMIC_EXERT_CAP
00966 if (++sentCnt >= dynamicSendCap) break;
00967 if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
00968 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
00969 #endif
00970
00971 #if ! MULTI_SENDQUEUE
00972
00973 msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
00974
00975 #endif
00976 }
00977 #if MULTI_SENDQUEUE
00978 }
00979 #endif
00980 MACHSTATE(2,"}SendMsgBuf end ");
00981 return sent;
00982 }
00983
00984 static int MsgQueueEmpty() {
00985 int i;
00986 #if MULTI_SENDQUEUE
00987 for (i=0; i<_Cmi_mynodesize; i++)
00988 if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
00989 #else
00990 return PCQueueEmpty(sendMsgBuf);
00991 #endif
00992 return 1;
00993 }
00994
00995
00996 static int RecvQueueEmpty() {
00997 int i;
00998 for (i=0; i<_Cmi_mynodesize; i++) {
00999 CmiState cs=CmiGetStateN(i);
01000 if (!PCQueueEmpty(cs->recv)) return 0;
01001 }
01002 return 1;
01003 }
01004
01005
01006 #define REPORT_COMM_METRICS 0
01007 #if REPORT_COMM_METRICS
01008 static double pumptime = 0.0;
01009 static double releasetime = 0.0;
01010 static double sendtime = 0.0;
01011 #endif
01012
01013 #endif //end of CMK_SMP
01014
01015 static void AdvanceCommunicationForMPI(int whenidle) {
01016 #if REPORT_COMM_METRICS
01017 double t1, t2, t3, t4;
01018 t1 = CmiWallTimer();
01019 #endif
01020
01021 #if CMK_SMP
01022 PumpMsgs();
01023
01024 #if REPORT_COMM_METRICS
01025 t2 = CmiWallTimer();
01026 #endif
01027
01028 CmiReleaseSentMessages();
01029 #if REPORT_COMM_METRICS
01030 t3 = CmiWallTimer();
01031 #endif
01032
01033 SendMsgBuf();
01034
01035 #if REPORT_COMM_METRICS
01036 t4 = CmiWallTimer();
01037 pumptime += (t2-t1);
01038 releasetime += (t3-t2);
01039 sendtime += (t4-t3);
01040 #endif
01041
01042 #else
01043 CmiReleaseSentMessages();
01044
01045 #if REPORT_COMM_METRICS
01046 t2 = CmiWallTimer();
01047 #endif
01048 PumpMsgs();
01049
01050 #if REPORT_COMM_METRICS
01051 t3 = CmiWallTimer();
01052 pumptime += (t3-t2);
01053 releasetime += (t2-t1);
01054 #endif
01055
01056 #endif
01057 }
01058
01059
01060 static void MachinePostNonLocalForMPI() {
01061 #if !CMK_SMP
01062 if (no_outstanding_sends) {
01063 while (CpvAccess(MsgQueueLen)>0) {
01064 AdvanceCommunicationForMPI(0);
01065 }
01066 }
01067
01068
01069
01070
01071 #if 0
01072 if (!msg) {
01073 CmiReleaseSentMessages();
01074 if (PumpMsgs())
01075 return PCQueuePop(cs->recv);
01076 else
01077 return 0;
01078 }
01079 #endif
01080 #else
01081 if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
01082 CmiReleaseSentMessages();
01083
01084
01085
01086
01087 }
01088 #endif
01089 }
01090
01091
01092 void CmiNotifyIdleForMPI(void) {
01093 CmiReleaseSentMessages();
01094 if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
01095 }
01096
01097
01098
01099 #if CMK_MACHINE_PROGRESS_DEFINED
01100 void CmiMachineProgressImpl() {
01101 #if !CMK_SMP
01102 PumpMsgs();
01103 #if CMK_IMMEDIATE_MSG
01104 CmiHandleImmediate();
01105 #endif
01106 #else
01107
01108
01109 if (CmiMyRank() == CmiMyNodeSize())
01110 CommunicationServerThread(0);
01111 #endif
01112 }
01113 #endif
01114
01115
01116 void DrainResourcesForMPI() {
01117 #if !CMK_SMP
01118 while (!CmiAllAsyncMsgsSent()) {
01119 PumpMsgs();
01120 CmiReleaseSentMessages();
01121 }
01122 #else
01123 if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
01124 while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
01125 CmiReleaseSentMessages();
01126 SendMsgBuf();
01127 PumpMsgs();
01128 }
01129 }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
01130 while(!CmiAllAsyncMsgsSent()) {
01131 CmiReleaseSentMessages();
01132 }
01133 }
01134 #endif
01135 #if CMK_MEM_CHECKPOINT
01136 if (CmiMyPe() == 0) mpi_end_spare();
01137 #endif
01138 MACHSTATE(2, "Machine exit barrier begin {");
01139 START_EVENT();
01140 if (MPI_SUCCESS != MPI_Barrier(charmComm))
01141 CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
01142 END_EVENT(10);
01143 MACHSTATE(2, "} Machine exit barrier end");
01144 }
01145
01146 void MachineExitForMPI() {
01147 int i;
01148 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
01149 int doPrint = 0;
01150 if (CmiMyNode()==0) doPrint = 1;
01151
01152 if (doPrint ) {
01153 #if MPI_POST_RECV
01154 CmiPrintf("node[%d]: %llu posted receives, %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
01155 #endif
01156 }
01157 #endif
01158
01159 #if MPI_POST_RECV
01160 {
01161 MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
01162 if (ptr) {
01163 do {
01164 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
01165 ptr = ptr->next;
01166 } while (ptr!=CpvAccess(postRecvListHdr));
01167 }
01168 }
01169 #endif
01170
01171 #if REPORT_COMM_METRICS
01172 #if CMK_SMP
01173 CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
01174 CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
01175 pumptime, releasetime, sendtime);
01176 #else
01177 CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
01178 CmiMyPe(), pumptime, releasetime, sendtime);
01179 #endif
01180 #endif
01181
01182 if(!CharmLibInterOperate) {
01183 #if ! CMK_AUTOBUILD
01184 signal(SIGINT, signal_int);
01185 MPI_Finalize();
01186 #endif
01187 exit(0);
01188 }
01189 }
01190
01191 static int machine_exit_idx;
01192 static void machine_exit(char *m) {
01193 EmergencyExit();
01194
01195 fflush(stdout);
01196 CmiNodeBarrier();
01197 if (CmiMyRank() == 0) {
01198 MPI_Barrier(charmComm);
01199
01200 MPI_Abort(charmComm, 1);
01201 } else {
01202 while (1) CmiYield();
01203 }
01204 }
01205
01206 static void KillOnAllSigs(int sigNo) {
01207 static int already_in_signal_handler = 0;
01208 char *m;
01209 if (already_in_signal_handler) return;
01210 already_in_signal_handler = 1;
01211 #if CMK_CCS_AVAILABLE
01212 if (CpvAccess(cmiArgDebugFlag)) {
01213 CpdNotify(CPD_SIGNAL, sigNo);
01214 CpdFreeze();
01215 }
01216 #endif
01217 CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
01218 "Signal: %d\n",CmiMyPe(),sigNo);
01219 CmiPrintStackTrace(1);
01220
01221 m = CmiAlloc(CmiMsgHeaderSizeBytes);
01222 CmiSetHandler(m, machine_exit_idx);
01223 CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
01224 machine_exit(m);
01225 }
01226
01227
01228
01229
01230 static void registerMPITraceEvents() {
01231 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
01232 traceRegisterUserEvent("MPI_Barrier", 10);
01233 traceRegisterUserEvent("MPI_Send", 20);
01234 traceRegisterUserEvent("MPI_Recv", 30);
01235 traceRegisterUserEvent("MPI_Isend", 40);
01236 traceRegisterUserEvent("MPI_Irecv", 50);
01237 traceRegisterUserEvent("MPI_Test[any]", 60);
01238 traceRegisterUserEvent("MPI_Iprobe", 70);
01239 #endif
01240 }
01241
01242 #if MACHINE_DEBUG_LOG
01243 FILE *debugLog = NULL;
01244 #endif
01245
01246 static char *thread_level_tostring(int thread_level) {
01247 #if CMK_MPI_INIT_THREAD
01248 switch (thread_level) {
01249 case MPI_THREAD_SINGLE:
01250 return "MPI_THREAD_SINGLE";
01251 case MPI_THREAD_FUNNELED:
01252 return "MPI_THREAD_FUNNELED";
01253 case MPI_THREAD_SERIALIZED:
01254 return "MPI_THREAD_SERIALIZED";
01255 case MPI_THREAD_MULTIPLE :
01256 return "MPI_THREAD_MULTIPLE";
01257 default: {
01258 char *str = (char*)malloc(5);
01259 sprintf(str,"%d", thread_level);
01260 return str;
01261 }
01262 }
01263 return "unknown";
01264 #else
01265 char *str = (char*)malloc(5);
01266 sprintf(str,"%d", thread_level);
01267 return str;
01268 #endif
01269 }
01270
01275 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
01276 int n,i;
01277 int ver, subver;
01278 int provided;
01279 int thread_level;
01280 int myNID;
01281 int largc=*argc;
01282 char** largv=*argv;
01283
01284 #if MACHINE_DEBUG
01285 debugLog=NULL;
01286 #endif
01287 #if CMK_USE_HP_MAIN_FIX
01288 #if FOR_CPLUS
01289 _main(largc,largv);
01290 #endif
01291 #endif
01292
01293 if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
01294 #if CMK_SMP
01295 Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
01296 #else
01297 CmiAbort("+comm_thread_only_recv option can only be used with SMP version of Charm++");
01298 #endif
01299 }
01300
01301 if(!CharmLibInterOperate) {
01302 #if CMK_MPI_INIT_THREAD
01303 #if CMK_SMP
01304 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
01305 thread_level = MPI_THREAD_FUNNELED;
01306 else
01307 thread_level = MPI_THREAD_MULTIPLE;
01308 #else
01309 thread_level = MPI_THREAD_SINGLE;
01310 #endif
01311 MPI_Init_thread(argc, argv, thread_level, &provided);
01312 _thread_provided = provided;
01313 #else
01314 MPI_Init(argc, argv);
01315 thread_level = 0;
01316 _thread_provided = -1;
01317 #endif
01318 }
01319
01320 largc = *argc;
01321 largv = *argv;
01322 if(!CharmLibInterOperate) {
01323 MPI_Comm_dup(MPI_COMM_WORLD,&charmComm);
01324 MPI_Comm_size(charmComm, numNodes);
01325 MPI_Comm_rank(charmComm, myNodeID);
01326 }
01327
01328 myNID = *myNodeID;
01329
01330 MPI_Get_version(&ver, &subver);
01331 if(!CharmLibInterOperate) {
01332 if (myNID == 0) {
01333 printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
01334 printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
01335 }
01336 }
01337
01338 #if CMK_SMP
01339 if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
01340 Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
01341 if (myNID == 0) {
01342 printf("Charm++> +comm_thread_only_recv disabled\n");
01343 }
01344 }
01345 #endif
01346
01347 {
01348 int debug = CmiGetArgFlag(largv,"++debug");
01349 int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
01350 if (debug || debug_no_pause) {
01351 #if CMK_HAS_GETPID
01352 printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
01353 fflush(stdout);
01354 if (!debug_no_pause)
01355 sleep(15);
01356 #else
01357 printf("++debug ignored.\n");
01358 #endif
01359 }
01360 }
01361
01362
01363 #if CMK_MEM_CHECKPOINT
01364 if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
01365 CmiAssert(num_workpes <= *numNodes);
01366 total_pes = *numNodes;
01367 *numNodes = num_workpes;
01368 }
01369 else
01370 total_pes = num_workpes = *numNodes;
01371 if (*myNodeID == 0)
01372 CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
01373 petorank = (int *)malloc(sizeof(int) * num_workpes);
01374 for (i=0; i<num_workpes; i++) petorank[i] = i;
01375 nextrank = num_workpes;
01376
01377 if (*myNodeID >= num_workpes) {
01378 MPI_Status sts;
01379 int vals[2];
01380 MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
01381 int newpe = vals[0];
01382 CpvAccess(_curRestartPhase) = vals[1];
01383
01384 if (newpe == -1) {
01385 MPI_Barrier(charmComm);
01386 MPI_Finalize();
01387 exit(0);
01388 }
01389
01390 CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
01391
01392 MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
01393 nextrank = *myNodeID + 1;
01394 *myNodeID = newpe;
01395 myNID = newpe;
01396
01397
01398 char *phase_str;
01399 char **restart_argv;
01400 int i=0;
01401 while(largv[i]!= NULL) i++;
01402 restart_argv = (char **)malloc(sizeof(char *)*(i+3));
01403 i=0;
01404 while(largv[i]!= NULL){
01405 restart_argv[i] = largv[i];
01406 i++;
01407 }
01408 restart_argv[i] = "+restartaftercrash";
01409 phase_str = (char*)malloc(10);
01410 sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
01411 restart_argv[i+1]=phase_str;
01412 restart_argv[i+2]=NULL;
01413 *argv = restart_argv;
01414 *argc = i+2;
01415 largc = *argc;
01416 largv = *argv;
01417 }
01418 #endif
01419
01420 idleblock = CmiGetArgFlag(largv, "+idleblocking");
01421 if (idleblock && _Cmi_mynode == 0) {
01422 printf("Charm++: Running in idle blocking mode.\n");
01423 }
01424
01425 #if CMK_CHARMDEBUG
01426
01427 signal(SIGSEGV, KillOnAllSigs);
01428 signal(SIGFPE, KillOnAllSigs);
01429 signal(SIGILL, KillOnAllSigs);
01430 signal_int = signal(SIGINT, KillOnAllSigs);
01431 signal(SIGTERM, KillOnAllSigs);
01432 signal(SIGABRT, KillOnAllSigs);
01433 # if !defined(_WIN32) || defined(__CYGWIN__)
01434 signal(SIGQUIT, KillOnAllSigs);
01435 signal(SIGBUS, KillOnAllSigs);
01436 # endif
01437 #endif
01438
01439 #if CMK_NO_OUTSTANDING_SENDS
01440 no_outstanding_sends=1;
01441 #endif
01442 if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
01443 no_outstanding_sends = 1;
01444 if (myNID == 0)
01445 printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
01446 no_outstanding_sends?"":" not");
01447 }
01448
01449 request_max=MAX_QLEN;
01450 CmiGetArgInt(largv,"+requestmax",&request_max);
01451
01452
01453 #if MPI_POST_RECV
01454 CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
01455 CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
01456 CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
01457 CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
01458 CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
01459 CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
01460 CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
01461 if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
01462 if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
01463 MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
01464 if (myNID==0) {
01465 printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
01466 MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
01467 MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
01468 }
01469 #endif
01470
01471 #if CMI_EXERT_SEND_CAP
01472 CmiGetArgInt(largv, "+dynCapSend", &SEND_CAP);
01473 if (myNID==0) {
01474 printf("Charm++: using static send cap %d\n", SEND_CAP);
01475 }
01476 #endif
01477 #if CMI_EXERT_RECV_CAP
01478 CmiGetArgInt(largv, "+dynCapRecv", &RECV_CAP);
01479 if (myNID==0) {
01480 printf("Charm++: using static recv cap %d\n", RECV_CAP);
01481 }
01482 #endif
01483 #if CMI_DYNAMIC_EXERT_CAP
01484 CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
01485 CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
01486 CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
01487 if (myNID==0) {
01488 printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
01489 CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
01490 }
01491 #endif
01492
01493 #if USE_ASYNC_RECV_FUNC
01494 CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
01495 if(myNID==0) {
01496 printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
01497 }
01498 #endif
01499
01500
01501 if (CmiGetArgFlag(largv,"+checksum")) {
01502 #if CMK_ERROR_CHECKING
01503 checksum_flag = 1;
01504 if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
01505 #else
01506 if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
01507 #endif
01508 }
01509
01510 procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
01511 for (i=0; i<_Cmi_mynodesize+1; i++) {
01512 #if MULTI_SENDQUEUE
01513 procState[i].sendMsgBuf = PCQueueCreate();
01514 #endif
01515 procState[i].recvLock = CmiCreateLock();
01516 }
01517 #if CMK_SMP
01518 #if !MULTI_SENDQUEUE
01519 sendMsgBuf = PCQueueCreate();
01520 sendMsgBufLock = CmiCreateLock();
01521 #endif
01522 #endif
01523 }
01524
01525 static void MachinePreCommonInitForMPI(int everReturn) {
01526
01527 #if MPI_POST_RECV
01528 int doInit = 1;
01529 int i;
01530
01531 #if CMK_SMP
01532 if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
01533 #endif
01534
01535
01536
01537
01538
01539
01540
01541
01542 CpvInitialize(unsigned long long, Cmi_posted_recv_total);
01543 CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
01544 CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
01545 CpvInitialize(char **, CmiPostedRecvBuffers);
01546
01547 CpvAccess(CmiPostedRecvRequests) = NULL;
01548 CpvAccess(CmiPostedRecvBuffers) = NULL;
01549
01550 CpvInitialize(MPIPostRecvList *, postRecvListHdr);
01551 CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
01552 CpvInitialize(int, msgRecvCnt);
01553
01554 CpvAccess(postRecvListHdr) = NULL;
01555 CpvAccess(curPostRecvPtr) = NULL;
01556 CpvAccess(msgRecvCnt) = 0;
01557
01558 #if MPI_DYNAMIC_POST_RECV
01559 CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
01560 #endif
01561
01562 if (doInit) {
01563 #if MPI_DYNAMIC_POST_RECV
01564 MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
01565
01566 MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
01567 CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01568 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01569 #else
01570
01571
01572
01573 CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
01574
01575
01576 CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
01577 CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
01578 CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
01579
01580 CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
01581 CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
01582 CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
01583
01584
01585 for (i=0; i<MPI_POST_RECV_COUNT; i++) {
01586 char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
01587 CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
01588 if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
01589 MPI_POST_RECV_SIZE,
01590 MPI_BYTE,
01591 MPI_ANY_SOURCE,
01592 POST_RECV_TAG,
01593 charmComm,
01594 CpvAccess(postRecvListHdr)->postedRecvReqs+i ))
01595 CmiAbort("MPI_Irecv failed\n");
01596 }
01597 #endif
01598 }
01599 #endif
01600
01601 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
01602 CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
01603 CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01604 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
01605 #endif
01606
01607 #if USE_ASYNC_RECV_FUNC
01608 #if CMK_SMP
01609
01610 if(CmiMyRank() == CmiMyNodeSize()) {
01611 waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
01612 waitIrecvListHead->next = NULL;
01613 }
01614 #else
01615 waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
01616 waitIrecvListHead->next = NULL;
01617 #endif
01618 #endif
01619 }
01620
01621 static void MachinePostCommonInitForMPI(int everReturn) {
01622
01623 CmiIdleState *s=CmiNotifyGetState();
01624
01625 CpvInitialize(SMSG_LIST *, sent_msgs);
01626 CpvInitialize(SMSG_LIST *, end_sent);
01627 CpvInitialize(int, MsgQueueLen);
01628 CpvAccess(sent_msgs) = NULL;
01629 CpvAccess(end_sent) = NULL;
01630 CpvAccess(MsgQueueLen) = 0;
01631
01632 machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
01633
01634 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
01635 CpvInitialize(double, projTraceStart);
01636
01637 if (CmiMyPe() == 0) {
01638 registerMachineUserEventsFunction(®isterMPITraceEvents);
01639 }
01640 #endif
01641
01642 #if CMK_SMP
01643 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
01644 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
01645 if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV)
01646 CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
01647 #else
01648 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
01649 #endif
01650
01651 #if MACHINE_DEBUG_LOG
01652 if (CmiMyRank() == 0) {
01653 char ln[200];
01654 sprintf(ln,"debugLog.%d",CmiMyNode());
01655 debugLog=fopen(ln,"w");
01656 }
01657 #endif
01658 }
01659
01660
01661
01662
01663
01664
01665
01666
01667 void LrtsAbort(const char *message) {
01668 char *m;
01669
01670 #if CMK_CCS_AVAILABLE
01671 if (CpvAccess(cmiArgDebugFlag)) {
01672 CpdNotify(CPD_ABORT, message);
01673 CpdFreeze();
01674 }
01675 #endif
01676 CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
01677 "Reason: %s\n",CmiMyPe(),message);
01678
01679 CmiPrintStackTrace(0);
01680 m = CmiAlloc(CmiMsgHeaderSizeBytes);
01681 CmiSetHandler(m, machine_exit_idx);
01682 CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
01683 machine_exit(m);
01684
01685 MPI_Abort(charmComm, 1);
01686 }
01687
01688
01689 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
01690
01691
01692 static CmiNodeLock timerLock = 0;
01693 static int _absoluteTime = 0;
01694 static double starttimer = 0;
01695 static int _is_global = 0;
01696
01697 int CmiTimerIsSynchronized() {
01698 int flag;
01699 void *v;
01700
01701
01702 if (MPI_SUCCESS != MPI_Attr_get(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
01703 printf("MPI_WTIME_IS_GLOBAL not valid!\n");
01704 if (flag) {
01705 _is_global = *(int*)v;
01706 if (_is_global && CmiMyPe() == 0)
01707 printf("Charm++> MPI timer is synchronized\n");
01708 }
01709 return _is_global;
01710 }
01711
01712 int CmiTimerAbsolute() {
01713 return _absoluteTime;
01714 }
01715
01716 double CmiStartTimer() {
01717 return 0.0;
01718 }
01719
01720 double CmiInitTime() {
01721 return starttimer;
01722 }
01723
01724 void CmiTimerInit(char **argv) {
01725 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
01726 if (_absoluteTime && CmiMyPe() == 0)
01727 printf("Charm++> absolute MPI timer is used\n");
01728
01729 #if ! CMK_MEM_CHECKPOINT
01730 _is_global = CmiTimerIsSynchronized();
01731 #else
01732 _is_global = 0;
01733 #endif
01734
01735 if (_is_global) {
01736 if (CmiMyRank() == 0) {
01737 double minTimer;
01738 #if CMK_TIMER_USE_XT3_DCLOCK
01739 starttimer = dclock();
01740 #else
01741 starttimer = MPI_Wtime();
01742 #endif
01743
01744 MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
01745 charmComm );
01746 starttimer = minTimer;
01747 }
01748 } else {
01749 #if ! CMK_MEM_CHECKPOINT
01750 CmiBarrier();
01751 CmiBarrier();
01752 CmiBarrier();
01753 #endif
01754 #if CMK_TIMER_USE_XT3_DCLOCK
01755 starttimer = dclock();
01756 #else
01757 starttimer = MPI_Wtime();
01758 #endif
01759 }
01760
01761 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
01762 if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
01763 timerLock = CmiCreateLock();
01764 #endif
01765 CmiNodeAllBarrier();
01766 }
01767
01774 double CmiTimer(void) {
01775 double t;
01776 #if 0 && CMK_SMP
01777 if (timerLock) CmiLock(timerLock);
01778 #endif
01779
01780 #if CMK_TIMER_USE_XT3_DCLOCK
01781 t = dclock();
01782 #else
01783 t = MPI_Wtime();
01784 #endif
01785
01786 #if 0 && CMK_SMP
01787 if (timerLock) CmiUnlock(timerLock);
01788 #endif
01789
01790 return _absoluteTime?t: (t-starttimer);
01791 }
01792
01793 double CmiWallTimer(void) {
01794 double t;
01795 #if 0 && CMK_SMP
01796 if (timerLock) CmiLock(timerLock);
01797 #endif
01798
01799 #if CMK_TIMER_USE_XT3_DCLOCK
01800 t = dclock();
01801 #else
01802 t = MPI_Wtime();
01803 #endif
01804
01805 #if 0 && CMK_SMP
01806 if (timerLock) CmiUnlock(timerLock);
01807 #endif
01808
01809 return _absoluteTime? t: (t-starttimer);
01810 }
01811
01812 double CmiCpuTimer(void) {
01813 double t;
01814 #if 0 && CMK_SMP
01815 if (timerLock) CmiLock(timerLock);
01816 #endif
01817 #if CMK_TIMER_USE_XT3_DCLOCK
01818 t = dclock() - starttimer;
01819 #else
01820 t = MPI_Wtime() - starttimer;
01821 #endif
01822 #if 0 && CMK_SMP
01823 if (timerLock) CmiUnlock(timerLock);
01824 #endif
01825 return t;
01826 }
01827
01828 #endif
01829
01830
01831
01832 int CmiBarrier() {
01833 #if CMK_SMP
01834
01835 CmiNodeAllBarrier();
01836 if (CmiMyRank() == CmiMyNodeSize())
01837 #else
01838 if (CmiMyRank() == 0)
01839 #endif
01840 {
01846
01847
01848 if (MPI_SUCCESS != MPI_Barrier(charmComm))
01849 CmiAbort("Timernit: MPI_Barrier failed!\n");
01850
01851
01852 }
01853 CmiNodeAllBarrier();
01854 return 0;
01855 }
01856
01857
01858 int CmiBarrierZero() {
01859 int i;
01860 #if CMK_SMP
01861 if (CmiMyRank() == CmiMyNodeSize())
01862 #else
01863 if (CmiMyRank() == 0)
01864 #endif
01865 {
01866 char msg[1];
01867 MPI_Status sts;
01868 if (CmiMyNode() == 0) {
01869 for (i=0; i<CmiNumNodes()-1; i++) {
01870 START_EVENT();
01871
01872 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
01873 CmiPrintf("MPI_Recv failed!\n");
01874
01875 END_EVENT(30);
01876 }
01877 } else {
01878 START_EVENT();
01879
01880 if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,charmComm))
01881 printf("MPI_Send failed!\n");
01882
01883 END_EVENT(20);
01884 }
01885 }
01886 CmiNodeAllBarrier();
01887 return 0;
01888 }
01889
01890
01891 #if CMK_MEM_CHECKPOINT
01892
01893 void mpi_restart_crashed(int pe, int rank)
01894 {
01895 int vals[2];
01896 vals[0] = pe;
01897 vals[1] = CpvAccess(_curRestartPhase)+1;
01898 MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
01899 MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
01900 }
01901
01902
01903 void mpi_end_spare()
01904 {
01905 int i;
01906 for (i=nextrank; i<total_pes; i++) {
01907 int vals[2] = {-1,-1};
01908 MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
01909 }
01910 }
01911
01912 int find_spare_mpirank(int pe)
01913 {
01914 if (nextrank == total_pes) {
01915 CmiAbort("Charm++> No spare processor available.");
01916 }
01917 petorank[pe] = nextrank;
01918 nextrank++;
01919 return nextrank-1;
01920 }
01921
01922 void CkDieNow()
01923 {
01924 CmiPrintf("[%d] die now.\n", CmiMyPe());
01925
01926
01927 while (!CmiAllAsyncMsgsSent()) {
01928 PumpMsgs();
01929 CmiReleaseSentMessages();
01930 }
01931 MPI_Barrier(charmComm);
01932 MPI_Finalize();
01933 exit(0);
01934 }
01935
01936 #endif
01937
01938
01939 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
01940
01941
01942 #if MPI_DYNAMIC_POST_RECV
01943
01944 static void consumeAllMsgs()
01945 {
01946 MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
01947 if (ptr) {
01948 do {
01949 int i;
01950 for (i=0; i<ptr->bufCnt; i++) {
01951 int done = 0;
01952 MPI_Status sts;
01953
01954
01955 if (ptr->postedRecvBufs[i] == NULL) continue;
01956
01957 START_TRACE_RECVCOMM(NULL);
01958 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
01959 CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
01960 if (done) {
01961 int nbytes;
01962 char *msg;
01963
01964 if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
01965 CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
01966
01967 msg = (ptr->postedRecvBufs)[i];
01968 (ptr->postedRecvBufs)[i] = NULL;
01969
01970 END_TRACE_RECVCOMM(msg);
01971 handleOneRecvedMsg(nbytes, msg);
01972 } else {
01973 if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
01974 CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
01975 }
01976 }
01977 ptr = ptr->next;
01978 } while (ptr != CpvAccess(curPostRecvPtr));
01979 }
01980 }
01981
01982 static void recordMsgHistogramInfo(int size)
01983 {
01984 int idx = 0;
01985 size -= MPI_POST_RECV_LOWERSIZE;
01986 if (size > 0)
01987 idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
01988
01989 if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
01990 CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
01991 }
01992
01993 #define POST_RECV_USE_STATIC_PARAM 0
01994 #define POST_RECV_REPORT_STS 0
01995
01996 #if POST_RECV_REPORT_STS
01997 static int buildDynCallCnt = 0;
01998 #endif
01999
02000 static void buildDynamicRecvBuffers()
02001 {
02002 int i;
02003
02004 int local_MSG_CNT_THRESHOLD;
02005 int local_MSG_INC;
02006
02007 #if POST_RECV_REPORT_STS
02008 buildDynCallCnt++;
02009 #endif
02010
02011
02012 reportMsgHistogramInfo();
02013
02014 CpvAccess(msgRecvCnt) = 0;
02015
02016 consumeAllMsgs();
02017
02018 #if POST_RECV_USE_STATIC_PARAM
02019 local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
02020 local_MSG_INC = MPI_POST_RECV_MSG_INC;
02021 #else
02022 {
02023 int total = 0;
02024 int count = 0;
02025 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02026 int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02027
02028 if (tmp > 0) {
02029 total += tmp;
02030 count++;
02031 }
02032 }
02033 if (count == 1) local_MSG_CNT_THRESHOLD = 1;
02034 else local_MSG_CNT_THRESHOLD = total / count /3;
02035 local_MSG_INC = total/count;
02036 #if POST_RECV_REPORT_STS
02037 printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
02038 #endif
02039 }
02040 #endif
02041
02042
02043
02044 MPIPostRecvList *newHdr = NULL;
02045 MPIPostRecvList *newListPtr = newHdr;
02046 MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
02047 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02048 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02049 if (count >= local_MSG_CNT_THRESHOLD) {
02050
02051 #if POST_RECV_REPORT_STS
02052
02053 int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
02054 int high = low + MSG_HISTOGRAM_BINSIZE;
02055 int reportCnt;
02056 if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
02057 else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02058 printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
02059 #endif
02060
02061 int notFound = 1;
02062 MPIPostRecvList *newEntry = NULL;
02063 while (ptr) {
02064 if (ptr->msgSizeIdx < i) {
02065
02066 MPIPostRecvList *nextptr = ptr->next;
02067
02068 free(ptr->postedRecvReqs);
02069 int j;
02070 for (j=0; j<ptr->bufCnt; j++) {
02071 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02072 }
02073 free(ptr->postedRecvBufs);
02074 ptr = nextptr;
02075 } else if (ptr->msgSizeIdx == i) {
02076 int newBufCnt, j;
02077 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
02078 newEntry = ptr;
02079
02080 if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
02081 else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02082 if (newBufCnt != ptr->bufCnt) {
02083
02084 free(ptr->postedRecvReqs);
02085 ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
02086 for (j=0; j<ptr->bufCnt; j++) {
02087 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02088 }
02089 free(ptr->postedRecvBufs);
02090 ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
02091 }
02092
02093
02094 ptr->bufCnt = newBufCnt;
02095 for (j=0; j<ptr->bufCnt; j++) {
02096 ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
02097 if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
02098 MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
02099 charmComm, ptr->postedRecvReqs+j))
02100 CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
02101 }
02102
02103
02104 ptr = ptr->next;
02105
02106 if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
02107 notFound = 0;
02108 break;
02109 } else {
02110
02111 break;
02112 }
02113 if (ptr == CpvAccess(postRecvListHdr)) {
02114 ptr = NULL;
02115 break;
02116 }
02117 }
02118
02119 if (notFound) {
02120
02121 int j;
02122 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
02123 newEntry = malloc(sizeof(MPIPostRecvList));
02124 MPIPostRecvList *one = newEntry;
02125 one->msgSizeIdx = i;
02126 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
02127 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
02128 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
02129 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
02130 for (j=0; j<one->bufCnt; j++) {
02131 one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
02132 if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
02133 MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
02134 charmComm, one->postedRecvReqs+j))
02135 CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
02136 }
02137 }
02138
02139
02140 CmiAssert(newEntry != NULL);
02141 if (newHdr == NULL) {
02142 newHdr = newEntry;
02143 newListPtr = newEntry;
02144 newHdr->next = newHdr;
02145 } else {
02146 newListPtr->next = newEntry;
02147 newListPtr = newEntry;
02148 newListPtr->next = newHdr;
02149 }
02150 }
02151 }
02152
02153
02154 while (ptr) {
02155
02156 MPIPostRecvList *nextptr = ptr->next;
02157
02158 free(ptr->postedRecvReqs);
02159 int j;
02160 for (j=0; j<ptr->bufCnt; j++) {
02161 if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
02162 }
02163 free(ptr->postedRecvBufs);
02164 ptr = nextptr;
02165 if (ptr == CpvAccess(postRecvListHdr)) break;
02166 }
02167
02168 CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
02169 memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
02170 }
02171
02172 static void examineMsgHistogramInfo(int size)
02173 {
02174 int total = CpvAccess(msgRecvCnt)++;
02175 if (total < MPI_POST_RECV_FREQ) {
02176 recordMsgHistogramInfo(size);
02177 } else {
02178 buildDynamicRecvBuffers();
02179 }
02180 }
02181 #else
02182
02183 static void recordMsgHistogramInfo(int size)
02184 {
02185 int idx = size/MSG_HISTOGRAM_BINSIZE;
02186 if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
02187 CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
02188 }
02189 #endif
02190
02191 static void reportMsgHistogramInfo()
02192 {
02193 #if MPI_DYNAMIC_POST_RECV
02194 int i, count;
02195 count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
02196 if (count > 0) {
02197 printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
02198 }
02199 for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
02200 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02201 if (count > 0) {
02202 int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
02203 int high = low + MSG_HISTOGRAM_BINSIZE;
02204 printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
02205 }
02206 }
02207 count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
02208 if (count > 0) {
02209 printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
02210 }
02211 #else
02212 int i;
02213 for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
02214 int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
02215 if (count > 0) {
02216 int low = i*MSG_HISTOGRAM_BINSIZE;
02217 int high = low + MSG_HISTOGRAM_BINSIZE;
02218 printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
02219 }
02220 }
02221 #endif
02222 }
02223 #endif
02224
02225 void CmiSetupMachineRecvBuffersUser()
02226 {
02227 #if MPI_DYNAMIC_POST_RECV
02228 buildDynamicRecvBuffers();
02229 #endif
02230 }
02231
02232
02233