00001
00172
00173
00174
00175
00176
00177
00178 #define _GNU_SOURCE 1
00179 #include <stdarg.h>
00180
00181 #define CMK_USE_PRINTF_HACK 0
00182 #if CMK_USE_PRINTF_HACK
00183
00184
00185
00186
00187
00188
00189
00190
00191 static void InternalPrintf(const char *f, va_list l);
00192 int printf(const char *fmt, ...) {
00193 int nChar;
00194 va_list p; va_start(p, fmt);
00195 InternalPrintf(fmt,p);
00196 va_end(p);
00197 return 10;
00198 }
00199 #endif
00200
00201
00202 #include "converse.h"
00203 #include "memory-isomalloc.h"
00204
00205 #include <stdio.h>
00206 #include <stdlib.h>
00207 #include <ctype.h>
00208 #include <fcntl.h>
00209 #include <errno.h>
00210 #include <setjmp.h>
00211 #include <signal.h>
00212 #include <string.h>
00213
00214
00215 #include "machine.h"
00216
00217
00218 #include "pcqueue.h"
00219
00220 #include "machine-smp.h"
00221
00222 #if CMK_USE_KQUEUE
00223 #include <sys/event.h>
00224 int _kq = -1;
00225 #endif
00226
00227 #if CMK_USE_POLL
00228 #include <poll.h>
00229 #endif
00230
00231 #if CMK_USE_GM
00232 #include "gm.h"
00233 struct gm_port *gmport = NULL;
00234 int portFinish = 0;
00235 #endif
00236
00237 #if CMK_USE_MX
00238 #include "myriexpress.h"
00239 mx_endpoint_t endpoint;
00240 mx_endpoint_addr_t endpoint_addr;
00241 int MX_FILTER = 123456;
00242 static uint64_t Cmi_nic_id=0;
00243 #endif
00244
00245 #if CMK_USE_AMMASSO
00246 #include "clustercore/ccil_api.h"
00247 #endif
00248
00249 #if CMK_MULTICORE
00250 int Cmi_commthread = 0;
00251 #endif
00252
00253 #include "conv-ccs.h"
00254 #include "ccs-server.h"
00255 #include "sockRoutines.h"
00256
00257 #if defined(_WIN32) && ! defined(__CYGWIN__)
00258
00259 # include <windows.h>
00260 # include <wincon.h>
00261 # include <sys/types.h>
00262 # include <sys/timeb.h>
00263 # define fdopen _fdopen
00264 # define SIGBUS -1
00265 # define SIGKILL -1
00266 # define SIGQUIT -1
00267
00268
00269 #else
00270 # include <pwd.h>
00271 # include <unistd.h>
00272 # include <fcntl.h>
00273 # include <sys/file.h>
00274 #endif
00275
00276 #if CMK_PERSISTENT_COMM
00277 #include "persist_impl.h"
00278 #endif
00279
00280 #define PRINTBUFSIZE 16384
00281
00282 #ifdef __ONESIDED_IMPL
00283 #ifdef __ONESIDED_NO_HARDWARE
00284 int putSrcHandler;
00285 int putDestHandler;
00286 int getSrcHandler;
00287 int getDestHandler;
00288 #include "conv-onesided.c"
00289 #endif
00290 #endif
00291
00292
00293 static void CommunicationServer(int withDelayMs, int where);
00294
00295 void CmiHandleImmediate();
00296 extern int CmemInsideMem();
00297 extern void CmemCallWhenMemAvail();
00298 static void ConverseRunPE(int everReturn);
00299 void CmiYield(void);
00300 void ConverseCommonExit(void);
00301
00302 static unsigned int dataport=0;
00303 static int Cmi_mach_id=0;
00304 static SOCKET dataskt;
00305
00306 extern void TokenUpdatePeriodic();
00307 extern void getAvailSysMem();
00308
00309 #define BROADCAST_SPANNING_FACTOR 4
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 static int machine_initiated_shutdown=0;
00328 static int already_in_signal_handler=0;
00329
00330 static void CmiDestoryLocks();
00331
00332 void CmiMachineExit();
00333
00334 #if CMK_USE_SYSVSHM
00335 void tearDownSharedBuffers();
00336 #endif
00337
00338 static void machine_exit(int status)
00339 {
00340 MACHSTATE(3," machine_exit");
00341 machine_initiated_shutdown=1;
00342
00343 CmiDestoryLocks();
00344 EmergencyExit();
00345
00346 #if CMK_USE_GM
00347 if (gmport) {
00348 gm_close(gmport); gmport = 0;
00349 gm_finalize();
00350 }
00351 #endif
00352 #if CMK_USE_SYSVSHM
00353 tearDownSharedBuffers();
00354 #endif
00355 CmiMachineExit();
00356 exit(status);
00357 }
00358
00359 static void charmrun_abort(const char*);
00360
00361 static void KillEveryone(const char *msg)
00362 {
00363 charmrun_abort(msg);
00364 machine_exit(1);
00365 }
00366
00367 static void KillEveryoneCode(n)
00368 int n;
00369 {
00370 char _s[100];
00371 sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
00372 charmrun_abort(_s);
00373 machine_exit(1);
00374 }
00375
00376 CpvExtern(int, freezeModeFlag);
00377
00378 static void KillOnAllSigs(int sigNo)
00379 {
00380 const char *sig="unknown signal";
00381 const char *suggestion="";
00382 if (machine_initiated_shutdown ||
00383 already_in_signal_handler)
00384 machine_exit(1);
00385 already_in_signal_handler=1;
00386
00387 #if CMK_CCS_AVAILABLE
00388 if (CpvAccess(cmiArgDebugFlag)) {
00389 int reply = 0;
00390 CpdNotify(CPD_SIGNAL,sigNo);
00391 #if ! CMK_BIGSIM_CHARM
00392 CcsSendReplyNoError(4,&reply);
00393 CpvAccess(freezeModeFlag) = 1;
00394 CpdFreezeModeScheduler();
00395 #else
00396 CpdFreeze();
00397 #endif
00398 }
00399 #endif
00400
00401 CmiDestoryLocks();
00402
00403 if (sigNo==SIGSEGV) {
00404 sig="segmentation violation";
00405 suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
00406 }
00407 if (sigNo==SIGFPE) {
00408 sig="floating point exception";
00409 suggestion="Check for integer or floating-point division by zero.\n";
00410 }
00411 if (sigNo==SIGBUS) {
00412 sig="bus error";
00413 suggestion="Check for misaligned reads or writes to memory.\n";
00414 }
00415 if (sigNo==SIGILL) {
00416 sig="illegal instruction";
00417 suggestion="Check for calls to uninitialized function pointers.\n";
00418 }
00419 if (sigNo==SIGKILL) sig="caught signal KILL";
00420 if (sigNo==SIGQUIT) sig="caught signal QUIT";
00421 if (sigNo==SIGTERM) sig="caught signal TERM";
00422 MACHSTATE1(5," Caught signal %s ",sig);
00423
00424 #ifdef __FAULT__
00425 if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
00426 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
00427 }else{
00428 #else
00429 {
00430 #endif
00431 CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
00432 "Signal: %s\n",CmiMyPe(),sig);
00433 if (0!=suggestion[0])
00434 CmiError("Suggestion: %s",suggestion);
00435 CmiPrintStackTrace(1);
00436 charmrun_abort(sig);
00437 machine_exit(1);
00438 }
00439 }
00440
00441 static void machine_atexit_check(void)
00442 {
00443 if (!machine_initiated_shutdown)
00444 CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
00445 printf("Program finished.\n");
00446 #if 0
00447 fgetc(stdin);
00448 #endif
00449 }
00450
00451 #if !defined(_WIN32) || defined(__CYGWIN__)
00452 static void HandleUserSignals(int signum)
00453 {
00454 int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
00455 CcdRaiseCondition(condnum);
00456 }
00457 #endif
00458
00459 static void PerrorExit(const char *msg)
00460 {
00461 perror(msg);
00462 machine_exit(1);
00463 }
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474 #if CMK_USE_POLL
00475 # define CMK_PIPE_DECL(delayMs) \
00476 struct pollfd fds[10]; \
00477 int nFds_sto=0; int *nFds=&nFds_sto; \
00478 int pollDelayMs=delayMs;
00479 # define CMK_PIPE_SUB fds,nFds
00480 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
00481
00482 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
00483 # define CMK_PIPE_ADDREAD(rd_fd) \
00484 do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
00485 # define CMK_PIPE_ADDWRITE(wr_fd) \
00486 do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
00487 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
00488 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
00489
00490 #elif CMK_USE_KQUEUE
00491
00492 # define CMK_PIPE_DECL(delayMs) \
00493 if (_kq == -1) _kq = kqueue(); \
00494 struct kevent ke_sto; \
00495 struct kevent* ke = &ke_sto; \
00496 struct timespec tmo; \
00497 tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
00498 # define CMK_PIPE_SUB ke
00499 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
00500
00501 # define CMK_PIPE_PARAM struct kevent* ke
00502 # define CMK_PIPE_ADDREAD(rd_fd) \
00503 do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
00504 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00505 # define CMK_PIPE_ADDWRITE(wr_fd) \
00506 do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
00507 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
00508 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
00509 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
00510
00511 #else
00512
00513 # define CMK_PIPE_DECL(delayMs) \
00514 fd_set rfds_sto,wfds_sto;\
00515 fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
00516 FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
00517 # define CMK_PIPE_SUB rfds,wfds
00518 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
00519
00520 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
00521 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
00522 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
00523 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
00524 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
00525 #endif
00526
00527 static void CMK_PIPE_CHECKERR(void) {
00528 #if defined(_WIN32) && !defined(__CYGWIN__)
00529
00530
00531
00532
00533
00534
00535 #else
00536 if (errno!=EINTR)
00537 KillEveryone("Socket error in CheckSocketsReady!\n");
00538 #endif
00539 }
00540
00541
00542 static void CmiStdoutFlush(void);
00543 static int CmiStdoutNeedsService(void);
00544 static void CmiStdoutService(void);
00545 static void CmiStdoutAdd(CMK_PIPE_PARAM);
00546 static void CmiStdoutCheck(CMK_PIPE_PARAM);
00547
00548
00549 double GetClock(void)
00550 {
00551 #if defined(_WIN32) && !defined(__CYGWIN__)
00552 struct _timeb tv;
00553 _ftime(&tv);
00554 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
00555 #else
00556 struct timeval tv; int ok;
00557 ok = gettimeofday(&tv, NULL);
00558 if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
00559 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
00560 #endif
00561 }
00562
00563 char *CopyMsg(char *msg, int len)
00564 {
00565 char *copy = (char *)CmiAlloc(len);
00566 if (!copy)
00567 fprintf(stderr, "Out of memory\n");
00568 memcpy(copy, msg, len);
00569 return copy;
00570 }
00571
00572
00573
00574
00575
00576
00577
00578 static int Cmi_truecrash;
00579 static int already_aborting=0;
00580 void CmiAbort(const char *message)
00581 {
00582 if (already_aborting) machine_exit(1);
00583 already_aborting=1;
00584 {
00585
00586
00587
00588
00589
00590 }
00591 MACHSTATE1(5,"CmiAbort(%s)",message);
00592
00593 #if CMK_CCS_AVAILABLE
00594
00595 if (CpvAccess(cmiArgDebugFlag)) {
00596 CpdNotify(CPD_ABORT, message);
00597 CpdFreeze();
00598 }
00599 #endif
00600
00601
00602
00603 {
00604
00605
00606
00607
00608
00609 }
00610
00611 CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
00612 "Reason: %s\n",CmiMyPe(),message);
00613 CmiPrintStackTrace(0);
00614
00615
00616 CmiStdoutFlush();
00617
00618 if(Cmi_truecrash) {
00619 printf("CHARM++ FATAL ERROR: %s\n", message);
00620 *(int *)NULL = 0;
00621 } else {
00622 charmrun_abort(message);
00623 machine_exit(1);
00624 }
00625 }
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
00644 #include <fcntl.h>
00645 void CmiEnableAsyncIO(int fd)
00646 {
00647 if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
00648 CmiError("setting socket owner: %s\n", strerror(errno)) ;
00649 exit(1);
00650 }
00651 if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
00652 CmiError("setting socket async: %s\n", strerror(errno)) ;
00653 exit(1);
00654 }
00655 }
00656 #else
00657 void CmiEnableAsyncIO(int fd) { }
00658 #endif
00659
00660
00661 #if !defined(_WIN32) || defined(__CYGWIN__)
00662 void CmiEnableNonblockingIO(int fd) {
00663 int on=1;
00664 if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
00665 CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
00666 exit(1);
00667 }
00668 }
00669 #else
00670 void CmiEnableNonblockingIO(int fd) { }
00671 #endif
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684 int _Cmi_mynode;
00685 int _Cmi_mynodesize;
00686 int _Cmi_numnodes;
00687 int _Cmi_numpes;
00688 static int Cmi_nodestart;
00689 static skt_ip_t Cmi_self_IP;
00690 static skt_ip_t Cmi_charmrun_IP;
00691 static int Cmi_charmrun_port;
00692 static int Cmi_charmrun_pid;
00693 static int Cmi_charmrun_fd=-1;
00694
00695 static int Cmi_net_magic;
00696
00697 static int Cmi_netpoll;
00698 static int Cmi_asyncio;
00699 static int Cmi_idlepoll;
00700 static int Cmi_syncprint;
00701 static int Cmi_print_stats = 0;
00702
00703 #if ! CMK_SMP && ! defined(_WIN32)
00704
00705 static void parse_forks(void) {
00706 char *forkstr;
00707 int nread;
00708 int forks;
00709 int i,pid;
00710 forkstr=getenv("CmiMyForks");
00711 if(forkstr!=0) {
00712 nread = sscanf(forkstr,"%d",&forks);
00713 for(i=1;i<=forks;i++) {
00714 pid=fork();
00715 if(pid<0) CmiAbort("Fork returned an error");
00716 if(pid==0) {
00717
00718 _Cmi_mynode+=i;
00719 _Cmi_mype+=i;
00720 break;
00721 }
00722 }
00723 }
00724 }
00725 #endif
00726 static void parse_magic(void)
00727 {
00728 char* nm;
00729 int nread;
00730 nm = getenv("NETMAGIC");
00731 if (nm!=0)
00732 {
00733 nread = sscanf(nm, "%d",&Cmi_net_magic);
00734 }
00735 }
00736 static void parse_netstart(void)
00737 {
00738 char *ns;
00739 int nread;
00740 int port;
00741 ns = getenv("NETSTART");
00742 if (ns!=0)
00743 {
00744 char Cmi_charmrun_name[1024];
00745 nread = sscanf(ns, "%d%s%d%d%d",
00746 &_Cmi_mynode,
00747 Cmi_charmrun_name, &Cmi_charmrun_port,
00748 &Cmi_charmrun_pid, &port);
00749 Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
00750
00751 if (nread!=5) {
00752 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
00753 exit(1);
00754 }
00755 } else
00756 {
00757 _Cmi_mynode=0;
00758 Cmi_charmrun_IP=_skt_invalid_ip;
00759 Cmi_charmrun_port=0;
00760 Cmi_charmrun_pid=0;
00761 dataport = -1;
00762 }
00763 #if CMK_USE_IBVERBS | CMK_USE_IBUD
00764 char *cmi_num_nodes = getenv("CmiNumNodes");
00765 if(cmi_num_nodes != NULL){
00766 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
00767 }
00768 #endif
00769 }
00770
00771 static void extract_common_args(char **argv)
00772 {
00773 if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
00774 Cmi_print_stats = 1;
00775 }
00776
00777
00778 #include "machine-smp.c"
00779
00780 CsvDeclare(CmiNodeState, NodeState);
00781
00782
00783 #define CMI_DEST_RANK(msg) *(int *)(msg)
00784 #include "immediate.c"
00785
00786 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
00787 #include "machine-commthd-util.c"
00788 #endif
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799 #define LOGGING 0
00800
00801 #if LOGGING
00802
00803 typedef struct logent {
00804 double time;
00805 int seqno;
00806 int srcpe;
00807 int dstpe;
00808 int kind;
00809 } *logent;
00810
00811
00812 logent log;
00813 int log_pos;
00814 int log_wrap;
00815
00816 static void log_init(void)
00817 {
00818 log = (logent)malloc(50000 * sizeof(struct logent));
00819 _MEMCHECK(log);
00820 log_pos = 0;
00821 log_wrap = 0;
00822 }
00823
00824 static void log_done(void)
00825 {
00826 char logname[100]; FILE *f; int i, size;
00827 sprintf(logname, "log.%d", _Cmi_mynode);
00828 f = fopen(logname, "w");
00829 if (f==0) KillEveryone("fopen problem");
00830 if (log_wrap) size = 50000; else size=log_pos;
00831 for (i=0; i<size; i++) {
00832 logent ent = log+i;
00833 fprintf(f, "%1.4f %d %c %d %d\n",
00834 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00835 }
00836 fclose(f);
00837 }
00838
00839 void printLog(void)
00840 {
00841 char logname[100]; FILE *f; int i, j, size;
00842 static int logged = 0;
00843 if (logged)
00844 return;
00845 logged = 1;
00846 CmiPrintf("Logging: %d\n", _Cmi_mynode);
00847 sprintf(logname, "log.%d", _Cmi_mynode);
00848 f = fopen(logname, "w");
00849 if (f==0) KillEveryone("fopen problem");
00850 for (i = 5000; i; i--)
00851 {
00852
00853 j = log_pos - i;
00854 if (j < 0)
00855 {
00856 if (log_wrap)
00857 j = 5000 + j;
00858 else
00859 j = 0;
00860 };
00861 {
00862 logent ent = log+j;
00863 fprintf(f, "%1.4f %d %c %d %d\n",
00864 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
00865 }
00866 }
00867 fclose(f);
00868 CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
00869 }
00870
00871 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
00872
00873 #endif
00874
00875 #if !LOGGING
00876
00877 #define log_init()
00878 #define log_done()
00879 #define printLog()
00880 #define LOG(t,s,k,d,q)
00881
00882 #endif
00883
00884
00885
00886
00887
00888
00889
00890
00891 static CmiNodeLock Cmi_scanf_mutex;
00892 static double Cmi_clock;
00893 static double Cmi_check_delay = 3.0;
00894
00895
00896
00897
00898
00899
00900
00901
00902 #if CMK_SHARED_VARS_UNAVAILABLE
00903
00904 static volatile int memflag=0;
00905 void CmiMemLock() { memflag++; }
00906 void CmiMemUnlock() { memflag--; }
00907
00908 static volatile int comm_flag=0;
00909 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
00910 #ifndef MACHLOCK_DEBUG
00911 # define CmiCommLock() (comm_flag=1)
00912 # define CmiCommUnlock() (comm_flag=0)
00913 #else
00914 void CmiCommLock(void) {
00915 MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
00916 comm_flag=1;
00917 }
00918 void CmiCommUnlock(void) {
00919 MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
00920 comm_flag=0;
00921 }
00922 #endif
00923
00924 static struct CmiStateStruct Cmi_state;
00925 int _Cmi_mype;
00926 int _Cmi_myrank=0;
00927 #define CmiGetState() (&Cmi_state)
00928 #define CmiGetStateN(n) (&Cmi_state)
00929
00930 void CmiYield(void) { sleep(0); }
00931
00932 static void CommunicationInterrupt(int ignored)
00933 {
00934 MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
00935 if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0))
00936 {
00937 MACHSTATE(5,"--SKIPPING SIGIO--");
00938 return;
00939 }
00940 MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
00941 {
00942
00943 CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
00944
00945 CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);
00946
00947 CmiIsomallocBlockListActivate(oldList);
00948 }
00949 MACHSTATE(2,"--END SIGIO--")
00950 }
00951
00952 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
00953
00954 static void CmiStartThreads(char **argv)
00955 {
00956 MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
00957 MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
00958 if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
00959 KillEveryone
00960 ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
00961
00962 CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00963 _Cmi_mype = Cmi_nodestart;
00964
00965
00966 _Cmi_myrank=1;
00967 CommunicationServerInit();
00968 _Cmi_myrank=0;
00969
00970 #if !CMK_ASYNC_NOT_NEEDED
00971 if (Cmi_asyncio)
00972 {
00973 CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
00974 if (!Cmi_netpoll) {
00975 if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
00976 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00977 }
00978 #if CMK_USE_GM || CMK_USE_MX
00979
00980 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
00981 #endif
00982 }
00983 #endif
00984 }
00985
00986 static void CmiDestoryLocks()
00987 {
00988 comm_flag = 0;
00989 memflag = 0;
00990 }
00991
00992 #endif
00993
00994
00995
00996
00997 CpvDeclare(unsigned , networkProgressCount);
00998 int networkProgressPeriod;
00999
01000 CpvDeclare(void *, CmiLocalQueue);
01001
01002
01003 #ifndef CmiMyPe
01004 int CmiMyPe()
01005 {
01006 return CmiGetState()->pe;
01007 }
01008 #endif
01009 #ifndef CmiMyRank
01010 int CmiMyRank()
01011 {
01012 return CmiGetState()->rank;
01013 }
01014 #endif
01015
01016 CpvExtern(int,_charmEpoch);
01017
01018
01019
01020
01021
01022 extern double evacTime;
01023
01024 void CmiPushPE(int pe,void *msg)
01025 {
01026 CmiState cs=CmiGetStateN(pe);
01027
01028
01029
01030
01031
01032
01033 MACHSTATE1(2,"Pushing message into %d's queue",pe);
01034 MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
01035
01036 #if CMK_IMMEDIATE_MSG
01037 if (CmiIsImmediate(msg)) {
01038 CmiPushImmediateMsg(msg);
01039 return;
01040 }
01041 #endif
01042 #if !CMK_SMP_MULTIQ
01043 PCQueuePush(cs->recv,msg);
01044 #else
01045 PCQueuePush(cs->recv[CmiGetState()->myGrpIdx], msg);
01046 #endif
01047
01048 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
01049 if (_Cmi_noprocforcommthread)
01050 #endif
01051 CmiIdleLock_addMessage(&cs->idle);
01052 }
01053
01054 #if CMK_NODE_QUEUE_AVAILABLE
01055
01056
01057
01058 static void CmiPushNode(void *msg)
01059 {
01060 CmiState cs=CmiGetStateN(0);
01061
01062 MACHSTATE(2,"Pushing message into node queue");
01063 MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
01064
01065 #if CMK_IMMEDIATE_MSG
01066 if (CmiIsImmediate(msg)) {
01067 MACHSTATE(2,"Pushing Immediate message into queue");
01068 CmiPushImmediateMsg(msg);
01069 return;
01070 }
01071 #endif
01072
01073
01074
01075
01076
01077 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
01078 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
01079 #endif
01080 PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
01081 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
01082 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
01083 #endif
01084
01085
01086
01087 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
01088 if (_Cmi_noprocforcommthread)
01089 #endif
01090 CmiIdleLock_addMessage(&cs->idle);
01091 }
01092 #endif
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105 static int Cmi_charmrun_fd_sendflag=0;
01106
01107
01108 static int sendone_abort_fn(int code,const char *msg) {
01109 fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
01110 machine_exit(1);
01111 return -1;
01112 }
01113
01114 static void ctrl_sendone_nolock(const char *type,
01115 const char *data1,int dataLen1,
01116 const char *data2,int dataLen2)
01117 {
01118 const void *bufs[3]; int lens[3]; int nBuffers=0;
01119 ChMessageHeader hdr;
01120 skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
01121 MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
01122 if (Cmi_charmrun_fd==-1)
01123 charmrun_abort("ctrl_sendone called in standalone!\n");
01124 Cmi_charmrun_fd_sendflag=1;
01125 ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
01126 bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
01127 if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
01128 if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
01129 skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
01130 Cmi_charmrun_fd_sendflag=0;
01131 skt_set_abort(oldAbort);
01132 MACHSTATE(2,"} ctrl_sendone_nolock");
01133 }
01134
01135 static void ctrl_sendone_locking(const char *type,
01136 const char *data1,int dataLen1,
01137 const char *data2,int dataLen2)
01138 {
01139 CmiCommLock();
01140 ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
01141 CmiCommUnlock();
01142 }
01143
01144 #ifndef MEMORYUSAGE_OUTPUT
01145 #define MEMORYUSAGE_OUTPUT 0
01146 #endif
01147 #if MEMORYUSAGE_OUTPUT
01148 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
01149 static int memoryusage_counter;
01150 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
01151 #define memoryusage_output {\
01152 memoryusage_counter++;\
01153 if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
01154 #endif
01155
01156 static double Cmi_check_last;
01157
01158
01159 static void pingCharmrun(void *ignored)
01160 {
01161 #if MEMORYUSAGE_OUTPUT
01162 memoryusage_output;
01163 if(memoryusage_isOutput){
01164 memoryusage_counter = 0;
01165 #else
01166 {
01167 #endif
01168
01169 double clock=GetClock();
01170 if (clock > Cmi_check_last + Cmi_check_delay) {
01171 MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
01172 Cmi_check_last = clock;
01173 #if CMK_USE_GM || CMK_USE_MX
01174 if (!Cmi_netpoll)
01175 #endif
01176 CmiCommLockOrElse(return;);
01177 if (Cmi_charmrun_fd_sendflag) return;
01178 CmiCommLock();
01179 ctrl_sendone_nolock("ping",NULL,0,NULL,0);
01180 CmiCommUnlock();
01181 }
01182 #if 1
01183 #if CMK_USE_GM || CMK_USE_MX
01184 if (!Cmi_netpoll)
01185 #endif
01186 CmiStdoutFlush();
01187 #endif
01188 }
01189 }
01190
01191
01192 static void pingCharmrunPeriodic(void *ignored)
01193 {
01194 pingCharmrun(ignored);
01195 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
01196 }
01197
01198 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
01199 static void charmrun_abort(const char *s)
01200 {
01201 if (Cmi_charmrun_fd==-1) {
01202 fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
01203 CmiPrintStackTrace(0);
01204 abort();
01205 } else {
01206 char msgBuf[80];
01207 skt_set_abort(ignore_further_errors);
01208 sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
01209 ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
01210 }
01211 }
01212
01213
01214
01215 #ifdef __FAULT__
01216 #include "machine-recover.c"
01217 #endif
01218
01219 static void node_addresses_store(ChMessage *msg);
01220
01221 static int barrierReceived = 0;
01222
01223 static void ctrl_getone(void)
01224 {
01225 ChMessage msg;
01226 MACHSTATE(2,"ctrl_getone")
01227 MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
01228 ChMessage_recv(Cmi_charmrun_fd,&msg);
01229 MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
01230
01231 if (strcmp(msg.header.type,"die")==0) {
01232 MACHSTATE(2,"ctrl_getone bye bye")
01233 fprintf(stderr,"aborting: %s\n",msg.data);
01234 log_done();
01235 ConverseCommonExit();
01236 machine_exit(0);
01237 }
01238 #if CMK_CCS_AVAILABLE
01239 else if (strcmp(msg.header.type, "req_fw")==0) {
01240 CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
01241
01242
01243
01244
01245
01246
01247 int pe=0;
01248 void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
01249 MACHSTATE(2,"Incoming CCS request");
01250 if (cmsg!=NULL) CmiPushPE(pe,cmsg);
01251 }
01252 #endif
01253 #ifdef __FAULT__
01254 else if(strcmp(msg.header.type,"crashnode")==0) {
01255 crash_node_handle(&msg);
01256 }
01257 else if(strcmp(msg.header.type,"initnodetab")==0) {
01260 node_addresses_store(&msg);
01261
01262 }
01263 #endif
01264 else if(strcmp(msg.header.type,"barrier")==0) {
01265 barrierReceived = 1;
01266 }
01267 else if(strcmp(msg.header.type,"barrier0")==0) {
01268 barrierReceived = 2;
01269 }
01270 else {
01271
01272
01273
01274
01275 charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
01276 machine_exit(1);
01277 }
01278
01279 MACHSTATE(2,"ctrl_getone done")
01280 ChMessage_free(&msg);
01281 }
01282
01283 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
01284
01285
01286 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
01287 {
01288 MACHSTATE(2,"Outgoing CCS reply");
01289 ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
01290 repData,repLen);
01291 MACHSTATE(1,"Outgoing CCS reply away");
01292 }
01293 #endif
01294
01295
01296
01297
01298
01299
01300 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
01301 static void InternalPrintf(const char *f, va_list l)
01302 {
01303 ChMessage replymsg;
01304 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01305 CmiStdoutFlush();
01306 vsprintf(buffer, f, l);
01307 if(Cmi_syncprint) {
01308 CmiCommLock();
01309 ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
01310 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01311 ChMessage_free(&replymsg);
01312 CmiCommUnlock();
01313 } else {
01314 ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
01315 }
01316 InternalWriteToTerminal(0,buffer,strlen(buffer));
01317 CmiTmpFree(buffer);
01318 }
01319
01320 static void InternalError(const char *f, va_list l)
01321 {
01322 ChMessage replymsg;
01323 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
01324 CmiStdoutFlush();
01325 vsprintf(buffer, f, l);
01326 if(Cmi_syncprint) {
01327 ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
01328 CmiCommLock();
01329 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01330 ChMessage_free(&replymsg);
01331 CmiCommUnlock();
01332 } else {
01333 ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
01334 }
01335 InternalWriteToTerminal(1,buffer,strlen(buffer));
01336 CmiTmpFree(buffer);
01337 }
01338
01339 static int InternalScanf(char *fmt, va_list l)
01340 {
01341 ChMessage replymsg;
01342 char *ptr[20];
01343 char *p; int nargs, i;
01344 nargs=0;
01345 p=fmt;
01346 while (*p) {
01347 if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
01348 if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
01349 if (p[0]=='%') { nargs++; p++; continue; }
01350 if (*p=='\n') *p=' '; p++;
01351 }
01352 if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
01353 for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
01354 CmiLock(Cmi_scanf_mutex);
01355 if (Cmi_charmrun_fd!=-1)
01356 {
01357 ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
01358
01359 CmiCommLock();
01360 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
01361 i = sscanf((char*)replymsg.data, fmt,
01362 ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01363 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01364 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01365 ChMessage_free(&replymsg);
01366 CmiCommUnlock();
01367 } else
01368 {
01369 i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
01370 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
01371 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
01372 }
01373 CmiUnlock(Cmi_scanf_mutex);
01374 return i;
01375 }
01376
01377 #if CMK_CMIPRINTF_IS_A_BUILTIN
01378
01379
01380 void CmiPrintf(const char *fmt, ...)
01381 {
01382 CpdSystemEnter();
01383 {
01384 va_list p; va_start(p, fmt);
01385 if (Cmi_charmrun_fd!=-1)
01386 InternalPrintf(fmt, p);
01387 else
01388 vfprintf(stdout,fmt,p);
01389 va_end(p);
01390 }
01391 CpdSystemExit();
01392 }
01393
01394 void CmiError(const char *fmt, ...)
01395 {
01396 CpdSystemEnter();
01397 {
01398 va_list p; va_start (p, fmt);
01399 if (Cmi_charmrun_fd!=-1)
01400 InternalError(fmt, p);
01401 else
01402 vfprintf(stderr,fmt,p);
01403 va_end(p);
01404 }
01405 CpdSystemExit();
01406 }
01407
01408 int CmiScanf(const char *fmt, ...)
01409 {
01410 int i;
01411 CpdSystemEnter();
01412 {
01413 va_list p; va_start(p, fmt);
01414 i = InternalScanf((char *)fmt, p);
01415 va_end(p);
01416 }
01417 CpdSystemExit();
01418 return i;
01419 }
01420
01421 #endif
01422
01423
01424
01425
01426
01427
01428
01429
01430
01431 static int readStdout[2];
01432 static int writeStdout[2];
01433 static int serviceStdout[2];
01434 #define readStdoutBufLen (16*1024)
01435 static char readStdoutBuf[readStdoutBufLen+1];
01436 static int servicingStdout;
01437
01438
01439 static void CmiStdoutInit(void) {
01440 int i;
01441 if (Cmi_charmrun_fd==-1) return;
01442
01443
01444 #if !defined(_WIN32) || defined(__CYGWIN__)
01445
01446 setbuf(stdout,NULL); setbuf(stderr,NULL);
01447
01448
01449 for (i=0;i<2;i++) {
01450 int pair[2];
01451 int srcFd=1+i;
01452
01453
01454 writeStdout[i]=dup(srcFd);
01455 #if 0
01456
01457 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
01458 #else
01459
01460 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair))
01461 {perror("building stdio redirection socketpair"); exit(1);}
01462 #endif
01463 readStdout[i]=pair[0];
01464 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
01465
01466 #if 0
01467 CmiEnableNonblockingIO(srcFd);
01468 #endif
01469 #if CMK_SHARED_VARS_UNAVAILABLE
01470 if (Cmi_asyncio)
01471 {
01472
01473 CmiEnableAsyncIO(readStdout[i]);
01474 }
01475 #endif
01476 }
01477 #else
01478
01479 # ifndef read
01480 # define read(x,y,z) 0
01481 # endif
01482 # ifndef write
01483 # define write(x,y,z)
01484 # endif
01485 #endif
01486 }
01487
01488
01489 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
01490 {
01491 write(writeStdout[isStdErr],str,len);
01492 }
01493
01494
01495
01496
01497
01498 static void CmiStdoutServiceOne(int i) {
01499 int nBytes;
01500 const static char *cmdName[2]={"print","printerr"};
01501 servicingStdout=1;
01502 while(1) {
01503 const char *tooMuchWarn=NULL; int tooMuchLen=0;
01504 if (!skt_select1(readStdout[i],0)) break;
01505 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
01506 if (nBytes<=0) break;
01507
01508
01509 readStdoutBuf[nBytes]=0;
01510 nBytes++;
01511
01512 if (nBytes>=readStdoutBufLen-100)
01513 {
01514
01515
01516 tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
01517 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
01518 nBytes--;
01519 tooMuchLen=strlen(tooMuchWarn)+1;
01520 }
01521 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
01522 tooMuchWarn,tooMuchLen);
01523
01524 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
01525 }
01526 servicingStdout=0;
01527 serviceStdout[i]=0;
01528 }
01529
01530
01531
01532
01533
01534 static void CmiStdoutServiceAll(void) {
01535 int i;
01536 for (i=0;i<2;i++) {
01537 if (readStdout[i]==0) continue;
01538 CmiStdoutServiceOne(i);
01539 }
01540 }
01541
01542
01543
01544
01545 static void CmiStdoutService(void) {
01546 CmiStdoutServiceAll();
01547 }
01548
01549
01550
01551
01552 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
01553 int i;
01554 for (i=0;i<2;i++) {
01555 if (readStdout[i]==0) continue;
01556 CMK_PIPE_ADDREAD(readStdout[i]);
01557 }
01558 }
01559 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
01560 int i;
01561 for (i=0;i<2;i++) {
01562 if (readStdout[i]==0) continue;
01563 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
01564 }
01565 }
01566 static int CmiStdoutNeedsService(void) {
01567 return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
01568 }
01569
01570
01571 static void CmiStdoutFlush(void) {
01572 if (servicingStdout) return;
01573 CmiCommLockOrElse( return; )
01574 CmiCommLock();
01575 CmiStdoutServiceAll();
01576 CmiCommUnlock();
01577 }
01578
01579
01580
01581
01582
01583
01584 #include "machine-dgram.c"
01585
01586
01587 #ifndef CmiNodeFirst
01588 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
01589 int CmiNodeSize(int node) { return nodes[node].nodesize; }
01590 #endif
01591
01592 #ifndef CmiNodeOf
01593 int CmiNodeOf(int pe) { return (nodes_by_pe[pe] - nodes); }
01594 int CmiRankOf(int pe) { return pe - (nodes_by_pe[pe]->nodestart); }
01595 #endif
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624 #if CMK_USE_IBVERBS
01625 void copyInfiAddr(ChInfiAddr *qpList);
01626 #endif
01627
01628 #if CMK_IBVERBS_FAST_START
01629 static void send_partial_init()
01630 {
01631 ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
01632 ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
01633 }
01634 #endif
01635
01636
01637
01638
01639 static void node_addresses_obtain(char **argv)
01640 {
01641 ChMessage nodetabmsg;
01642 MACHSTATE(3,"node_addresses_obtain { ");
01643 if (Cmi_charmrun_fd==-1)
01644 {
01645 int npes=1;
01646 ChSingleNodeinfo *fakeTab;
01647 ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
01648 fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
01649 CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
01650 #if CMK_SHARED_VARS_UNAVAILABLE
01651 if (npes!=1) {
01652 fprintf(stderr,
01653 "To use multiple processors, you must run this program as:\n"
01654 " > charmrun +p%d %s <args>\n"
01655 "or build the %s-smp version of Charm++.\n",
01656 npes,argv[0],CMK_MACHINE_NAME);
01657 exit(1);
01658 }
01659 #else
01660
01661 if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) ||
01662 CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
01663 npes = _Cmi_mynodesize;
01664 #endif
01665
01666
01667
01668
01669
01670 fakeTab->nodeNo=ChMessageInt_new(1);
01671 fakeTab->info.nPE=ChMessageInt_new(npes);
01672 fakeTab->info.dataport=ChMessageInt_new(0);
01673 fakeTab->info.IP=_skt_invalid_ip;
01674 }
01675 else
01676 {
01677 ChSingleNodeinfo me;
01678
01679 me.nodeNo=ChMessageInt_new(_Cmi_mynode);
01680
01681 #if CMK_USE_IBVERBS
01682 {
01683 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
01684 me.info.qpList = malloc(qpListSize);
01685 copyInfiAddr(me.info.qpList);
01686 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
01687 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
01688 free(me.info.qpList);
01689 }
01690 #else
01691
01692
01693
01694
01695 me.info.nPE=ChMessageInt_new(0);
01696
01697 me.info.IP=skt_innode_my_ip();
01698 me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
01699 #ifdef CMK_USE_MX
01700 me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
01701 #endif
01702 #if CMK_USE_IBUD
01703 me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
01704 me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
01705 me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
01706 MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
01707 #endif
01708 me.info.dataport=ChMessageInt_new(dataport);
01709
01710
01711
01712
01713 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
01714 MACHSTATE1(5,"send initnode - dataport:%d", dataport);
01715 #endif //CMK_USE_IBVERBS
01716
01717 MACHSTATE(3,"initnode sent");
01718
01719
01720
01721 if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
01722 CmiAbort("Timeout waiting for nodetab!\n");
01723 }
01724 MACHSTATE(2,"recv initnode {");
01725 ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
01726 MACHSTATE(2,"} recv initnode");
01727 }
01728
01729
01730 node_addresses_store(&nodetabmsg);
01731 ChMessage_free(&nodetabmsg);
01732
01733 MACHSTATE(3,"} node_addresses_obtain ");
01734 }
01735
01736 #if CMK_NODE_QUEUE_AVAILABLE
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746
01747
01748 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
01749 {
01750 int i, rank, dst; OtherNode node;
01751
01752 dst = ogm->dst;
01753 switch (dst) {
01754 case NODE_BROADCAST_ALL:
01755 CmiPushNode(CopyMsg(ogm->data,ogm->size));
01756
01757 case NODE_BROADCAST_OTHERS:
01758 #if CMK_BROADCAST_SPANNING_TREE
01759 SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
01760 #elif CMK_BROADCAST_HYPERCUBE
01761 SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
01762 #else
01763 for (i=0; i<_Cmi_numnodes; i++)
01764 if (i!=_Cmi_mynode)
01765 DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
01766 #endif
01767 GarbageCollectMsg(ogm);
01768 break;
01769 default:
01770 node = nodes+dst;
01771 rank=DGRAM_NODEMESSAGE;
01772 if (dst != _Cmi_mynode) {
01773 DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
01774 GarbageCollectMsg(ogm);
01775 } else {
01776 if (ogm->freemode == 'A') {
01777 CmiPushNode(CopyMsg(ogm->data,ogm->size));
01778 ogm->freemode = 'X';
01779 } else {
01780 CmiPushNode(ogm->data);
01781 FreeOutgoingMsg(ogm);
01782 }
01783 }
01784 }
01785 }
01786
01787 #else
01788
01789 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
01790
01791 #endif
01792
01793 #if CMK_C_INLINE
01794 inline static
01795 #endif
01796 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
01797 #if CMK_USE_SYSVSHM
01798 {
01799 #if SYSVSHM_STATS
01800 double _startValidTime = CmiWallTimer();
01801 #endif
01802 int ret=CmiValidSysvshm(ogm,node);
01803 #if SYSVSHM_STATS
01804 sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
01805 #endif
01806 MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
01807 if(ret){
01808 CmiSendMessageSysvshm(ogm,node,rank,broot);
01809 }else{
01810 DeliverViaNetwork(ogm, node, rank, broot,copy);
01811 }
01812 }
01813 #elif CMK_USE_PXSHM
01814 {
01815 #if PXSHM_STATS
01816 double _startValidTime = CmiWallTimer();
01817 #endif
01818 int ret=CmiValidPxshm(ogm,node);
01819 #if PXSHM_STATS
01820 pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
01821 #endif
01822 MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
01823 if(ret){
01824 CmiSendMessagePxshm(ogm,node,rank,broot);
01825 }else{
01826 DeliverViaNetwork(ogm, node, rank, broot,copy);
01827 }
01828 }
01829 #else
01830 DeliverViaNetwork(ogm, node, rank, broot, copy);
01831 #endif
01832
01833 }
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847 int DeliverOutgoingMessage(OutgoingMsg ogm)
01848 {
01849 int i, rank, dst; OtherNode node;
01850
01851 int network = 1;
01852
01853
01854 dst = ogm->dst;
01855
01856 switch (dst) {
01857 case PE_BROADCAST_ALL:
01858 #if !CMK_SMP_NOT_RELAX_LOCK
01859 CmiCommLock();
01860 #endif
01861 for (rank = 0; rank<_Cmi_mynodesize; rank++) {
01862 CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
01863 }
01864 #if CMK_BROADCAST_SPANNING_TREE
01865 SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
01866 #elif CMK_BROADCAST_HYPERCUBE
01867 SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
01868 #else
01869 for (i=0; i<_Cmi_numnodes; i++)
01870 if (i!=_Cmi_mynode){
01871
01872 if(CmiNodeAlive(i)){
01873 DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
01874 }
01875 }
01876 #endif
01877 GarbageCollectMsg(ogm);
01878 #if !CMK_SMP_NOT_RELAX_LOCK
01879 CmiCommUnlock();
01880 #endif
01881 break;
01882 case PE_BROADCAST_OTHERS:
01883 #if !CMK_SMP_NOT_RELAX_LOCK
01884 CmiCommLock();
01885 #endif
01886 for (rank = 0; rank<_Cmi_mynodesize; rank++)
01887 if (rank + Cmi_nodestart != ogm->src) {
01888 CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
01889 }
01890 #if CMK_BROADCAST_SPANNING_TREE
01891 SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
01892 #elif CMK_BROADCAST_HYPERCUBE
01893 SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
01894 #else
01895 for (i = 0; i<_Cmi_numnodes; i++)
01896 if (i!=_Cmi_mynode){
01897
01898 if(CmiNodeAlive(i)){
01899 DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
01900 }
01901 }
01902 #endif
01903 GarbageCollectMsg(ogm);
01904 #if !CMK_SMP_NOT_RELAX_LOCK
01905 CmiCommUnlock();
01906 #endif
01907 break;
01908 default:
01909 #ifndef CMK_OPTIMIZE
01910 if (dst<0 || dst>=CmiNumPes())
01911 CmiAbort("Send to out-of-bounds processor!");
01912 #endif
01913 node = nodes_by_pe[dst];
01914 rank = dst - node->nodestart;
01915 if (node->nodestart != Cmi_nodestart) {
01916 #if !CMK_SMP_NOT_RELAX_LOCK
01917 CmiCommLock();
01918 #endif
01919 DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
01920 GarbageCollectMsg(ogm);
01921 #if !CMK_SMP_NOT_RELAX_LOCK
01922 CmiCommUnlock();
01923 #endif
01924 } else {
01925 network = 0;
01926 if (ogm->freemode == 'A') {
01927 CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
01928 ogm->freemode = 'X';
01929 } else {
01930 CmiPushPE(rank, ogm->data);
01931 FreeOutgoingMsg(ogm);
01932 }
01933 }
01934 }
01935 #if CMK_MULTICORE
01936 network = 0;
01937 #endif
01938 return network;
01939 }
01940
01941
01942
01943
01944
01945
01946
01947
01948
01949
01950
01951
01952
01953
01954
01955 #if CMK_NODE_QUEUE_AVAILABLE
01956 char *CmiGetNonLocalNodeQ(void)
01957 {
01958 char *result = 0;
01959 if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
01960 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
01961 result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
01962 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
01963 }
01964 return result;
01965 }
01966 #endif
01967
01968 void *CmiGetNonLocal(void)
01969 {
01970 #if CMK_SMP_MULTIQ
01971 int i;
01972 #endif
01973
01974 CmiState cs = CmiGetState();
01975 CmiIdleLock_checkMessage(&cs->idle);
01976
01977 #if !CMK_SMP_MULTIQ
01978 return (void *) PCQueuePop(cs->recv);
01979 #else
01980 void *retVal = NULL;
01981 for(i=cs->curPolledIdx; i<MULTIQ_GRPSIZE; i++){
01982 retVal = (void *)PCQueuePop(cs->recv[i]);
01983 if(retVal!=NULL) {
01984 cs->curPolledIdx = i+1;
01985 return retVal;
01986 }
01987 }
01988 cs->curPolledIdx=0;
01989 return NULL;
01990 #endif
01991 }
01992
01993
01997 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
01998 OutgoingMsg ogm;
01999 MallocOutgoingMsg(ogm);
02000 MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
02001 ogm->size = size;
02002 ogm->data = data;
02003 ogm->src = cs->pe;
02004 ogm->dst = pe;
02005 ogm->freemode = freemode;
02006 ogm->refcount = 0;
02007 return (CmiCommHandle)ogm;
02008 }
02009
02010 #if CMK_NODE_QUEUE_AVAILABLE
02011
02012
02013
02014
02015
02016
02017
02018
02019
02020
02021
02022
02023
02024 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
02025 {
02026 CmiState cs = CmiGetState(); OutgoingMsg ogm;
02027 MACHSTATE(1,"CmiGeneralNodeSend {");
02028
02029 if (freemode == 'S') {
02030 char *copy = (char *)CmiAlloc(size);
02031 if (!copy)
02032 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
02033 memcpy(copy, data, size);
02034 data = copy; freemode = 'F';
02035 }
02036
02037 #if CMK_IMMEDIATE_MSG
02038
02039 if (node == CmiMyNode() && CmiIsImmediate(data)) {
02040 CmiPushImmediateMsg(data);
02041
02042 if (!_immRunning) CmiHandleImmediate();
02043 return;
02044 }
02045 #endif
02046
02047 CmiMsgHeaderSetLength(data, size);
02048 ogm=PrepareOutgoing(cs,node,size,freemode,data);
02049 CmiCommLock();
02050 DeliverOutgoingNodeMessage(ogm);
02051 CmiCommUnlock();
02052
02053 CommunicationServer(0, COMM_SERVER_FROM_WORKER);
02054 MACHSTATE(1,"} CmiGeneralNodeSend");
02055 return (CmiCommHandle)ogm;
02056 }
02057
02058 #endif
02059
02060
02061
02062
02063
02064
02065
02066
02067
02068
02069
02070
02071
02072
02073 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
02074 {
02075 int sendonnetwork;
02076 CmiState cs = CmiGetState(); OutgoingMsg ogm;
02077 MACHSTATE(1,"CmiGeneralSend {");
02078
02079 if (freemode == 'S') {
02080 #if CMK_USE_GM
02081 if (pe != cs->pe) {
02082 freemode = 'G';
02083 }
02084 else
02085 #endif
02086 {
02087 char *copy = (char *)CmiAlloc(size);
02088 if (!copy)
02089 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
02090 memcpy(copy, data, size);
02091 data = copy; freemode = 'F';
02092 }
02093 }
02094
02095 if (pe == cs->pe) {
02096 #if ! CMK_SMP
02097 if (!_immRunning)
02098
02099 #endif
02100 {
02101 #if CMK_IMMEDIATE_MSG
02102
02103
02104 if (CmiIsImmediate(data)) {
02105 CmiPushImmediateMsg(data);
02106 CmiHandleImmediate();
02107 return 0;
02108 }
02109 #endif
02110 CdsFifo_Enqueue(cs->localqueue, data);
02111 if (freemode == 'A') {
02112 MallocOutgoingMsg(ogm);
02113 ogm->freemode = 'X';
02114 return ogm;
02115 } else return 0;
02116 }
02117 }
02118
02119 #if CMK_PERSISTENT_COMM
02120 if (phs) {
02121 CmiAssert(phsSize == 1);
02122 CmiSendPersistentMsg(*phs, pe, size, data);
02123 return NULL;
02124 }
02125 #endif
02126
02127 CmiMsgHeaderSetLength(data, size);
02128 ogm=PrepareOutgoing(cs,pe,size,freemode,data);
02129
02130 #if CMK_SMP_NOT_RELAX_LOCK
02131 CmiCommLock();
02132 #endif
02133
02134 sendonnetwork = DeliverOutgoingMessage(ogm);
02135
02136 #if CMK_SMP_NOT_RELAX_LOCK
02137 CmiCommUnlock();
02138 #endif
02139
02140
02141 #if CMK_USE_SYSVSHM
02142 CommunicationServerSysvshm();
02143 #elif CMK_USE_PXSHM
02144 CommunicationServerPxshm();
02145 #endif
02146 #if !CMK_SHARED_VARS_UNAVAILABLE
02147 #if !CMK_SMP_NOT_SKIP_COMMSERVER
02148 if (sendonnetwork!=0)
02149 #endif
02150 #endif
02151 CommunicationServer(0, COMM_SERVER_FROM_WORKER);
02152 MACHSTATE(1,"} CmiGeneralSend");
02153 return (CmiCommHandle)ogm;
02154 }
02155
02156
02157 void CmiSyncSendFn(int p, int s, char *m)
02158 {
02159 CQdCreate(CpvAccess(cQdState), 1);
02160 CmiGeneralSend(p,s,'S',m);
02161 }
02162
02163 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
02164 {
02165 CQdCreate(CpvAccess(cQdState), 1);
02166 return CmiGeneralSend(p,s,'A',m);
02167 }
02168
02169 void CmiFreeSendFn(int p, int s, char *m)
02170 {
02171 CQdCreate(CpvAccess(cQdState), 1);
02172 CmiGeneralSend(p,s,'F',m);
02173 }
02174
02175 void CmiSyncBroadcastFn(int s, char *m)
02176 {
02177 CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
02178 CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m);
02179 }
02180
02181 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
02182 {
02183 CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
02184 return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m);
02185 }
02186
02187 void CmiFreeBroadcastFn(int s, char *m)
02188 {
02189 CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
02190 CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m);
02191 }
02192
02193 void CmiSyncBroadcastAllFn(int s, char *m)
02194 {
02195 CQdCreate(CpvAccess(cQdState), CmiNumPes());
02196 CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m);
02197 }
02198
02199 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
02200 {
02201 CQdCreate(CpvAccess(cQdState), CmiNumPes());
02202 return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m);
02203 }
02204
02205 void CmiFreeBroadcastAllFn(int s, char *m)
02206 {
02207 CQdCreate(CpvAccess(cQdState), CmiNumPes());
02208 CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m);
02209 }
02210
02211 #if CMK_NODE_QUEUE_AVAILABLE
02212
02213 void CmiSyncNodeSendFn(int p, int s, char *m)
02214 {
02215 CQdCreate(CpvAccess(cQdState), 1);
02216 CmiGeneralNodeSend(p,s,'S',m);
02217 }
02218
02219 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
02220 {
02221 CQdCreate(CpvAccess(cQdState), 1);
02222 return CmiGeneralNodeSend(p,s,'A',m);
02223 }
02224
02225 void CmiFreeNodeSendFn(int p, int s, char *m)
02226 {
02227 CQdCreate(CpvAccess(cQdState), 1);
02228 CmiGeneralNodeSend(p,s,'F',m);
02229 }
02230
02231 void CmiSyncNodeBroadcastFn(int s, char *m)
02232 {
02233 CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
02234 CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m);
02235 }
02236
02237 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
02238 {
02239 CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
02240 return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
02241 }
02242
02243 void CmiFreeNodeBroadcastFn(int s, char *m)
02244 {
02245 CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
02246 CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m);
02247 }
02248
02249 void CmiSyncNodeBroadcastAllFn(int s, char *m)
02250 {
02251 CQdCreate(CpvAccess(cQdState), CmiNumNodes());
02252 CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m);
02253 }
02254
02255 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
02256 {
02257 CQdCreate(CpvAccess(cQdState), CmiNumNodes());
02258 return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m);
02259 }
02260
02261 void CmiFreeNodeBroadcastAllFn(int s, char *m)
02262 {
02263 CQdCreate(CpvAccess(cQdState), CmiNumNodes());
02264 CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m);
02265 }
02266 #endif
02267
02268
02269
02270
02271
02272
02273
02274 int CmiAsyncMsgSent(CmiCommHandle handle)
02275 {
02276 return (((OutgoingMsg)handle)->freemode == 'X');
02277 }
02278
02279 void CmiReleaseCommHandle(CmiCommHandle handle)
02280 {
02281 FreeOutgoingMsg(((OutgoingMsg)handle));
02282 }
02283
02284 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
02285
02286
02287
02288
02289
02290
02291
02292 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
02293 {
02294 int i;
02295 for(i=0;i<npes;i++) {
02296 CmiReference(msg);
02297 CmiSyncSendAndFree(pes[i], len, msg);
02298 }
02299 }
02300
02301 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
02302 {
02303 CmiError("ListSend not implemented.");
02304 return (CmiCommHandle) 0;
02305 }
02306
02307
02308
02309
02310
02311
02312 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
02313 {
02314 int i;
02315 for(i=0;i<npes;i++) {
02316 CmiReference(msg);
02317 CmiSyncSendAndFree(pes[i], len, msg);
02318 }
02319 CmiFree(msg);
02320 }
02321
02322 #endif
02323
02324 #if CMK_BROADCAST_SPANNING_TREE
02325
02326
02327
02328
02329
02330 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
02331 {
02332 CmiState cs = CmiGetState();
02333 int i;
02334
02335 if (root) startpe = _Cmi_mynode;
02336 else ogm = NULL;
02337
02338 CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
02339
02340 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
02341 int p = _Cmi_mynode-startpe;
02342 if (p<0) p+=_Cmi_numnodes;
02343 p = BROADCAST_SPANNING_FACTOR*p + i;
02344 if (p > _Cmi_numnodes - 1) break;
02345 p += startpe;
02346 p = p%_Cmi_numnodes;
02347 CmiAssert(p!=_Cmi_mynode);
02348
02349 if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
02350
02351 DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
02352 }
02353 if (!root && ogm) GarbageCollectMsg(ogm);
02354 }
02355 #endif
02356
02357 #if CMK_BROADCAST_HYPERCUBE
02358 int log_of_2 (int i) {
02359 int m;
02360 for (m=0; i>(1<<m); ++m);
02361 return m;
02362 }
02363
02364
02365
02366
02367 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
02368 {
02369 CmiState cs = CmiGetState();
02370 int i, k, npes, tmp;
02371 int *dest_pes;
02372
02373 if (root) {
02374 msg = ogm->data;
02375 srcpe = CmiMyNode();
02376 }
02377 else ogm = NULL;
02378
02379 tmp = srcpe ^ CmiMyNode();
02380 k = log_of_2(CmiNumNodes()) + 2;
02381 if (tmp) {
02382 do {--k;} while (!(tmp>>k));
02383 }
02384
02385 MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
02386
02387 dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
02388 k--;
02389 npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
02390
02391 for (i = 0; i < npes; i++) {
02392 int p = dest_pes[i];
02393
02394 if (!root && ! ogm)
02395 ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
02396 DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
02397 }
02398 if (!root && ogm) GarbageCollectMsg(ogm);
02399 CmiTmpFree(dest_pes);
02400 }
02401 #endif
02402
02403
02404
02405
02406
02407
02408
02409
02410
02411
02412
02413
02414 void CmiMachineProgressImpl()
02415 {
02416 #if CMK_USE_SYSVSHM
02417 CommunicationServerSysvshm();
02418 #elif CMK_USE_PXSHM
02419 CommunicationServerPxshm();
02420 #endif
02421 CommunicationServer(0, COMM_SERVER_FROM_SMP);
02422 }
02423
02424
02425
02426
02427
02428
02429
02430 #if CMK_BARRIER_USE_COMMON_CODE
02431
02432
02433
02434 int CmiBarrier()
02435 {
02436 int len, size, i;
02437 int status;
02438 int numnodes = CmiNumNodes();
02439 static int barrier_phase = 0;
02440
02441 if (Cmi_charmrun_fd == -1) return 0;
02442 if (numnodes == 1) {
02443 CmiNodeAllBarrier();
02444 return 0;
02445 }
02446
02447 if (CmiMyRank() == 0) {
02448 ctrl_sendone_locking("barrier",NULL,0,NULL,0);
02449 while (barrierReceived != 1) {
02450 CmiCommLock();
02451 ctrl_getone();
02452 CmiCommUnlock();
02453 }
02454 barrierReceived = 0;
02455 barrier_phase ++;
02456 }
02457
02458 CmiNodeAllBarrier();
02459
02460 return 0;
02461 }
02462
02463
02464 int CmiBarrierZero()
02465 {
02466 int i;
02467 int numnodes = CmiNumNodes();
02468 ChMessage msg;
02469
02470 if (Cmi_charmrun_fd == -1) return 0;
02471 if (numnodes == 1) {
02472 CmiNodeAllBarrier();
02473 return 0;
02474 }
02475
02476 if (CmiMyRank() == 0) {
02477 char str[64];
02478 sprintf(str, "%d", CmiMyNode());
02479 ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
02480 if (CmiMyNode() == 0) {
02481 while (barrierReceived != 2) {
02482 CmiCommLock();
02483 ctrl_getone();
02484 CmiCommUnlock();
02485 }
02486 barrierReceived = 0;
02487 }
02488 }
02489
02490 CmiNodeAllBarrier();
02491 return 0;
02492 }
02493
02494 #endif
02495
02496
02497
02498
02499
02500
02501 extern void CthInit(char **argv);
02502 extern void ConverseCommonInit(char **);
02503
02504 static char **Cmi_argv;
02505 static char **Cmi_argvcopy;
02506 static CmiStartFn Cmi_startfn;
02507 static int Cmi_usrsched;
02508
02509 static void ConverseRunPE(int everReturn)
02510 {
02511 CmiIdleState *s=CmiNotifyGetState();
02512 CmiState cs;
02513 char** CmiMyArgv;
02514 CmiNodeAllBarrier();
02515 cs = CmiGetState();
02516 CpvInitialize(void *,CmiLocalQueue);
02517 CpvAccess(CmiLocalQueue) = cs->localqueue;
02518
02519
02520 if (CmiMyRank())
02521 CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
02522 else
02523 CmiMyArgv = Cmi_argv;
02524 CthInit(CmiMyArgv);
02525
02526 #if CMK_USE_GM
02527 CmiCheckGmStatus();
02528 #endif
02529
02530 ConverseCommonInit(CmiMyArgv);
02531
02532
02533
02534
02535 CpvInitialize(int , networkProgressCount);
02536 CpvAccess(networkProgressCount) = 0;
02537
02538
02539 if (CmiMyPe() == 0) {
02540 if (Cmi_netpoll == 1) {
02541 CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
02542 }
02543 #if CMK_SHARED_VARS_UNAVAILABLE
02544 else {
02545 if (CmiMemoryIs(CMI_MEMORY_IS_OS))
02546 CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
02547 }
02548 #endif
02549 }
02550
02551 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
02552 CmiInitNotifyCommThdScheme();
02553 #endif
02554
02555 #if MEMORYUSAGE_OUTPUT
02556 memoryusage_counter = 0;
02557 #endif
02558 #if CMK_USE_GM || CMK_USE_MX
02559 if (Cmi_charmrun_fd != -1)
02560 #endif
02561 {
02562 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
02563 (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
02564 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
02565 (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
02566 #if CMK_USE_SYSVSHM
02567 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
02568 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
02569 #elif CMK_USE_PXSHM
02570
02571 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
02572 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
02573 #endif
02574 }
02575
02576 #if CMK_SHARED_VARS_UNAVAILABLE
02577 if (Cmi_netpoll)
02578 CcdCallOnConditionKeep(CcdPERIODIC,
02579 (CcdVoidFn) CommunicationPeriodic, NULL);
02580 else
02581 CcdCallOnConditionKeep(CcdPERIODIC_10ms,
02582 (CcdVoidFn) CommunicationPeriodic, NULL);
02583 #endif
02584
02585 if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
02586 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
02587 #if CMK_SHARED_VARS_UNAVAILABLE
02588 if (!Cmi_asyncio) {
02589
02590 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
02591 }
02592 else {
02593
02594 struct itimerval i;
02595 CmiSignal(SIGALRM, 0, 0, pingCharmrun);
02596 #if MEMORYUSAGE_OUTPUT
02597 i.it_interval.tv_sec = 0;
02598 i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
02599 i.it_value.tv_sec = 0;
02600 i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
02601 #else
02602 i.it_interval.tv_sec = 1;
02603 i.it_interval.tv_usec = 0;
02604 i.it_value.tv_sec = 1;
02605 i.it_value.tv_usec = 0;
02606 #endif
02607 setitimer(ITIMER_REAL, &i, NULL);
02608 }
02609
02610 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
02611
02612
02613 CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
02614 #endif
02615 #endif
02616
02617
02618 Cmi_clock=GetClock();
02619 }
02620
02621 #ifdef IGET_FLOWCONTROL
02622
02623 getAvailSysMem();
02624
02625 CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000);
02626 CcdCallOnConditionKeep(CcdPERIODIC_10s,
02627 (CcdVoidFn) TokenUpdatePeriodic, NULL);
02628 #endif
02629
02630 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
02631 srand((int)(1024.0*CmiWallTimer()));
02632 if (CmiMyPe()==0)
02633 CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
02634 CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
02635 #endif
02636
02637 #ifdef __ONESIDED_IMPL
02638 #ifdef __ONESIDED_NO_HARDWARE
02639 putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
02640 putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
02641 getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
02642 getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
02643 #endif
02644 #ifdef __ONESIDED_GM_HARDWARE
02645 getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
02646 getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
02647 #endif
02648 #endif
02649
02650
02651 if (CmiMyRank() == CmiMyNodeSize()) {
02652 if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
02653 if (Cmi_charmrun_fd!=-1)
02654 while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
02655 }
02656 else{
02657 if (!everReturn) {
02658 Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
02659
02660
02661 _immediateReady = 1;
02662 if (Cmi_usrsched==0) CsdScheduler(-1);
02663 ConverseExit();
02664 }else{
02665 _immediateReady = 1;
02666 }
02667 }
02668 }
02669
02670 void ConverseExit(void)
02671 {
02672 MACHSTATE(2,"ConverseExit {");
02673 machine_initiated_shutdown=1;
02674 if (CmiMyRank()==0) {
02675 if(Cmi_print_stats)
02676 printNetStatistics();
02677 log_done();
02678 }
02679 #if CMK_USE_SYSVSHM
02680 CmiExitSysvshm();
02681 #elif CMK_USE_PXSHM
02682 CmiExitPxshm();
02683 #endif
02684 ConverseCommonExit();
02685 CmiNodeBarrier();
02686 if (CmiMyRank()==0) CmiStdoutFlush();
02687 if (Cmi_charmrun_fd==-1) {
02688 if (CmiMyRank() == 0) exit(0);
02689 else while (1) CmiYield();
02690 }
02691 else {
02692 ctrl_sendone_locking("ending",NULL,0,NULL,0);
02693 #if CMK_SHARED_VARS_UNAVAILABLE
02694 Cmi_check_delay = 1.0;
02695 while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
02696 #elif CMK_MULTICORE
02697 if (!Cmi_commthread && CmiMyRank()==0) {
02698 Cmi_check_delay = 1.0;
02699 while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
02700 }
02701 #endif
02702 }
02703 MACHSTATE(2,"} ConverseExit");
02704
02705
02706 while (1) CmiYield();
02707 }
02708
02709 static void set_signals(void)
02710 {
02711 if(!Cmi_truecrash) {
02712 signal(SIGSEGV, KillOnAllSigs);
02713 signal(SIGFPE, KillOnAllSigs);
02714 signal(SIGILL, KillOnAllSigs);
02715 signal(SIGINT, KillOnAllSigs);
02716 signal(SIGTERM, KillOnAllSigs);
02717 signal(SIGABRT, KillOnAllSigs);
02718 # if !defined(_WIN32) || defined(__CYGWIN__)
02719 signal(SIGQUIT, KillOnAllSigs);
02720 signal(SIGBUS, KillOnAllSigs);
02721 # if CMK_HANDLE_SIGUSR
02722 signal(SIGUSR1, HandleUserSignals);
02723 signal(SIGUSR2, HandleUserSignals);
02724 # endif
02725 # endif
02726 }
02727 }
02728
02729
02730
02731
02732 static void obtain_idleFn(void) {sleep(0);}
02733
02734 static int net_default_skt_abort(int code,const char *msg)
02735 {
02736 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
02737 machine_exit(1);
02738 return -1;
02739 }
02740
02741 #if MACHINE_DEBUG_LOG
02742 FILE *debugLog = NULL;
02743 #endif
02744
02745 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
02746 {
02747 #if MACHINE_DEBUG
02748 debugLog=NULL;
02749 #endif
02750 #if CMK_USE_HP_MAIN_FIX
02751 #if FOR_CPLUS
02752 _main(argc,argv);
02753 #endif
02754 #endif
02755 Cmi_startfn = fn; Cmi_usrsched = usc;
02756 Cmi_netpoll = 0;
02757 #if CMK_NETPOLL
02758 Cmi_netpoll = 1;
02759 #endif
02760 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
02761 Cmi_idlepoll = 0;
02762 #else
02763 Cmi_idlepoll = 1;
02764 #endif
02765 Cmi_truecrash = 0;
02766 if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
02767 CmiGetArgFlagDesc(argv,"++debug",NULL )) Cmi_truecrash = 1;
02768
02769 if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
02770 if (CmiGetArgFlagDesc(argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
02771
02772 if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
02773
02774 if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
02775 Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
02776
02777 Cmi_asyncio = 1;
02778 #if CMK_ASYNC_NOT_NEEDED
02779 Cmi_asyncio = 0;
02780 #endif
02781 if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
02782 if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
02783 #if CMK_MULTICORE
02784 if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
02785 Cmi_commthread = 1;
02786 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
02787 _Cmi_noprocforcommthread = 1;
02788 #endif
02789 if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
02790 }
02791 #endif
02792
02793 skt_init();
02794
02795
02796 skt_set_abort(net_default_skt_abort);
02797 atexit(machine_atexit_check);
02798 parse_netstart();
02799 parse_magic();
02800 #if ! CMK_SMP && ! defined(_WIN32)
02801
02802 parse_forks();
02803 #endif
02804 extract_args(argv);
02805 log_init();
02806 Cmi_scanf_mutex = CmiCreateLock();
02807
02808 #if MACHINE_DEBUG_LOG
02809 {
02810 char ln[200];
02811 sprintf(ln,"debugLog.%d",_Cmi_mynode);
02812 debugLog=fopen(ln,"w");
02813 }
02814 #endif
02815
02816
02817 MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
02818
02819 skt_set_idle(obtain_idleFn);
02820 if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
02821 set_signals();
02822 #if CMK_USE_TCP
02823 dataskt=skt_server(&dataport);
02824 #elif !CMK_USE_GM && !CMK_USE_MX
02825 dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
02826 #else
02827
02828 dataskt=-1;
02829 #endif
02830 MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
02831 Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
02832 MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
02833 skt_tcp_no_nagle(Cmi_charmrun_fd);
02834 CmiStdoutInit();
02835 } else {
02836 printf("Charm++: standalone mode (not using charmrun)\n");
02837 dataskt=-1;
02838 Cmi_charmrun_fd=-1;
02839 }
02840
02841 CmiMachineInit(argv);
02842
02843 node_addresses_obtain(argv);
02844 MACHSTATE(5,"node_addresses_obtain done");
02845
02846 CmiCommunicationInit(argv);
02847
02848 #if CMK_USE_SYSVSHM
02849 CmiInitSysvshm(argv);
02850 #elif CMK_USE_PXSHM
02851 CmiInitPxshm(argv);
02852 #endif
02853
02854 skt_set_idle(CmiYield);
02855 Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
02856
02857 if (Cmi_charmrun_fd==-1)
02858 Cmi_check_delay=1.0e30;
02859
02860 CsvInitialize(CmiNodeState, NodeState);
02861 CmiNodeStateInit(&CsvAccess(NodeState));
02862
02863 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
02864 CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
02865 #endif
02866
02867
02868
02869 networkProgressPeriod = 0;
02870 CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
02871
02872 Cmi_argvcopy = CmiCopyArgs(argv);
02873 Cmi_argv = argv;
02874
02875 CmiStartThreads(argv);
02876
02877 #if CMK_USE_AMMASSO
02878 CmiAmmassoOpenQueuePairs();
02879 #endif
02880
02881 ConverseRunPE(everReturn);
02882 }
02883
02884 #if CMK_PERSISTENT_COMM
02885
02886 int persistentSendMsgHandlerIdx;
02887
02888 static void sendPerMsgHandler(char *msg)
02889 {
02890 int msgSize;
02891 void *destAddr, *destSizeAddr;
02892 int ep;
02893
02894 msgSize = CmiMsgHeaderGetLength(msg);
02895 msgSize -= (2*sizeof(void *)+sizeof(int));
02896 ep = *(int*)(msg+msgSize);
02897 destAddr = *(void **)(msg + msgSize + sizeof(int));
02898 destSizeAddr = *(void **)(msg + msgSize + sizeof(int) + sizeof(void*));
02899
02900 CmiSetHandler(msg, ep);
02901 *((int *)destSizeAddr) = msgSize;
02902 memcpy(destAddr, msg, msgSize);
02903 }
02904
02905 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
02906 {
02907 CmiAssert(h!=NULL);
02908 PersistentSendsTable *slot = (PersistentSendsTable *)h;
02909 CmiAssert(slot->used == 1);
02910 CmiAssert(slot->destPE == destPE);
02911 if (size > slot->sizeMax) {
02912 CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
02913 CmiAbort("Abort: Invalid size\n");
02914 }
02915
02916
02917
02918 if (slot->destAddress[0]) {
02919 int oldep = CmiGetHandler(m);
02920 int newsize = size + sizeof(void *)*2 + sizeof(int);
02921 char *newmsg = (char*)CmiAlloc(newsize);
02922 memcpy(newmsg, m, size);
02923 memcpy(newmsg+size, &oldep, sizeof(int));
02924 memcpy(newmsg+size+sizeof(int), &slot->destAddress[0], sizeof(void *));
02925 memcpy(newmsg+size+sizeof(int)+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
02926 CmiFree(m);
02927 CmiMsgHeaderSetLength(newmsg, newsize);
02928 CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
02929 phs = NULL; phsSize = 0;
02930 CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
02931 }
02932 else {
02933 #if 1
02934
02935 if (slot->messageBuf != NULL) {
02936 CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
02937 CmiAbort("");
02938 }
02939 slot->messageBuf = m;
02940 slot->messageSize = size;
02941 #else
02942
02943 PersistentHandle *phs_tmp = phs;
02944 int phsSize_tmp = phsSize;
02945 phs = NULL; phsSize = 0;
02946 CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
02947 CmiSyncSendAndFree(slot->destPE, size, m);
02948 phs = phs_tmp; phsSize = phsSize_tmp;
02949 #endif
02950 }
02951 }
02952
02953 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
02954 {
02955 char *dupmsg = (char *) CmiAlloc(size);
02956 memcpy(dupmsg, msg, size);
02957
02958
02959 if (CmiMyPe()==destPE) {
02960 CQdCreate(CpvAccess(cQdState), 1);
02961 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
02962 }
02963 else
02964 CmiSendPersistentMsg(h, destPE, size, dupmsg);
02965 }
02966
02967
02968 int PumpPersistent()
02969 {
02970 PersistentReceivesTable *slot = persistentReceivesTableHead;
02971 int status = 0;
02972 while (slot) {
02973 unsigned int size = *(slot->recvSizePtr[0]);
02974 if (size > 0)
02975 {
02976 char *msg = slot->messagePtr[0];
02977
02978
02979 #if 0
02980 void *dupmsg;
02981 dupmsg = CmiAlloc(size);
02982
02983 _MEMCHECK(dupmsg);
02984 memcpy(dupmsg, msg, size);
02985 msg = dupmsg;
02986 #else
02987
02988
02989
02990 CmiReference(msg);
02991 #endif
02992
02993 CmiPushPE(CMI_DEST_RANK(msg), msg);
02994 #if CMK_BROADCAST_SPANNING_TREE
02995 if (CMI_BROADCAST_ROOT(msg))
02996 SendSpanningChildren(size, msg);
02997 #endif
02998 *(slot->recvSizePtr[0]) = 0;
02999 status = 1;
03000 }
03001 slot = slot->next;
03002 }
03003 return status;
03004 }
03005
03006 void *PerAlloc(int size)
03007 {
03008 return CmiAlloc(size);
03009 }
03010
03011 void PerFree(char *msg)
03012 {
03013 CmiFree(msg);
03014 }
03015
03016 void persist_machine_init()
03017 {
03018 persistentSendMsgHandlerIdx =
03019 CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
03020 }
03021
03022 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
03023 {
03024 int i;
03025 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
03026 char *buf = PerAlloc(maxBytes+sizeof(int)*2);
03027 _MEMCHECK(buf);
03028 memset(buf, 0, maxBytes+sizeof(int)*2);
03029 slot->messagePtr[i] = buf;
03030 slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
03031 *(slot->recvSizePtr[0]) = 0;
03032 }
03033 slot->sizeMax = maxBytes;
03034 }
03035
03036 #endif
03037
03038
03039 #if CMK_CELL
03040
03041 #include "spert_ppu.h"
03042
03043 void machine_OffloadAPIProgress() {
03044 CmiCommLock();
03045 OffloadAPIProgress();
03046 CmiCommUnlock();
03047 }
03048 #endif
03049
03050
03051