arch/net/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/net/machine.c,v $
00003  * $Author: gzheng $
00004  * $Date: 2008-06-25 08:01:11 $
00005  * $Revision: 2.347 $
00006  *****************************************************************************/
00007 
00178 /*****************************************************************************
00179  *
00180  * Include Files
00181  *
00182  ****************************************************************************/
00183 
00184 #include <stdarg.h> /*<- was <varargs.h>*/
00185 
00186 #define CMK_USE_PRINTF_HACK 0
00187 #if CMK_USE_PRINTF_HACK
00188 /*HACK: turn printf into CmiPrintf, by just defining our own
00189 external symbol "printf".  This may be more trouble than it's worth,
00190 since the only advantage is that it works properly with +syncprint.
00191 
00192 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
00193 because they don't call printf.  Has to be defined up here because we probably 
00194 haven't properly guessed this compiler's prototype for "printf".
00195 */
00196 static void InternalPrintf(const char *f, va_list l);
00197 int printf(const char *fmt, ...) {
00198         int nChar;
00199         va_list p; va_start(p, fmt);
00200         InternalPrintf(fmt,p);
00201         va_end(p);
00202         return 10;
00203 }
00204 #endif
00205 
00206 
00207 #include "converse.h"
00208 #include "memory-isomalloc.h"
00209 
00210 #include <stdio.h>
00211 #include <stdlib.h>
00212 #include <ctype.h>
00213 #include <fcntl.h>
00214 #include <errno.h>
00215 #include <setjmp.h>
00216 #include <signal.h>
00217 #include <string.h>
00218 
00219 /* define machine debug */
00220 #include "machine.h"
00221 
00222 /******************* Producer-Consumer Queues ************************/
00223 #include "pcqueue.h"
00224 
00225 #include "machine-smp.h"
00226 
00227 #if CMK_USE_KQUEUE
00228 #include <sys/event.h>
00229 int _kq = -1;
00230 #endif
00231 
00232 #if CMK_USE_POLL
00233 #include <poll.h>
00234 #endif
00235 
00236 #if CMK_USE_GM
00237 #include "gm.h"
00238 struct gm_port *gmport = NULL;
00239 int  portFinish = 0;
00240 #endif
00241 
00242 #if CMK_USE_MX
00243 #include "myriexpress.h"
00244 mx_endpoint_t      endpoint;
00245 mx_endpoint_addr_t endpoint_addr;
00246 int MX_FILTER   =  123456;
00247 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
00248 #endif
00249 
00250 #if CMK_USE_AMMASSO
00251   #include "clustercore/ccil_api.h"
00252 #endif
00253 
00254 #if CMK_MULTICORE
00255 int Cmi_commthread = 0;
00256 #endif
00257 
00258 #include "conv-ccs.h"
00259 #include "ccs-server.h"
00260 #include "sockRoutines.h"
00261 
00262 #if defined(_WIN32) && ! defined(__CYGWIN__)
00263 /*For windows systems:*/
00264 #  include <windows.h>
00265 #  include <wincon.h>
00266 #  include <sys/types.h>
00267 #  include <sys/timeb.h>
00268 #  define fdopen _fdopen
00269 #  define SIGBUS -1  /*These signals don't exist in Win32*/
00270 #  define SIGKILL -1
00271 #  define SIGQUIT -1
00272 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
00273 
00274 #else /*UNIX*/
00275 #  include <pwd.h>
00276 #  include <unistd.h>
00277 #  include <fcntl.h>
00278 #  include <sys/file.h>
00279 #endif
00280 
00281 #if CMK_PERSISTENT_COMM
00282 #include "persist_impl.h"
00283 #endif
00284 
00285 #define PRINTBUFSIZE 16384
00286 
00287 #ifdef __ONESIDED_IMPL
00288 #ifdef __ONESIDED_NO_HARDWARE
00289 int putSrcHandler;
00290 int putDestHandler;
00291 int getSrcHandler;
00292 int getDestHandler;
00293 #include "conv-onesided.c"
00294 #endif
00295 #endif
00296 
00297 /*
00298 0: from smp thread
00299 1: from interrupt
00300 2: from worker thread
00301 */
00302 static void CommunicationServer(int withDelayMs, int where);
00303 
00304 void CmiHandleImmediate();
00305 extern int CmemInsideMem();
00306 extern void CmemCallWhenMemAvail();
00307 static void ConverseRunPE(int everReturn);
00308 void CmiYield(void);
00309 void ConverseCommonExit(void);
00310 
00311 static unsigned int dataport=0;
00312 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
00313 static SOCKET       dataskt;
00314 
00315 extern void TokenUpdatePeriodic();
00316 extern void getAvailSysMem();
00317 
00318 #define BROADCAST_SPANNING_FACTOR               4
00319 
00320 /****************************************************************************
00321  *
00322  * Handling Errors
00323  *
00324  * Errors should be handled by printing a message on stderr and
00325  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
00326  * communication should be made.  The other processes will notice the
00327  * abnormal termination and will deal with it.
00328  *
00329  * Rationale: if an error triggers an attempt to send a message,
00330  * the attempt to send a message is likely to trigger another error,
00331  * leading to an infinite loop and a process that spins instead of
00332  * shutting down.
00333  *
00334  *****************************************************************************/
00335 
00336 static int machine_initiated_shutdown=0;
00337 static int already_in_signal_handler=0;
00338 
00339 static void CmiDestoryLocks();
00340 
00341 #if CMK_USE_SYSVSHM /* define teardown function before use */
00342 void tearDownSharedBuffers();
00343 #endif 
00344 
00345 static void machine_exit(int status)
00346 {
00347   MACHSTATE(3,"     machine_exit");
00348   machine_initiated_shutdown=1;
00349 
00350   CmiDestoryLocks();            /* destory locks to prevent dead locking */
00351 
00352 #if CMK_USE_GM
00353   if (gmport) { 
00354     gm_close(gmport); gmport = 0;
00355     gm_finalize();
00356   }
00357 #endif
00358 #if CMK_USE_SYSVSHM
00359         tearDownSharedBuffers();
00360 #endif
00361   exit(status);
00362 }
00363 
00364 static void charmrun_abort(const char*);
00365 
00366 static void KillEveryone(const char *msg)
00367 {
00368   charmrun_abort(msg);
00369   machine_exit(1);
00370 }
00371 
00372 static void KillEveryoneCode(n)
00373 int n;
00374 {
00375   char _s[100];
00376   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
00377   charmrun_abort(_s);
00378   machine_exit(1);
00379 }
00380 
00381 static void KillOnAllSigs(int sigNo)
00382 {
00383   const char *sig="unknown signal";
00384   const char *suggestion="";
00385   if (machine_initiated_shutdown ||
00386       already_in_signal_handler) 
00387         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
00388   already_in_signal_handler=1;
00389 
00390   CmiDestoryLocks();            /* destory locks */
00391 
00392   if (sigNo==SIGSEGV) {
00393      sig="segmentation violation";
00394      suggestion="Try running with '++debug', or linking with '-memory paranoid'.\n";
00395   }
00396   if (sigNo==SIGFPE) {
00397      sig="floating point exception";
00398      suggestion="Check for integer or floating-point division by zero.\n";
00399   }
00400   if (sigNo==SIGBUS) {
00401      sig="bus error";
00402      suggestion="Check for misaligned reads or writes to memory.\n";
00403   }
00404   if (sigNo==SIGILL) {
00405      sig="illegal instruction";
00406      suggestion="Check for calls to uninitialized function pointers.\n";
00407   }
00408   if (sigNo==SIGKILL) sig="caught signal KILL";
00409   if (sigNo==SIGQUIT) sig="caught signal QUIT";
00410   if (sigNo==SIGTERM) sig="caught signal TERM";
00411   MACHSTATE1(5,"     Caught signal %s ",sig);
00412 /*ifdef this part*/
00413 #ifdef __FAULT__
00414   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
00415                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
00416   }else{
00417 #else
00418         {
00419 #endif
00420    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
00421         "Signal: %s\n",CmiMyPe(),sig);
00422         if (0!=suggestion[0])
00423                 CmiError("Suggestion: %s",suggestion);
00424         CmiPrintStackTrace(1);
00425         charmrun_abort(sig);
00426         machine_exit(1);                
00427         }       
00428 }
00429 
00430 static void machine_atexit_check(void)
00431 {
00432   if (!machine_initiated_shutdown)
00433     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
00434   printf("Program finished.\n");
00435 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
00436   fgetc(stdin);
00437 #endif
00438 }
00439 
00440 #if !defined(_WIN32) || defined(__CYGWIN__)
00441 static void HandleUserSignals(int signum)
00442 {
00443   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
00444   CcdRaiseCondition(condnum);
00445 }
00446 #endif
00447 
00448 static void PerrorExit(const char *msg)
00449 {
00450   perror(msg);
00451   machine_exit(1);
00452 }
00453 
00454 /*****************************************************************************
00455  *
00456  *     Utility routines for network machine interface.
00457  *
00458  *****************************************************************************/
00459 
00460 /*
00461 Horrific #defines to hide the differences between select() and poll().
00462  */
00463 #if CMK_USE_POLL /*poll() version*/
00464 # define CMK_PIPE_DECL(delayMs) \
00465         struct pollfd fds[10]; \
00466         int nFds_sto=0; int *nFds=&nFds_sto; \
00467         int pollDelayMs=delayMs;
00468 # define CMK_PIPE_SUB fds,nFds
00469 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
00470 
00471 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
00472 # define CMK_PIPE_ADDREAD(rd_fd) \
00473         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
00474 # define CMK_PIPE_ADDWRITE(wr_fd) \
00475         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
00476 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
00477 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
00478 
00479 #elif CMK_USE_KQUEUE /* kqueue version */
00480 
00481 # define CMK_PIPE_DECL(delayMs) \
00482         if (_kq == -1) _kq = kqueue(); \
00483     struct kevent ke_sto; \
00484     struct kevent* ke = &ke_sto; \
00485     struct timespec tmo; \
00486     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
00487 # define CMK_PIPE_SUB ke
00488 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
00489 
00490 # define CMK_PIPE_PARAM struct kevent* ke
00491 # define CMK_PIPE_ADDREAD(rd_fd) \
00492         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
00493                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00494 # define CMK_PIPE_ADDWRITE(wr_fd) \
00495         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
00496                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00497 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
00498 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
00499 
00500 #else /*select() version*/
00501 
00502 # define CMK_PIPE_DECL(delayMs) \
00503         fd_set rfds_sto,wfds_sto;\
00504         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
00505         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
00506 # define CMK_PIPE_SUB rfds,wfds
00507 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
00508 
00509 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
00510 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
00511 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
00512 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
00513 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
00514 #endif
00515 
00516 static void CMK_PIPE_CHECKERR(void) {
00517 #if defined(_WIN32) && !defined(__CYGWIN__)
00518 /* Win32 socket seems to randomly return inexplicable errors
00519 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
00520         int err=WSAGetLastError();
00521         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
00522 o,err);
00523 */
00524 #else /*UNIX machine*/
00525         if (errno!=EINTR)
00526                 KillEveryone("Socket error in CheckSocketsReady!\n");
00527 #endif
00528 }
00529 
00530 
00531 static void CmiStdoutFlush(void);
00532 static int  CmiStdoutNeedsService(void);
00533 static void CmiStdoutService(void);
00534 static void CmiStdoutAdd(CMK_PIPE_PARAM);
00535 static void CmiStdoutCheck(CMK_PIPE_PARAM);
00536 
00537 
00538 double GetClock(void)
00539 {
00540 #if defined(_WIN32) && !defined(__CYGWIN__)
00541   struct _timeb tv; 
00542   _ftime(&tv);
00543   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
00544 #else
00545   struct timeval tv; int ok;
00546   ok = gettimeofday(&tv, NULL);
00547   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
00548   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
00549 #endif
00550 }
00551 
00552 char *CopyMsg(char *msg, int len)
00553 {
00554   char *copy = (char *)CmiAlloc(len);
00555   if (!copy)
00556       fprintf(stderr, "Out of memory\n");
00557   memcpy(copy, msg, len);
00558   return copy;
00559 }
00560 
00561 /***********************************************************************
00562  *
00563  * Abort function:
00564  *
00565  ************************************************************************/
00566 
00567 static int  Cmi_truecrash;
00568 static int already_aborting=0;
00569 void CmiAbort(const char *message)
00570 {
00571   if (already_aborting) machine_exit(1);
00572   already_aborting=1;
00573         {
00574 /*       char str[100];
00575          sprintf(str,"dead.%d",CmiMyNode());
00576          FILE *fp = fopen(str,"w");
00577          fprintf(fp,"%s",message);
00578          fclose(fp);*/
00579         }
00580   MACHSTATE1(5,"CmiAbort(%s)",message);
00581   
00582   /* CmiDestoryLocks();  */
00583 
00584   {
00585 /*    char str[22];
00586     snprintf(str,18,"dead.%d",CmiMyPe());
00587     FILE *fp = fopen(str,"w");
00588     fprintf(fp,"Abort:%s\n",message);
00589     fclose(fp);*/
00590   }
00591 
00592   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
00593         "Reason: %s\n",CmiMyPe(),message);
00594   CmiPrintStackTrace(0);
00595   
00596   /*Send off any remaining prints*/
00597   CmiStdoutFlush();
00598   
00599   if(Cmi_truecrash) {
00600     printf("CHARM++ FATAL ERROR: %s\n", message);
00601     *(int *)NULL = 0; /*Write to null, causing bus error*/
00602   } else {
00603     charmrun_abort(message);
00604     machine_exit(1);
00605   }
00606 }
00607 
00608 
00609 /******************************************************************************
00610  *
00611  * CmiEnableAsyncIO
00612  *
00613  * The net and tcp versions use a bunch of unix processes talking to each
00614  * other via file descriptors.  We need for a signal SIGIO to be generated
00615  * each time a message arrives, making it possible to write a signal
00616  * handler to handle the messages.  The vast majority of unixes can,
00617  * in fact, do this.  However, there isn't any standard for how this is
00618  * supposed to be done, so each version of UNIX has a different set of
00619  * calls to turn this signal on.  So, there is like one version here for
00620  * every major brand of UNIX.
00621  *
00622  *****************************************************************************/
00623 
00624 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
00625 #include <fcntl.h>
00626 void CmiEnableAsyncIO(int fd)
00627 {
00628   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
00629     CmiError("setting socket owner: %s\n", strerror(errno)) ;
00630     exit(1);
00631   }
00632   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
00633     CmiError("setting socket async: %s\n", strerror(errno)) ;
00634     exit(1);
00635   }
00636 }
00637 #else
00638 void CmiEnableAsyncIO(int fd) { }
00639 #endif
00640 
00641 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
00642 #if !defined(_WIN32) || defined(__CYGWIN__)
00643 void CmiEnableNonblockingIO(int fd) {
00644   int on=1;
00645   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
00646     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
00647     exit(1);
00648   }
00649 }
00650 #else
00651 void CmiEnableNonblockingIO(int fd) { }
00652 #endif
00653 
00654 
00655 /******************************************************************************
00656 *
00657 * Configuration Data
00658 *
00659 * This data is all read in from the NETSTART variable (provided by the
00660 * charmrun) and from the command-line arguments.  Once read in, it is never
00661  * modified.
00662  *
00663  *****************************************************************************/
00664 
00665 int               _Cmi_mynode;    /* Which address space am I */
00666 int               _Cmi_mynodesize;/* Number of processors in my address space */
00667 int               _Cmi_numnodes;  /* Total number of address spaces */
00668 int               _Cmi_numpes;    /* Total number of processors */
00669 static int        Cmi_nodestart; /* First processor in this address space */
00670 static skt_ip_t   Cmi_self_IP;
00671 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
00672 static int        Cmi_charmrun_port;
00673 static int        Cmi_charmrun_pid;
00674 static int        Cmi_charmrun_fd=-1;
00675 
00676 static int    Cmi_netpoll;
00677 static int    Cmi_asyncio;
00678 static int    Cmi_idlepoll;
00679 static int    Cmi_syncprint;
00680 static int Cmi_print_stats = 0;
00681 
00682 static void parse_netstart(void)
00683 {
00684   char *ns;
00685   int nread;
00686   int port;
00687   ns = getenv("NETSTART");
00688   if (ns!=0) 
00689   {/*Read values set by Charmrun*/
00690         char Cmi_charmrun_name[1024];
00691         nread = sscanf(ns, "%d%s%d%d%d",
00692                  &_Cmi_mynode,
00693                  Cmi_charmrun_name, &Cmi_charmrun_port,
00694                  &Cmi_charmrun_pid, &port);
00695         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
00696 
00697         if (nread!=5) {
00698                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
00699                 exit(1);
00700         }
00701   } else 
00702   {/*No charmrun-- set flag values for standalone operation*/
00703         _Cmi_mynode=0;
00704         Cmi_charmrun_IP=_skt_invalid_ip;
00705         Cmi_charmrun_port=0;
00706         Cmi_charmrun_pid=0;
00707         dataport = -1;
00708   }
00709 #if CMK_USE_IBVERBS     
00710         char *cmi_num_nodes = getenv("CmiNumNodes");
00711         if(cmi_num_nodes != NULL){
00712                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
00713         }
00714 #endif  
00715 }
00716 
00717 static void extract_common_args(char **argv)
00718 {
00719   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
00720     Cmi_print_stats = 1;
00721 }
00722 
00723 /* for SMP */
00724 #include "machine-smp.c"
00725 
00726 CsvDeclare(CmiNodeState, NodeState);
00727 
00728 /* Immediate message support */
00729 #define CMI_DEST_RANK(msg)      *(int *)(msg)
00730 #include "immediate.c"
00731 
00732 /******************************************************************************
00733  *
00734  * Packet Performance Logging
00735  *
00736  * This module is designed to give a detailed log of the packets and their
00737  * acknowledgements, for performance tuning.  It can be disabled.
00738  *
00739  *****************************************************************************/
00740 
00741 #define LOGGING 0
00742 
00743 #if LOGGING
00744 
00745 typedef struct logent {
00746   double time;
00747   int seqno;
00748   int srcpe;
00749   int dstpe;
00750   int kind;
00751 } *logent;
00752 
00753 
00754 logent log;
00755 int    log_pos;
00756 int    log_wrap;
00757 
00758 static void log_init(void)
00759 {
00760   log = (logent)malloc(50000 * sizeof(struct logent));
00761   _MEMCHECK(log);
00762   log_pos = 0;
00763   log_wrap = 0;
00764 }
00765 
00766 static void log_done(void)
00767 {
00768   char logname[100]; FILE *f; int i, size;
00769   sprintf(logname, "log.%d", _Cmi_mynode);
00770   f = fopen(logname, "w");
00771   if (f==0) KillEveryone("fopen problem");
00772   if (log_wrap) size = 50000; else size=log_pos;
00773   for (i=0; i<size; i++) {
00774     logent ent = log+i;
00775     fprintf(f, "%1.4f %d %c %d %d\n",
00776             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00777   }
00778   fclose(f);
00779 }
00780 
00781 void printLog(void)
00782 {
00783   char logname[100]; FILE *f; int i, j, size;
00784   static int logged = 0;
00785   if (logged)
00786       return;
00787   logged = 1;
00788   CmiPrintf("Logging: %d\n", _Cmi_mynode);
00789   sprintf(logname, "log.%d", _Cmi_mynode);
00790   f = fopen(logname, "w");
00791   if (f==0) KillEveryone("fopen problem");
00792   for (i = 5000; i; i--)
00793   {
00794   /*for (i=0; i<size; i++) */
00795     j = log_pos - i;
00796     if (j < 0)
00797     {
00798         if (log_wrap)
00799             j = 5000 + j;
00800         else
00801             j = 0;
00802     };
00803     {
00804     logent ent = log+j;
00805     fprintf(f, "%1.4f %d %c %d %d\n",
00806             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00807     }
00808   }
00809   fclose(f);
00810   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
00811 }
00812 
00813 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
00814 
00815 #endif
00816 
00817 #if !LOGGING
00818 
00819 #define log_init() /*empty*/
00820 #define log_done() /*empty*/
00821 #define printLog() /*empty*/
00822 #define LOG(t,s,k,d,q) /*empty*/
00823 
00824 #endif
00825 
00826 /******************************************************************************
00827  *
00828  * Node state
00829  *
00830  *****************************************************************************/
00831 
00832 
00833 static CmiNodeLock    Cmi_scanf_mutex;
00834 static double         Cmi_clock;
00835 static double         Cmi_check_delay = 3.0;
00836 
00837 /******************************************************************************
00838  *
00839  * OS Threads
00840  * SMP implementation moved to machine-smp.c
00841  *****************************************************************************/
00842 
00843 /************************ No kernel SMP threads ***************/
00844 #if CMK_SHARED_VARS_UNAVAILABLE
00845 
00846 static volatile int memflag=0;
00847 void CmiMemLock() { memflag++; }
00848 void CmiMemUnlock() { memflag--; }
00849 
00850 static volatile int comm_flag=0;
00851 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
00852 #ifndef MACHLOCK_DEBUG
00853 #  define CmiCommLock() (comm_flag=1)
00854 #  define CmiCommUnlock() (comm_flag=0)
00855 #else /* Error-checking flag locks */
00856 void CmiCommLock(void) {
00857   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
00858   comm_flag=1;
00859 }
00860 void CmiCommUnlock(void) {
00861   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
00862   comm_flag=0;
00863 }
00864 #endif
00865 
00866 static struct CmiStateStruct Cmi_state;
00867 int _Cmi_mype;
00868 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
00869 #define CmiGetState() (&Cmi_state)
00870 #define CmiGetStateN(n) (&Cmi_state)
00871 
00872 void CmiYield(void) { sleep(0); }
00873 
00874 static void CommunicationInterrupt(int ignored)
00875 {
00876   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
00877   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
00878   { /* Already busy inside malloc, comm, or immediate messages */
00879     MACHSTATE(5,"--SKIPPING SIGIO--");
00880     return;
00881   }
00882   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
00883   {
00884     /*Make sure any malloc's we do in here are NOT migratable:*/
00885     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
00886 /*    _Cmi_myrank=1; */
00887     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
00888 /*    _Cmi_myrank=0; */
00889     CmiIsomallocBlockListActivate(oldList);
00890   }
00891   MACHSTATE(2,"--END SIGIO--")
00892 }
00893 
00894 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
00895 
00896 static void CmiStartThreads(char **argv)
00897 {
00898   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
00899   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
00900   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
00901     KillEveryone
00902       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
00903   
00904   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00905   _Cmi_mype = Cmi_nodestart;
00906 
00907   /* Prepare Cpv's for immediate messages: */
00908   _Cmi_myrank=1;
00909   CommunicationServerInit();
00910   _Cmi_myrank=0;
00911   
00912 #if !CMK_ASYNC_NOT_NEEDED
00913   if (Cmi_asyncio)
00914   {
00915     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
00916     if (!Cmi_netpoll) {
00917       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
00918       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00919     }
00920 #if CMK_USE_GM || CMK_USE_MX
00921       /* charmrun is serviced in interrupt for gm */
00922     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00923 #endif
00924   }
00925 #endif
00926 }
00927 
00928 static void CmiDestoryLocks()
00929 {
00930   comm_flag = 0;
00931   memflag = 0;
00932 }
00933 
00934 #endif
00935 
00936 /*Network progress utility variables. Period controls the rate at
00937   which the network poll is called */
00938 
00939 CpvDeclare(unsigned , networkProgressCount);
00940 int networkProgressPeriod;
00941 
00942 CpvDeclare(void *, CmiLocalQueue);
00943 
00944 
00945 #ifndef CmiMyPe
00946 int CmiMyPe(void) 
00947 { 
00948   return CmiGetState()->pe; 
00949 }
00950 #endif
00951 #ifndef CmiMyRank
00952 int CmiMyRank(void)
00953 {
00954   return CmiGetState()->rank;
00955 }
00956 #endif
00957 
00958 CpvExtern(int,_charmEpoch);
00959 
00960 /*Add a message to this processor's receive queue 
00961   Must be called while holding comm. lock
00962 */
00963 
00964 extern double evacTime;
00965 
00966 static void CmiPushPE(int pe,void *msg)
00967 {
00968   CmiState cs=CmiGetStateN(pe);
00969         /*
00970                 FAULT_EVAC
00971         
00972         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
00973                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
00974         }*/
00975   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
00976   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
00977 
00978 #if CMK_IMMEDIATE_MSG
00979   if (CmiIsImmediate(msg)) {
00980     CmiPushImmediateMsg(msg);
00981     return;
00982   }
00983 #endif
00984   PCQueuePush(cs->recv,msg);
00985 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00986   if (_Cmi_noprocforcommthread) 
00987 #endif
00988   CmiIdleLock_addMessage(&cs->idle);
00989 }
00990 
00991 #if CMK_NODE_QUEUE_AVAILABLE
00992 /*Add a message to the node queue.  
00993   Must be called while holding comm. lock
00994 */
00995 static void CmiPushNode(void *msg)
00996 {
00997   CmiState cs=CmiGetStateN(0);
00998   
00999   MACHSTATE(2,"Pushing message into node queue");
01000   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
01001   
01002 #if CMK_IMMEDIATE_MSG
01003   if (CmiIsImmediate(msg)) {
01004     MACHSTATE(2,"Pushing Immediate message into queue");
01005     CmiPushImmediateMsg(msg);
01006     return;
01007   }
01008 #endif 
01009   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
01010   /*Silly: always try to wake up processor 0, so at least *somebody*
01011     will be awake to handle the message*/
01012 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
01013   if (_Cmi_noprocforcommthread) 
01014 #endif
01015   CmiIdleLock_addMessage(&cs->idle);
01016 }
01017 #endif
01018 
01019 /***************************************************************
01020  Communication with charmrun:
01021  We can send (ctrl_sendone) and receive (ctrl_getone)
01022  messages on a TCP socket connected to charmrun.
01023  This is used for printfs, CCS, etc; and also for
01024  killing ourselves if charmrun dies.
01025 */
01026 
01027 /*This flag prevents simultanious outgoing
01028 messages on the charmrun socket.  It is protected
01029 by the commlock.*/
01030 static int Cmi_charmrun_fd_sendflag=0;
01031 
01032 /* ctrl_sendone */
01033 static int sendone_abort_fn(int code,const char *msg) {
01034         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
01035         machine_exit(1);
01036         return -1;
01037 }
01038 
01039 static void ctrl_sendone_nolock(const char *type,
01040                                 const char *data1,int dataLen1,
01041                                 const char *data2,int dataLen2)
01042 {
01043   const void *bufs[3]; int lens[3]; int nBuffers=0;
01044   ChMessageHeader hdr;
01045   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
01046   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
01047   if (Cmi_charmrun_fd==-1) 
01048         charmrun_abort("ctrl_sendone called in standalone!\n");
01049   Cmi_charmrun_fd_sendflag=1;
01050   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
01051   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
01052   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
01053   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
01054   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
01055   Cmi_charmrun_fd_sendflag=0;
01056   skt_set_abort(oldAbort);
01057   MACHSTATE(2,"} ctrl_sendone_nolock");
01058 }
01059 
01060 static void ctrl_sendone_locking(const char *type,
01061                                 const char *data1,int dataLen1,
01062                                 const char *data2,int dataLen2)
01063 {
01064   CmiCommLock();
01065   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
01066   CmiCommUnlock();
01067 }
01068 
01069 #ifndef MEMORYUSAGE_OUTPUT
01070 #define MEMORYUSAGE_OUTPUT 0 
01071 #endif
01072 #if MEMORYUSAGE_OUTPUT 
01073 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
01074 static int memoryusage_counter;
01075 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
01076 #define memoryusage_output {\
01077   memoryusage_counter++;\
01078   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
01079 #endif
01080 
01081 static double Cmi_check_last;
01082 
01083 /* if charmrun dies, we finish */
01084 static void pingCharmrun(void *ignored) 
01085 {
01086 #if MEMORYUSAGE_OUTPUT
01087   memoryusage_output;
01088   if(memoryusage_isOutput){
01089     memoryusage_counter = 0;
01090 #else
01091   {
01092 #endif 
01093 
01094   double clock=GetClock();
01095   if (clock > Cmi_check_last + Cmi_check_delay) {
01096     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
01097     Cmi_check_last = clock; 
01098 #if CMK_USE_GM || CMK_USE_MX
01099     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
01100 #endif
01101     CmiCommLockOrElse(return;); /*Already busy doing communication*/
01102     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
01103     CmiCommLock();
01104     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
01105     CmiCommUnlock();
01106   }
01107 #if 1
01108 #if CMK_USE_GM || CMK_USE_MX
01109   if (!Cmi_netpoll)
01110 #endif
01111   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
01112 #endif
01113   }
01114 }
01115 
01116 /* periodic charm ping, for gm and netpoll */
01117 static void pingCharmrunPeriodic(void *ignored)
01118 {
01119   pingCharmrun(ignored);
01120   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
01121 }
01122 
01123 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
01124 static void charmrun_abort(const char *s)
01125 {
01126   if (Cmi_charmrun_fd==-1) {/*Standalone*/
01127         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
01128         abort();
01129   } else {
01130         char msgBuf[80];
01131         skt_set_abort(ignore_further_errors);
01132         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
01133         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
01134   }
01135 }
01136 
01137 /* ctrl_getone */
01138 
01139 #ifdef __FAULT__
01140 #include "machine-recover.c"
01141 #endif
01142 
01143 static void node_addresses_store(ChMessage *msg);
01144 
01145 static void ctrl_getone(void)
01146 {
01147   ChMessage msg;
01148   MACHSTATE(2,"ctrl_getone")
01149   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
01150   ChMessage_recv(Cmi_charmrun_fd,&msg);
01151   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
01152 
01153   if (strcmp(msg.header.type,"die")==0) {
01154     MACHSTATE(2,"ctrl_getone bye bye")
01155     fprintf(stderr,"aborting: %s\n",msg.data);
01156     log_done();
01157     ConverseCommonExit();
01158     machine_exit(0);
01159   } 
01160 #if CMK_CCS_AVAILABLE
01161   else if (strcmp(msg.header.type, "req_fw")==0) {
01162     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
01163         /*Sadly, I *can't* do a:
01164       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
01165         here, because I can't send converse messages in the
01166         communication thread.  I *can* poke this message into 
01167         any convenient processor's queue, though:  (OSL, 9/14/2000)
01168         */
01169     int pe=0;/*<- node-local processor number. Any one will do.*/
01170     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
01171     MACHSTATE(2,"Incoming CCS request");
01172     CmiPushPE(pe,cmsg);
01173   }
01174 #endif
01175 #ifdef __FAULT__        
01176   else if(strcmp(msg.header.type,"crashnode")==0) {
01177         crash_node_handle(&msg);
01178   }
01179   else if(strcmp(msg.header.type,"initnodetab")==0) {
01182         node_addresses_store(&msg);
01183         fprintf(stdout,"nodetable added %d\n",CmiMyPe());
01184   }
01185 #endif
01186   else {
01187   /* We do not use KillEveryOne here because it calls CmiMyPe(),
01188    * which is not available to the communication thread on an SMP version.
01189    */
01190     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
01191     machine_exit(1);
01192   }
01193   
01194   MACHSTATE(2,"ctrl_getone done")
01195   ChMessage_free(&msg);
01196 }
01197 
01198 #if CMK_CCS_AVAILABLE
01199 /*Deliver this reply data to this reply socket.
01200   The data is forwarded to CCS server via charmrun.*/
01201 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
01202 {
01203   MACHSTATE(2,"Outgoing CCS reply");
01204   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
01205                        repData,repLen);
01206   MACHSTATE(1,"Outgoing CCS reply away");
01207 }
01208 #endif
01209 
01210 /*****************************************************************************
01211  *
01212  * CmiPrintf, CmiError, CmiScanf
01213  *
01214  *****************************************************************************/
01215 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
01216 static void InternalPrintf(const char *f, va_list l)
01217 {
01218   ChMessage replymsg;
01219   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01220   CmiStdoutFlush();
01221   vsprintf(buffer, f, l);
01222   if(Cmi_syncprint) {
01223           CmiCommLock();
01224           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
01225           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01226           ChMessage_free(&replymsg);
01227           CmiCommUnlock();
01228   } else {
01229           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
01230   }
01231   InternalWriteToTerminal(0,buffer,strlen(buffer));
01232   CmiTmpFree(buffer);
01233 }
01234 
01235 static void InternalError(const char *f, va_list l)
01236 {
01237   ChMessage replymsg;
01238   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01239   CmiStdoutFlush();
01240   vsprintf(buffer, f, l);
01241   if(Cmi_syncprint) {
01242           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
01243           CmiCommLock();
01244           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01245           ChMessage_free(&replymsg);
01246           CmiCommUnlock();
01247   } else {
01248           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
01249   }
01250   InternalWriteToTerminal(1,buffer,strlen(buffer));
01251   CmiTmpFree(buffer);
01252 }
01253 
01254 static int InternalScanf(char *fmt, va_list l)
01255 {
01256   ChMessage replymsg;
01257   char *ptr[20];
01258   char *p; int nargs, i;
01259   nargs=0;
01260   p=fmt;
01261   while (*p) {
01262     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
01263     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
01264     if (p[0]=='%') { nargs++; p++; continue; }
01265     if (*p=='\n') *p=' '; p++;
01266   }
01267   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
01268   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
01269   CmiLock(Cmi_scanf_mutex);
01270   if (Cmi_charmrun_fd!=-1)
01271   {/*Send charmrun the format string*/
01272         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
01273         /*Wait for the reply (characters to scan) from charmrun*/
01274         CmiCommLock();
01275         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01276         i = sscanf((char*)replymsg.data, fmt,
01277                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01278                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01279                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01280         ChMessage_free(&replymsg);
01281         CmiCommUnlock();
01282   } else
01283   {/*Just do the scanf normally*/
01284         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01285                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01286                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01287   }
01288   CmiUnlock(Cmi_scanf_mutex);
01289   return i;
01290 }
01291 
01292 #if CMK_CMIPRINTF_IS_A_BUILTIN
01293 
01294 /*New stdarg.h declarations*/
01295 void CmiPrintf(const char *fmt, ...)
01296 {
01297   va_list p; va_start(p, fmt);
01298   if (Cmi_charmrun_fd!=-1)
01299     InternalPrintf(fmt, p);
01300   else
01301     vfprintf(stdout,fmt,p);
01302   va_end(p);
01303 }
01304 
01305 void CmiError(const char *fmt, ...)
01306 {
01307   va_list p; va_start (p, fmt);
01308   if (Cmi_charmrun_fd!=-1)
01309     InternalError(fmt, p);
01310   else
01311     vfprintf(stderr,fmt,p);
01312   va_end(p);
01313 }
01314 
01315 int CmiScanf(const char *fmt, ...)
01316 {
01317   va_list p; int i; va_start(p, fmt);
01318   i = InternalScanf((char *)fmt, p);
01319   va_end(p);
01320   return i;
01321 }
01322 
01323 #endif
01324 
01325 /***************************************************************************
01326  * Output redirection:
01327  *  When people don't use CkPrintf, like above, we'd still like to be able
01328  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
01329  * which lets us read the characters sent to stdout at our lesiure.
01330  ***************************************************************************/
01331 
01332 /*Can read from stdout or stderr using these fd's*/
01333 static int readStdout[2]; 
01334 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
01335 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
01336 #define readStdoutBufLen (16*1024)
01337 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
01338 static int servicingStdout;
01339 
01340 /*Initialization-- should only be called once per node*/
01341 static void CmiStdoutInit(void) {
01342         int i;
01343         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
01344 
01345 /*There's some way to do this same thing in windows, but I don't know how*/