00001
00002
00003
00004
00005
00006
00007
00178
00179
00180
00181
00182
00183
00184 #include <stdarg.h>
00185
00186 #define CMK_USE_PRINTF_HACK 0
00187 #if CMK_USE_PRINTF_HACK
00188
00189
00190
00191
00192
00193
00194
00195
00196 static void InternalPrintf(const char *f, va_list l);
00197 int printf(const char *fmt, ...) {
00198 int nChar;
00199 va_list p; va_start(p, fmt);
00200 InternalPrintf(fmt,p);
00201 va_end(p);
00202 return 10;
00203 }
00204 #endif
00205
00206
00207 #include "converse.h"
00208 #include "memory-isomalloc.h"
00209
00210 #include <stdio.h>
00211 #include <stdlib.h>
00212 #include <ctype.h>
00213 #include <fcntl.h>
00214 #include <errno.h>
00215 #include <setjmp.h>
00216 #include <signal.h>
00217 #include <string.h>
00218
00219
00220 #include "machine.h"
00221
00222
00223 #include "pcqueue.h"
00224
00225 #include "machine-smp.h"
00226
00227 #if CMK_USE_KQUEUE
00228 #include <sys/event.h>
00229 int _kq = -1;
00230 #endif
00231
00232 #if CMK_USE_POLL
00233 #include <poll.h>
00234 #endif
00235
00236 #if CMK_USE_GM
00237 #include "gm.h"
00238 struct gm_port *gmport = NULL;
00239 int portFinish = 0;
00240 #endif
00241
00242 #if CMK_USE_MX
00243 #include "myriexpress.h"
00244 mx_endpoint_t endpoint;
00245 mx_endpoint_addr_t endpoint_addr;
00246 int MX_FILTER = 123456;
00247 static uint64_t Cmi_nic_id=0;
00248 #endif
00249
00250 #if CMK_USE_AMMASSO
00251 #include "clustercore/ccil_api.h"
00252 #endif
00253
00254 #if CMK_MULTICORE
00255 int Cmi_commthread = 0;
00256 #endif
00257
00258 #include "conv-ccs.h"
00259 #include "ccs-server.h"
00260 #include "sockRoutines.h"
00261
00262 #if defined(_WIN32) && ! defined(__CYGWIN__)
00263
00264 # include <windows.h>
00265 # include <wincon.h>
00266 # include <sys/types.h>
00267 # include <sys/timeb.h>
00268 # define fdopen _fdopen
00269 # define SIGBUS -1
00270 # define SIGKILL -1
00271 # define SIGQUIT -1
00272
00273
00274 #else
00275 # include <pwd.h>
00276 # include <unistd.h>
00277 # include <fcntl.h>
00278 # include <sys/file.h>
00279 #endif
00280
00281 #if CMK_PERSISTENT_COMM
00282 #include "persist_impl.h"
00283 #endif
00284
00285 #define PRINTBUFSIZE 16384
00286
00287 #ifdef __ONESIDED_IMPL
00288 #ifdef __ONESIDED_NO_HARDWARE
00289 int putSrcHandler;
00290 int putDestHandler;
00291 int getSrcHandler;
00292 int getDestHandler;
00293 #include "conv-onesided.c"
00294 #endif
00295 #endif
00296
00297
00298
00299
00300
00301
00302 static void CommunicationServer(int withDelayMs, int where);
00303
00304 void CmiHandleImmediate();
00305 extern int CmemInsideMem();
00306 extern void CmemCallWhenMemAvail();
00307 static void ConverseRunPE(int everReturn);
00308 void CmiYield(void);
00309 void ConverseCommonExit(void);
00310
00311 static unsigned int dataport=0;
00312 static int Cmi_mach_id=0;
00313 static SOCKET dataskt;
00314
00315 extern void TokenUpdatePeriodic();
00316 extern void getAvailSysMem();
00317
00318 #define BROADCAST_SPANNING_FACTOR 4
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 static int machine_initiated_shutdown=0;
00337 static int already_in_signal_handler=0;
00338
00339 static void CmiDestoryLocks();
00340
00341 #if CMK_USE_SYSVSHM
00342 void tearDownSharedBuffers();
00343 #endif
00344
00345 static void machine_exit(int status)
00346 {
00347 MACHSTATE(3," machine_exit");
00348 machine_initiated_shutdown=1;
00349
00350 CmiDestoryLocks();
00351
00352 #if CMK_USE_GM
00353 if (gmport) {
00354 gm_close(gmport); gmport = 0;
00355 gm_finalize();
00356 }
00357 #endif
00358 #if CMK_USE_SYSVSHM
00359 tearDownSharedBuffers();
00360 #endif
00361 exit(status);
00362 }
00363
00364 static void charmrun_abort(const char*);
00365
00366 static void KillEveryone(const char *msg)
00367 {
00368 charmrun_abort(msg);
00369 machine_exit(1);
00370 }
00371
00372 static void KillEveryoneCode(n)
00373 int n;
00374 {
00375 char _s[100];
00376 sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
00377 charmrun_abort(_s);
00378 machine_exit(1);
00379 }
00380
00381 static void KillOnAllSigs(int sigNo)
00382 {
00383 const char *sig="unknown signal";
00384 const char *suggestion="";
00385 if (machine_initiated_shutdown ||
00386 already_in_signal_handler)
00387 machine_exit(1);
00388 already_in_signal_handler=1;
00389
00390 CmiDestoryLocks();
00391
00392 if (sigNo==SIGSEGV) {
00393 sig="segmentation violation";
00394 suggestion="Try running with '++debug', or linking with '-memory paranoid'.\n";
00395 }
00396 if (sigNo==SIGFPE) {
00397 sig="floating point exception";
00398 suggestion="Check for integer or floating-point division by zero.\n";
00399 }
00400 if (sigNo==SIGBUS) {
00401 sig="bus error";
00402 suggestion="Check for misaligned reads or writes to memory.\n";
00403 }
00404 if (sigNo==SIGILL) {
00405 sig="illegal instruction";
00406 suggestion="Check for calls to uninitialized function pointers.\n";
00407 }
00408 if (sigNo==SIGKILL) sig="caught signal KILL";
00409 if (sigNo==SIGQUIT) sig="caught signal QUIT";
00410 if (sigNo==SIGTERM) sig="caught signal TERM";
00411 MACHSTATE1(5," Caught signal %s ",sig);
00412
00413 #ifdef __FAULT__
00414 if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
00415 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
00416 }else{
00417 #else
00418 {
00419 #endif
00420 CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
00421 "Signal: %s\n",CmiMyPe(),sig);
00422 if (0!=suggestion[0])
00423 CmiError("Suggestion: %s",suggestion);
00424 CmiPrintStackTrace(1);
00425 charmrun_abort(sig);
00426 machine_exit(1);
00427 }
00428 }
00429
00430 static void machine_atexit_check(void)
00431 {
00432 if (!machine_initiated_shutdown)
00433 CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
00434 printf("Program finished.\n");
00435 #if 0
00436 fgetc(stdin);
00437 #endif
00438 }
00439
00440 #if !defined(_WIN32) || defined(__CYGWIN__)
00441 static void HandleUserSignals(int signum)
00442 {
00443 int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
00444 CcdRaiseCondition(condnum);
00445 }
00446 #endif
00447
00448 static void PerrorExit(const char *msg)
00449 {
00450 perror(msg);
00451 machine_exit(1);
00452 }
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463 #if CMK_USE_POLL
00464 # define CMK_PIPE_DECL(delayMs) \
00465 struct pollfd fds[10]; \
00466 int nFds_sto=0; int *nFds=&nFds_sto; \
00467 int pollDelayMs=delayMs;
00468 # define CMK_PIPE_SUB fds,nFds
00469 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
00470
00471 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
00472 # define CMK_PIPE_ADDREAD(rd_fd) \
00473 do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
00474 # define CMK_PIPE_ADDWRITE(wr_fd) \
00475 do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
00476 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
00477 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
00478
00479 #elif CMK_USE_KQUEUE
00480
00481 # define CMK_PIPE_DECL(delayMs) \
00482 if (_kq == -1) _kq = kqueue(); \
00483 struct kevent ke_sto; \
00484 struct kevent* ke = &ke_sto; \
00485 struct timespec tmo; \
00486 tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
00487 # define CMK_PIPE_SUB ke
00488 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
00489
00490 # define CMK_PIPE_PARAM struct kevent* ke
00491 # define CMK_PIPE_ADDREAD(rd_fd) \
00492 do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
00493 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00494 # define CMK_PIPE_ADDWRITE(wr_fd) \
00495 do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
00496 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00497 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
00498 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
00499
00500 #else
00501
00502 # define CMK_PIPE_DECL(delayMs) \
00503 fd_set rfds_sto,wfds_sto;\
00504 fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
00505 FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
00506 # define CMK_PIPE_SUB rfds,wfds
00507 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
00508
00509 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
00510 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
00511 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
00512 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
00513 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
00514 #endif
00515
00516 static void CMK_PIPE_CHECKERR(void) {
00517 #if defined(_WIN32) && !defined(__CYGWIN__)
00518
00519
00520
00521
00522
00523
00524 #else
00525 if (errno!=EINTR)
00526 KillEveryone("Socket error in CheckSocketsReady!\n");
00527 #endif
00528 }
00529
00530
00531 static void CmiStdoutFlush(void);
00532 static int CmiStdoutNeedsService(void);
00533 static void CmiStdoutService(void);
00534 static void CmiStdoutAdd(CMK_PIPE_PARAM);
00535 static void CmiStdoutCheck(CMK_PIPE_PARAM);
00536
00537
00538 double GetClock(void)
00539 {
00540 #if defined(_WIN32) && !defined(__CYGWIN__)
00541 struct _timeb tv;
00542 _ftime(&tv);
00543 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
00544 #else
00545 struct timeval tv; int ok;
00546 ok = gettimeofday(&tv, NULL);
00547 if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
00548 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
00549 #endif
00550 }
00551
00552 char *CopyMsg(char *msg, int len)
00553 {
00554 char *copy = (char *)CmiAlloc(len);
00555 if (!copy)
00556 fprintf(stderr, "Out of memory\n");
00557 memcpy(copy, msg, len);
00558 return copy;
00559 }
00560
00561
00562
00563
00564
00565
00566
00567 static int Cmi_truecrash;
00568 static int already_aborting=0;
00569 void CmiAbort(const char *message)
00570 {
00571 if (already_aborting) machine_exit(1);
00572 already_aborting=1;
00573 {
00574
00575
00576
00577
00578
00579 }
00580 MACHSTATE1(5,"CmiAbort(%s)",message);
00581
00582
00583
00584 {
00585
00586
00587
00588
00589
00590 }
00591
00592 CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
00593 "Reason: %s\n",CmiMyPe(),message);
00594 CmiPrintStackTrace(0);
00595
00596
00597 CmiStdoutFlush();
00598
00599 if(Cmi_truecrash) {
00600 printf("CHARM++ FATAL ERROR: %s\n", message);
00601 *(int *)NULL = 0;
00602 } else {
00603 charmrun_abort(message);
00604 machine_exit(1);
00605 }
00606 }
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
00625 #include <fcntl.h>
00626 void CmiEnableAsyncIO(int fd)
00627 {
00628 if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
00629 CmiError("setting socket owner: %s\n", strerror(errno)) ;
00630 exit(1);
00631 }
00632 if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
00633 CmiError("setting socket async: %s\n", strerror(errno)) ;
00634 exit(1);
00635 }
00636 }
00637 #else
00638 void CmiEnableAsyncIO(int fd) { }
00639 #endif
00640
00641
00642 #if !defined(_WIN32) || defined(__CYGWIN__)
00643 void CmiEnableNonblockingIO(int fd) {
00644 int on=1;
00645 if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
00646 CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
00647 exit(1);
00648 }
00649 }
00650 #else
00651 void CmiEnableNonblockingIO(int fd) { }
00652 #endif
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665 int _Cmi_mynode;
00666 int _Cmi_mynodesize;
00667 int _Cmi_numnodes;
00668 int _Cmi_numpes;
00669 static int Cmi_nodestart;
00670 static skt_ip_t Cmi_self_IP;
00671 static skt_ip_t Cmi_charmrun_IP;
00672 static int Cmi_charmrun_port;
00673 static int Cmi_charmrun_pid;
00674 static int Cmi_charmrun_fd=-1;
00675
00676 static int Cmi_netpoll;
00677 static int Cmi_asyncio;
00678 static int Cmi_idlepoll;
00679 static int Cmi_syncprint;
00680 static int Cmi_print_stats = 0;
00681
00682 static void parse_netstart(void)
00683 {
00684 char *ns;
00685 int nread;
00686 int port;
00687 ns = getenv("NETSTART");
00688 if (ns!=0)
00689 {
00690 char Cmi_charmrun_name[1024];
00691 nread = sscanf(ns, "%d%s%d%d%d",
00692 &_Cmi_mynode,
00693 Cmi_charmrun_name, &Cmi_charmrun_port,
00694 &Cmi_charmrun_pid, &port);
00695 Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
00696
00697 if (nread!=5) {
00698 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
00699 exit(1);
00700 }
00701 } else
00702 {
00703 _Cmi_mynode=0;
00704 Cmi_charmrun_IP=_skt_invalid_ip;
00705 Cmi_charmrun_port=0;
00706 Cmi_charmrun_pid=0;
00707 dataport = -1;
00708 }
00709 #if CMK_USE_IBVERBS
00710 char *cmi_num_nodes = getenv("CmiNumNodes");
00711 if(cmi_num_nodes != NULL){
00712 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
00713 }
00714 #endif
00715 }
00716
00717 static void extract_common_args(char **argv)
00718 {
00719 if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
00720 Cmi_print_stats = 1;
00721 }
00722
00723
00724 #include "machine-smp.c"
00725
00726 CsvDeclare(CmiNodeState, NodeState);
00727
00728
00729 #define CMI_DEST_RANK(msg) *(int *)(msg)
00730 #include "immediate.c"
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741 #define LOGGING 0
00742
00743 #if LOGGING
00744
00745 typedef struct logent {
00746 double time;
00747 int seqno;
00748 int srcpe;
00749 int dstpe;
00750 int kind;
00751 } *logent;
00752
00753
00754 logent log;
00755 int log_pos;
00756 int log_wrap;
00757
00758 static void log_init(void)
00759 {
00760 log = (logent)malloc(50000 * sizeof(struct logent));
00761 _MEMCHECK(log);
00762 log_pos = 0;
00763 log_wrap = 0;
00764 }
00765
00766 static void log_done(void)
00767 {
00768 char logname[100]; FILE *f; int i, size;
00769 sprintf(logname, "log.%d", _Cmi_mynode);
00770 f = fopen(logname, "w");
00771 if (f==0) KillEveryone("fopen problem");
00772 if (log_wrap) size = 50000; else size=log_pos;
00773 for (i=0; i<size; i++) {
00774 logent ent = log+i;
00775 fprintf(f, "%1.4f %d %c %d %d\n",
00776 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00777 }
00778 fclose(f);
00779 }
00780
00781 void printLog(void)
00782 {
00783 char logname[100]; FILE *f; int i, j, size;
00784 static int logged = 0;
00785 if (logged)
00786 return;
00787 logged = 1;
00788 CmiPrintf("Logging: %d\n", _Cmi_mynode);
00789 sprintf(logname, "log.%d", _Cmi_mynode);
00790 f = fopen(logname, "w");
00791 if (f==0) KillEveryone("fopen problem");
00792 for (i = 5000; i; i--)
00793 {
00794
00795 j = log_pos - i;
00796 if (j < 0)
00797 {
00798 if (log_wrap)
00799 j = 5000 + j;
00800 else
00801 j = 0;
00802 };
00803 {
00804 logent ent = log+j;
00805 fprintf(f, "%1.4f %d %c %d %d\n",
00806 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00807 }
00808 }
00809 fclose(f);
00810 CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
00811 }
00812
00813 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
00814
00815 #endif
00816
00817 #if !LOGGING
00818
00819 #define log_init()
00820 #define log_done()
00821 #define printLog()
00822 #define LOG(t,s,k,d,q)
00823
00824 #endif
00825
00826
00827
00828
00829
00830
00831
00832
00833 static CmiNodeLock Cmi_scanf_mutex;
00834 static double Cmi_clock;
00835 static double Cmi_check_delay = 3.0;
00836
00837
00838
00839
00840
00841
00842
00843
00844 #if CMK_SHARED_VARS_UNAVAILABLE
00845
00846 static volatile int memflag=0;
00847 void CmiMemLock() { memflag++; }
00848 void CmiMemUnlock() { memflag--; }
00849
00850 static volatile int comm_flag=0;
00851 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
00852 #ifndef MACHLOCK_DEBUG
00853 # define CmiCommLock() (comm_flag=1)
00854 # define CmiCommUnlock() (comm_flag=0)
00855 #else
00856 void CmiCommLock(void) {
00857 MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
00858 comm_flag=1;
00859 }
00860 void CmiCommUnlock(void) {
00861 MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
00862 comm_flag=0;
00863 }
00864 #endif
00865
00866 static struct CmiStateStruct Cmi_state;
00867 int _Cmi_mype;
00868 int _Cmi_myrank=0;
00869 #define CmiGetState() (&Cmi_state)
00870 #define CmiGetStateN(n) (&Cmi_state)
00871
00872 void CmiYield(void) { sleep(0); }
00873
00874 static void CommunicationInterrupt(int ignored)
00875 {
00876 MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
00877 if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0))
00878 {
00879 MACHSTATE(5,"--SKIPPING SIGIO--");
00880 return;
00881 }
00882 MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
00883 {
00884
00885 CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
00886
00887 CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);
00888
00889 CmiIsomallocBlockListActivate(oldList);
00890 }
00891 MACHSTATE(2,"--END SIGIO--")
00892 }
00893
00894 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
00895
00896 static void CmiStartThreads(char **argv)
00897 {
00898 MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
00899 MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
00900 if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
00901 KillEveryone
00902 ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
00903
00904 CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00905 _Cmi_mype = Cmi_nodestart;
00906
00907
00908 _Cmi_myrank=1;
00909 CommunicationServerInit();
00910 _Cmi_myrank=0;
00911
00912 #if !CMK_ASYNC_NOT_NEEDED
00913 if (Cmi_asyncio)
00914 {
00915 CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
00916 if (!Cmi_netpoll) {
00917 if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
00918 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00919 }
00920 #if CMK_USE_GM || CMK_USE_MX
00921
00922 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00923 #endif
00924 }
00925 #endif
00926 }
00927
00928 static void CmiDestoryLocks()
00929 {
00930 comm_flag = 0;
00931 memflag = 0;
00932 }
00933
00934 #endif
00935
00936
00937
00938
00939 CpvDeclare(unsigned , networkProgressCount);
00940 int networkProgressPeriod;
00941
00942 CpvDeclare(void *, CmiLocalQueue);
00943
00944
00945 #ifndef CmiMyPe
00946 int CmiMyPe(void)
00947 {
00948 return CmiGetState()->pe;
00949 }
00950 #endif
00951 #ifndef CmiMyRank
00952 int CmiMyRank(void)
00953 {
00954 return CmiGetState()->rank;
00955 }
00956 #endif
00957
00958 CpvExtern(int,_charmEpoch);
00959
00960
00961
00962
00963
00964 extern double evacTime;
00965
00966 static void CmiPushPE(int pe,void *msg)
00967 {
00968 CmiState cs=CmiGetStateN(pe);
00969
00970
00971
00972
00973
00974
00975 MACHSTATE1(2,"Pushing message into %d's queue",pe);
00976 MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
00977
00978 #if CMK_IMMEDIATE_MSG
00979 if (CmiIsImmediate(msg)) {
00980 CmiPushImmediateMsg(msg);
00981 return;
00982 }
00983 #endif
00984 PCQueuePush(cs->recv,msg);
00985 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00986 if (_Cmi_noprocforcommthread)
00987 #endif
00988 CmiIdleLock_addMessage(&cs->idle);
00989 }
00990
00991 #if CMK_NODE_QUEUE_AVAILABLE
00992
00993
00994
00995 static void CmiPushNode(void *msg)
00996 {
00997 CmiState cs=CmiGetStateN(0);
00998
00999 MACHSTATE(2,"Pushing message into node queue");
01000 MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
01001
01002 #if CMK_IMMEDIATE_MSG
01003 if (CmiIsImmediate(msg)) {
01004 MACHSTATE(2,"Pushing Immediate message into queue");
01005 CmiPushImmediateMsg(msg);
01006 return;
01007 }
01008 #endif
01009 PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
01010
01011
01012 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
01013 if (_Cmi_noprocforcommthread)
01014 #endif
01015 CmiIdleLock_addMessage(&cs->idle);
01016 }
01017 #endif
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030 static int Cmi_charmrun_fd_sendflag=0;
01031
01032
01033 static int sendone_abort_fn(int code,const char *msg) {
01034 fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
01035 machine_exit(1);
01036 return -1;
01037 }
01038
01039 static void ctrl_sendone_nolock(const char *type,
01040 const char *data1,int dataLen1,
01041 const char *data2,int dataLen2)
01042 {
01043 const void *bufs[3]; int lens[3]; int nBuffers=0;
01044 ChMessageHeader hdr;
01045 skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
01046 MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
01047 if (Cmi_charmrun_fd==-1)
01048 charmrun_abort("ctrl_sendone called in standalone!\n");
01049 Cmi_charmrun_fd_sendflag=1;
01050 ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
01051 bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
01052 if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
01053 if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
01054 skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
01055 Cmi_charmrun_fd_sendflag=0;
01056 skt_set_abort(oldAbort);
01057 MACHSTATE(2,"} ctrl_sendone_nolock");
01058 }
01059
01060 static void ctrl_sendone_locking(const char *type,
01061 const char *data1,int dataLen1,
01062 const char *data2,int dataLen2)
01063 {
01064 CmiCommLock();
01065 ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
01066 CmiCommUnlock();
01067 }
01068
01069 #ifndef MEMORYUSAGE_OUTPUT
01070 #define MEMORYUSAGE_OUTPUT 0
01071 #endif
01072 #if MEMORYUSAGE_OUTPUT
01073 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
01074 static int memoryusage_counter;
01075 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
01076 #define memoryusage_output {\
01077 memoryusage_counter++;\
01078 if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
01079 #endif
01080
01081 static double Cmi_check_last;
01082
01083
01084 static void pingCharmrun(void *ignored)
01085 {
01086 #if MEMORYUSAGE_OUTPUT
01087 memoryusage_output;
01088 if(memoryusage_isOutput){
01089 memoryusage_counter = 0;
01090 #else
01091 {
01092 #endif
01093
01094 double clock=GetClock();
01095 if (clock > Cmi_check_last + Cmi_check_delay) {
01096 MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
01097 Cmi_check_last = clock;
01098 #if CMK_USE_GM || CMK_USE_MX
01099 if (!Cmi_netpoll)
01100 #endif
01101 CmiCommLockOrElse(return;);
01102 if (Cmi_charmrun_fd_sendflag) return;
01103 CmiCommLock();
01104 ctrl_sendone_nolock("ping",NULL,0,NULL,0);
01105 CmiCommUnlock();
01106 }
01107 #if 1
01108 #if CMK_USE_GM || CMK_USE_MX
01109 if (!Cmi_netpoll)
01110 #endif
01111 CmiStdoutFlush();
01112 #endif
01113 }
01114 }
01115
01116
01117 static void pingCharmrunPeriodic(void *ignored)
01118 {
01119 pingCharmrun(ignored);
01120 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
01121 }
01122
01123 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
01124 static void charmrun_abort(const char *s)
01125 {
01126 if (Cmi_charmrun_fd==-1) {
01127 fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
01128 abort();
01129 } else {
01130 char msgBuf[80];
01131 skt_set_abort(ignore_further_errors);
01132 sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
01133 ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
01134 }
01135 }
01136
01137
01138
01139 #ifdef __FAULT__
01140 #include "machine-recover.c"
01141 #endif
01142
01143 static void node_addresses_store(ChMessage *msg);
01144
01145 static void ctrl_getone(void)
01146 {
01147 ChMessage msg;
01148 MACHSTATE(2,"ctrl_getone")
01149 MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
01150 ChMessage_recv(Cmi_charmrun_fd,&msg);
01151 MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
01152
01153 if (strcmp(msg.header.type,"die")==0) {
01154 MACHSTATE(2,"ctrl_getone bye bye")
01155 fprintf(stderr,"aborting: %s\n",msg.data);
01156 log_done();
01157 ConverseCommonExit();
01158 machine_exit(0);
01159 }
01160 #if CMK_CCS_AVAILABLE
01161 else if (strcmp(msg.header.type, "req_fw")==0) {
01162 CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
01163
01164
01165
01166
01167
01168
01169 int pe=0;
01170 void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
01171 MACHSTATE(2,"Incoming CCS request");
01172 CmiPushPE(pe,cmsg);
01173 }
01174 #endif
01175 #ifdef __FAULT__
01176 else if(strcmp(msg.header.type,"crashnode")==0) {
01177 crash_node_handle(&msg);
01178 }
01179 else if(strcmp(msg.header.type,"initnodetab")==0) {
01182 node_addresses_store(&msg);
01183 fprintf(stdout,"nodetable added %d\n",CmiMyPe());
01184 }
01185 #endif
01186 else {
01187
01188
01189
01190 charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
01191 machine_exit(1);
01192 }
01193
01194 MACHSTATE(2,"ctrl_getone done")
01195 ChMessage_free(&msg);
01196 }
01197
01198 #if CMK_CCS_AVAILABLE
01199
01200
01201 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
01202 {
01203 MACHSTATE(2,"Outgoing CCS reply");
01204 ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
01205 repData,repLen);
01206 MACHSTATE(1,"Outgoing CCS reply away");
01207 }
01208 #endif
01209
01210
01211
01212
01213
01214
01215 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
01216 static void InternalPrintf(const char *f, va_list l)
01217 {
01218 ChMessage replymsg;
01219 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01220 CmiStdoutFlush();
01221 vsprintf(buffer, f, l);
01222 if(Cmi_syncprint) {
01223 CmiCommLock();
01224 ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
01225 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01226 ChMessage_free(&replymsg);
01227 CmiCommUnlock();
01228 } else {
01229 ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
01230 }
01231 InternalWriteToTerminal(0,buffer,strlen(buffer));
01232 CmiTmpFree(buffer);
01233 }
01234
01235 static void InternalError(const char *f, va_list l)
01236 {
01237 ChMessage replymsg;
01238 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01239 CmiStdoutFlush();
01240 vsprintf(buffer, f, l);
01241 if(Cmi_syncprint) {
01242 ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
01243 CmiCommLock();
01244 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01245 ChMessage_free(&replymsg);
01246 CmiCommUnlock();
01247 } else {
01248 ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
01249 }
01250 InternalWriteToTerminal(1,buffer,strlen(buffer));
01251 CmiTmpFree(buffer);
01252 }
01253
01254 static int InternalScanf(char *fmt, va_list l)
01255 {
01256 ChMessage replymsg;
01257 char *ptr[20];
01258 char *p; int nargs, i;
01259 nargs=0;
01260 p=fmt;
01261 while (*p) {
01262 if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
01263 if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
01264 if (p[0]=='%') { nargs++; p++; continue; }
01265 if (*p=='\n') *p=' '; p++;
01266 }
01267 if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
01268 for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
01269 CmiLock(Cmi_scanf_mutex);
01270 if (Cmi_charmrun_fd!=-1)
01271 {
01272 ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
01273
01274 CmiCommLock();
01275 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01276 i = sscanf((char*)replymsg.data, fmt,
01277 ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01278 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01279 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01280 ChMessage_free(&replymsg);
01281 CmiCommUnlock();
01282 } else
01283 {
01284 i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01285 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01286 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01287 }
01288 CmiUnlock(Cmi_scanf_mutex);
01289 return i;
01290 }
01291
01292 #if CMK_CMIPRINTF_IS_A_BUILTIN
01293
01294
01295 void CmiPrintf(const char *fmt, ...)
01296 {
01297 va_list p; va_start(p, fmt);
01298 if (Cmi_charmrun_fd!=-1)
01299 InternalPrintf(fmt, p);
01300 else
01301 vfprintf(stdout,fmt,p);
01302 va_end(p);
01303 }
01304
01305 void CmiError(const char *fmt, ...)
01306 {
01307 va_list p; va_start (p, fmt);
01308 if (Cmi_charmrun_fd!=-1)
01309 InternalError(fmt, p);
01310 else
01311 vfprintf(stderr,fmt,p);
01312 va_end(p);
01313 }
01314
01315 int CmiScanf(const char *fmt, ...)
01316 {
01317 va_list p; int i; va_start(p, fmt);
01318 i = InternalScanf((char *)fmt, p);
01319 va_end(p);
01320 return i;
01321 }
01322
01323 #endif
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333 static int readStdout[2];
01334 static int writeStdout[2];
01335 static int serviceStdout[2];
01336 #define readStdoutBufLen (16*1024)
01337 static char readStdoutBuf[readStdoutBufLen+1];
01338 static int servicingStdout;
01339
01340
01341 static void CmiStdoutInit(void) {
01342 int i;
01343 if (Cmi_charmrun_fd==-1) return;
01344
01345