00001 #include "converse.h"
00002
00003 #include "sockRoutines.h"
00004 #include "sockRoutines.C"
00005 #include "ccs-auth.h"
00006 #include "ccs-auth.C"
00007 #include "ccs-server.h"
00008 #include "ccs-server.C"
00009
00010 #include <stdio.h>
00011 #include <string.h>
00012 #include <ctype.h>
00013 #include <errno.h>
00014 #include <setjmp.h>
00015 #include <stdlib.h>
00016 #include <signal.h>
00017 #include <fcntl.h>
00018 #include <time.h>
00019 #include <assert.h>
00020 #include <math.h>
00021 #include <limits.h>
00022 #if CMK_BPROC
00023 #include <sys/bproc.h>
00024 #endif
00025 #if CMK_USE_POLL
00026 #include <poll.h>
00027 #endif
00028 #include <sys/stat.h>
00029
00030 #include <unordered_map>
00031 #include <map>
00032 #include <string>
00033 #include <vector>
00034 #include <queue>
00035 #include <utility>
00036 #include <algorithm>
00037
00038 #if defined(_WIN32)
00039
00040 #define getcwd _getcwd
00041 #define strdup _strdup
00042 #define unlink _unlink
00043 #define open _open
00044 #define fdopen _fdopen
00045 #define ftruncate _chsize
00046 #include <winbase.h>
00047 #include <direct.h>
00048 #include <io.h>
00049 #include <sys/timeb.h>
00050 #include <process.h>
00051 #define DIRSEP "\\"
00052 #define SIGBUS -1
00053 #define SIGKILL -1
00054 #define SIGQUIT -1
00055
00056 #else
00057 #include <pwd.h>
00058 #include <unistd.h>
00059 #define DIRSEP "/"
00060 #endif
00061
00062 #define PRINT(a) (arg_quiet ? 1 : printf a)
00063
00064 #if CMK_SSH_NOT_NEEDED
00065 #define CMK_USE_SSH 0
00066
00067 #else
00068 #define CMK_USE_SSH 1
00069 #if CMK_SSH_IS_A_COMMAND
00070 #define SSH_CMD "ssh"
00071 #endif
00072
00073 #endif
00074
00075 #include "daemon.h"
00076
00077
00078 #define DEBUGF(x)
00079
00080 #ifndef MAXPATHLEN
00081 #define MAXPATHLEN 1024
00082 #endif
00083
00084 static const int MAX_NUM_RETRIES = 3;
00085
00086
00087 #ifdef HSTART
00088
00089 static int mynodes_start;
00090
00091 #endif
00092
00093 static double ftTimer;
00094
00095 static double start_timer;
00096
00097 static double GetClock(void)
00098 {
00099 #if defined(_WIN32)
00100 struct _timeb tv;
00101 _ftime(&tv);
00102 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
00103 #else
00104 struct timeval tv;
00105 int ok = gettimeofday(&tv, NULL);
00106 if (ok < 0) {
00107 perror("gettimeofday");
00108 exit(1);
00109 }
00110 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
00111 #endif
00112 }
00113
00114 static int probefile(const char *path)
00115 {
00116 FILE *f = fopen(path, "r");
00117 if (f == NULL)
00118 return 0;
00119 fclose(f);
00120 return 1;
00121 }
00122
00123 static const char *mylogin(void)
00124 {
00125 #if defined(_WIN32)
00126 static char name[100] = {'d', 'u', 'n', 'n', 'o', 0};
00127 unsigned int len = 100;
00128 GetUserName(name, (LPDWORD) &len);
00129 return name;
00130 #else
00131 struct passwd *self = getpwuid(getuid());
00132 if (self == 0) {
00133 #if CMK_HAS_POPEN
00134 char cmd[16];
00135 char uname[64];
00136 FILE *p;
00137 sprintf(cmd, "id -u -n");
00138 p = popen(cmd, "r");
00139 if (p) {
00140 if (fscanf(p, "%63s", uname) != 1) {
00141 fprintf(stderr, "charmrun> fscanf() failed!\n");
00142 exit(1);
00143 }
00144 pclose(p);
00145 return strdup(uname);
00146 } else
00147 return "unknown";
00148 #else
00149 return "unknown";
00150 #endif
00151 }
00152 return self->pw_name;
00153 #endif
00154 }
00155
00156
00157
00158
00159
00160
00161
00162 typedef struct s_pathfixlist {
00163 char *s1;
00164 char *s2;
00165 struct s_pathfixlist *next;
00166 } * pathfixlist;
00167
00168 static pathfixlist pathfix_append(char *s1, char *s2, pathfixlist l)
00169 {
00170 pathfixlist pf = (pathfixlist) malloc(sizeof(s_pathfixlist));
00171 pf->s1 = s1;
00172 pf->s2 = s2;
00173 pf->next = l;
00174 return pf;
00175 }
00176
00177 static char *pathfix(const char *path, pathfixlist fixes)
00178 {
00179 char buffer[MAXPATHLEN];
00180 char buf2[MAXPATHLEN];
00181 strcpy(buffer, path);
00182 int mod = 1;
00183 while (mod) {
00184 mod = 0;
00185 for (pathfixlist l = fixes; l; l = l->next) {
00186 int len = strlen(l->s1);
00187 char *offs = strstr(buffer, l->s1);
00188 if (offs) {
00189 offs[0] = 0;
00190 sprintf(buf2, "%s%s%s", buffer, l->s2, offs + len);
00191 strcpy(buffer, buf2);
00192 mod = 1;
00193 }
00194 }
00195 }
00196 return strdup(buffer);
00197 }
00198
00199 static char *pathextfix(const char *path, pathfixlist fixes, char *ext)
00200 {
00201 char *newpath = pathfix(path, fixes);
00202 if (ext == NULL)
00203 return newpath;
00204 char *ret = (char *) malloc(strlen(newpath) + strlen(ext) + 2);
00205 strcpy(ret, newpath);
00206 strcat(ret, ext);
00207 free(newpath);
00208 return ret;
00209 }
00210
00211
00212
00213
00214
00215
00216
00217 static int is_quote(char c) { return (c == '\'' || c == '"'); }
00218
00219 static void zap_newline(char *s)
00220 {
00221 char *p = s + strlen(s) - 1;
00222 if (*p == '\n')
00223 *p = '\0';
00224
00225 p--;
00226 if (*p == '\15')
00227 *p = '\0';
00228 }
00229
00230
00231 static char *substr(const char *lo, const char *hi)
00232 {
00233 if (is_quote(*lo))
00234 lo++;
00235 if (is_quote(*(hi - 1)))
00236 hi--;
00237 int len = hi - lo;
00238 char *res = (char *) malloc(1 + len);
00239 memcpy(res, lo, len);
00240 res[len] = 0;
00241 return res;
00242 }
00243
00244 static int subeqs(const char *lo, const char *hi, const char *str)
00245 {
00246 int len = strlen(str);
00247 if (hi - lo != len)
00248 return 0;
00249 if (memcmp(lo, str, len))
00250 return 0;
00251 return 1;
00252 }
00253
00254
00255 static const char *skipblanks(const char *p)
00256 {
00257 while ((*p == ' ') || (*p == '\t'))
00258 p++;
00259 return p;
00260 }
00261
00262
00263 static const char *skipstuff(const char *p)
00264 {
00265 char quote = 0;
00266 if (*p && (*p == '\'' || *p == '"')) {
00267 quote = *p;
00268 p++;
00269 }
00270 if (quote != 0) {
00271 while (*p && *p != quote)
00272 p++;
00273 if (*p != quote) {
00274 fprintf(stderr, "ERROR> Unmatched quote in nodelist file.\n");
00275 exit(1);
00276 }
00277 p++;
00278 } else
00279 while ((*p) && (*p != ' ') && (*p != '\t'))
00280 p++;
00281 return p;
00282 }
00283
00284 static char *cstring_join(const std::vector<const char *> & vec, const char *separator)
00285 {
00286 const size_t separator_length = strlen(separator);
00287 size_t length = 0;
00288 for (const char *p : vec)
00289 length += strlen(p) + separator_length;
00290
00291 char * const str = (char *)malloc(length + 1);
00292
00293 if (0 < vec.size())
00294 strcpy(str, vec[0]);
00295 for (int i = 1; i < vec.size(); ++i)
00296 {
00297 strcat(str, separator);
00298 strcat(str, vec[i]);
00299 }
00300
00301 return str;
00302 }
00303
00304 #if CMK_USE_SSH
00305 static const char *getenv_ssh()
00306 {
00307 char *e = getenv("CONV_RSH");
00308 return e ? e : SSH_CMD;
00309 }
00310 #endif
00311
00312 #if !defined(_WIN32)
00313 static char *getenv_display()
00314 {
00315 static char result[100], ipBuf[200];
00316
00317 char *e = getenv("DISPLAY");
00318 if (e == 0)
00319 return NULL;
00320 char *p = strrchr(e, ':');
00321 if (p == 0)
00322 return NULL;
00323 if ((e[0] == ':') || (strncmp(e, "unix:", 5) == 0)) {
00324 sprintf(result, "%s:%s", skt_print_ip(ipBuf, skt_my_ip()), p + 1);
00325 } else
00326 strcpy(result, e);
00327 return result;
00328 }
00329 static char *getenv_display_no_tamper()
00330 {
00331 static char result[100], ipBuf[200];
00332
00333 char *e = getenv("DISPLAY");
00334 if (e == 0)
00335 return NULL;
00336 char *p = strrchr(e, ':');
00337 if (p == 0)
00338 return NULL;
00339 strcpy(result, e);
00340 return result;
00341 }
00342
00343 #endif
00344
00345 static unsigned int server_port;
00346 static char server_addr[1024];
00347 static SOCKET server_fd;
00348
00349
00350
00351
00352
00353
00354 typedef struct s_ppdef {
00355 union {
00356 int *i;
00357 double *r;
00358 const char **s;
00359 int *f;
00360 } where;
00361 const char *lname;
00362 const char *doc;
00363 char type;
00364 bool initFlag;
00365 struct s_ppdef *next;
00366 } * ppdef;
00367
00368 static ppdef ppdefs;
00369
00370 static int pparam_pos;
00371 static const char **pparam_argv;
00372 static char pparam_optc = '-';
00373 static char pparam_error[100];
00374
00375 struct ppdeffind
00376 {
00377 ppdef def;
00378 int enable;
00379 };
00380
00381 static ppdeffind pparam_find(const char *lname)
00382 {
00383 ppdef def;
00384 for (def = ppdefs; def; def = def->next)
00385 {
00386 if (strcmp(def->lname, lname) == 0)
00387 return {def, 1};
00388
00389 static const char no_prefix[] = "no-";
00390 if (strncmp(no_prefix, lname, sizeof(no_prefix)-1) == 0)
00391 {
00392 if (strcmp(def->lname, lname + (sizeof(no_prefix)-1)) == 0)
00393 return {def, 0};
00394 }
00395 }
00396 return {nullptr, 1};
00397 }
00398
00399 static ppdef pparam_cell(const char *lname)
00400 {
00401 ppdeffind deffind = pparam_find(lname);
00402 if (deffind.def)
00403 return deffind.def;
00404
00405 auto def = (ppdef)malloc(sizeof(s_ppdef));
00406 def->lname = lname;
00407 def->type = 's';
00408 def->doc = "(undocumented)";
00409 def->next = ppdefs;
00410 def->initFlag = true;
00411 ppdefs = def;
00412 return def;
00413 }
00414
00415 static void pparam_int(int *where, int defValue, const char *arg, const char *doc)
00416 {
00417 ppdef def = pparam_cell(arg);
00418 def->type = 'i';
00419 def->where.i = where;
00420 *where = defValue;
00421 def->lname = arg;
00422 def->doc = doc;
00423 }
00424
00425 static void pparam_flag(int *where, int defValue, const char *arg, const char *doc)
00426 {
00427 ppdef def = pparam_cell(arg);
00428 def->type = 'f';
00429 def->where.f = where;
00430 *where = defValue;
00431 def->lname = arg;
00432 def->doc = doc;
00433 }
00434
00435 static void pparam_real(double *where, double defValue, const char *arg,
00436 const char *doc)
00437 {
00438 ppdef def = pparam_cell(arg);
00439 def->type = 'r';
00440 def->where.r = where;
00441 *where = defValue;
00442 def->lname = arg;
00443 def->doc = doc;
00444 }
00445
00446 static void pparam_str(const char **where, const char *defValue, const char *arg,
00447 const char *doc)
00448 {
00449 ppdef def = pparam_cell(arg);
00450 def->type = 's';
00451 def->where.s = where;
00452 *where = defValue;
00453 def->lname = arg;
00454 def->doc = doc;
00455 }
00456
00457 static int pparam_setdef(ppdef def, const char *value)
00458 {
00459 char *p;
00460 if (def->initFlag)
00461 def->initFlag = false;
00462 else {
00463 fprintf(stderr, "Option \'%s\' is used more than once. Please remove duplicate arguments for this option\n", def->lname);
00464 exit(1);
00465 }
00466
00467 switch (def->type) {
00468 case 'i':
00469 *def->where.i = strtol(value, &p, 10);
00470 if (*p)
00471 return -1;
00472 return 0;
00473 case 'r':
00474 *def->where.r = strtod(value, &p);
00475 if (*p)
00476 return -1;
00477 return 0;
00478 case 's': {
00479
00480 *def->where.s = (char *) calloc(strlen(value) + 1, sizeof(char));
00481 char *parsed_value = (char *) *def->where.s;
00482 for (int i = 0, j = 0; i < strlen(value); i++) {
00483 if (i + 1 < strlen(value)) {
00484 if (value[i] == '\\' && value[i + 1] == 'n') {
00485 parsed_value[j++] = '\n';
00486 i++;
00487 continue;
00488 }
00489 }
00490 parsed_value[j++] = value[i];
00491 }
00492 return 0;
00493 }
00494 case 'f':
00495 *def->where.f = strtol(value, &p, 10);
00496 if (*p)
00497 return -1;
00498 return 0;
00499 }
00500 return -1;
00501 }
00502
00503 static int pparam_set(char *lname, char *value)
00504 {
00505 ppdef def = pparam_cell(lname);
00506 return pparam_setdef(def, value);
00507 }
00508
00509 static const char *pparam_getdef(ppdef def)
00510 {
00511 static char result[100];
00512 switch (def->type) {
00513 case 'i':
00514 sprintf(result, "%d", *def->where.i);
00515 return result;
00516 case 'r':
00517 sprintf(result, "%f", *def->where.r);
00518 return result;
00519 case 's':
00520 return *def->where.s ? *def->where.s : "";
00521 case 'f':
00522 sprintf(result, *def->where.f ? "true" : "false");
00523 return result;
00524 }
00525 return NULL;
00526 }
00527
00528 static void pparam_printdocs()
00529 {
00530 int maxname = 0, maxdoc = 0;
00531 for (ppdef def = ppdefs; def; def = def->next) {
00532 int len;
00533 len = strlen(def->lname);
00534 if (len > maxname)
00535 maxname = len;
00536 len = strlen(def->doc);
00537 if (len > maxdoc)
00538 maxdoc = len;
00539 }
00540 fprintf(stderr, "\n");
00541 fprintf(stderr, "Charmrun Command-line Parameters:\n");
00542 fprintf(stderr, " (Boolean parameters may be prefixed with \"no-\" to negate their effect, for example \"++no-scalable-start\".)\n");
00543 for (ppdef def = ppdefs; def; def = def->next) {
00544 fprintf(stderr, " %c%c%-*s ", pparam_optc, pparam_optc, maxname,
00545 def->lname);
00546 fprintf(stderr, " %-*s [%s]\n", maxdoc, def->doc, pparam_getdef(def));
00547 }
00548 fprintf(stderr, "\n");
00549 }
00550
00551 static void pparam_delarg(int i)
00552 {
00553 for (int j = i; pparam_argv[j]; j++)
00554 pparam_argv[j] = pparam_argv[j + 1];
00555 }
00556
00557 static int pparam_countargs(const char **argv)
00558 {
00559 int argc;
00560 for (argc = 0; argv[argc]; argc++)
00561 ;
00562 return argc;
00563 }
00564
00565 static int pparam_parseopt()
00566 {
00567 const char *opt = pparam_argv[pparam_pos];
00568
00569 if ((opt[1] == '+') && (opt[2] == 0)) {
00570 pparam_delarg(pparam_pos);
00571 while (pparam_argv[pparam_pos])
00572 pparam_pos++;
00573 return 0;
00574 }
00575
00576 if (opt[1] == 0) {
00577 sprintf(pparam_error, "Illegal option +\n");
00578 return -1;
00579 }
00580
00581 ppdeffind deffind{};
00582 if (opt[1] == '+')
00583 deffind = pparam_find(opt + 2);
00584 else {
00585 char name[2];
00586 name[0] = opt[1];
00587 if (strlen(opt) <= 2 || !isalpha(opt[2])) {
00588 name[1] = 0;
00589 deffind = pparam_find(name);
00590 }
00591 }
00592 if (deffind.def == nullptr) {
00593 if (opt[1] == '+') {
00594 sprintf(pparam_error, "Option %s not recognized.", opt);
00595 return -1;
00596 } else {
00597
00598 pparam_pos++;
00599 return 0;
00600 }
00601 }
00602 auto def = deffind.def;
00603
00604 if ((def->type == 'f') && (opt[1] != '+') && (opt[2])) {
00605 sprintf(pparam_error, "Option %s should not include a value", opt);
00606 return -1;
00607 }
00608 if (def->type == 'f') {
00609 *def->where.f = deffind.enable;
00610 pparam_delarg(pparam_pos);
00611 return 0;
00612 }
00613
00614 if ((opt[1] == '+') || (opt[2] == 0)) {
00615 pparam_delarg(pparam_pos);
00616 opt = pparam_argv[pparam_pos];
00617 } else
00618 opt += 2;
00619 if ((opt == 0) || (opt[0] == 0)) {
00620 sprintf(pparam_error, "%s must be followed by a value.", opt);
00621 return -1;
00622 }
00623 int ok = pparam_setdef(def, opt);
00624 pparam_delarg(pparam_pos);
00625 if (ok < 0) {
00626 sprintf(pparam_error, "Illegal value for %s", opt);
00627 return -1;
00628 }
00629 return 0;
00630 }
00631
00632 static int pparam_parsecmd(char optchr, const char **argv)
00633 {
00634 pparam_error[0] = 0;
00635 pparam_argv = argv;
00636 pparam_optc = optchr;
00637 pparam_pos = 0;
00638 while (1) {
00639 const char *opt = pparam_argv[pparam_pos];
00640 if (opt == 0)
00641 break;
00642 if (opt[0] != optchr)
00643 pparam_pos++;
00644 else if (pparam_parseopt() < 0)
00645 return -1;
00646 }
00647 return 0;
00648 }
00649
00650 #ifdef HSTART
00651 static char **dupargv(const char **argv)
00652 {
00653 if (argv == NULL)
00654 return NULL;
00655
00656 int argc;
00657
00658
00659 for (argc = 0; argv[argc] != NULL; argc++)
00660 ;
00661 char **copy = (char **) malloc((argc + 2) * sizeof(char *));
00662 if (copy == NULL)
00663 return NULL;
00664
00665
00666 for (argc = 0; argv[argc] != NULL; argc++) {
00667 int len = strlen(argv[argc]);
00668 copy[argc] = (char *)malloc(sizeof(char) * (len + 1));
00669 strcpy(copy[argc], argv[argc]);
00670 }
00671 copy[argc] = NULL;
00672 return copy;
00673 }
00674
00675 #endif
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687 #define MAX_LINE_LENGTH 1000
00688
00689 static const char **arg_argv;
00690 static int arg_argc;
00691
00692 static int arg_requested_pes;
00693 static int arg_requested_nodes;
00694 static int arg_requested_numhosts;
00695
00696 static int arg_timeout;
00697 static int arg_timelimit;
00698 static int arg_verbose;
00699 static const char *arg_nodelist;
00700 static const char *arg_nodegroup;
00701 static const char *arg_runscript;
00702 static const char *arg_charmrunip;
00703
00704 static int arg_debug;
00705 static int arg_debug_no_pause;
00706 static int arg_debug_no_xrdb;
00707 static int arg_charmdebug;
00708 static const char *
00709 arg_debug_commands;
00710
00711
00712 static int arg_quiet;
00713 static int arg_local;
00714 static int arg_batch_spawn;
00715 static int arg_scalable_start;
00716
00717 #ifdef HSTART
00718 static int arg_hierarchical_start;
00719 static int arg_child_charmrun;
00720 #endif
00721 static int arg_help;
00722 static int arg_ppn;
00723 static int arg_usehostname;
00724
00725 #if CMK_SHRINK_EXPAND
00726 static char **saved_argv;
00727 static int saved_argc;
00728 static int arg_realloc_pes;
00729 static int arg_old_pes;
00730 static int arg_shrinkexpand;
00731 static int arg_charmrun_port;
00732 static const char *arg_shrinkexpand_basedir;
00733 #endif
00734
00735 #if CMK_USE_SSH
00736 static int arg_maxssh;
00737 static const char *arg_shell;
00738 static int arg_in_xterm;
00739 static const char *arg_debugger;
00740 static const char *arg_xterm;
00741 static const char *arg_display;
00742 static int arg_ssh_display;
00743 static const char *arg_mylogin;
00744 #endif
00745 static int arg_mpiexec;
00746 static int arg_mpiexec_no_n;
00747 static int arg_no_va_rand;
00748
00749 static const char *arg_nodeprog_a;
00750 static const char *arg_nodeprog_r;
00751 static char *arg_currdir_a;
00752 static char *arg_currdir_r;
00753
00754 static int arg_server;
00755 static int arg_server_port = 0;
00756 static const char *arg_server_auth = NULL;
00757 static int replay_single = 0;
00758
00759 #if CMK_BPROC
00760 static int arg_startpe;
00761 static int arg_endpe;
00762 static int arg_singlemaster;
00763 static int arg_skipmaster;
00764 #endif
00765
00766 struct TopologyRequest
00767 {
00768 int host, socket, core, pu;
00769
00770 enum class Unit
00771 {
00772 Host,
00773 Socket,
00774 Core,
00775 PU,
00776 None,
00777 };
00778
00779 int active() const
00780 {
00781 return (host > 0) + (socket > 0) + (core > 0) + (pu > 0);
00782 }
00783
00784 Unit unit() const
00785 {
00786 if (host > 0)
00787 return Unit::Host;
00788 else if (socket > 0)
00789 return Unit::Socket;
00790 else if (core > 0)
00791 return Unit::Core;
00792 else if (pu > 0)
00793 return Unit::PU;
00794 else
00795 return Unit::None;
00796 }
00797 };
00798
00799 TopologyRequest proc_per;
00800 TopologyRequest onewth_per;
00801 int auto_provision;
00802
00803 static void arg_init(int argc, const char **argv)
00804 {
00805 static char buf[1024];
00806
00807 int local_def = 0;
00808 #if CMK_CHARMRUN_LOCAL
00809 local_def = 1;
00810 #endif
00811
00812 pparam_int(&arg_requested_pes, 0, "p", "Number of PEs to create");
00813 pparam_int(&arg_requested_numhosts, 0, "numHosts", "Number of hosts to use from nodelist file");
00814
00815 pparam_int(&arg_requested_nodes, 0, "n", "Number of processes to create");
00816 pparam_int(&arg_requested_nodes, 0, "np", "Number of processes to create");
00817
00818 pparam_int(&arg_timeout, 60, "timeout",
00819 "Seconds to wait per host connection");
00820 pparam_int(&arg_timelimit, -1, "timelimit",
00821 "Seconds to wait for program to complete");
00822 pparam_flag(&arg_verbose, 0, "verbose", "Print diagnostic messages");
00823 pparam_flag(&arg_quiet, 0, "quiet", "Omit non-error runtime messages");
00824 pparam_str(&arg_nodelist, 0, "nodelist", "File containing list of physical nodes");
00825 pparam_str(&arg_nodegroup, "main", "nodegroup",
00826 "Which group of physical nodes to use");
00827
00828 #if CMK_CCS_AVAILABLE
00829 pparam_flag(&arg_server, 0, "server", "Enable client-server (CCS) mode");
00830 pparam_int(&arg_server_port, 0, "server-port",
00831 "Port to listen for CCS requests");
00832 pparam_str(&arg_server_auth, 0, "server-auth", "CCS Authentication file");
00833 #endif
00834 pparam_flag(&arg_local, local_def, "local",
00835 "Start node programs locally without daemon");
00836 pparam_int(&arg_batch_spawn, 0, "batch", "Launch connections to this many "
00837 "node programs at a time, avoiding "
00838 "overloading charmrun PE");
00839 #ifndef _WIN32
00840 pparam_flag(&arg_scalable_start, 1, "scalable-start", "Enable scalable start");
00841 #endif
00842 #ifdef HSTART
00843 pparam_flag(&arg_hierarchical_start, 0, "hierarchical-start",
00844 "hierarchical start");
00845 pparam_flag(&arg_child_charmrun, 0, "child-charmrun", "child charmrun");
00846 #endif
00847 #if CMK_SHRINK_EXPAND
00848 pparam_int(&arg_realloc_pes, 1, "newp", "New number of processes to create");
00849 pparam_int(&arg_old_pes, 1, "oldp", "Old number of processes to create");
00850 pparam_flag(&arg_shrinkexpand, 0, "shrinkexpand", "Enable shrink/expand support");
00851 pparam_int(&arg_charmrun_port, 0, "charmrun_port", "Make charmrun listen on this port");
00852 #endif
00853 pparam_flag(&arg_usehostname, 0, "usehostname",
00854 "Send nodes our symbolic hostname instead of IP address");
00855 pparam_str(&arg_charmrunip, 0, "useip",
00856 "Use IP address provided for charmrun IP");
00857 pparam_flag(&arg_mpiexec, 0, "mpiexec", "Use mpiexec to start jobs");
00858 pparam_flag(&arg_mpiexec_no_n, 0, "mpiexec-no-n", "Use mpiexec to start jobs without -n procs");
00859 #if CMK_USE_SSH
00860 pparam_flag(&arg_debug, 0, "debug",
00861 "Run each node under gdb in an xterm window");
00862 pparam_flag(&arg_debug_no_pause, 0, "debug-no-pause",
00863 "Like debug, except doesn't pause at beginning");
00864 pparam_str(&arg_debug_commands, 0, "debug-commands",
00865 "Commands to be run inside gdb at startup");
00866 pparam_flag(&arg_debug_no_xrdb, 0, "no-xrdb", "Don't check xrdb");
00867
00868
00869
00870
00871
00872
00873 #if !defined(_WIN32)
00874 pparam_flag(&arg_charmdebug, 0, "charmdebug",
00875 "Used only when charmrun is started by charmdebug");
00876 #endif
00877
00878 pparam_int(&arg_maxssh, 16, "maxssh",
00879 "Maximum number of ssh's to run at a time");
00880 pparam_str(&arg_shell, 0, "remote-shell",
00881 "Which remote shell to use (default $CONV_RSH or " SSH_CMD ")");
00882 pparam_str(&arg_debugger, 0, "debugger", "Which debugger to use");
00883 pparam_str(&arg_display, 0, "display", "X Display for xterm");
00884 pparam_flag(&arg_ssh_display, 0, "ssh-display",
00885 "Use own X Display for each ssh session");
00886 pparam_flag(&arg_in_xterm, 0, "in-xterm", "Run each node in an xterm window");
00887 pparam_str(&arg_xterm, 0, "xterm", "Which xterm to use");
00888 #endif
00889 #ifdef CMK_BPROC
00890
00891 pparam_int(&arg_startpe, 0, "startpe", "First PE to start job(SCYLD)");
00892 pparam_int(&arg_endpe, 1000000, "endpe", "Last PE to start job(SCYLD)");
00893 pparam_flag(&arg_singlemaster, 0, "singlemaster",
00894 "Only assign one process to master node(SCYLD)");
00895 pparam_flag(&arg_skipmaster, 0, "skipmaster",
00896 "Do not assign any process to master node(SCYLD)");
00897 if (arg_skipmaster && arg_singlemaster) {
00898 PRINT(("Charmrun> 'singlemaster' is ignored due to 'skipmaster'. \n"));
00899 arg_singlemaster = 0;
00900 }
00901 pparam_flag(&arg_debug, 0, "debug", "Turn on more verbose debug prints");
00902 #endif
00903 pparam_str(&arg_runscript, 0, "runscript", "Script to run node-program with");
00904 pparam_flag(&arg_help, 0, "help", "Print help messages");
00905 pparam_int(&arg_ppn, 0, "ppn", "Number of PEs per Charm++ node (=OS process)");
00906 pparam_flag(&arg_no_va_rand, 0, "no-va-randomization",
00907 "Disables randomization of the virtual address space");
00908
00909
00910 pparam_int(&proc_per.host, 0,
00911 "processPerHost", "assign N processes per host");
00912 pparam_int(&proc_per.socket, 0,
00913 "processPerSocket", "assign N processes per socket");
00914 pparam_int(&proc_per.core, 0,
00915 "processPerCore", "assign N processes per core");
00916 pparam_int(&proc_per.pu, 0,
00917 "processPerPU", "assign N processes per PU");
00918
00919
00920 pparam_flag(&onewth_per.host, 0,
00921 "oneWthPerHost", "assign one worker thread per host");
00922 pparam_flag(&onewth_per.socket, 0,
00923 "oneWthPerSocket", "assign one worker thread per socket");
00924 pparam_flag(&onewth_per.core, 0,
00925 "oneWthPerCore", "assign one worker thread per core");
00926 pparam_flag(&onewth_per.pu, 0,
00927 "oneWthPerPU", "assign one worker thread per PU");
00928
00929 pparam_flag(&auto_provision, 0, "auto-provision", "fully utilize available resources");
00930 pparam_flag(&auto_provision, 0, "autoProvision", "fully utilize available resources");
00931
00932 #ifdef HSTART
00933 arg_argv = (const char **)dupargv(argv);
00934 #endif
00935
00936 #if CMK_SHRINK_EXPAND
00937
00938 saved_argc = argc;
00939 saved_argv = (char **) malloc(sizeof(char *) * (saved_argc));
00940 for (int i = 0; i < saved_argc; i++) {
00941
00942 saved_argv[i] = (char *) argv[i];
00943 }
00944 #endif
00945
00946 if (pparam_parsecmd('+', argv) < 0) {
00947 fprintf(stderr, "ERROR> syntax: %s\n", pparam_error);
00948 pparam_printdocs();
00949 exit(1);
00950 }
00951
00952 if (arg_help) {
00953 pparam_printdocs();
00954
00955 }
00956
00957 if ( arg_mpiexec_no_n ) arg_mpiexec = arg_mpiexec_no_n;
00958
00959 #if CMK_SHRINK_EXPAND
00960 if (arg_shrinkexpand) {
00961 arg_requested_pes = arg_realloc_pes;
00962 printf("\n \nCharmrun> %d Reallocated pes\n \n", arg_requested_pes);
00963 }
00964 #endif
00965
00966 #ifdef HSTART
00967 if (!arg_hierarchical_start || arg_child_charmrun)
00968 #endif
00969 arg_argv =
00970 (argv) + 1;
00971 arg_argc = pparam_countargs(arg_argv);
00972 if (arg_argc < 1) {
00973 fprintf(stderr, "ERROR> You must specify a node-program.\n");
00974 pparam_printdocs();
00975 exit(1);
00976 }
00977
00978 #ifdef HSTART
00979 if (!arg_hierarchical_start || arg_child_charmrun) {
00980
00981 arg_argv++;
00982 arg_argc--;
00983 } else {
00984
00985 arg_argv++;
00986 arg_argc--;
00987
00988 arg_argv[arg_argc++] = "++child-charmrun";
00989 arg_argv[arg_argc] = NULL;
00990 }
00991 #else
00992 arg_argv++;
00993 arg_argc--;
00994 #endif
00995
00996 if (arg_server_port || arg_server_auth)
00997 arg_server = 1;
00998
00999 if (arg_verbose) arg_quiet = 0;
01000
01001 if (arg_debug || arg_debug_no_pause || arg_in_xterm) {
01002 fprintf(stderr, "Charmrun> scalable start disabled under ++debug and ++in-xterm:\n"
01003 "NOTE: will make an SSH connection per process launched,"
01004 " instead of per physical node.\n");
01005 arg_scalable_start = 0;
01006 arg_quiet = 0;
01007 arg_verbose = 1;
01008 if (arg_debug || arg_debug_no_pause)
01009 {
01010
01011 arg_argv[arg_argc++] = "++debug";
01012 }
01013 }
01014
01015 if (arg_quiet) arg_argv[arg_argc++] = "++quiet";
01016
01017
01018
01019 for (int i = 0; argv[i]; i++) {
01020 if (0 == strcmp(argv[i], "+replay-detail")) {
01021 replay_single = 1;
01022 arg_requested_pes = 1;
01023 }
01024 }
01025
01026 #ifdef CMK_BPROC
01027 if (arg_local) {
01028 fprintf(stderr,
01029 "Warning> ++local cannot be used in bproc version, ignored!\n");
01030 arg_local = 0;
01031 }
01032 #endif
01033
01034 #if CMK_USE_SSH
01035
01036 if (!arg_shell) {
01037 if (arg_mpiexec)
01038 arg_shell = "mpiexec";
01039 else
01040 arg_shell = getenv_ssh();
01041 }
01042
01043 #if !defined(_WIN32)
01044
01045 if (!arg_display)
01046 arg_display = getenv_display_no_tamper();
01047 #endif
01048
01049 if ((arg_debug || arg_debug_no_pause || arg_in_xterm) && (arg_display == 0)) {
01050 fprintf(stderr, "ERROR> DISPLAY must be set to use debugging mode\n");
01051 exit(1);
01052 }
01053 if (arg_debug || arg_debug_no_pause)
01054 arg_timeout = 8 * 60 * 60;
01055
01056
01057 if (!arg_debugger)
01058 #ifdef __APPLE__
01059 arg_debugger = "lldb";
01060 #else
01061 arg_debugger = "gdb";
01062 #endif
01063
01064 if (!arg_xterm)
01065 arg_xterm = "xterm";
01066
01067 arg_mylogin = mylogin();
01068 #endif
01069
01070
01071 if (getcwd(buf, 1023) == NULL) {
01072 fprintf(stderr, "charmrun> getcwd() failed!\n");
01073 exit(1);
01074 }
01075 arg_currdir_a = strdup(buf);
01076
01077
01078 arg_nodeprog_r = argv[1];
01079
01080 if (arg_nodeprog_r[0] == '-' || arg_nodeprog_r[0] == '+') {
01081
01082
01083
01084 fprintf(stderr, "Charmrun does not recognize the flag '%s'.\n", arg_nodeprog_r);
01085 if (arg_nodeprog_r[0] == '+')
01086 fprintf(stderr, "Charm++'s flags need to be placed *after* the program name.\n");
01087 pparam_printdocs();
01088 exit(1);
01089 }
01090
01091 #if defined(_WIN32)
01092 if (argv[1][1] == ':' ||
01093 argv[1][0] == '\\' && argv[1][1] == '\\') {
01094 #else
01095 if (argv[1][0] == '/') {
01096 #endif
01097
01098 arg_nodeprog_a = argv[1];
01099 } else {
01100 sprintf(buf, "%s%s%s", arg_currdir_a, DIRSEP, arg_nodeprog_r);
01101 arg_nodeprog_a = strdup(buf);
01102 }
01103 if (arg_scalable_start) {
01104 PRINT(("Charmrun> scalable start enabled. \n"));
01105 }
01106
01107 #ifdef HSTART
01108 if (arg_hierarchical_start) {
01109 PRINT(("Charmrun> Hierarchical scalable start enabled. \n"));
01110 if (arg_debug || arg_debug_no_pause) {
01111 fprintf(stderr, "Charmrun> Error: ++hierarchical-start does not support "
01112 "debugging mode. \n");
01113 exit(1);
01114 }
01115 if (arg_verbose) {
01116 fprintf(stderr, "Charmrun> Warning: you have enabled verbose output with "
01117 "Hierarchical startup, you may get inconsistent verbose "
01118 "outputs. \n++hierarchial-start does not support verbose "
01119 "mode. \n");
01120 }
01121
01122 } else if (arg_child_charmrun) {
01123 fprintf(
01124 stderr,
01125 "Charmrun> Error: ++child-charmrun is not a user-specified flag. \n");
01126 exit(1);
01127 }
01128 #endif
01129
01130 const int proc_active = proc_per.active();
01131 const int onewth_active = onewth_per.active();
01132 if (proc_active || onewth_active || auto_provision)
01133 {
01134 if (arg_requested_pes != 0)
01135 {
01136 fprintf(stderr, "Charmrun> Error: +p cannot be used with ++(process|oneWth)Per* or ++auto-provision.\n");
01137 exit(1);
01138 }
01139
01140 if (proc_active && arg_requested_nodes > 0)
01141 {
01142 fprintf(stderr, "Charmrun> Error: +n/++np cannot be used with ++processPer* or ++auto-provision.\n");
01143 exit(1);
01144 }
01145
01146 if (proc_active && arg_mpiexec)
01147 {
01148 fprintf(stderr, "Charmrun> Error: ++mpiexec and ++processPer* cannot be used together.\n");
01149 exit(1);
01150 }
01151
01152 if (proc_active + (auto_provision > 0) > 1)
01153 {
01154 fprintf(stderr, "Charmrun> Error: Only one of ++processPer(Host|Socket|Core|PU) or ++auto-provision is allowed.\n");
01155 exit(1);
01156 }
01157
01158 #if CMK_SMP
01159 if (onewth_active + (arg_ppn > 0) + (auto_provision > 0) > 1)
01160 {
01161 fprintf(stderr, "Charmrun> Error: Only one of ++oneWthPer(Host|Socket|Core|PU), ++ppn, or ++auto-provision is allowed.\n");
01162 exit(1);
01163 }
01164
01165 using Unit = typename TopologyRequest::Unit;
01166
01167 const Unit proc_unit = proc_per.unit();
01168 const Unit onewth_unit = onewth_per.unit();
01169
01170 if ((onewth_unit == Unit::Host && (proc_unit == Unit::Socket || proc_unit == Unit::Core || proc_unit == Unit::PU)) ||
01171 (onewth_unit == Unit::Socket && (proc_unit == Unit::Core || proc_unit == Unit::PU)) ||
01172 (onewth_unit == Unit::Core && proc_unit == Unit::PU))
01173 {
01174 fprintf(stderr, "Charmrun> Error: Cannot request processes on a smaller unit than that requested for worker threads.\n");
01175 exit(1);
01176 }
01177
01178 if ((onewth_unit == Unit::Host && proc_unit == Unit::Host && proc_per.host > 1) ||
01179 (onewth_unit == Unit::Socket && proc_unit == Unit::Socket && proc_per.socket > 1) ||
01180 (onewth_unit == Unit::Core && proc_unit == Unit::Core && proc_per.core > 1) ||
01181 (onewth_unit == Unit::PU && proc_unit == Unit::PU && proc_per.pu > 1))
01182 {
01183 fprintf(stderr, "Charmrun> Error: Cannot request more processes than worker threads per unit.\n");
01184 exit(1);
01185 }
01186 #endif
01187 }
01188 else
01189 {
01190 #if CMK_SMP
01191 if (arg_requested_pes > 0 && arg_requested_nodes > 0 && arg_ppn > 0 && arg_ppn * arg_requested_nodes != arg_requested_pes)
01192 {
01193 fprintf(stderr, "Charmrun> Error: ++np times ++ppn does not equal +p.\n");
01194 exit(1);
01195 }
01196
01197 if (arg_requested_pes > 0 && arg_ppn > 0 && arg_requested_pes % arg_ppn != 0)
01198 {
01199 if (arg_ppn > arg_requested_pes)
01200 {
01201 arg_ppn = arg_requested_pes;
01202 fprintf(stderr, "Charmrun> Warning: forced ++ppn = +p = %d\n", arg_ppn);
01203 }
01204 else
01205 {
01206 fprintf(stderr, "Charmrun> Error: ++ppn (number of PEs per node) does not divide +p (number of PEs).\n");
01207 exit(1);
01208 }
01209 }
01210 #else
01211 if (arg_requested_pes > 0 && arg_requested_nodes > 0 && arg_requested_pes != arg_requested_nodes)
01212 {
01213 fprintf(stderr, "Charmrun> Error: +p and ++np do not agree.\n");
01214 exit(1);
01215 }
01216 #endif
01217 }
01218
01219 #if !CMK_SMP
01220 if (arg_ppn > 1 || onewth_active)
01221 {
01222 fprintf(stderr, "Charmrun> Error: ++oneWthPer(Host|Socket|Core|PU) and ++ppn are only available in SMP mode.\n");
01223 exit(1);
01224 }
01225 #endif
01226
01227 if (auto_provision)
01228 {
01229 #if CMK_SMP
01230 proc_per.socket = 1;
01231 onewth_per.pu = 1;
01232 #else
01233 proc_per.core = 1;
01234 #endif
01235 }
01236 else if (arg_requested_pes <= 0 && arg_requested_nodes <= 0 && arg_ppn <= 0 && !proc_active && !onewth_active)
01237 {
01238 PRINT(("Charmrun> No provisioning arguments specified. Running with a single PE.\n"
01239 " Use ++auto-provision to fully subscribe resources or +p1 to silence this message.\n"));
01240 }
01241 }
01242
01243
01244
01245
01246
01247
01248
01249 static int portOk = 1;
01250 static const char *nodetab_tempName = NULL;
01251 static char *nodetab_file_find()
01252 {
01253 char buffer[MAXPATHLEN];
01254
01255
01256 if (arg_nodelist) {
01257 const char *path = arg_nodelist;
01258 if (probefile(path))
01259 return strdup(path);
01260 fprintf(stderr, "ERROR> No such nodelist file %s\n", path);
01261 exit(1);
01262 }
01263
01264 if (getenv("NODELIST")) {
01265 char *path = getenv("NODELIST");
01266 if (path && probefile(path))
01267 return strdup(path);
01268
01269 fprintf(stderr, "ERROR> Cannot find nodelist file %s\n", path);
01270 exit(1);
01271 }
01272
01273 if (probefile("./nodelist"))
01274 return strdup("./nodelist");
01275 #if defined(_WIN32)
01276 tmpnam(buffer);
01277 nodetab_tempName = strdup(buffer);
01278 #else
01279 if (getenv("HOME")) {
01280 sprintf(buffer, "%s/.nodelist", getenv("HOME"));
01281 }
01282 #endif
01283 if (!probefile(buffer)) {
01284
01285 FILE *f = fopen(buffer, "w");
01286 if (f == NULL) {
01287 fprintf(stderr, "ERROR> Cannot create a 'nodelist' file.\n");
01288 exit(1);
01289 }
01290 fprintf(f, "group main\nhost localhost\n");
01291 fclose(f);
01292 }
01293 return strdup(buffer);
01294 }
01295
01296 struct nodetab_host
01297 {
01298 static skt_ip_t resolve(const char *name);
01299
01300 double speed = 1.0;
01301
01302 const char *name = "SET_H->NAME";
01303 #if CMK_USE_SSH
01304 const char *shell = arg_shell;
01305 const char *debugger = arg_debugger;
01306 const char *xterm = arg_xterm;
01307 const char *login = arg_mylogin;
01308 const char *passwd = "*";
01309 const char *setup = "*";
01310 #endif
01311 char *ext = nullptr;
01312 pathfixlist pathfixes = nullptr;
01313
01314 skt_ip_t ip = _skt_invalid_ip;
01315 int cpus = 1;
01316 int nice = -100;
01317
01318
01319 int processes = 0;
01320
01321 int hostno = 0;
01322
01323 #ifdef __FAULT__
01324 bool crashed = false;
01325 #endif
01326 };
01327
01328 skt_ip_t nodetab_host::resolve(const char *name)
01329 {
01330 skt_ip_t ip = skt_innode_lookup_ip(name);
01331 if (skt_ip_match(ip, _skt_invalid_ip)) {
01332 #ifdef CMK_BPROC
01333
01334 if (!(1 <= arg_requested_pes && atoi(name) == -1))
01335 #endif
01336 {
01337 fprintf(stderr, "ERROR> Cannot obtain IP address of %s\n", name);
01338 exit(1);
01339 }
01340 }
01341
01342 return ip;
01343 }
01344
01345 struct nodetab_process
01346 {
01347 #if CMK_USE_IBVERBS
01348 ChInfiAddr *qpList = nullptr;
01349 ChInfiAddr *qpData = nullptr;
01350 #endif
01351 #if CMK_USE_IBUD
01352 ChInfiAddr qp;
01353 #endif
01354
01355 nodetab_host * host;
01356 int rank = 0;
01357
01358 int ssh_pid = 0;
01359 SOCKET req_client = -1;
01360
01361 int dataport = -1;
01362 #ifdef HSTART
01363 SOCKET charmrun_fds = -1;
01364 #endif
01365
01366 ChNodeinfo info;
01367
01368 int num_pus;
01369 int num_cores;
01370 int num_sockets;
01371
01372 int forkstart = 0;
01373
01374 int PEs = 0;
01375
01376 int nodeno = 0;
01377
01378 friend bool operator< (const nodetab_process &, const nodetab_process &);
01379 };
01380
01381 bool operator< (const nodetab_process & a, const nodetab_process & b)
01382 {
01383 const int a_hostno = a.host->hostno, b_hostno = b.host->hostno;
01384 return a_hostno < b_hostno || (a_hostno == b_hostno && a.nodeno < b.nodeno);
01385 }
01386
01387 static std::vector<nodetab_host *> host_table;
01388 #ifdef HSTART
01389 static std::vector<nodetab_host *> my_host_table;
01390 #else
01391 # define my_host_table host_table
01392 #endif
01393 static std::vector<nodetab_process> my_process_table;
01394 static std::vector<nodetab_process *> pe_to_process_map;
01395
01396 static const char *nodetab_args(const char *args, nodetab_host *h)
01397 {
01398 while (*args != 0)
01399 {
01400 const char *b1 = skipblanks(args), *e1 = skipstuff(b1);
01401 const char *b2 = skipblanks(e1), *e2 = skipstuff(b2);
01402
01403 while (*b1 == '+')
01404 b1++;
01405
01406 if (subeqs(b1, e1, "speed"))
01407 h->speed = atof(b2);
01408 else if (subeqs(b1, e1, "cpus"))
01409 h->cpus = atol(b2);
01410 else if (subeqs(b1, e1, "pathfix"))
01411 {
01412 const char *b3 = skipblanks(e2), *e3 = skipstuff(b3);
01413 args = skipblanks(e3);
01414 h->pathfixes =
01415 pathfix_append(substr(b2, e2), substr(b3, e3), h->pathfixes);
01416 e2 = e3;
01417 }
01418 else if (subeqs(b1, e1, "ext"))
01419 h->ext = substr(b2, e2);
01420 else if (subeqs(b1, e1, "nice"))
01421 h->nice = atoi(b2);
01422 #if CMK_USE_SSH
01423 else if (subeqs(b1, e1, "login"))
01424 h->login = substr(b2, e2);
01425 else if (subeqs(b1, e1, "passwd"))
01426 h->passwd = substr(b2, e2);
01427 else if (subeqs(b1, e1, "setup"))
01428 h->setup = strdup(b2);
01429 else if (subeqs(b1, e1, "shell"))
01430 h->shell = substr(b2, e2);
01431 else if (subeqs(b1, e1, "debugger"))
01432 h->debugger = substr(b2, e2);
01433 else if (subeqs(b1, e1, "xterm"))
01434 h->xterm = substr(b2, e2);
01435 #endif
01436 else
01437 return args;
01438
01439 args = skipblanks(e2);
01440 }
01441
01442 return args;
01443 }
01444
01445
01446 static void nodetab_init_for_local()
01447 {
01448
01449 static const char hostname[] = "127.0.0.1";
01450 nodetab_host * h = new nodetab_host{};
01451 h->name = hostname;
01452 h->ip = nodetab_host::resolve(hostname);
01453 host_table.push_back(h);
01454 }
01455
01456 #ifdef HSTART
01457
01458
01459
01460 static int branchfactor;
01461 static int nodes_per_child;
01462 static void nodetab_init_hierarchical_start(void)
01463 {
01464 TODO;
01465 branchfactor = ceil(sqrt(nodetab_rank0_size));
01466 nodes_per_child = round(nodetab_rank0_size * 1.0 / branchfactor);
01467 }
01468 #endif
01469
01470 static void nodetab_init_with_nodelist()
01471 {
01472
01473
01474 char *nodesfile = nodetab_file_find();
01475 if (arg_verbose)
01476 printf("Charmrun> using %s as nodesfile\n", nodesfile);
01477
01478 FILE *f;
01479 if (!(f = fopen(nodesfile, "r"))) {
01480 fprintf(stderr, "ERROR> Cannot read %s: %s\n", nodesfile, strerror(errno));
01481 exit(1);
01482 }
01483 free(nodesfile);
01484
01485 nodetab_host global, group;
01486 int rightgroup = (strcmp(arg_nodegroup, "main") == 0);
01487
01488
01489
01490 char *prevHostName = NULL;
01491 char input_line[MAX_LINE_LENGTH];
01492 std::unordered_map<std::string, nodetab_host *> temp_hosts;
01493 int lineNo = 1;
01494 int hostno = 0;
01495
01496 while (fgets(input_line, sizeof(input_line) - 1, f) != 0) {
01497 if (input_line[0] == '#')
01498 continue;
01499 zap_newline(input_line);
01500 if (!nodetab_args(input_line, &global)) {
01501
01502 nodetab_args(input_line, &group);
01503 } else {
01504 const char *b1 = skipblanks(input_line), *e1 = skipstuff(b1);
01505 const char *b2 = skipblanks(e1), *e2 = skipstuff(b2);
01506 const char *b3 = skipblanks(e2);
01507 if (subeqs(b1, e1, "host")) {
01508 if (rightgroup) {
01509
01510
01511 if (prevHostName && strcmp(b2, prevHostName) &&
01512 (!strcmp(b2, "localhost") ||
01513 !strcmp(prevHostName, "localhost"))) {
01514 fprintf(stderr, "ERROR> Mixing localhost with other hostnames will "
01515 "lead to connection failures.\n");
01516 fprintf(stderr, "ERROR> The problematic line in group %s is: %s\n",
01517 arg_nodegroup, input_line);
01518 exit(1);
01519 }
01520
01521 const std::string hostname = substr(b2, e2);
01522 auto host_iter = temp_hosts.find(hostname);
01523 if (host_iter != temp_hosts.end())
01524 {
01525 nodetab_host *host = (*host_iter).second;
01526 nodetab_args(b3, host);
01527 }
01528 else
01529 {
01530 nodetab_host *host = new nodetab_host{group};
01531 host->name = strdup(hostname.c_str());
01532 host->ip = nodetab_host::resolve(hostname.c_str());
01533 host->hostno = hostno++;
01534 temp_hosts.insert({hostname, host});
01535 nodetab_args(b3, host);
01536 }
01537
01538 free(prevHostName);
01539 prevHostName = strdup(b2);
01540 }
01541 } else if (subeqs(b1, e1, "group")) {
01542 group = global;
01543 nodetab_args(b3, &group);
01544 rightgroup = subeqs(b2, e2, arg_nodegroup);
01545 } else if (b1 != b3) {
01546 fprintf(stderr, "ERROR> unrecognized command in nodesfile:\n");
01547 fprintf(stderr, "ERROR> %s\n", input_line);
01548 exit(1);
01549 }
01550 }
01551 lineNo++;
01552 }
01553 fclose(f);
01554 free(prevHostName);
01555 if (nodetab_tempName != NULL)
01556 unlink(nodetab_tempName);
01557
01558 const size_t temp_hosts_size = temp_hosts.size();
01559 if (temp_hosts_size == 0) {
01560 fprintf(stderr, "ERROR> No hosts in group %s\n", arg_nodegroup);
01561 exit(1);
01562 }
01563
01564 host_table.resize(temp_hosts_size);
01565 for (const auto & h_pair : temp_hosts)
01566 {
01567 nodetab_host * h = h_pair.second;
01568 host_table[h->hostno] = h;
01569 }
01570 }
01571
01572 static void nodetab_init()
01573 {
01574
01575 if (arg_local || arg_mpiexec)
01576 nodetab_init_for_local();
01577 else
01578 nodetab_init_with_nodelist();
01579 }
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591 static void nodeinfo_add(const ChSingleNodeinfo *in, nodetab_process & p)
01592 {
01593 const int node = ChMessageInt(in->nodeNo);
01594 if (node != p.nodeno)
01595 fprintf(stderr, "Charmrun> Warning: Process #%d received ChSingleNodeInfo #%d\n", p.nodeno, node);
01596
01597 p.info = in->info;
01598 p.num_pus = ChMessageInt(in->num_pus);
01599 p.num_cores = ChMessageInt(in->num_cores);
01600 p.num_sockets = ChMessageInt(in->num_sockets);
01601 }
01602
01603 static void nodeinfo_populate(nodetab_process & p)
01604 {
01605 ChNodeinfo & i = p.info;
01606 const int node = p.nodeno;
01607
01608 i.nodeno = ChMessageInt_new(node);
01609 i.nPE = ChMessageInt_new(p.PEs);
01610 i.nProcessesInPhysNode = ChMessageInt_new(p.host->processes);
01611
01612 if (arg_mpiexec)
01613 p.host->ip = i.IP;
01614 else
01615 i.IP = p.host->ip;
01616
01617 #if !CMK_USE_IBVERBS
01618 unsigned int dataport = ChMessageInt(i.dataport);
01619 if (0 == dataport) {
01620 fprintf(stderr, "Node %d could not initialize network!\n", node);
01621 exit(1);
01622 }
01623 p.dataport = dataport;
01624 if (arg_verbose) {
01625 char ips[200];
01626 skt_print_ip(ips, i.IP);
01627 printf("Charmrun> client %d connected (IP=%s data_port=%d)\n", node, ips,
01628 dataport);
01629 }
01630 #endif
01631 }
01632
01633
01634
01635
01636
01637
01638
01639
01640
01641
01642
01643
01644
01645
01646
01647 static char *input_buffer;
01648
01649 static void input_extend()
01650 {
01651 char line[1024];
01652 int len = input_buffer ? strlen(input_buffer) : 0;
01653 fflush(stdout);
01654 if (fgets(line, 1023, stdin) == 0) {
01655 fprintf(stderr, "end-of-file on stdin");
01656 exit(1);
01657 }
01658 char *new_input_buffer = (char *) realloc(input_buffer, len + strlen(line) + 1);
01659 if (new_input_buffer == NULL) {
01660
01661 free(input_buffer);
01662 fprintf(stderr, "Charmrun: Realloc failed");
01663 exit(1);
01664 } else {
01665 input_buffer = new_input_buffer;
01666 }
01667
01668 strcpy(input_buffer + len, line);
01669 }
01670
01671 static void input_init() { input_buffer = strdup(""); }
01672
01673 static char *input_extract(int nchars)
01674 {
01675 char *res = substr(input_buffer, input_buffer + nchars);
01676 char *tmp =
01677 substr(input_buffer + nchars, input_buffer + strlen(input_buffer));
01678 free(input_buffer);
01679 input_buffer = tmp;
01680 return res;
01681 }
01682
01683 static char *input_gets()
01684 {
01685 char *p;
01686 while (1) {
01687 p = strchr(input_buffer, '\n');
01688 if (p)
01689 break;
01690 input_extend();
01691 }
01692 int len = p - input_buffer;
01693 char *res = input_extract(len + 1);
01694 res[len] = 0;
01695 return res;
01696 }
01697
01698
01699 static char *input_scanf_chars(char *fmt)
01700 {
01701 char buf[8192];
01702 static int fd;
01703 static FILE *file;
01704 fflush(stdout);
01705 if (file == 0) {
01706 #if CMK_USE_MKSTEMP
01707 char tmp[128];
01708 strcpy(tmp, "/tmp/fnordXXXXXX");
01709 if (mkstemp(tmp) == -1) {
01710 fprintf(stderr, "charmrun> mkstemp() failed!\n");
01711 exit(1);
01712 }
01713 #else
01714 char *tmp = tmpnam(NULL);
01715 #endif
01716 unlink(tmp);
01717 fd = open(tmp, O_RDWR | O_CREAT | O_TRUNC, 0664);
01718 if (fd < 0) {
01719 fprintf(stderr, "cannot open temp file /tmp/fnord");
01720 exit(1);
01721 }
01722 file = fdopen(fd, "r+");
01723 unlink(tmp);
01724 }
01725 int pos;
01726 while (1) {
01727 int len = strlen(input_buffer);
01728 rewind(file);
01729 fwrite(input_buffer, len, 1, file);
01730 fflush(file);
01731 rewind(file);
01732 if (ftruncate(fd, len)) {
01733 fprintf(stderr, "charmrun> ftruncate() failed!\n");
01734 exit(1);
01735 }
01736 if (fscanf(file, fmt, buf, buf, buf, buf, buf, buf, buf, buf, buf, buf, buf,
01737 buf, buf, buf, buf, buf, buf, buf) <= 0) {
01738 fprintf(stderr, "charmrun> fscanf() failed!\n");
01739 exit(1);
01740 }
01741 pos = ftell(file);
01742 if (pos < len)
01743 break;
01744 input_extend();
01745 }
01746 return input_extract(pos);
01747 }
01748
01749
01750
01751
01752
01753
01754
01755 #if CMK_CCS_AVAILABLE
01756
01757
01758
01759
01760
01761 static void req_ccs_connect(void)
01762 {
01763 struct {
01764 ChMessageHeader ch;
01765 CcsImplHeader hdr;
01766 } h;
01767 void *reqData;
01768 if (0 == CcsServer_recvRequest(&h.hdr, &reqData))
01769 return;
01770 int pe = ChMessageInt(h.hdr.pe);
01771 int reqBytes = ChMessageInt(h.hdr.len);
01772
01773 if (pe == -1) {
01774
01775 pe = 0;
01776 }
01777 const int pe_count = pe_to_process_map.size();
01778 if ((pe <= -pe_count || pe >= pe_count) && 0 == replay_single) {
01779
01780
01781
01782 #if !CMK_BIGSIM_CHARM
01783 if (pe == -pe_count)
01784 fprintf(stderr, "Invalid processor index in CCS request: are you trying "
01785 "to do a broadcast instead?");
01786 else
01787 fprintf(stderr, "Invalid processor index in CCS request.");
01788 CcsServer_sendReply(&h.hdr, 0, 0);
01789 free(reqData);
01790 return;
01791 #endif
01792 } else if (pe < -1) {
01793
01794
01795
01796 reqBytes -= pe * sizeof(ChMessageInt_t);
01797 pe = ChMessageInt(*(ChMessageInt_t *) reqData);
01798 }
01799
01800 if (!check_stdio_header(&h.hdr)) {
01801
01802 #define LOOPBACK 0
01803 #if LOOPBACK
01804
01805 CcsServer_sendReply(&h.hdr, 0, 0);
01806 #else
01807 int destpe = pe;
01808 #if CMK_BIGSIM_CHARM
01809 destpe = destpe % pe_count;
01810 #endif
01811 if (replay_single)
01812 destpe = 0;
01813
01814 ChMessageHeader_new("req_fw", sizeof(h.hdr) + reqBytes, &h.ch);
01815
01816 const void *bufs[3];
01817 int lens[3];
01818 bufs[0] = &h;
01819 lens[0] = sizeof(h);
01820 bufs[1] = reqData;
01821 lens[1] = reqBytes;
01822 const SOCKET ctrlfd = pe_to_process_map[pe]->req_client;
01823 skt_sendV(ctrlfd, 2, bufs, lens);
01824
01825 #endif
01826 }
01827 free(reqData);
01828 }
01829
01830
01831
01832
01833
01834 static int req_ccs_reply_fw(ChMessage *msg, SOCKET srcFd)
01835 {
01836 int len = msg->len;
01837
01838
01839 CcsImplHeader hdr;
01840 skt_recvN(srcFd, &hdr, sizeof(hdr));
01841 len -= sizeof(hdr);
01842
01843 #define m (4 * 1024)
01844 if (len < m || hdr.attr.auth) {
01845
01846 void *data = malloc(len);
01847 skt_recvN(srcFd, data, len);
01848 CcsServer_sendReply(&hdr, len, data);
01849 free(data);
01850 } else {
01851
01852 ChMessageInt_t outLen;
01853 int destFd;
01854 skt_abortFn old = skt_set_abort(reply_abortFn);
01855 int destErrs = 0;
01856
01857 destFd = ChMessageInt(hdr.replyFd);
01858 outLen = ChMessageInt_new(len);
01859 skt_sendN(destFd, &outLen, sizeof(outLen));
01860 while (len > 0) {
01861 char buf[m];
01862 int r = m;
01863 if (r > len)
01864 r = len;
01865 skt_recvN(srcFd, buf, r);
01866 if (0 == destErrs)
01867
01868 destErrs |= skt_sendN(destFd, buf, r);
01869 len -= m;
01870 #undef m
01871 }
01872 skt_close(destFd);
01873
01874 skt_set_abort(old);
01875 }
01876 return 0;
01877 }
01878
01879 #else
01880 static int req_ccs_reply_fw(ChMessage *msg, SOCKET srcFd) {}
01881 #endif
01882
01883
01884
01885
01886
01887
01888
01889
01890
01891
01894
01895
01896 static int req_ending = 0;
01897
01898
01899 static int gdb_info_pid = 0;
01900 static int gdb_info_std[3];
01901 static FILE *gdb_stream = NULL;
01902
01903 #define REQ_OK 0
01904 #define REQ_FAILED -1
01905
01906 #ifdef HSTART
01907 static int req_reply(SOCKET fd, const char *type, const char *data, int dataLen);
01908 static int req_reply_child(SOCKET fd, const char *type, const char *data, int dataLen)
01909 {
01910
01911 int status = req_reply(fd, type, data, dataLen);
01912 if (status != REQ_OK)
01913 return status;
01914 SOCKET clientFd;
01915 skt_recvN(fd, (char *) &clientFd, sizeof(SOCKET));
01916 skt_sendN(fd, (const char *) &clientFd, sizeof(fd));
01917 return status;
01918 }
01919 #endif
01920
01923 static int req_reply(SOCKET fd, const char *type, const char *data, int dataLen)
01924 {
01925 ChMessageHeader msg;
01926 if (fd == INVALID_SOCKET)
01927 return REQ_FAILED;
01928 ChMessageHeader_new(type, dataLen, &msg);
01929 skt_sendN(fd, (const char *) &msg, sizeof(msg));
01930 skt_sendN(fd, data, dataLen);
01931 return REQ_OK;
01932 }
01933
01934 static void kill_all_compute_nodes(const char *msg, size_t msgSize)
01935 {
01936 ChMessageHeader hdr;
01937 ChMessageHeader_new("die", msgSize, &hdr);
01938 for (const nodetab_process & p : my_process_table)
01939 {
01940 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
01941 skt_sendN(p.req_client, msg, msgSize);
01942 }
01943 }
01944
01945 static void kill_all_compute_nodes(const char *msg)
01946 {
01947 return kill_all_compute_nodes(msg, strlen(msg)+1);
01948 }
01949
01950 template <size_t msgSize>
01951 static inline void kill_all_compute_nodes(const char msg[msgSize])
01952 {
01953 return kill_all_compute_nodes(msg, msgSize);
01954 }
01955
01956
01957
01958
01959
01960
01961
01962 static int req_handle_initnode(ChMessage *msg, nodetab_process & p)
01963 {
01964 if (msg->len != sizeof(ChSingleNodeinfo)) {
01965 fprintf(stderr, "Charmrun: Bad initnode data length. Aborting\n");
01966 fprintf(stderr, "Charmrun: possibly because: %s.\n", msg->data);
01967 exit(1);
01968 }
01969
01970 nodeinfo_add((ChSingleNodeinfo *) msg->data, p);
01971 return REQ_OK;
01972 }
01973
01974 #if CMK_USE_IBVERBS || CMK_USE_IBUD
01975 static int req_handle_qplist(ChMessage *msg, nodetab_process & p)
01976 {
01977 #if CMK_USE_IBVERBS
01978 const int my_process_count = my_process_table.size();
01979 int qpListSize = (my_process_count-1) * sizeof(ChInfiAddr);
01980
01981 if (msg->len != qpListSize)
01982 {
01983 fprintf(stderr, "Charmrun: Bad qplist data length. Aborting.\n");
01984 exit(1);
01985 }
01986
01987 p.qpList = (ChInfiAddr *) malloc(qpListSize);
01988 memcpy(p.qpList, msg->data, qpListSize);
01989 #elif CMK_USE_IBUD
01990 if (msg->len != sizeof(ChInfiAddr))
01991 {
01992 fprintf(stderr, "Charmrun: Bad qplist data length. Aborting.\n");
01993 exit(1);
01994 }
01995
01996 p.qp = *(ChInfiAddr *)msg->data;
01997 printf("Charmrun> client %d lid=%d qpn=%i psn=%i\n", node,
01998 ChMessageInt(p.qp.lid), ChMessageInt(p.qp.qpn),
01999 ChMessageInt(p.qp.psn));
02000 #endif
02001 return REQ_OK;
02002 }
02003 #endif
02004
02010 static void req_send_initnodetab_internal(const nodetab_process & destination, int count, int msgSize)
02011 {
02012 ChMessageHeader hdr;
02013 ChMessageInt_t nNodes = ChMessageInt_new(count);
02014 ChMessageInt_t nodeno = ChMessageInt_new(destination.nodeno);
02015 ChMessageHeader_new("initnodetab", msgSize, &hdr);
02016 const SOCKET fd = destination.req_client;
02017 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
02018 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
02019 skt_sendN(fd, (const char *) &nodeno, sizeof(nodeno));
02020 for (const nodetab_process & p : my_process_table)
02021 skt_sendN(fd, (const char *) &p.info, sizeof(ChNodeinfo));
02022 }
02023
02024 static void req_send_initnodetab(const nodetab_process & destination)
02025 {
02026 const int my_process_count = my_process_table.size();
02027 int msgSize = sizeof(ChMessageInt_t) * ChInitNodetabFields +
02028 sizeof(ChNodeinfo) * my_process_count;
02029 req_send_initnodetab_internal(destination, my_process_count, msgSize);
02030 }
02031
02032 #ifdef HSTART
02033
02034 static int req_send_initnodetab1(SOCKET fd)
02035 {
02036 const int my_process_count = my_process_table.size();
02037 ChMessageHeader hdr;
02038 ChMessageInt_t nNodes = ChMessageInt_new(my_process_count);
02039 ChMessageHeader_new("initnttab", sizeof(ChMessageInt_t) +
02040 sizeof(ChNodeinfo) * my_process_count,
02041 &hdr);
02042 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
02043 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
02044 for (const nodetab_process & p : my_process_table)
02045 skt_sendN(fd, (const char *) p.info, sizeof(ChNodeinfo));
02046
02047 return REQ_OK;
02048 }
02049
02050
02051
02052 static int parent_charmrun_fd = -1;
02053 static int req_handle_initnodedistribution(ChMessage *msg, const nodetab_process & p)
02054 {
02055 const int nodetab_rank0_size = nodetab_rank0_table.size();
02056 int nodes_to_fork =
02057 nodes_per_child;
02058 int rank0_start = nodetab_rank0_table[client * nodes_per_child];
02059 int rank0_finish;
02060 if (client == branchfactor - 1) {
02061 nodes_to_fork = nodetab_rank0_table.size() - client * nodes_per_child;
02062 rank0_finish = nodetab_rank0_size;
02063 } else
02064 rank0_finish =
02065 nodetab_rank0_table[client * nodes_per_child + nodes_to_fork];
02066
02067 ChMessageInt_t *nodemsg = (ChMessageInt_t *) malloc(
02068 (rank0_finish - rank0_start) * sizeof(ChMessageInt_t));
02069 for (int k = 0; k < rank0_finish - rank0_start; k++)
02070 nodemsg[k] = ChMessageInt_new(nodetab_rank0_table[rank0_start + k]);
02071 ChMessageHeader hdr;
02072 ChMessageInt_t nNodes = ChMessageInt_new(rank0_finish - rank0_start);
02073 ChMessageInt_t nTotalNodes = ChMessageInt_new(nodetab_rank0_size);
02074 ChMessageHeader_new("initnodetab",
02075 sizeof(ChMessageInt_t) * 2 +
02076 sizeof(ChMessageInt_t) * (rank0_finish - rank0_start),
02077 &hdr);
02078 const SOCKET fd = p.charmrun_fds;
02079 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
02080 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
02081 skt_sendN(fd, (const char *) &nTotalNodes, sizeof(nTotalNodes));
02082 skt_sendN(fd, (const char *) nodemsg,
02083 (rank0_finish - rank0_start) * sizeof(ChMessageInt_t));
02084 free(nodemsg);
02085 return REQ_OK;
02086 }
02087
02088 static std::vector<ChSingleNodeinfo> myNodesInfo;
02089 static int send_myNodeInfo_to_parent()
02090 {
02091 const int nodetab_rank0_size = nodetab_rank0_table.size();
02092 ChMessageHeader hdr;
02093 ChMessageInt_t nNodes = ChMessageInt_new(nodetab_rank0_size);
02094 ChMessageHeader_new("initnodetab",
02095 sizeof(ChMessageInt_t) +
02096 sizeof(ChSingleNodeinfo) * nodetab_rank0_size,
02097 &hdr);
02098 skt_sendN(parent_charmrun_fd, (const char *) &hdr, sizeof(hdr));
02099 skt_sendN(parent_charmrun_fd, (const char *) &nNodes, sizeof(nNodes));
02100 skt_sendN(parent_charmrun_fd, (const char *) myNodesInfo.data(),
02101 sizeof(ChSingleNodeinfo) * myNodesInfo.size());
02102
02103 return REQ_OK;
02104 }
02105 static void forward_nodetab_to_children()
02106 {
02107
02108
02109 if (!skt_select1(parent_charmrun_fd, 1200 * 1000)) {
02110 exit(0);
02111 }
02112 ChMessage msg;
02113 ChMessage_recv(parent_charmrun_fd, &msg);
02114
02115 ChMessageInt_t *nodelistmsg = (ChMessageInt_t *) msg.data;
02116 int nodetab_Nodes = ChMessageInt(nodelistmsg[0]);
02117 for (const nodetab_process & p : my_process_table)
02118 {
02119 SOCKET fd = p.req_client;
02120 ChMessageHeader hdr;
02121 ChMessageInt_t nNodes = ChMessageInt_new(nodetab_Nodes);
02122 ChMessageHeader_new("initnodetab", sizeof(ChMessageInt_t) +
02123 sizeof(ChNodeinfo) * nodetab_Nodes,
02124 &hdr);
02125 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
02126 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
02127 skt_sendN(fd, (const char *) (nodelistmsg + 1),
02128 sizeof(ChNodeinfo) * nodetab_Nodes);
02129 }
02130 }
02131
02132
02133 static void receive_nodeset_from_child(ChMessage *msg, SOCKET fd)
02134 {
02135 ChMessageInt_t *n32 = (ChMessageInt_t *) msg->data;
02136 int numOfNodes = ChMessageInt(n32[0]);
02137 ChSingleNodeinfo *childNodeInfo = (ChSingleNodeinfo *) (n32 + 1);
02138 for (int k = 0; k < numOfNodes; k++)
02139 nodeinfo_add(childNodeInfo + k, my_process_table[childNodeInfo[k].nodeNo]);
02140 }
02141
02142 static void set_sockets_list(ChMessage *msg, SOCKET fd)
02143 {
02144 ChMessageInt_t *n32 = (ChMessageInt_t *) msg->data;
02145 int node_start = ChMessageInt(n32[0]);
02146 TODO;
02147 charmrun_fds[node_start / nodes_per_child] = fd;
02148 }
02149 #endif
02150
02151 static void checkPrintfError(int err)
02152 {
02153 if (err < 0) {
02154 static int warned = 0;
02155 if (!warned) {
02156 perror("charmrun WARNING> error in printf");
02157 warned = 1;
02158 }
02159 }
02160 }
02161
02162 static int req_handle_print(ChMessage *msg, SOCKET fd)
02163 {
02164 checkPrintfError(printf("%s", msg->data));
02165 checkPrintfError(fflush(stdout));
02166 write_stdio_duplicate(msg->data);
02167 return REQ_OK;
02168 }
02169
02170 static int req_handle_printerr(ChMessage *msg, SOCKET fd)
02171 {
02172 fprintf(stderr, "%s", msg->data);
02173 fflush(stderr);
02174 write_stdio_duplicate(msg->data);
02175 return REQ_OK;
02176 }
02177
02178 static int req_handle_printsyn(ChMessage *msg, SOCKET fd)
02179 {
02180 checkPrintfError(printf("%s", msg->data));
02181 checkPrintfError(fflush(stdout));
02182 write_stdio_duplicate(msg->data);
02183 #ifdef HSTART
02184 if (arg_hierarchical_start)
02185 req_reply_child(fd, "printdone", "", 1);
02186 else
02187 #endif
02188 req_reply(fd, "printdone", "", 1);
02189 return REQ_OK;
02190 }
02191
02192 static int req_handle_printerrsyn(ChMessage *msg, SOCKET fd)
02193 {
02194 fprintf(stderr, "%s", msg->data);
02195 fflush(stderr);
02196 write_stdio_duplicate(msg->data);
02197 #ifdef HSTART
02198 if (arg_hierarchical_start)
02199 req_reply_child(fd, "printdone", "", 1);
02200 else
02201 #endif
02202 req_reply(fd, "printdone", "", 1);
02203 return REQ_OK;
02204 }
02205
02206 static int _exitcode = 0;
02207
02208 static int req_handle_ending(ChMessage *msg, SOCKET fd)
02209 {
02210 req_ending++;
02211
02212 if (msg->data) {
02213 int exitcode = atoi(msg->data);
02214 if (exitcode)
02215 _exitcode = exitcode;
02216 }
02217
02218 #if CMK_SHRINK_EXPAND
02219
02220 #elif (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
02221 if (req_ending == my_process_table.size())
02222 #else
02223 if (req_ending == arg_requested_pes)
02224 #endif
02225 {
02226 #if CMK_SHRINK_EXPAND
02227 ChMessage ackmsg;
02228 ChMessage_new("realloc_ack", 0, &ackmsg);
02229 for (const nodetab_process & p : my_process_table)
02230 ChMessage_send(p.req_client, &ackmsg);
02231 #endif
02232
02233 for (const nodetab_process & p : my_process_table)
02234 skt_close(p.req_client);
02235 if (arg_verbose)
02236 printf("Charmrun> Graceful exit with exit code %d.\n", _exitcode);
02237 exit(_exitcode);
02238 }
02239 return REQ_OK;
02240 }
02241
02242 static int req_handle_barrier(ChMessage *msg, SOCKET fd)
02243 {
02244 static int barrier_count = 0;
02245 static int barrier_phase = 0;
02246 barrier_count++;
02247 #ifdef HSTART
02248 if (barrier_count == arg_requested_pes)
02249 #else
02250 if (barrier_count == my_process_table.size())
02251 #endif
02252 {
02253 barrier_count = 0;
02254 barrier_phase++;
02255 for (const nodetab_process & p : my_process_table)
02256 if (REQ_OK != req_reply(p.req_client, "barrier", "", 1)) {
02257 fprintf(stderr, "req_handle_barrier socket error: %d\n", p.nodeno);
02258 abort();
02259 }
02260 }
02261 return REQ_OK;
02262 }
02263
02264 static int req_handle_barrier0(ChMessage *msg, SOCKET fd)
02265 {
02266 static int count = 0;
02267 static SOCKET fd0;
02268 int pe = atoi(msg->data);
02269 if (pe == 0)
02270 fd0 = fd;
02271 count++;
02272 #ifdef HSTART
02273 if (count == arg_requested_pes)
02274 #else
02275 if (count == my_host_table.size())
02276 #endif
02277 {
02278 req_reply(fd0, "barrier0", "", 1);
02279 count = 0;
02280 }
02281 return REQ_OK;
02282 }
02283
02284 static void req_handle_abort(ChMessage *msg, SOCKET fd)
02285 {
02286
02287 if (msg->len == 0)
02288 fprintf(stderr, "Aborting!\n");
02289 else
02290 fprintf(stderr, "%s\n", msg->data);
02291 exit(1);
02292 }
02293
02294 static int req_handle_scanf(ChMessage *msg, SOCKET fd)
02295 {
02296 char *fmt = msg->data;
02297 fmt[msg->len - 1] = 0;
02298 char *res = input_scanf_chars(fmt);
02299 char *p = res;
02300 while (*p) {
02301 if (*p == '\n')
02302 *p = ' ';
02303 p++;
02304 }
02305 #ifdef HSTART
02306 if (arg_hierarchical_start)
02307 req_reply_child(fd, "scanf-data", res, strlen(res) + 1);
02308 else
02309 #endif
02310 req_reply(fd, "scanf-data", res, strlen(res) + 1);
02311 free(res);
02312 return REQ_OK;
02313 }
02314
02315 #if CMK_SHRINK_EXPAND
02316 static int req_handle_realloc(ChMessage *msg, SOCKET fd)
02317 {
02318 printf("Charmrun> Realloc request received\n");
02319
02320
02321
02322 int restart_idx = -1, newp_idx = -1, oldp_idx = -1, shrink_expand_idx= -1, charmrun_idx = -1;
02323 int additional_args = 10;
02324 for (int i = 0; i < saved_argc; ++i) {
02325 if (strcmp(saved_argv[i], "+restart") == 0) {
02326 restart_idx = i;
02327 additional_args -= 2;
02328 }
02329 if(strcmp(saved_argv[i], "++newp") == 0)
02330 {
02331 newp_idx = i;
02332 additional_args -= 2;
02333 }
02334 if(strcmp(saved_argv[i], "++oldp") == 0)
02335 {
02336 oldp_idx = i;
02337 additional_args -= 2;
02338 }
02339 if(strcmp(saved_argv[i], "++shrinkexpand") == 0)
02340 {
02341 shrink_expand_idx = i;
02342 additional_args -= 1;
02343 }
02344 if(strcmp(saved_argv[i], "++charmrun_port") == 0)
02345 {
02346 charmrun_idx = i;
02347 additional_args -= 2;
02348 }
02349 }
02350
02351 #if defined __APPLE__
02352 const char *dir = "/tmp";
02353 #else
02354 const char *dir = "/dev/shm";
02355 #endif
02356 for (int i = 0; i < saved_argc; ++i) {
02357 if (strcmp(saved_argv[i], "+shrinkexpand_basedir") == 0) {
02358 dir = saved_argv[i+1];
02359 break;
02360 }
02361 }
02362
02363 const char **ret = (const char **) malloc(sizeof(char *) * (saved_argc + additional_args));
02364
02365 int newP = ChMessageInt(*(ChMessageInt_t *)msg->data);
02366 int oldP = arg_requested_pes;
02367 printf("Charmrun> newp = %d oldP = %d \n \n \n", newP, oldP);
02368
02369 for (int i = 0; i < saved_argc; i++) {
02370 ret[i] = saved_argv[i];
02371 }
02372
02373 int index = 0;
02374
02375 char sp_buffer[50];
02376 sprintf(sp_buffer, "%d", newP);
02377
02378 char sp_buffer1[50];
02379 sprintf(sp_buffer1, "%d", oldP);
02380
02381 char sp_buffer2[6];
02382 sprintf(sp_buffer2, "%d", server_port);
02383
02384
02385
02386 if(newp_idx == -1)
02387 {
02388 ret[saved_argc + index++] = "++newp";
02389 ret[saved_argc + index++] = sp_buffer;
02390 }
02391 else
02392 ret[newp_idx + 1] = sp_buffer;
02393
02394 if(oldp_idx == -1)
02395 {
02396 ret[saved_argc + index++] = "++oldp";
02397 ret[saved_argc + index++] = sp_buffer1;
02398 }
02399 else
02400 ret[oldp_idx + 1] = sp_buffer1;
02401
02402 if(shrink_expand_idx == -1)
02403 {
02404 ret[saved_argc + index++] = "++shrinkexpand";
02405 }
02406
02407 if(charmrun_idx == -1)
02408 {
02409 ret[saved_argc + index++] = "++charmrun_port";
02410 ret[saved_argc + index++] = sp_buffer2;
02411 }
02412 else
02413 ret[charmrun_idx + 1] = sp_buffer2;
02414
02415 if (restart_idx == -1) {
02416 ret[saved_argc + index++] = "+restart";
02417 ret[saved_argc + index++] = dir;
02418 ret[saved_argc + index++] = NULL;
02419 } else {
02420 ret[restart_idx + 1] = dir;
02421 ret[saved_argc + index++] = NULL;
02422 }
02423
02424 ChMessage ackmsg;
02425 ChMessage_new("realloc_ack", 0, &ackmsg);
02426 for (const nodetab_process & p : my_process_table)
02427 ChMessage_send(p.req_client, &ackmsg);
02428
02429 skt_close(server_fd);
02430 skt_close(CcsServer_fd());
02431 execv(ret[0], (char **)ret);
02432 printf("Should not be here\n");
02433 exit(1);
02434
02435 return REQ_OK;
02436 }
02437 #endif
02438
02439 #ifdef __FAULT__
02440 static void restart_node(nodetab_process &);
02441 static void reconnect_crashed_client(nodetab_process &);
02442 static void announce_crash(const nodetab_process &);
02443
02444 static const nodetab_process * _last_crash;
02445 #ifdef HSTART
02446 static nodetab_process * _crash_charmrun_process;
02447 static int crashed_pe_id;
02448 static int restarted_pe_id;
02449 #endif
02450 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02451 static int numCrashes = 0;
02452 static SOCKET last_crashed_fd = -1;
02453 #endif
02454
02460 static int req_handle_crashack(ChMessage *msg, SOCKET fd)
02461 {
02462 static int count = 0;
02463 count++;
02464 #ifdef HSTART
02465 if (arg_hierarchical_start) {
02466 if (count == nodetab_rank0_table.size() - 1) {
02467
02468
02469 PRINT(("Charmrun> continue node: %d\n", _last_crash->nodeno));
02470 req_send_initnodetab1(_crash_charmrun_process->req_client);
02471 _last_crash = nullptr;
02472 count = 0;
02473 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02474 last_crashed_fd = -1;
02475 #endif
02476 }
02477 }
02478
02479 else
02480
02481 #endif
02482 if (count == my_process_table.size() - 1) {
02483
02484
02485 PRINT(("Charmrun> continue node: %d\n", _last_crash->nodeno));
02486 req_send_initnodetab(*_last_crash);
02487 _last_crash = nullptr;
02488 count = 0;
02489 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02490 last_crashed_fd = -1;
02491 #endif
02492 }
02493 return 0;
02494 }
02495
02496 #ifdef HSTART
02497
02498 static int set_crashed_socket_id(ChMessage *msg, SOCKET fd)
02499 {
02500 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
02501 int nt = ChMessageInt(nodeInfo->nodeNo) - mynodes_start;
02502 nodeInfo->nodeNo = ChMessageInt_new(nt);
02503
02504
02505 my_process_table[nt].req_client = fd;
02506 TODO;
02507 }
02508
02509
02510
02511 static int req_handle_crash(ChMessage *msg, nodetab_process & p)
02512 {
02513 const SOCKET fd = p.req_client;
02514
02515 ChMessageInt_t oldpe, newpe;
02516 skt_recvN(fd, (const char *) &oldpe, sizeof(oldpe));
02517 skt_recvN(fd, (const char *) &newpe, sizeof(newpe));
02518 *nodetab_table[ChMessageInt(oldpe)] = *nodetab_table[ChMessageInt(newpe)];
02519
02520 int status = req_handle_initnode(msg, p);
02521 _crash_charmrun_process = &p;
02522
02523 fprintf(stderr, "Root charmrun : Socket %d failed: %s\n", fd,
02524 _crash_charmrun_process->host->name);
02525 fflush(stderr);
02526 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
02527 int crashed_node = ChMessageInt(nodeInfo->nodeNo);
02528 _last_crash = crashed_node;
02529 switch (status) {
02530 case REQ_OK:
02531 break;
02532 case REQ_FAILED:
02533 return REQ_FAILED;
02534 }
02535
02536
02537 for (const nodetab_process & p2 : my_process_table)
02538 req_send_initnodetab(p2);
02539
02540
02541 announce_crash(p);
02542 }
02543
02544 #endif
02545 #endif
02546
02547 #ifdef __FAULT__
02548 static void error_in_req_serve_client(nodetab_process & p)
02549 {
02550 const SOCKET fd = p.req_client;
02551 fprintf(stderr, "Socket %d failed \n", fd);
02552
02553 fflush(stdout);
02554 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
02555 skt_close(fd);
02556 #endif
02557
02560
02561 restart_node(p);
02562
02563 fprintf(stderr, "charmrun says process %d failed (on host %s)\n", p.nodeno, p.host->name);
02568 reconnect_crashed_client(p);
02569 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02570 skt_close(fd);
02571 #endif
02572 }
02573 #endif
02574
02575 static int req_handler_dispatch(ChMessage *msg, nodetab_process & p)
02576 {
02577 const int replyFd = p.req_client;
02578 char *cmd = msg->header.type;
02579 DEBUGF(("Got request '%s'\n", cmd, replyFd));
02580 #if CMK_CCS_AVAILABLE
02581 if (strcmp(cmd, "reply_fw") == 0)
02582 return req_ccs_reply_fw(msg, replyFd);
02583 #endif
02584
02585
02586 int recv_status = ChMessageData_recv(replyFd, msg);
02587 #ifdef __FAULT__
02588 #ifdef HSTART
02589 if (!arg_hierarchical_start)
02590 #endif
02591 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02592 if (recv_status < 0) {
02593 if (replyFd == last_crashed_fd) {
02594 return REQ_OK;
02595 }
02596 DEBUGF(("recv_status %d on socket %d \n", recv_status, replyFd));
02597 error_in_req_serve_client(p);
02598 }
02599 #else
02600 if (recv_status < 0) {
02601 error_in_req_serve_client(p);
02602 return REQ_OK;
02603 }
02604 #endif
02605 #endif
02606
02607 if (strcmp(cmd, "ping") == 0)
02608 return REQ_OK;
02609 else if (strcmp(cmd, "print") == 0)
02610 return req_handle_print(msg, replyFd);
02611 else if (strcmp(cmd, "printerr") == 0)
02612 return req_handle_printerr(msg, replyFd);
02613 else if (strcmp(cmd, "printsyn") == 0)
02614 return req_handle_printsyn(msg, replyFd);
02615 else if (strcmp(cmd, "printerrsyn") == 0)
02616 return req_handle_printerrsyn(msg, replyFd);
02617 else if (strcmp(cmd, "scanf") == 0)
02618 return req_handle_scanf(msg, replyFd);
02619 else if (strcmp(cmd, "barrier") == 0)
02620 return req_handle_barrier(msg, replyFd);
02621 else if (strcmp(cmd, "barrier0") == 0)
02622 return req_handle_barrier0(msg, replyFd);
02623 else if (strcmp(cmd, "ending") == 0)
02624 return req_handle_ending(msg, replyFd);
02625 else if (strcmp(cmd, "abort") == 0) {
02626 req_handle_abort(msg, replyFd);
02627 return REQ_FAILED;
02628 }
02629 #ifdef __FAULT__
02630 else if (strcmp(cmd, "crash_ack") == 0)
02631 return req_handle_crashack(msg, replyFd);
02632 #ifdef HSTART
02633 else if (strcmp(cmd, "initnode") == 0)
02634 return req_handle_crash(msg, p);
02635 #endif
02636 #endif
02637 #if CMK_SHRINK_EXPAND
02638 else if (strcmp(cmd, "realloc") == 0)
02639 return req_handle_realloc(msg, replyFd);
02640 #endif
02641 else {
02642 #ifndef __FAULT__
02643 fprintf(stderr, "Charmrun> Bad control socket request '%s'\n", cmd);
02644 abort();
02645 return REQ_OK;
02646 #endif
02647 }
02648 return REQ_OK;
02649 }
02650
02651 static void req_serve_client(nodetab_process & p)
02652 {
02653 DEBUGF(("Getting message from client...\n"));
02654
02655 ChMessage msg;
02656 int recv_status = ChMessageHeader_recv(p.req_client, &msg);
02657 #ifdef __FAULT__
02658 #ifdef HSTART
02659 if (!arg_hierarchical_start && recv_status < 0)
02660 error_in_req_serve_client(p);
02661 #else
02662 if (recv_status < 0) {
02663 error_in_req_serve_client(p);
02664 return;
02665 }
02666 #endif
02667 #endif
02668
02669 DEBUGF(("Message is '%s'\n", msg.header.type));
02670 int status = req_handler_dispatch(&msg, p);
02671 switch (status) {
02672 case REQ_OK:
02673 break;
02674 case REQ_FAILED:
02675 fprintf(stderr, "Charmrun> Error processing control socket request %s\n",
02676 msg.header.type);
02677 abort();
02678 break;
02679 }
02680 ChMessage_free(&msg);
02681 }
02682
02683 #ifdef HSTART
02684 static void req_forward_root(nodetab_process & p)
02685 {
02686 const SOCKET fd = p.req_client;
02687 ChMessage msg;
02688 int recv_status = ChMessage_recv(fd, &msg);
02689
02690 char *cmd = msg.header.type;
02691
02692 #ifdef __FAULT__
02693 if (recv_status < 0) {
02694 error_in_req_serve_client(p);
02695 return;
02696 }
02697
02698
02699 if (strcmp(cmd, "initnode") == 0) {
02700 set_crashed_socket_id(&msg, fd);
02701 }
02702 #endif
02703
02704 int status = REQ_OK;
02705 if (strcmp(cmd, "ping") != 0) {
02706 status = req_reply(parent_charmrun_fd, cmd, msg.data,
02707 ChMessageInt(msg.header.len));
02708
02709 if (strcmp(cmd, "scanf") == 0 || strcmp(cmd, "printsyn") == 0 ||
02710 strcmp(cmd, "printerrsyn") == 0)
02711 skt_sendN(parent_charmrun_fd, (const char *) &fd, sizeof(fd));
02712
02713 #ifdef __FAULT__
02714 if (strcmp(cmd, "initnode") == 0) {
02715 ChMessageInt_t oldpe = ChMessageInt_new(crashed_pe_id);
02716 ChMessageInt_t newpe = ChMessageInt_new(restarted_pe_id);
02717 skt_sendN(parent_charmrun_fd, (const char *) &oldpe, sizeof(oldpe));
02718 skt_sendN(parent_charmrun_fd, (const char *) &newpe, sizeof(newpe));
02719 }
02720 #endif
02721 }
02722
02723 switch (status) {
02724 case REQ_OK:
02725 break;
02726 case REQ_FAILED:
02727 abort();
02728 break;
02729 }
02730 ChMessage_free(&msg);
02731 }
02732
02733 static void req_forward_client()
02734 {
02735 ChMessage msg;
02736 int recv_status = ChMessage_recv(parent_charmrun_fd, &msg);
02737 if (recv_status < 0) {
02738
02739 for (const nodetab_process & p : my_process_table)
02740 skt_close(p.req_client);
02741 exit(0);
02742 }
02743
02744 char *cmd = msg.header.type;
02745
02746 if (strcmp(cmd, "barrier") == 0) {
02747 for (const nodetab_process & p : my_process_table)
02748 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
02749 ChMessageInt(msg.header.len))) {
02750 abort();
02751 }
02752 return;
02753 }
02754 #ifdef __FAULT__
02755 if (strcmp(cmd, "initnodetab") == 0) {
02756 if (_last_crash == nullptr)
02757 current_restart_phase++;
02758
02759 for (const nodetab_process & p : my_process_table)
02760 if (_last_crash == nullptr)
02761 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
02762 ChMessageInt(msg.header.len))) {
02763 abort();
02764 }
02765 return;
02766 }
02767
02768 if (strcmp(cmd, "crashnode") == 0) {
02769 for (const nodetab_process & p : my_process_table)
02770 if (_last_crash == nullptr)
02771 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
02772 ChMessageInt(msg.header.len))) {
02773 abort();
02774 }
02775 return;
02776 }
02777 if (strcmp(cmd, "initnttab") == 0) {
02778 _last_crash = nullptr;
02779 if (REQ_OK != req_reply(_last_crash->req_client, "initnodetab",
02780 msg.data, ChMessageInt(msg.header.len))) {
02781 abort();
02782 }
02783 return;
02784 }
02785
02786 #endif
02787
02788 SOCKET fd;
02789
02790
02791 if (strcmp(cmd, "req_fw") == 0) {
02792 CcsImplHeader *hdr = (CcsImplHeader *) msg.data;
02793 int pe = ChMessageInt(hdr->pe);
02794 fd = nodetab_table[pe]->ctrlfd;
02795 } else if (strcmp(cmd, "barrier0") == 0) {
02796 fd = nodetab_table[0]->ctrlfd;
02797 } else
02798 skt_recvN(parent_charmrun_fd, (char *) &fd, sizeof(SOCKET));
02799
02800 int status = req_reply(fd, cmd, msg.data, ChMessageInt(msg.header.len));
02801
02802 switch (status) {
02803 case REQ_OK:
02804 break;
02805 case REQ_FAILED:
02806 abort();
02807 break;
02808 }
02809 ChMessage_free(&msg);
02810 }
02811
02812 #endif
02813
02814 static int ignore_socket_errors(SOCKET skt, int c, const char *m)
02815 {
02816
02817 #ifndef __FAULT__
02818 exit(2);
02819 #endif
02820 return -1;
02821 }
02822
02823 static nodetab_process & get_process_for_socket(std::vector<nodetab_process> & process_table, SOCKET req_client)
02824 {
02825 nodetab_process * ptr = nullptr;
02826 for (nodetab_process & p : process_table)
02827 {
02828 if (p.req_client == req_client)
02829 {
02830 ptr = &p;
02831 break;
02832 }
02833 }
02834 if (ptr == nullptr)
02835 {
02836 fprintf(stderr, "Charmrun> get_process_for_socket: unknown socket\n");
02837 exit(1);
02838 }
02839
02840 nodetab_process & p = *ptr;
02841 return p;
02842 }
02843
02844
02845
02846
02847 static int socket_error_in_poll(SOCKET skt, int code, const char *msg)
02848 {
02849
02850
02851 skt_set_abort(ignore_socket_errors);
02852
02853 {
02854 const nodetab_process & p = get_process_for_socket(my_process_table, skt);
02855 fprintf(stderr, "Charmrun> error on request socket to node %d '%s'--\n"
02856 "%s\n",
02857 p.nodeno, p.host->name, msg);
02858 }
02859
02860 #ifndef __FAULT__
02861 for (const nodetab_process & p : my_process_table)
02862 skt_close(p.req_client);
02863 exit(1);
02864 #endif
02865 ftTimer = GetClock();
02866 return -1;
02867 }
02868
02869 #if CMK_USE_POLL
02870 #define CMK_PIPE_DECL(maxn, delayMs) \
02871 static struct pollfd *fds = NULL; \
02872 int nFds_sto = 0; \
02873 int *nFds = &nFds_sto; \
02874 int pollDelayMs = delayMs; \
02875 if (fds == NULL) \
02876 fds = (struct pollfd *) malloc((maxn) * sizeof(struct pollfd));
02877 #define CMK_PIPE_SUB fds, nFds
02878 #define CMK_PIPE_CALL() \
02879 poll(fds, *nFds, pollDelayMs); \
02880 *nFds = 0
02881
02882 #define CMK_PIPE_PARAM struct pollfd *fds, int *nFds
02883 #define CMK_PIPE_ADDREAD(rd_fd) \
02884 do { \
02885 fds[*nFds].fd = rd_fd; \
02886 fds[*nFds].events = POLLIN; \
02887 (*nFds)++; \
02888 } while (0)
02889 #define CMK_PIPE_ADDWRITE(wr_fd) \
02890 do { \
02891 fds[*nFds].fd = wr_fd; \
02892 fds[*nFds].events = POLLOUT; \
02893 (*nFds)++; \
02894 } while (0)
02895 #define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents &POLLIN
02896 #define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents &POLLOUT
02897
02898 #else
02899
02900 #define CMK_PIPE_DECL(maxn, delayMs) \
02901 fd_set rfds_sto, wfds_sto; \
02902 int nFds = 0; \
02903 fd_set *rfds = &rfds_sto, *wfds = &wfds_sto; \
02904 struct timeval tmo; \
02905 FD_ZERO(rfds); \
02906 FD_ZERO(wfds); \
02907 tmo.tv_sec = delayMs / 1000; \
02908 tmo.tv_usec = 1000 * (delayMs % 1000);
02909 #define CMK_PIPE_SUB rfds, wfds
02910 #define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, 0, 0, &tmo)
02911
02912 #define CMK_PIPE_PARAM fd_set *rfds, fd_set *wfds
02913 #define CMK_PIPE_ADDREAD(rd_fd) \
02914 { \
02915 assert(nFds < FD_SETSIZE); \
02916 FD_SET(rd_fd, rfds); \
02917 nFds++; \
02918 }
02919 #define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd, wfds)
02920 #define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd, rfds)
02921 #define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd, wfds)
02922 #endif
02923
02924
02925
02926
02927
02928 static void req_poll()
02929 {
02930 CMK_PIPE_DECL(my_process_table.size() + 5, 1000);
02931 for (const nodetab_process & p : my_process_table)
02932 CMK_PIPE_ADDREAD(p.req_client);
02933 if (CcsServer_fd() != INVALID_SOCKET)
02934 CMK_PIPE_ADDREAD(CcsServer_fd());
02935 if (arg_charmdebug) {
02936 CMK_PIPE_ADDREAD(0);
02937 CMK_PIPE_ADDREAD(gdb_info_std[1]);
02938 CMK_PIPE_ADDREAD(gdb_info_std[2]);
02939 }
02940
02941 skt_set_abort(socket_error_in_poll);
02942
02943 DEBUGF(("Req_poll: Calling select...\n"));
02944 int status = CMK_PIPE_CALL();
02945 DEBUGF(("Req_poll: Select returned %d...\n", status));
02946
02947 if (status == 0)
02948 return;
02949
02950 if (status < 0) {
02951 if (errno == EINTR || errno == EAGAIN)
02952 return;
02953 fflush(stdout);
02954 fflush(stderr);
02955 socket_error_in_poll(-1, 1359, "Node program terminated unexpectedly!\n");
02956 }
02957 for (nodetab_process & p : my_process_table)
02958 {
02959 const SOCKET req_client = p.req_client;
02960 if (CMK_PIPE_CHECKREAD(req_client)) {
02961 int readcount = 10;
02962
02963 do {
02964 req_serve_client(p);
02965 readcount--;
02966 } while (1 == skt_select1(req_client, 0) && readcount > 0);
02967 }
02968 }
02969
02970 if (CcsServer_fd() != INVALID_SOCKET)
02971 if (CMK_PIPE_CHECKREAD(CcsServer_fd())) {
02972 DEBUGF(("Activity on CCS server port...\n"));
02973 req_ccs_connect();
02974 }
02975
02976 if (arg_charmdebug) {
02977 char buf[2048];
02978 if (CMK_PIPE_CHECKREAD(0)) {
02979 int indata = read(0, buf, 5);
02980 buf[indata] = 0;
02981 if (indata < 5)
02982 fprintf(stderr, "Error reading command (%s)\n", buf);
02983 if (strncmp(buf, "info:", 5) == 0) {
02984
02985 char c;
02986 int num = 0;
02987
02988 while (read(0, &c, 1) != -1) {
02989 buf[num++] = c;
02990 if (c == '\n' || num >= 2045) {
02991 if (write(gdb_info_std[0], buf, num) != num) {
02992 fprintf(stderr, "charmrun> writing info command to gdb failed!\n");
02993 exit(1);
02994 }
02995 if (c == '\n')
02996 break;
02997 }
02998 }
02999 }
03000
03001 }
03002
03003
03004
03005
03006
03007
03008 if (CMK_PIPE_CHECKREAD(gdb_info_std[2])) {
03009 int indata = read(gdb_info_std[2], buf, 100);
03010
03011 if (indata > 0) {
03012 buf[indata] = 0;
03013
03014
03015
03016 fflush(gdb_stream);
03017 }
03018 } else if (CMK_PIPE_CHECKREAD(gdb_info_std[1])) {
03019 int indata = read(gdb_info_std[1], buf, 100);
03020
03021 if (indata > 0) {
03022 buf[indata] = 0;
03023
03024
03025 fprintf(gdb_stream, "%s", buf);
03026 fflush(gdb_stream);
03027 }
03028 }
03029 }
03030 }
03031
03032 #ifdef HSTART
03033 static void req_poll_hierarchical()
03034 {
03035 skt_set_abort(socket_error_in_poll);
03036
03037 struct timeval tmo;
03038 tmo.tv_sec = 1;
03039 tmo.tv_usec = 0;
03040 fd_set rfds;
03041 FD_ZERO(&rfds);
03042 for (const nodetab_process & p : my_process_table)
03043 FD_SET(p.req_client, &rfds);
03044 if (CcsServer_fd() != INVALID_SOCKET)
03045 FD_SET(CcsServer_fd(), &rfds);
03046 if (arg_charmdebug) {
03047 FD_SET(0, &rfds);
03048 FD_SET(gdb_info_std[1], &rfds);
03049 FD_SET(gdb_info_std[2], &rfds);
03050 }
03051
03052 if (arg_child_charmrun)
03053 FD_SET(parent_charmrun_fd, &rfds);
03054 DEBUGF(("Req_poll: Calling select...\n"));
03055 int status = select(FD_SETSIZE, &rfds, 0, 0,
03056 &tmo);
03057
03058
03059
03060 DEBUGF(("Req_poll: Select returned %d...\n", status));
03061
03062 if (status == 0)
03063 return;
03064 if (status < 0) {
03065 fflush(stdout);
03066 fflush(stderr);
03067 socket_error_in_poll(req_clients[0], 1359, "Node program terminated unexpectedly!\n");
03068 }
03069 for (nodetab_process & p : my_process_table)
03070 {
03071 const SOCKET req_client = p.req_client;
03072 if (FD_ISSET(req_client, &rfds)) {
03073 int readcount = 10;
03074
03075 do {
03076 if (arg_child_charmrun)
03077 req_forward_root(p);
03078 else
03079 req_serve_client(p);
03080 readcount--;
03081 } while (1 == skt_select1(req_client, 0) && readcount > 0);
03082 }
03083
03084 if (arg_child_charmrun)
03085
03086 if (FD_ISSET(parent_charmrun_fd, &rfds)) {
03087 int readcount = 10;
03088 do {
03089 req_forward_client();
03090 readcount--;
03091 } while (1 == skt_select1(parent_charmrun_fd, 0) && readcount > 0);
03092 }
03093
03094
03095 if (CcsServer_fd() != INVALID_SOCKET)
03096 if (FD_ISSET(CcsServer_fd(), &rfds)) {
03097 DEBUGF(("Activity on CCS server port...\n"));
03098 req_ccs_connect();
03099 }
03100
03101 if (arg_charmdebug) {
03102 char buf[2048];
03103 if (FD_ISSET(0, &rfds)) {
03104 int indata = read(0, buf, 5);
03105 buf[indata] = 0;
03106 if (indata < 5)
03107 fprintf(stderr, "Error reading command (%s)\n", buf);
03108 if (strncmp(buf, "info:", 5) == 0) {
03109
03110 char c;
03111 int num = 0;
03112
03113 while (read(0, &c, 1) != -1) {
03114 buf[num++] = c;
03115 if (c == '\n' || num >= 2045) {
03116 if (write(gdb_info_std[0], buf, num) != num) {
03117 fprintf(stderr, "charmrun> writing info command to gdb failed!\n");
03118 exit(1);
03119 }
03120 if (c == '\n')
03121 break;
03122 }
03123 }
03124 }
03125
03126 }
03127
03128
03129
03130
03131
03132
03133 if (FD_ISSET(gdb_info_std[2], &rfds)) {
03134 int indata = read(gdb_info_std[2], buf, 100);
03135
03136 if (indata > 0) {
03137 buf[indata] = 0;
03138
03139
03140
03141 fflush(gdb_stream);
03142 }
03143 } else if (FD_ISSET(gdb_info_std[1], &rfds)) {
03144 int indata = read(gdb_info_std[1], buf, 100);
03145
03146 if (indata > 0) {
03147 buf[indata] = 0;
03148
03149
03150 fprintf(gdb_stream, "%s", buf);
03151 fflush(gdb_stream);
03152 }
03153 }
03154 }
03155 }
03156 #endif
03157
03158 #ifdef HSTART
03159 static skt_ip_t parent_charmrun_IP;
03160 static int parent_charmrun_port;
03161 static int parent_charmrun_pid;
03162 static unsigned int dataport;
03163 static SOCKET dataskt;
03164 static int charmrun_phase = 0;
03165 #endif
03166
03167 static int client_connect_problem(const nodetab_process & p, const char *msg)
03168 {
03169 fprintf(stderr, "Charmrun> error attaching to node '%s':\n%s\n", p.host->name, msg);
03170 exit(1);
03171 return -1;
03172 }
03173
03174 static int client_connect_problem_skt(SOCKET skt, int code, const char *msg)
03175 {
03176 const nodetab_process & p = get_process_for_socket(my_process_table, skt);
03177 return client_connect_problem(p, msg);
03178 }
03179
03181 static SOCKET errorcheck_one_client_connect(void)
03182 {
03183 static int numClientsConnected = 0;
03184 unsigned int clientPort;
03185 skt_ip_t clientIP;
03186 if (arg_verbose)
03187 printf("Charmrun> Waiting for %d-th client to connect.\n", numClientsConnected);
03188
03189 if (0 == skt_select1(server_fd, arg_timeout * 1000))
03190 {
03191 fprintf(stderr, "Charmrun> Timeout waiting for node-program to connect\n");
03192 exit(1);
03193 }
03194
03195 const SOCKET req_client = skt_accept(server_fd, &clientIP, &clientPort);
03196
03197
03198
03199
03200 if (req_client == SOCKET_ERROR)
03201 {
03202 fprintf(stderr, "Charmrun> Failure in node accept\n");
03203 exit(1);
03204 }
03205 if (req_client < 0)
03206 {
03207 fprintf(stderr, "Charmrun> Warning: errorcheck_one_client_connect: socket < 0\n");
03208 }
03209
03210 skt_tcp_no_nagle(req_client);
03211
03212 ++numClientsConnected;
03213
03214 return req_client;
03215 }
03216
03217 static nodetab_process & get_process_for_nodeno(std::vector<nodetab_process> & process_table, int nodeno)
03218 {
03219 nodetab_process * ptr = nullptr;
03220 for (nodetab_process & p : process_table)
03221 {
03222 if (p.nodeno == nodeno)
03223 {
03224 ptr = &p;
03225 break;
03226 }
03227 }
03228 if (ptr == nullptr)
03229 {
03230 fprintf(stderr, "Charmrun> get_process_for_nodeno: unknown nodeno %d\n", nodeno);
03231 exit(1);
03232 }
03233
03234 nodetab_process & p = *ptr;
03235 assert(p.nodeno == nodeno);
03236 return p;
03237 }
03238
03239 #if CMK_C_INLINE
03240 inline static
03241 #endif
03242 void
03243 read_initnode_one_client(nodetab_process & p)
03244 {
03245 if (!skt_select1(p.req_client, arg_timeout * 1000))
03246 client_connect_problem(p, "Timeout on IP request");
03247
03248 ChMessage msg;
03249 ChMessage_recv(p.req_client, &msg);
03250 req_handle_initnode(&msg, p);
03251 ChMessage_free(&msg);
03252 }
03253
03254 #if CMK_IBVERBS_FAST_START
03255 static void req_one_client_partinit_skt(std::vector<nodetab_process> & process_table, const SOCKET req_client)
03256 {
03257 if (!skt_select1(req_client, arg_timeout * 1000))
03258 {
03259 fprintf(stderr, "Charmrun> Timeout on partial init request, socket %d\n", req_client);
03260 exit(1);
03261 }
03262
03263 ChMessage partStartMsg;
03264 ChMessage_recv(req_client, &partStartMsg);
03265 assert(strncmp(partStartMsg.header.type, "partinit", 8) == 0);
03266 int nodeNo = ChMessageInt(*(ChMessageInt_t *) partStartMsg.data);
03267 ChMessage_free(&partStartMsg);
03268
03269 nodetab_process & p = get_process_for_nodeno(process_table, nodeNo);
03270 p.req_client = req_client;
03271 }
03272
03273 static void req_one_client_partinit(std::vector<nodetab_process> & process_table, int index)
03274 {
03275 # ifdef HSTART
03276 if (arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1)
03277 {
03278 nodetab_process & p = process_table[index];
03279 req_one_client_partinit_skt(process_table, p.req_client);
03280 }
03281 else
03282 # endif
03283 {
03284 const SOCKET req_client = errorcheck_one_client_connect();
03285 req_one_client_partinit_skt(process_table, req_client);
03286 }
03287 }
03288 #endif
03289
03290 #ifdef HSTART
03291
03292 static void add_singlenodeinfo_to_mynodeinfo(ChMessage *msg, SOCKET ctrlfd)
03293 {
03294
03295 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
03296
03297
03298 ChMessageInt_t nodeNo = ChMessageInt_new(
03299 nodetab_rank0_table[ChMessageInt(nodeInfo->nodeNo) - mynodes_start]);
03300 myNodesInfo.push_back({nodeNo, nodeInfo->info});
03301
03302
03303 int nt = ChMessageInt(nodeInfo->nodeNo) - mynodes_start;
03304 nodeInfo->nodeNo = ChMessageInt_new(nt);
03305 my_process_table[nt].req_client = ctrlfd;
03306 }
03307 #endif
03308
03309 static void req_set_client_connect(std::vector<nodetab_process> & process_table, int count)
03310 {
03311 int curclientend, curclientstart = 0;
03312
03313 std::queue<SOCKET> open_sockets;
03314
03315 ChMessage msg;
03316 #if CMK_USE_IBVERBS && !CMK_IBVERBS_FAST_START
03317 # ifdef HSTART
03318 if (!(arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1))
03319 # endif
03320 {
03321 for (int i = 0; i < count; i++)
03322 open_sockets.push(errorcheck_one_client_connect());
03323 }
03324 curclientend = count;
03325 #else
03326 curclientend = 0;
03327 #endif
03328
03329 int finished = 0;
03330 while (finished < count)
03331 {
03332
03333 #if !CMK_USE_IBVERBS || CMK_IBVERBS_FAST_START
03334 while (curclientstart == curclientend || skt_select1(server_fd, 1) != 0) {
03335 # ifdef HSTART
03336 if (!(arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1))
03337 # endif
03338 open_sockets.push(errorcheck_one_client_connect());
03339
03340 curclientend++;
03341 }
03342 #endif
03343
03344 while (!open_sockets.empty())
03345 {
03346 const SOCKET req_client = open_sockets.front();
03347 open_sockets.pop();
03348
03349 if (skt_select1(req_client, 1) != 0)
03350 {
03351 ChMessage_recv(req_client, &msg);
03352
03353 int nodeNo = ChMessageInt(((ChSingleNodeinfo *)msg.data)->nodeNo);
03354 nodetab_process & p = get_process_for_nodeno(process_table, nodeNo);
03355 p.req_client = req_client;
03356
03357 #ifdef HSTART
03358 if (arg_hierarchical_start)
03359 {
03360 if (!arg_child_charmrun) {
03361 if (charmrun_phase == 1)
03362 receive_nodeset_from_child(&msg, req_client);
03363 else
03364 set_sockets_list(&msg, req_client);
03365
03366 } else
03367 add_singlenodeinfo_to_mynodeinfo(&msg, req_client);
03368 }
03369 else
03370 #endif
03371 req_handle_initnode(&msg, p);
03372
03373 ++finished;
03374 }
03375 else
03376 {
03377 open_sockets.push(req_client);
03378 }
03379 }
03380 }
03381
03382 ChMessage_free(&msg);
03383 }
03384
03385 static void send_clients_nodeinfo()
03386 {
03387 const int my_process_count = my_process_table.size();
03388 int msgSize = sizeof(ChMessageInt_t) * ChInitNodetabFields +
03389 sizeof(ChNodeinfo) * my_process_count;
03390
03391 for (const nodetab_process & p : my_process_table)
03392 {
03393 const SOCKET fd = p.req_client;
03394 req_send_initnodetab_internal(p, my_process_count, msgSize);
03395 }
03396 }
03397
03398 #if CMK_USE_IBVERBS || CMK_USE_IBUD
03399 static void receive_qplist()
03400 {
03401 #if CMK_USE_IBVERBS && !CMK_IBVERBS_FAST_START
03402
03403 if (my_process_table.size() > 1)
03404 {
03405 ChMessage msg;
03406 for (const nodetab_process & p : my_process_table)
03407 {
03408 ChMessage_recv(p.req_client, &msg);
03409 ChMessage_free(&msg);
03410 }
03411 for (const nodetab_process & p : my_process_table)
03412 req_reply(p.req_client, "barrier", "", 1);
03413 }
03414 #endif
03415
03416 for (nodetab_process & p : my_process_table)
03417 {
03418 const SOCKET fd = p.req_client;
03419 if (!skt_select1(p.req_client, arg_timeout * 1000))
03420 client_connect_problem(p, "Timeout on IP request");
03421
03422 ChMessage msg;
03423 ChMessage_recv(p.req_client, &msg);
03424
03425 req_handle_qplist(&msg, p);
03426
03427 ChMessage_free(&msg);
03428 }
03429 }
03430 #endif
03431
03432 #if CMK_USE_IBVERBS
03433
03434
03435
03436
03437 static void exchange_qpdata_clients()
03438 {
03439 const int my_process_count = my_process_table.size();
03440
03441 for (nodetab_process & p : my_process_table)
03442 p.qpData =
03443 (ChInfiAddr *) malloc(sizeof(ChInfiAddr) * my_process_count);
03444
03445 for (nodetab_process & p1 : my_process_table)
03446 {
03447 const int proc = p1.nodeno;
03448 int count = 0;
03449 for (nodetab_process & p2 : my_process_table)
03450 {
03451 if (&p1 != &p2)
03452 {
03453 ChInfiAddr & ia = p2.qpData[proc] = p1.qpList[count];
03454 ia.nodeno = ChMessageInt_new(proc);
03455
03456
03457
03458
03459 count++;
03460 }
03461 }
03462 free(p1.qpList);
03463 p1.qpList = nullptr;
03464 }
03465 }
03466 #endif
03467
03468 #if CMK_USE_IBVERBS || CMK_USE_IBUD
03469 static void send_clients_qpdata()
03470 {
03471 const int my_process_count = my_process_table.size();
03472 int qpDataSize = sizeof(ChInfiAddr) * my_process_count;
03473
03474 for (const nodetab_process & p : my_process_table)
03475 {
03476 const SOCKET fd = p.req_client;
03477 ChMessageHeader hdr;
03478 ChMessageHeader_new("qpdata", qpDataSize, &hdr);
03479 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
03480 #if CMK_USE_IBVERBS
03481 skt_sendN(fd, (const char *) p.qpData, qpDataSize);
03482 #elif CMK_USE_IBUD
03483 for (const nodetab_process & p2 : my_process_table)
03484 skt_sendN(fd, (const char *) &p2.qp, sizeof(ChInfiAddr));
03485 #endif
03486 }
03487 }
03488 #endif
03489
03490 static struct timeval tim;
03491 #define getthetime(x) \
03492 gettimeofday(&tim, NULL); \
03493 x = tim.tv_sec + (tim.tv_usec / 1000000.0);
03494 #define getthetime1(x) \
03495 gettimeofday(&tim, NULL); \
03496 x = tim.tv_sec;
03497
03498 static void req_add_phase2_processes(std::vector<nodetab_process> &);
03499 static void req_all_clients_connected();
03500
03501
03502 static void req_client_connect_table(std::vector<nodetab_process> & process_table)
03503 {
03504 #if CMK_IBVERBS_FAST_START
03505 for (int c = 0, c_end = process_table.size(); c < c_end; ++c)
03506 req_one_client_partinit(process_table, c);
03507 for (nodetab_process & p : process_table)
03508 read_initnode_one_client(p);
03509 #else
03510 req_set_client_connect(process_table, process_table.size());
03511 #endif
03512 }
03513
03514 static int get_old_style_process_count()
03515 {
03516 const int p = arg_requested_pes;
03517 const int np = arg_requested_nodes;
03518 const int ppn = arg_ppn;
03519
03520 const bool p_active = (p > 0);
03521 const bool np_active = (np > 0);
03522 const bool ppn_active = (ppn > 0);
03523
03524 if (np_active)
03525 return np;
03526 else if (p_active)
03527 return ppn_active ? (p + ppn - 1) / ppn : p;
03528 else
03529 return 1;
03530 }
03531
03532 static int calculated_processes_per_host;
03533
03534 static void req_construct_phase2_processes(std::vector<nodetab_process> & phase2_processes)
03535 {
03536 const int active_host_count = my_process_table.size();
03537
03538 int total_processes;
03539
03540 if (proc_per.active())
03541 {
03542 const nodetab_process & p0 = my_process_table[0];
03543
03544 for (nodetab_process & p : my_process_table)
03545 {
03546 if (p.num_pus != p0.num_pus ||
03547 p.num_cores != p0.num_cores ||
03548 p.num_sockets != p0.num_sockets)
03549 {
03550 fprintf(stderr, "Charmrun> Error: Detected system topology is heterogeneous, please use old-style launch options.\n");
03551 exit(1);
03552 }
03553 }
03554
03555 using Unit = typename TopologyRequest::Unit;
03556
03557 int num_processes;
03558 const Unit proc_unit = proc_per.unit();
03559 switch (proc_unit)
03560 {
03561 case Unit::Host:
03562 num_processes = proc_per.host;
03563 break;
03564 case Unit::Socket:
03565 num_processes = proc_per.socket * p0.num_sockets;
03566 break;
03567 case Unit::Core:
03568 num_processes = proc_per.core * p0.num_cores;
03569 break;
03570 case Unit::PU:
03571 num_processes = proc_per.pu * p0.num_pus;
03572 break;
03573 default:
03574 num_processes = 1;
03575 break;
03576 }
03577
03578 calculated_processes_per_host = num_processes;
03579 total_processes = arg_requested_nodes <= 0 ? num_processes * active_host_count : arg_requested_nodes;
03580 }
03581 else
03582 {
03583 total_processes = get_old_style_process_count();
03584 calculated_processes_per_host = (total_processes + active_host_count - 1) / active_host_count;
03585 }
03586
03587 const int num_new_processes = total_processes - active_host_count;
03588 const int new_processes_per_host = (num_new_processes + active_host_count - 1) / active_host_count;
03589
03590 for (nodetab_process & p : my_process_table)
03591 {
03592 p.forkstart = active_host_count + p.nodeno * new_processes_per_host;
03593 p.host->processes = 1;
03594 }
03595
03596 for (int i = 0; i < num_new_processes; ++i)
03597 {
03598 nodetab_process & src = my_process_table[i % active_host_count];
03599 phase2_processes.push_back(src);
03600
03601 nodetab_process & p = phase2_processes.back();
03602 p.nodeno = src.forkstart + (src.host->processes++ - 1);
03603 }
03604 }
03605
03606 static void start_nodes_local(std::vector<nodetab_process> &);
03607 static void start_nodes_ssh(std::vector<nodetab_process> &);
03608 static void finish_nodes(std::vector<nodetab_process> &);
03609
03610 static void req_client_connect(std::vector<nodetab_process> & process_table)
03611 {
03612 skt_set_abort(client_connect_problem_skt);
03613
03614 if (arg_mpiexec)
03615 {
03616 req_construct_phase2_processes(process_table);
03617 req_client_connect_table(process_table);
03618 req_all_clients_connected();
03619 return;
03620 }
03621
03622 req_client_connect_table(process_table);
03623
03624 std::vector<nodetab_process> phase2_processes;
03625 req_construct_phase2_processes(phase2_processes);
03626
03627 if (phase2_processes.size() > 0)
03628 {
03629 if (!arg_scalable_start)
03630 {
03631 if (!arg_local)
03632 {
03633 #if CMK_SHRINK_EXPAND
03634 if (!arg_shrinkexpand || (arg_requested_pes > arg_old_pes))
03635 #endif
03636 {
03637 assert(!arg_mpiexec);
03638 start_nodes_ssh(phase2_processes);
03639 }
03640 #if !CMK_SSH_KILL
03641 finish_nodes(phase2_processes);
03642 #endif
03643 }
03644 else
03645 {
03646 start_nodes_local(phase2_processes);
03647 }
03648 }
03649 else
03650 {
03651
03652 ChMessageHeader hdr;
03653 ChMessageInt_t mydata[ChInitNodeforkFields];
03654 ChMessageHeader_new("nodefork", sizeof(mydata), &hdr);
03655 for (const nodetab_process & p : process_table)
03656 {
03657 int numforks = p.host->processes - 1;
03658 if (numforks <= 0)
03659 continue;
03660
03661 if (arg_verbose)
03662 printf("Charmrun> Instructing host \"%s\" to fork() x %d\n", p.host->name, numforks);
03663
03664 mydata[0] = ChMessageInt_new(numforks);
03665 mydata[1] = ChMessageInt_new(p.forkstart);
03666 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
03667 skt_sendN(p.req_client, (const char *) mydata, sizeof(mydata));
03668 }
03669 }
03670
03671 req_client_connect_table(phase2_processes);
03672 }
03673
03674 req_add_phase2_processes(phase2_processes);
03675 req_all_clients_connected();
03676 }
03677
03678 static void req_add_phase2_processes(std::vector<nodetab_process> & phase2_processes)
03679 {
03680
03681 my_process_table.insert(my_process_table.end(), phase2_processes.begin(), phase2_processes.end());
03682 }
03683
03684 static void req_all_clients_connected()
03685 {
03686 if (portOk == 0)
03687 exit(1);
03688 if (arg_verbose)
03689 printf("Charmrun> All clients connected.\n");
03690
03691
03692 int ppn = 1;
03693 #if CMK_SMP
03694 if (onewth_per.active())
03695 {
03696 using Unit = typename TopologyRequest::Unit;
03697
03698 const nodetab_process & p0 = my_process_table[0];
03699
03700 int threads_per_host;
03701 const Unit onewth_unit = onewth_per.unit();
03702 switch (onewth_unit)
03703 {
03704 case Unit::Socket:
03705 threads_per_host = p0.num_sockets;
03706 break;
03707 case Unit::Core:
03708 threads_per_host = p0.num_cores;
03709 break;
03710 case Unit::PU:
03711 threads_per_host = p0.num_pus;
03712 break;
03713
03714
03715 default:
03716 threads_per_host = 1;
03717 break;
03718 }
03719
03720
03721
03722 if (threads_per_host > calculated_processes_per_host && threads_per_host + calculated_processes_per_host > p0.num_pus)
03723 threads_per_host -= calculated_processes_per_host;
03724
03725 if (threads_per_host == 0)
03726 threads_per_host = 1;
03727
03728 if (threads_per_host < calculated_processes_per_host || threads_per_host % calculated_processes_per_host != 0)
03729 {
03730 fprintf(stderr, "Charmrun> Error: Invalid request for %d PEs among %d processes per host.\n",
03731 threads_per_host, calculated_processes_per_host);
03732 kill_all_compute_nodes("Invalid provisioning request");
03733 exit(1);
03734 }
03735
03736 ppn = threads_per_host / calculated_processes_per_host;
03737 }
03738 else
03739 {
03740 if (arg_ppn > 1)
03741 ppn = arg_ppn;
03742 else if (arg_requested_pes > 0 && arg_requested_nodes > 0)
03743 ppn = arg_requested_pes / arg_requested_nodes;
03744 }
03745 #endif
03746
03747
03748 std::stable_sort(my_process_table.begin(), my_process_table.end());
03749
03750 int newno = 0;
03751 for (nodetab_process & p : my_process_table)
03752 {
03753
03754 p.nodeno = newno++;
03755
03756
03757 p.PEs = ppn;
03758
03759
03760 for (int j = 0; j < ppn; ++j)
03761 pe_to_process_map.push_back(&p);
03762 }
03763
03764 for (nodetab_process & p : my_process_table)
03765 nodeinfo_populate(p);
03766
03767 #ifdef HSTART
03768 if (arg_hierarchical_start) {
03769
03770
03771 send_myNodeInfo_to_parent();
03772
03773 forward_nodetab_to_children();
03774 }
03775
03776 else
03777 #endif
03778 {
03779 send_clients_nodeinfo();
03780 #if CMK_USE_IBVERBS || CMK_USE_IBUD
03781 receive_qplist();
03782 #if CMK_USE_IBVERBS
03783 exchange_qpdata_clients();
03784 #endif
03785 send_clients_qpdata();
03786 #endif
03787 }
03788
03789 if (arg_verbose)
03790 printf("Charmrun> IP tables sent.\n");
03791 }
03792
03793
03794
03795 #ifdef HSTART
03796 static void req_charmrun_connect(void)
03797 {
03798
03799
03800 skt_set_abort(client_connect_problem_skt);
03801
03802 #if CMK_IBVERBS_FAST_START
03803 for (int c = 0, c_end = my_process_table.size(); c < c_end; ++c)
03804 req_one_client_partinit(my_process_table, c);
03805 for (nodetab_process & p : my_process_table)
03806 read_initnode_one_client(p);
03807 #else
03808
03809
03810 req_set_client_connect(my_process_table, my_process_table.size());
03811
03812
03813 #endif
03814
03815
03816
03817 if (portOk == 0)
03818 exit(1);
03819 if (arg_verbose)
03820 printf("Charmrun> All clients connected.\n");
03821
03822
03823
03824 #if CMK_USE_IBVERBS || CMK_USE_IBUD
03825 send_clients_nodeinfo();
03826 receive_qplist();
03827 #if CMK_USE_IBVERBS
03828 exchange_qpdata_clients();
03829 #endif
03830 send_clients_qpdata();
03831 #else
03832 for (const nodetab_process & p : my_process_table)
03833
03834 req_handle_initnodedistribution(NULL, p);
03835
03836
03837
03838
03839 charmrun_phase = 1;
03840
03841 skt_set_abort(client_connect_problem_skt);
03842
03843 req_set_client_connect(my_process_table, my_process_table.size());
03844
03845 send_clients_nodeinfo();
03846
03847
03848 #endif
03849 if (arg_verbose)
03850 printf("Charmrun> IP tables sent.\n");
03851
03852
03853 }
03854
03855 #endif
03856
03857 #ifndef CMK_BPROC
03858
03859 static void start_one_node_ssh(nodetab_process & p);
03860 static void finish_set_nodes(std::vector<nodetab_process> &, int start, int stop);
03861
03862 static void start_nodes_batch_and_connect(std::vector<nodetab_process> & process_table)
03863 {
03864 int batch = arg_batch_spawn;
03865 const int process_count = process_table.size();
03866 int clientstart = 0;
03867 do
03868 {
03869 int clientend = clientstart + batch;
03870 if (clientend > process_count)
03871 clientend = process_count;
03872
03873 for (int c = clientstart; c < clientend; ++c)
03874 start_one_node_ssh(process_table[c]);
03875
03876 #if CMK_USE_SSH
03877
03878 if (!arg_ssh_display)
03879 #endif
03880 finish_set_nodes(process_table, clientstart, clientend);
03881
03882
03883
03884 #if CMK_IBVERBS_FAST_START
03885 for (int c = clientstart; c < clientend; ++c)
03886 req_one_client_partinit(process_table, c);
03887 #else
03888 req_set_client_connect(process_table, clientend-clientstart);
03889 #endif
03890
03891 clientstart = clientend;
03892 }
03893 while (clientstart < process_count);
03894
03895 #if CMK_IBVERBS_FAST_START
03896 for (nodetab_process & p : process_table)
03897 read_initnode_one_client(p);
03898 #endif
03899 }
03900
03901 static void batch_launch_sequence(std::vector<nodetab_process> & process_table)
03902 {
03903 skt_set_abort(client_connect_problem_skt);
03904
03905 start_nodes_batch_and_connect(process_table);
03906
03907
03908
03909 std::vector<nodetab_process> phase2_processes;
03910 req_construct_phase2_processes(phase2_processes);
03911 if (phase2_processes.size() > 0)
03912 {
03913 if (!arg_scalable_start)
03914 {
03915 start_nodes_batch_and_connect(phase2_processes);
03916 }
03917 else
03918 {
03919
03920 int total = 0;
03921 ChMessageHeader hdr;
03922 ChMessageInt_t mydata[ChInitNodeforkFields];
03923 ChMessageHeader_new("nodefork", sizeof(mydata), &hdr);
03924 for (const nodetab_process & p : process_table)
03925 {
03926 int numforks = p.host->processes - 1;
03927 if (numforks <= 0)
03928 continue;
03929
03930 for (int c = 0; c < numforks; c += arg_batch_spawn)
03931 {
03932 const int count = std::min(numforks - c, arg_batch_spawn);
03933
03934 if (arg_verbose)
03935 printf("Charmrun> Instructing host \"%s\" to fork() x %d\n", p.host->name, count);
03936
03937 mydata[0] = ChMessageInt_new(count);
03938 mydata[1] = ChMessageInt_new(p.forkstart + c);
03939 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
03940 skt_sendN(p.req_client, (const char *) mydata, sizeof(mydata));
03941
03942 #if CMK_IBVERBS_FAST_START
03943 for (int f = 0; f < count; ++f)
03944 req_one_client_partinit(phase2_processes, total++);
03945 #else
03946 req_set_client_connect(phase2_processes, count);
03947 #endif
03948 }
03949 }
03950
03951 #if CMK_IBVERBS_FAST_START
03952 for (nodetab_process & p : phase2_processes)
03953 read_initnode_one_client(p);
03954 #endif
03955 }
03956 }
03957
03958 req_add_phase2_processes(phase2_processes);
03959 req_all_clients_connected();
03960 }
03961
03962 #endif
03963
03964
03965 static void req_start_server(void)
03966 {
03967 skt_ip_t ip = skt_innode_my_ip();
03968 server_port = 0;
03969 #if CMK_SHRINK_EXPAND
03970 if (arg_shrinkexpand) {
03971 char *ns = getenv("NETSTART");
03972 if (ns != 0) {
03973 int node_num, old_charmrun_pid, port;
03974 char old_charmrun_name[1024 * 1000];
03975 int nread = sscanf(ns, "%d%s%d%d%d", &node_num, old_charmrun_name,
03976 &server_port, &old_charmrun_pid, &port);
03977 if (nread != 5) {
03978 fprintf(stderr, "Error parsing NETSTART '%s'\n", ns);
03979 exit(1);
03980 }
03981 }
03982 }
03983 #endif
03984 if (arg_local)
03985
03986 strcpy(server_addr, "127.0.0.1");
03987 else if (arg_charmrunip != NULL)
03988
03989 strcpy(server_addr, arg_charmrunip);
03990 else if ((arg_charmrunip = getenv("CHARMRUN_IP")) != NULL)
03991
03992 strcpy(server_addr, arg_charmrunip);
03993 else if (skt_ip_match(ip, _skt_invalid_ip)) {
03994 fprintf(stderr, "Charmrun> Warning-- cannot find IP address for your hostname. "
03995 "Using loopback.\n");
03996 strcpy(server_addr, "127.0.0.1");
03997 } else if (arg_usehostname || skt_ip_match(ip, skt_lookup_ip("127.0.0.1")))
03998
03999 gethostname(server_addr, sizeof(server_addr));
04000 else
04001 skt_print_ip(server_addr, ip);
04002
04003 #if CMK_SHRINK_EXPAND
04004 server_port = arg_charmrun_port;
04005 #else
04006 server_port = 0;
04007 #endif
04008 server_fd = skt_server(&server_port);
04009
04010 if (arg_verbose) {
04011 printf("Charmrun> Charmrun = %s, port = %d\n", server_addr, server_port);
04012 }
04013
04014 #if CMK_CCS_AVAILABLE
04015 #ifdef HSTART
04016 if (!arg_hierarchical_start ||
04017 (arg_hierarchical_start && !arg_child_charmrun))
04018 #endif
04019 if (arg_server == 1)
04020 CcsServer_new(NULL, &arg_server_port, arg_server_auth);
04021 #endif
04022 }
04023
04024 #ifdef HSTART
04025
04026 static void parse_netstart(void)
04027 {
04028 char *ns = getenv("NETSTART");
04029 if (ns != 0) {
04030 int port;
04031 char parent_charmrun_name[1024 * 1000];
04032 int nread = sscanf(ns, "%d%s%d%d%d", &mynodes_start, parent_charmrun_name,
04033 &parent_charmrun_port, &parent_charmrun_pid, &port);
04034 parent_charmrun_IP = skt_lookup_ip(parent_charmrun_name);
04035
04036 if (nread != 5) {
04037 fprintf(stderr, "Error parsing NETSTART '%s'\n", ns);
04038 exit(1);
04039 }
04040 }
04041 #if CMK_USE_IBVERBS | CMK_USE_IBUD
04042 char *cmi_num_nodes = getenv("CmiNumNodes");
04043 if (cmi_num_nodes != NULL) {
04044 sscanf(cmi_num_nodes, "%d", &_Cmi_numnodes);
04045 }
04046 #endif
04047 }
04048
04049 static int hstart_total_hosts;
04050
04051 static void my_nodetab_store(ChMessage *msg)
04052 {
04053 ChMessageInt_t * nodelistmsg = (ChMessageInt_t *) msg->data;
04054 const int hstart_hosts_size = ChMessageInt(nodelistmsg[0]);
04055 hstart_total_hosts = ChMessageInt(nodelistmsg[1]);
04056 my_host_table.reserve(hstart_hosts_size);
04057 ChMessageInt_t * hstart_hosts = nodelistmsg + 2;
04058 for (int k = 0; k < hstart_hosts_size; k++)
04059 my_host_table.push(host_table[ChMessageInt(hstart_hosts[k])]);
04060 }
04061
04062
04063
04064 static void nodelist_obtain(void)
04065 {
04066 #if CMK_USE_IBVERBS
04067 # if 0
04068 {
04069 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
04070 me.info.qpList = malloc(qpListSize);
04071 copyInfiAddr(me.info.qpList);
04072 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
04073 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
04074 free(me.info.qpList);
04075 }
04076 # endif
04077 #else
04078 ChMessageHeader hdr;
04079 ChMessageInt_t node_start = ChMessageInt_new(mynodes_start);
04080 ChMessageHeader_new("initnodetab", sizeof(ChMessageInt_t), &hdr);
04081 skt_sendN(parent_charmrun_fd, (const char *) &hdr, sizeof(hdr));
04082 skt_sendN(parent_charmrun_fd, (const char *) &node_start, sizeof(node_start));
04083 #endif // CMK_USE_IBVERBS
04084
04085 ChMessage nodelistmsg;
04086
04087
04088
04089 if (!skt_select1(parent_charmrun_fd, 1200 * 1000)) {
04090 exit(0);
04091 }
04092 ChMessage_recv(parent_charmrun_fd, &nodelistmsg);
04093
04094 my_nodetab_store(&nodelistmsg);
04095 ChMessage_free(&nodelistmsg);
04096 }
04097
04098 static void init_mynodes(void)
04099 {
04100 parse_netstart();
04101 if (!skt_ip_match(parent_charmrun_IP, _skt_invalid_ip)) {
04102 dataskt = skt_server(&dataport);
04103 parent_charmrun_fd =
04104 skt_connect(parent_charmrun_IP, parent_charmrun_port, 1800);
04105 } else {
04106 parent_charmrun_fd = -1;
04107 }
04108
04109 nodelist_obtain();
04110 }
04111 #endif
04112
04113
04114
04115
04116
04117
04118 static void start_nodes_daemon(std::vector<nodetab_process> &);
04119 static void start_nodes_mpiexec();
04120 #ifdef HSTART
04121 static void start_next_level_charmruns(void);
04122 #endif
04123 #if CMK_BPROC
04124 static void nodetab_init_for_scyld(void);
04125 static void start_nodes_scyld(void);
04126 #endif
04127 static void kill_nodes(void);
04128 static void open_gdb_info(void);
04129 static void read_global_segments_size(void);
04130
04131 static void fast_idleFn(void) { sleep(0); }
04132
04133 static char **main_envp;
04134
04135 int main(int argc, const char **argv, char **envp)
04136 {
04137 srand(time(0));
04138 skt_init();
04139 skt_set_idle(fast_idleFn);
04140
04141
04142 main_envp = envp;
04143
04144 arg_init(argc, argv);
04145 if (arg_verbose)
04146 printf("Charmrun> charmrun started...\n");
04147
04148 start_timer = GetClock();
04149 #if CMK_BPROC
04150
04151 if (arg_nodelist)
04152 nodetab_init();
04153 else
04154 nodetab_init_for_scyld();
04155 #else
04156
04157 nodetab_init();
04158 #endif
04159
04160 if (arg_requested_numhosts > 0)
04161 {
04162 if (arg_requested_numhosts > host_table.size())
04163 {
04164 fprintf(stderr, "Charmrun> Error: ++numHosts exceeds available host pool.\n");
04165 exit(1);
04166 }
04167 else
04168 host_table.resize(arg_requested_numhosts);
04169 }
04170
04171 if (arg_verbose)
04172 {
04173 char ips[200];
04174 for (const nodetab_host * h : host_table)
04175 {
04176 skt_print_ip(ips, h->ip);
04177 printf("Charmrun> added host \"%s\", IP:%s\n", h->name, ips);
04178 }
04179 }
04180
04181 #ifdef HSTART
04182 if (arg_hierarchical_start)
04183 nodetab_init_hierarchical_start();
04184 #endif
04185
04186
04187 req_start_server();
04188
04189
04190 input_init();
04191
04192 #ifdef HSTART
04193
04194 if (arg_child_charmrun) {
04195 init_mynodes();
04196 }
04197 else
04198 my_host_table = host_table;
04199 #endif
04200
04201 const int my_host_count = my_host_table.size();
04202 const int my_initial_process_count = proc_per.active()
04203 ? (arg_requested_nodes > 0 ? std::min(my_host_count, arg_requested_nodes) : my_host_count)
04204 : std::min(my_host_count, get_old_style_process_count());
04205 my_process_table.resize(my_initial_process_count);
04206 for (int i = 0; i < my_initial_process_count; ++i)
04207 {
04208 nodetab_host * h = my_host_table[i];
04209 nodetab_process & p = my_process_table[i];
04210 p.host = h;
04211 p.nodeno = h->hostno;
04212 }
04213
04214
04215 if (0 != getenv("CONV_DAEMON"))
04216 start_nodes_daemon(my_process_table);
04217 else
04218 #if CMK_BPROC
04219 start_nodes_scyld();
04220 #else
04221 #if CMK_USE_IBVERBS
04222 PRINT(("Charmrun> IBVERBS version of charmrun\n"));
04223 #endif
04224
04225 #ifdef HSTART
04226
04227 if (arg_hierarchical_start) {
04228 if (!arg_local) {
04229 if (!arg_child_charmrun) {
04230 start_next_level_charmruns();
04231 } else {
04232 if (!arg_batch_spawn)
04233 start_nodes_ssh(my_process_table);
04234 else
04235 batch_launch_sequence(my_process_table);
04236 }
04237 } else
04238 start_nodes_local(my_process_table);
04239 }
04240
04241
04242 else
04243
04244 #endif
04245 {
04246 if (!arg_local) {
04247 if (!arg_batch_spawn) {
04248 #if CMK_SHRINK_EXPAND
04249
04250
04251 if (!arg_shrinkexpand || (arg_requested_pes > arg_old_pes))
04252 #endif
04253 {
04254 if (arg_mpiexec)
04255 start_nodes_mpiexec();
04256 else
04257 start_nodes_ssh(my_process_table);
04258 }
04259 } else
04260 batch_launch_sequence(my_process_table);
04261 } else
04262 start_nodes_local(my_process_table);
04263 }
04264 #endif
04265
04266 if (arg_charmdebug) {
04267 #if defined(_WIN32) || CMK_BPROC
04268
04269 fprintf(stderr,
04270 "Charmdebug is supported currently only with the ssh subsystem\n");
04271 abort();
04272 #else
04273
04274 PRINT(("opening connection with node 0 for info gdb\n"));
04275 read_global_segments_size();
04276 open_gdb_info();
04277 gdb_stream = fdopen(dup(2), "a");
04278 dup2(1, 2);
04279 #endif
04280 }
04281
04282 if (arg_verbose)
04283 printf("Charmrun> node programs all started\n");
04284
04285
04286 #ifdef HSTART
04287
04288 if (arg_hierarchical_start) {
04289 #if !CMK_SSH_KILL
04290 if (!arg_batch_spawn || (!arg_child_charmrun))
04291 finish_nodes(my_process_table);
04292 #endif
04293
04294 if (!arg_child_charmrun)
04295 req_charmrun_connect();
04296 else if (!arg_batch_spawn)
04297 req_client_connect(my_process_table);
04298 }
04299
04300 else
04301 #endif
04302 {
04303 #if !CMK_SSH_KILL
04304 if (!arg_batch_spawn)
04305 finish_nodes(my_process_table);
04306 #endif
04307 if (!arg_batch_spawn)
04308 req_client_connect(my_process_table);
04309 }
04310 #if CMK_SSH_KILL
04311 kill_nodes();
04312 #endif
04313 if (arg_verbose)
04314 printf("Charmrun> node programs all connected\n");
04315
04316 PRINT(("Charmrun> started all node programs in %.3f seconds.\n",
04317 GetClock() - start_timer));
04318
04319
04320 #ifdef HSTART
04321 if (arg_hierarchical_start)
04322 while (1)
04323 req_poll_hierarchical();
04324 else
04325 #endif
04326 {
04327 if (arg_timelimit == -1)
04328 {
04329 while (1)
04330 req_poll();
04331 }
04332 else
04333 {
04334 time_t start = time(NULL);
04335 while (1)
04336 {
04337 req_poll();
04338 time_t end = time(NULL);
04339 double elapsed = difftime(end, start);
04340 if (elapsed >= arg_timelimit)
04341 {
04342 fprintf(stderr, "Charmrun> Error: Time limit reached\n");
04343
04344 kill_all_compute_nodes("Time limit reached");
04345 for (const nodetab_process & p : my_process_table)
04346 skt_close(p.req_client);
04347 exit(1);
04348 }
04349 }
04350 }
04351 }
04352 }
04353
04354
04355
04356
04357
04358
04359 static char *create_netstart(int node)
04360 {
04361 static char dest[1536];
04362 int port = 0;
04363 if (arg_mpiexec)
04364 sprintf(dest, "$CmiMyNode %s %d %d %d", server_addr, server_port,
04365 getpid() & 0x7FFF, port);
04366 else
04367 sprintf(dest, "%d %s %d %d %d", node, server_addr, server_port,
04368 getpid() & 0x7FFF, port);
04369 return dest;
04370 }
04371
04372
04373
04374
04375
04376
04377
04378 static void start_nodes_daemon(std::vector<nodetab_process> & process_table)
04379 {
04380
04381
04382 char argBuffer[5000] = { '\0' };
04383 for (int i = 0; arg_argv[i]; i++) {
04384 if (arg_verbose)
04385 printf("Charmrun> packing arg: %s\n", arg_argv[i]);
04386 strcat(argBuffer, " ");
04387 strcat(argBuffer, arg_argv[i]);
04388 }
04389
04390 taskStruct task;
04391 task.magic = ChMessageInt_new(DAEMON_MAGIC);
04392
04393
04394
04395 for (const nodetab_process & p : process_table)
04396 {
04397 const nodetab_host * h = p.host;
04398
04399 char *arg_currdir_r = pathfix(arg_currdir_a, h->pathfixes);
04400 strcpy(task.cwd, arg_currdir_r);
04401 free(arg_currdir_r);
04402 char *arg_nodeprog_r = pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
04403 strcpy(task.pgm, arg_nodeprog_r);
04404
04405 if (arg_verbose)
04406 printf("Charmrun> Starting node program %d on '%s' as %s.\n", p.nodeno,
04407 h->name, arg_nodeprog_r);
04408 free(arg_nodeprog_r);
04409 sprintf(task.env, "NETSTART=%s", create_netstart(p.nodeno));
04410
04411 char nodeArgBuffer[5120];
04412 char *argBuf;
04413 if (h->nice != -100) {
04414 if (arg_verbose)
04415 printf("Charmrun> +nice %d\n", h->nice);
04416 sprintf(nodeArgBuffer, "%s +nice %d", argBuffer, h->nice);
04417 argBuf = nodeArgBuffer;
04418 } else
04419 argBuf = argBuffer;
04420 task.argLength = ChMessageInt_new(strlen(argBuf));
04421
04422
04423 char statusCode = 'N';
04424 int fd = skt_connect(h->ip, DAEMON_IP_PORT, 30);
04425 if (fd !=
04426 INVALID_SOCKET) {
04427 skt_sendN(fd, (const char *) &task, sizeof(task));
04428 skt_sendN(fd, (const char *) argBuf, strlen(argBuf));
04429 skt_recvN(fd, &statusCode, sizeof(char));
04430 }
04431 if (statusCode != 'G') {
04432 fprintf(stderr, "Error '%c' starting remote node program on %s--\n%s\n",
04433 statusCode, h->name, daemon_status2msg(statusCode));
04434 exit(1);
04435 } else if (arg_verbose)
04436 printf("Charmrun> Node program %d started.\n", p.nodeno);
04437 }
04438 }
04439
04440 #if defined(_WIN32)
04441
04442
04443
04444 static void start_nodes_ssh(std::vector<nodetab_process> & process_table) { start_nodes_daemon(process_table); }
04445 static void finish_nodes(std::vector<nodetab_process> & process_table) {}
04446 static void start_one_node_ssh(nodetab_process & p) {}
04447 static void start_nodes_mpiexec() {}
04448
04449 static void finish_set_nodes(std::vector<nodetab_process> & process_table, int start, int stop) {}
04450
04451 static void envCat(char *dest, LPTSTR oldEnv)
04452 {
04453 char *src = oldEnv;
04454 dest += strlen(dest);
04455 dest++;
04456 while ((*src) != '\0') {
04457 int adv = strlen(src) + 1;
04458 strcpy(dest, src);
04459 dest += adv;
04460 src += adv;
04461 }
04462 *dest = '\0';
04463 FreeEnvironmentStrings(oldEnv);
04464 }
04465
04466
04467
04468 static void start_nodes_local(std::vector<nodetab_process> & process_table)
04469 {
04470 char cmdLine[10000];
04471
04472
04473
04474
04475
04476 strcpy(cmdLine, pparam_argv[1]);
04477 const char **param = pparam_argv + 2;
04478 while (*param) {
04479 strcat(cmdLine, " ");
04480 strcat(cmdLine, *param);
04481 param++;
04482 }
04483
04484 PROCESS_INFORMATION pi;
04485 char environment[10000];
04486 for (nodetab_process & p : process_table)
04487 {
04488 STARTUPINFO si = {0};
04489
04490 sprintf(environment, "NETSTART=%s", create_netstart(p.nodeno));
04491
04492 envCat(environment, GetEnvironmentStrings());
04493
04494
04495
04496 si.cb = sizeof(si);
04497 if (arg_verbose)
04498 printf("Charmrun> start %d node program on localhost.\n", p.nodeno);
04499
04500 int ret;
04501 ret = CreateProcess(NULL,
04502 cmdLine,
04503 NULL,
04504 NULL,
04505 FALSE,
04506 #if CMK_CHARMPY
04507
04508 CREATE_NEW_PROCESS_GROUP | (p.nodeno == 0 ? 0 : DETACHED_PROCESS),
04509 #elif 1
04510 CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS,
04511 #else
04512 CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE,
04513 #endif
04514
04515 environment,
04516 ".",
04517 &si,
04518 &pi);
04519
04520 if (ret == 0) {
04521
04522
04523
04524
04525
04526
04527
04528
04529
04530
04531 int error = GetLastError();
04532 fprintf(stderr, "startProcess failed to start process \"%s\" with status: %d\n",
04533 pparam_argv[1], error);
04534 exit(1);
04535 }
04536 }
04537 }
04538
04539 #elif CMK_BPROC
04540
04541 static int bproc_nodeisup(int node)
04542 {
04543 int status = 0;
04544 #if CMK_BPROC_VERSION < 4
04545 if (bproc_nodestatus(node) == bproc_node_up)
04546 status = 1;
04547 if (arg_verbose)
04548 printf("Charmrun> node %d status: %s\n", node, status ? "up" : "down");
04549 #else
04550 char nodestatus[128];
04551 if (node == -1) {
04552 strcpy(nodestatus, "up");
04553 status = 1;
04554 }
04555 if (bproc_nodestatus(node, nodestatus, 128)) {
04556 if (strcmp(nodestatus, "up") == 0)
04557 status = 1;
04558 }
04559 if (arg_verbose)
04560 printf("Charmrun> node %d status: %s\n", node, nodestatus);
04561 #endif
04562 return status;
04563 }
04564
04565
04566
04567
04568 static void nodetab_init_for_scyld()
04569 {
04570 int maxNodes = bproc_numnodes() + 1;
04571 if (arg_endpe < maxNodes)
04572 maxNodes = arg_endpe + 1;
04573
04574
04575 int hostno = 0;
04576 for (int i = -1; i < maxNodes; i++) {
04577 char hostname[256];
04578 if (!bproc_nodeisup(i))
04579 continue;
04580 if (i != -1 && i < arg_startpe)
04581 continue;
04582 if (i == -1 && arg_skipmaster)
04583 continue;
04584 sprintf(hostname, "%d", i);
04585 nodetab_host * h = new nodetab_host{};
04586 h->name = strdup(hostname);
04587 h->ip = nodetab_host::resolve(hostname);
04588 h->hostno = hostno++;
04589 host_table.push_back(h);
04590 }
04591
04592 const int hosts_size = host_table.size();
04593 if (hosts_size == 0) {
04594 fprintf(stderr, "Charmrun> no slave node available!\n");
04595 exit(1);
04596 }
04597 if (arg_verbose)
04598 printf("Charmrun> There are %d slave nodes available.\n",
04599 hosts_size - (arg_skipmaster ? 0 : 1));
04600 }
04601
04602 static void start_nodes_scyld(void)
04603 {
04604 char *envp[2] = { (char *) malloc(256), NULL };
04605 for (const nodetab_process & p : my_process_table)
04606 {
04607 const nodetab_host * h = p.host;
04608 const int bproc_nodeno = atoi(h->name);
04609
04610 if (arg_verbose)
04611 printf("Charmrun> start node program on slave node: %d.\n", bproc_nodeno);
04612 sprintf(envp[0], "NETSTART=%s", create_netstart(p.nodeno));
04613
04614 int pid = fork();
04615 if (pid < 0)
04616 exit(1);
04617 if (pid == 0) {
04618 int fd, fd1 = dup(1);
04619 if (!(arg_debug || arg_debug_no_pause)) {
04620 if (fd = open("/dev/null", O_RDWR)) {
04621 dup2(fd, 0);
04622 dup2(fd, 1);
04623 dup2(fd, 2);
04624 }
04625 }
04626 if (bproc_nodeno == -1) {
04627 int status = execve(pparam_argv[1], pparam_argv + 1, envp);
04628 dup2(fd1, 1);
04629 fprintf(stderr, "execve failed to start process \"%s\" with status: %d\n",
04630 pparam_argv[1], status);
04631 } else {
04632 int status = bproc_execmove(bproc_nodeno, pparam_argv[1], pparam_argv + 1, envp);
04633 dup2(fd1, 1);
04634 fprintf(stderr, "bproc_execmove failed to start remote process \"%s\" with "
04635 "status: %d\n",
04636 pparam_argv[1], status);
04637 }
04638 kill(getppid(), 9);
04639 exit(1);
04640 }
04641 }
04642 free(envp[0]);
04643 }
04644 static void finish_nodes(std::vector<nodetab_process> & process_table) {}
04645
04646 #else
04647
04648
04649
04650
04651
04652
04653
04654
04655 #include <sys/wait.h>
04656
04657 extern char **environ;
04658 static void removeEnv(const char *doomedEnv)
04659 {
04660 char **oe, **ie;
04661 oe = ie = environ;
04662 while (*ie != NULL) {
04663 if (0 != strncmp(*ie, doomedEnv, strlen(doomedEnv)))
04664 *oe++ = *ie;
04665 ie++;
04666 }
04667 *oe = NULL;
04668 }
04669
04670 static int ssh_fork(const nodetab_process & p, const char *startScript)
04671 {
04672 const nodetab_host * h = p.host;
04673
04674 std::vector<const char *> sshargv;
04675
04676 const char *s = h->shell;
04677 const char *e = skipstuff(s);
04678 while (*s) {
04679 sshargv.push_back(substr(s, e));
04680 s = skipblanks(e);
04681 e = skipstuff(s);
04682 }
04683
04684 sshargv.push_back(h->name);
04685 if (arg_ssh_display)
04686 sshargv.push_back("-X");
04687 sshargv.push_back("-l");
04688 sshargv.push_back(h->login);
04689 sshargv.push_back("-o");
04690 sshargv.push_back("KbdInteractiveAuthentication=no");
04691 sshargv.push_back("-o");
04692 sshargv.push_back("PasswordAuthentication=no");
04693 sshargv.push_back("-o");
04694 sshargv.push_back("NoHostAuthenticationForLocalhost=yes");
04695 sshargv.push_back("/bin/bash -f");
04696 sshargv.push_back((const char *) NULL);
04697
04698 if (arg_verbose) {
04699 std::string cmd_str = sshargv[0];
04700 for (int n = 1; n < sshargv.size()-1; ++n)
04701 cmd_str += " " + std::string(sshargv[n]);
04702 printf("Charmrun> Starting %s\n", cmd_str.c_str());
04703 }
04704
04705 int pid = fork();
04706 if (pid < 0) {
04707 perror("ERROR> starting remote shell");
04708 exit(1);
04709 }
04710 if (pid == 0) {
04711 int fdScript = open(startScript, O_RDONLY);
04712 unlink(startScript);
04713 dup2(fdScript, 0);
04714
04715 for (int i = 3; i < 1024; i++)
04716 close(i);
04717 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
04718 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
04719 sshargv[0]);
04720 exit(1);
04721 }
04722 if (arg_verbose)
04723 printf("Charmrun> remote shell (%s:%d) started\n", h->name, p.nodeno);
04724 return pid;
04725 }
04726
04727 static void fprint_arg(FILE *f, const char **argv)
04728 {
04729 while (*argv) {
04730 fprintf(f, " %s", *argv);
04731 argv++;
04732 }
04733 }
04734 static void ssh_Find(FILE *f, const char *program, const char *dest)
04735 {
04736 fprintf(f, "Find %s\n", program);
04737 fprintf(f, "%s=$loc\n", dest);
04738 }
04739 static void ssh_script(FILE *f, const nodetab_process & p, const char **argv)
04740 {
04741 const nodetab_host * h = p.host;
04742 const char *dbg = h->debugger;
04743 const char *host = h->name;
04744 const int nodeno = p.nodeno;
04745
04746 if (arg_mpiexec)
04747 fprintf(f, "#!/bin/sh\n");
04748
04749 fprintf(f,
04750 "Echo() {\n"
04751 " echo 'Charmrun remote shell(%s.%d)>' $*\n"
04752 "}\n",
04753 host, nodeno);
04754 fprintf(f,
04755 "Exit() {\n"
04756 " if [ $1 -ne 0 ]\n"
04757 " then\n"
04758 " Echo Exiting with error code $1\n"
04759 " fi\n"
04760 #if CMK_SSH_KILL
04761 " sleep 5\n"
04762 " kill -9 $$\n"
04763 #else
04764 " exit $1\n"
04765 #endif
04766 "}\n");
04767 fprintf(f,
04768 "Find() {\n"
04769 " loc=''\n"
04770 " for dir in `echo $PATH | sed -e 's/:/ /g'`\n"
04771 " do\n"
04772 " test -f \"$dir/$1\" && loc=\"$dir/$1\"\n"
04773 " done\n"
04774 " if [ \"x$loc\" = x ]\n"
04775 " then\n"
04776 " Echo $1 not found in your PATH \"($PATH)\"--\n"
04777 " Echo set your path in your ~/.charmrunrc\n"
04778 " Exit 1\n"
04779 " fi\n"
04780 "}\n");
04781
04782 if (arg_verbose)
04783 fprintf(f, "Echo 'remote responding...'\n");
04784
04785 fprintf(f, "test -f \"$HOME/.charmrunrc\" && . \"$HOME/.charmrunrc\"\n");
04786
04787
04788
04789
04790 if (arg_display && !arg_ssh_display)
04791 fprintf(f, "DISPLAY='%s';export DISPLAY\n", arg_display);
04792
04793 #ifdef HSTART
04794 if (arg_child_charmrun)
04795 fprintf(f, "NETMAGIC=\"%d\";export NETMAGIC\n",
04796 parent_charmrun_pid & 0x7FFF);
04797 else
04798 #endif
04799 fprintf(f, "NETMAGIC=\"%d\";export NETMAGIC\n", getpid() & 0x7FFF);
04800
04801 if (arg_mpiexec) {
04802 fprintf(f, "CmiMyNode=$OMPI_COMM_WORLD_RANK\n");
04803 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$MPIRUN_RANK\n");
04804 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$PMI_RANK\n");
04805 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$PMI_ID\n");
04806 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$MP_CHILD\n");
04807 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$SLURM_PROCID\n");
04808 fprintf(f, "test -z \"$CmiMyNode\" && (Echo Could not detect rank from "
04809 "environment ; Exit 1)\n");
04810 fprintf(f, "export CmiMyNode\n");
04811 }
04812 #ifdef HSTART
04813 else if (arg_hierarchical_start && arg_child_charmrun)
04814 fprintf(f, "CmiMyNode='%d'; export CmiMyNode\n", mynodes_start + nodeno);
04815 #endif
04816 else
04817 fprintf(f, "CmiMyNode='%d'; export CmiMyNode\n", nodeno);
04818
04819 char *netstart;
04820 #ifdef HSTART
04821 if (arg_hierarchical_start && arg_child_charmrun)
04822 netstart = create_netstart(mynodes_start + nodeno);
04823 else
04824 #endif
04825 netstart = create_netstart(nodeno);
04826 fprintf(f, "NETSTART=\"%s\";export NETSTART\n", netstart);
04827
04828 fprintf(f, "CmiMyNodeSize='%d'; export CmiMyNodeSize\n", h->cpus);
04829
04830 fprintf(f, "CmiMyForks='%d'; export CmiMyForks\n", 0);
04831
04832
04833 using Unit = typename TopologyRequest::Unit;
04834 switch (proc_per.unit())
04835 {
04836 case Unit::Host:
04837 fprintf(f, "CmiProcessPerHost='%d'; export CmiProcessPerHost\n", proc_per.host);
04838 break;
04839 case Unit::Socket:
04840 fprintf(f, "CmiProcessPerSocket='%d'; export CmiProcessPerSocket\n", proc_per.socket);
04841 break;
04842 case Unit::Core:
04843 fprintf(f, "CmiProcessPerCore='%d'; export CmiProcessPerCore\n", proc_per.core);
04844 break;
04845 case Unit::PU:
04846 fprintf(f, "CmiProcessPerPU='%d'; export CmiProcessPerPU\n", proc_per.pu);
04847 break;
04848 default:
04849 break;
04850 }
04851 #if CMK_SMP
04852 switch (onewth_per.unit())
04853 {
04854 case Unit::Host:
04855 fprintf(f, "CmiOneWthPerHost='%d'; export CmiOneWthPerHost\n", 1);
04856 break;
04857 case Unit::Socket:
04858 fprintf(f, "CmiOneWthPerSocket='%d'; export CmiOneWthPerSocket\n", 1);
04859 break;
04860 case Unit::Core:
04861 fprintf(f, "CmiOneWthPerCore='%d'; export CmiOneWthPerCore\n", 1);
04862 break;
04863 case Unit::PU:
04864 fprintf(f, "CmiOneWthPerPU='%d'; export CmiOneWthPerPU\n", 1);
04865 break;
04866 default:
04867 break;
04868 }
04869 #endif
04870
04871 if (arg_mpiexec) {
04872 fprintf(f, "CmiNumNodes=$OMPI_COMM_WORLD_SIZE\n");
04873 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$MPIRUN_NPROCS\n");
04874 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$PMI_SIZE\n");
04875 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$MP_PROCS\n");
04876 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$SLURM_NTASKS\n");
04877 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$SLURM_NPROCS\n");
04878 fprintf(f, "test -z \"$CmiNumNodes\" && (Echo Could not detect node count "
04879 "from environment ; Exit 1)\n");
04880 fprintf(f, "export CmiNumNodes\n");
04881 }
04882 #ifdef HSTART
04883 else if (arg_hierarchical_start && arg_child_charmrun)
04884 fprintf(f, "CmiNumNodes='%d'; export CmiNumNodes\n", hstart_total_hosts);
04885 #endif
04886
04887 else
04888 fprintf(f, "CmiNumNodes='%d'; export CmiNumNodes\n", (int)my_process_table.size());
04889
04890 #ifdef CMK_GFORTRAN
04891 fprintf(f, "GFORTRAN_UNBUFFERED_ALL=YES; export GFORTRAN_UNBUFFERED_ALL\n");
04892 #endif
04893 #if CMK_USE_MX
04894 fprintf(f, "MX_MONOTHREAD=1; export MX_MONOTHREAD\n");
04895
04896 #endif
04897 #if CMK_AIX && CMK_SMP
04898 fprintf(f, "MALLOCMULTIHEAP=1; export MALLOCMULTIHEAP\n");
04899 #endif
04900
04901 if (arg_verbose) {
04902 printf("Charmrun> Sending \"%s\" to client %d.\n", netstart, nodeno);
04903 }
04904 fprintf(f,
04905 "PATH=\"$PATH:/bin:/usr/bin:/usr/X/bin:/usr/X11/bin:/usr/local/bin:"
04906 "/usr/X11R6/bin:/usr/openwin/bin\"\n");
04907
04908
04909 char *arg_nodeprog_r = pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
04910
04911
04912 char *arg_currdir_r = pathfix(arg_currdir_a, h->pathfixes);
04913
04914 if (arg_verbose) {
04915 printf("Charmrun> find the node program \"%s\" at \"%s\" for %d.\n",
04916 arg_nodeprog_r, arg_currdir_r, nodeno);
04917 }
04918 if (arg_debug || arg_debug_no_pause || arg_in_xterm) {
04919 ssh_Find(f, h->xterm, "F_XTERM");
04920 if (!arg_ssh_display && !arg_debug_no_xrdb)
04921 ssh_Find(f, "xrdb", "F_XRDB");
04922 if (arg_verbose)
04923 fprintf(f, "Echo 'using xterm' $F_XTERM\n");
04924 }
04925
04926 if (arg_debug || arg_debug_no_pause) {
04927 ssh_Find(f, dbg, "F_DBG");
04928 if (arg_verbose)
04929 fprintf(f, "Echo 'using debugger' $F_DBG\n");
04930 }
04931
04932 if (!arg_ssh_display && !arg_debug_no_xrdb &&
04933 (arg_debug || arg_debug_no_pause || arg_in_xterm)) {
04934
04935 fprintf(f, "$F_XRDB -query > /dev/null\n");
04936 fprintf(f, "if test $? != 0\nthen\n");
04937 fprintf(f, " Echo 'Cannot contact X Server '$DISPLAY'. You probably'\n");
04938 fprintf(f, " Echo 'need to run xhost to authorize connections.'\n");
04939 fprintf(f, " Echo '(See manual for xhost for security issues)'\n");
04940 fprintf(f, " Echo 'Or try ++batch 1 ++ssh-display to rely on SSH X11 "
04941 "forwarding'\n");
04942 fprintf(f, " Exit 1\n");
04943 fprintf(f, "fi\n");
04944 }
04945
04946 fprintf(f, "if test ! -x \"%s\"\nthen\n", arg_nodeprog_r);
04947 fprintf(f, " Echo 'Cannot locate this node-program: %s'\n", arg_nodeprog_r);
04948 fprintf(f, " Exit 1\n");
04949 fprintf(f, "fi\n");
04950
04951 fprintf(f, "cd \"%s\"\n", arg_currdir_r);
04952 fprintf(f, "if test $? = 1\nthen\n");
04953 fprintf(f, " Echo 'Cannot propagate this current directory:'\n");
04954 fprintf(f, " Echo '%s'\n", arg_currdir_r);
04955 fprintf(f, " Exit 1\n");
04956 fprintf(f, "fi\n");
04957
04958 if (strcmp(h->setup, "*")) {
04959 fprintf(f, "%s\n", h->setup);
04960 fprintf(f, "if test $? = 1\nthen\n");
04961 fprintf(f, " Echo 'this initialization command failed:'\n");
04962 fprintf(f, " Echo '\"%s\"'\n", h->setup);
04963 fprintf(f, " Echo 'edit your nodes file to fix it.'\n");
04964 fprintf(f, " Exit 1\n");
04965 fprintf(f, "fi\n");
04966 }
04967
04968 fprintf(f, "rm -f /tmp/charmrun_err.$$\n");
04969 if (arg_verbose)
04970 fprintf(f, "Echo 'starting node-program...'\n");
04971
04972 fprintf(f, "(");
04973
04974 if (arg_debug || arg_debug_no_pause) {
04975 if (strcmp(dbg, "gdb") == 0 || strcmp(dbg, "idb") == 0) {
04976 fprintf(f, "cat > /tmp/charmrun_gdb.$$ << END_OF_SCRIPT\n");
04977 if (strcmp(dbg, "idb") == 0) {
04978 fprintf(f, "set \\$cmdset=\"gdb\"\n");
04979 }
04980 fprintf(f, "shell /bin/rm -f /tmp/charmrun_gdb.$$\n");
04981 fprintf(f, "handle SIGPIPE nostop noprint\n");
04982 fprintf(f, "handle SIGWINCH nostop noprint\n");
04983 fprintf(f, "handle SIGWAITING nostop noprint\n");
04984 if (arg_debug_commands)
04985 fprintf(f, "%s\n", arg_debug_commands);
04986 fprintf(f, "set args");
04987 fprint_arg(f, argv);
04988 fprintf(f, "\n");
04989 if (arg_debug_no_pause)
04990 fprintf(f, "run\n");
04991 fprintf(f, "END_OF_SCRIPT\n");
04992 if (arg_runscript)
04993 fprintf(f, "\"%s\" ", arg_runscript);
04994 fprintf(f, "$F_XTERM");
04995 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
04996 if (strcmp(dbg, "idb") == 0)
04997 fprintf(f, " -e $F_DBG \"%s\" -c /tmp/charmrun_gdb.$$ \n", arg_nodeprog_r);
04998 else
04999 fprintf(f, " -e $F_DBG \"%s\" -x /tmp/charmrun_gdb.$$ \n", arg_nodeprog_r);
05000 } else if (strcmp(dbg, "lldb") == 0) {
05001 fprintf(f, "cat > /tmp/charmrun_lldb.$$ << END_OF_SCRIPT\n");
05002 fprintf(f, "platform shell -- /bin/rm -f /tmp/charmrun_lldb.$$\n");
05003
05004
05005
05006 fprintf(f, "process launch -X true -s --");
05007 fprint_arg(f, argv);
05008 fprintf(f, "\n");
05009 fprintf(f, "process handle -s false -n false SIGPIPE SIGWINCH\n");
05010 if (arg_debug_commands)
05011 fprintf(f, "%s\n", arg_debug_commands);
05012 if (arg_debug_no_pause)
05013 fprintf(f, "continue\n");
05014 else
05015 fprintf(f, "# Use \"continue\" or \"c\" to begin execution.\n");
05016 fprintf(f, "END_OF_SCRIPT\n");
05017 if (arg_runscript)
05018 fprintf(f, "\"%s\" ", arg_runscript);
05019 fprintf(f, "$F_XTERM");
05020 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
05021 fprintf(f, " -e $F_DBG \"%s\" -s /tmp/charmrun_lldb.$$ \n", arg_nodeprog_r);
05022 } else if (strcmp(dbg, "dbx") == 0) {
05023 fprintf(f, "cat > /tmp/charmrun_dbx.$$ << END_OF_SCRIPT\n");
05024 fprintf(f, "sh /bin/rm -f /tmp/charmrun_dbx.$$\n");
05025 fprintf(f, "dbxenv suppress_startup_message 5.0\n");
05026 fprintf(f, "ignore SIGPOLL\n");
05027 fprintf(f, "ignore SIGPIPE\n");
05028 fprintf(f, "ignore SIGWINCH\n");
05029 fprintf(f, "ignore SIGWAITING\n");
05030 if (arg_debug_commands)
05031 fprintf(f, "%s\n", arg_debug_commands);
05032 fprintf(f, "END_OF_SCRIPT\n");
05033 if (arg_runscript)
05034 fprintf(f, "\"%s\" ", arg_runscript);
05035 fprintf(f, "$F_XTERM");
05036 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
05037 fprintf(f, " -e $F_DBG %s ", arg_debug_no_pause ? "-r" : "");
05038 if (arg_debug) {
05039 fprintf(f, "-c \'runargs ");
05040 fprint_arg(f, argv);
05041 fprintf(f, "\' ");
05042 }
05043 fprintf(f, "-s/tmp/charmrun_dbx.$$ %s", arg_nodeprog_r);
05044 if (arg_debug_no_pause)
05045 fprint_arg(f, argv);
05046 fprintf(f, "\n");
05047 } else {
05048 fprintf(stderr, "Unknown debugger: %s.\n Exiting.\n", h->debugger);
05049 }
05050 } else if (arg_in_xterm) {
05051 if (arg_verbose)
05052 printf("Charmrun> node %d: xterm is %s\n", nodeno, h->xterm);
05053 fprintf(f, "cat > /tmp/charmrun_inx.$$ << END_OF_SCRIPT\n");
05054 fprintf(f, "#!/bin/sh\n");
05055 fprintf(f, "/bin/rm -f /tmp/charmrun_inx.$$\n");
05056 fprintf(f, "%s", arg_nodeprog_r);
05057 fprint_arg(f, argv);
05058 fprintf(f, "\n");
05059 fprintf(f, "echo 'program exited with code '\\$?\n");
05060 fprintf(f, "read eoln\n");
05061 fprintf(f, "END_OF_SCRIPT\n");
05062 fprintf(f, "chmod 700 /tmp/charmrun_inx.$$\n");
05063 if (arg_runscript)
05064 fprintf(f, "\"%s\" ", arg_runscript);
05065 fprintf(f, "$F_XTERM -title 'Node %d (%s)' ", nodeno, h->name);
05066 fprintf(f, " -sl 5000");
05067 fprintf(f, " -e /tmp/charmrun_inx.$$\n");
05068 } else {
05069 if (arg_runscript)
05070 fprintf(f, "\"%s\" ", arg_runscript);
05071 if (arg_no_va_rand) {
05072 if (arg_verbose)
05073 printf("Charmrun> setarch -R is used.\n");
05074 fprintf(f, "setarch `uname -m` -R ");
05075 }
05076 fprintf(f, "\"%s\" ", arg_nodeprog_r);
05077 fprint_arg(f, argv);
05078 if (h->nice != -100) {
05079 if (arg_verbose)
05080 printf("Charmrun> nice -n %d\n", h->nice);
05081 fprintf(f, " +nice %d ", h->nice);
05082 }
05083 fprintf(f, "\nres=$?\n");
05084
05085
05086
05087
05088 fprintf(f, "if [ $res -eq 127 ]\n"
05089 "then\n"
05090 " ( \n"
05091 " \"%s\" \n"
05092 " ldd \"%s\"\n"
05093 " ) > /tmp/charmrun_err.$$ 2>&1 \n"
05094 "fi\n",
05095 arg_nodeprog_r, arg_nodeprog_r);
05096 }
05097
05098
05099
05100
05101
05102 fprintf(f, ")");
05103 fprintf(f, " < /dev/null 1> /dev/null 2> /dev/null");
05104 if (!arg_mpiexec)
05105 fprintf(f, " &");
05106 fprintf(f, "\n");
05107
05108 if (arg_verbose)
05109 fprintf(f, "Echo 'remote shell phase successful.'\n");
05110 fprintf(f,
05111 "sleep 1\n"
05112 "if [ -r /tmp/charmrun_err.$$ ]\n"
05113 "then\n"
05114 " cat /tmp/charmrun_err.$$ \n"
05115 " rm -f /tmp/charmrun_err.$$ \n"
05116 " Exit 1\n"
05117 "fi\n");
05118 fprintf(f, "Exit 0\n");
05119 free(arg_currdir_r);
05120 }
05121
05122
05123
05124 static void read_global_segments_size()
05125 {
05126 nodetab_host const * h = host_table[0];
05127
05128
05129 arg_nodeprog_r =
05130 pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
05131
05132 std::vector<const char *> sshargv;
05133 sshargv.push_back(h->shell);
05134 sshargv.push_back(h->name);
05135 sshargv.push_back("-l");
05136 sshargv.push_back(h->login);
05137 char *tmp = (char *) malloc(sizeof(char) * 9 + strlen(arg_nodeprog_r));
05138 sprintf(tmp, "size -A %s", arg_nodeprog_r);
05139 sshargv.push_back(tmp);
05140 sshargv.push_back((const char *) NULL);
05141
05142 int childPid = fork();
05143 if (childPid < 0) {
05144 perror("ERROR> getting the size of the global variables segments");
05145 exit(1);
05146 } else if (childPid == 0) {
05147
05148 dup2(2, 1);
05149
05150
05151 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
05152 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
05153 sshargv[0]);
05154 exit(1);
05155 } else {
05156
05157 free(tmp);
05158 waitpid(childPid, NULL, 0);
05159 }
05160 }
05161
05162
05163 static void open_gdb_info()
05164 {
05165 nodetab_host const * h = host_table[0];
05166
05167
05168 arg_nodeprog_r =
05169 pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
05170
05171 std::vector<const char *> sshargv;
05172 sshargv.push_back(h->shell);
05173 sshargv.push_back(h->name);
05174 sshargv.push_back("-l");
05175 sshargv.push_back(h->login);
05176 char *tmp = (char *) malloc(sizeof(char) * 8 + strlen(arg_nodeprog_r));
05177 sprintf(tmp, "gdb -q %s", arg_nodeprog_r);
05178 sshargv.push_back(tmp);
05179 sshargv.push_back((const char *) NULL);
05180
05181 int fdin[2];
05182 int fdout[2];
05183 int fderr[2];
05184 if (pipe(fdin) == -1) {
05185 fprintf(stderr, "charmrun> pipe() failed!\n");
05186 exit(1);
05187 }
05188 if (pipe(fdout) == -1) {
05189 fprintf(stderr, "charmrun> pipe() failed!\n");
05190 exit(1);
05191 }
05192 if (pipe(fderr) == -1) {
05193 fprintf(stderr, "charmrun> pipe() failed!\n");
05194 exit(1);
05195 }
05196
05197 gdb_info_pid = fork();
05198 if (gdb_info_pid < 0) {
05199 perror("ERROR> starting info gdb");
05200 exit(1);
05201 } else if (gdb_info_pid == 0) {
05202
05203 close(fdin[1]);
05204 close(fdout[0]);
05205 close(fderr[0]);
05206 PRINT(("executing: \"%s\" \"%s\" \"%s\" \"%s\" \"%s\"\n", sshargv[0],
05207 sshargv[1], sshargv[2], sshargv[3], sshargv[4]));
05208 dup2(fdin[0], 0);
05209 dup2(fdout[1], 1);
05210 dup2(fderr[1], 2);
05211 for (int i = 3; i < 1024; i++)
05212 close(i);
05213 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
05214 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
05215 sshargv[0]);
05216 exit(1);
05217 }
05218
05219 free(tmp);
05220 gdb_info_std[0] = fdin[1];
05221 gdb_info_std[1] = fdout[0];
05222 gdb_info_std[2] = fderr[0];
05223 close(fdin[0]);
05224 close(fdout[1]);
05225 close(fderr[1]);
05226 }
05227 #ifdef HSTART
05228 static void start_next_level_charmruns()
05229 {
05230
05231 const char *nodeprog_name = strrchr(arg_nodeprog_a, '/');
05232 static char buf[1024];
05233 sprintf(buf, "%.*s%s%s", (int)(nodeprog_name-arg_nodeprog_a), arg_nodeprog_a, DIRSEP, "charmrun");
05234 arg_nodeprog_a = strdup(buf);
05235
05236 int nextIndex = 0;
05237 int client = 0;
05238
05239 for (nodetab_process * p : my_process_table)
05240 {
05241 TODO;
05242
05243 FILE *f;
05244 char startScript[200];
05245 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.procno);
05246 f = fopen(startScript, "w");
05247 if (f == NULL) {
05248
05249 sprintf(startScript, "charmrun.%d.%d", getpid(), p.procno);
05250 f = fopen(startScript, "w");
05251 if (f == NULL) {
05252 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
05253 exit(1);
05254 }
05255 }
05256 ssh_script(f, p, arg_argv);
05257 fclose(f);
05258
05259 p.ssh_pid = ssh_fork(p, startScript);
05260 client += nodes_per_child;
05261 }
05262 }
05263 #endif
05264
05265
05266 static void start_one_node_ssh(nodetab_process & p)
05267 {
05268 const nodetab_host * h = p.host;
05269
05270 char startScript[200];
05271 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.nodeno);
05272 FILE *f = fopen(startScript, "w");
05273 if (f == NULL) {
05274
05275 sprintf(startScript, "charmrun.%d.%d", getpid(), p.nodeno);
05276 f = fopen(startScript, "w");
05277 if (f == NULL) {
05278 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
05279 exit(1);
05280 }
05281 }
05282 ssh_script(f, p, arg_argv);
05283 fclose(f);
05284
05285 p.ssh_pid = ssh_fork(p, startScript);
05286 }
05287
05288 static void start_nodes_ssh(std::vector<nodetab_process> & process_table)
05289 {
05290 for (nodetab_process & p : process_table)
05291 {
05292 start_one_node_ssh(p);
05293 }
05294 }
05295
05296
05297 static int ssh_fork_one(nodetab_process & p, const char *startScript)
05298 {
05299 nodetab_host const * h = p.host;
05300
05301 std::vector<const char *> sshargv;
05302
05303 const char *s = h->shell;
05304 const char *e = skipstuff(s);
05305 while (*s) {
05306 sshargv.push_back(substr(s, e));
05307 s = skipblanks(e);
05308 e = skipstuff(s);
05309 }
05310
05311 const int processes = get_old_style_process_count();
05312
05313 char npes[24];
05314 if ( ! arg_mpiexec_no_n ) {
05315 sshargv.push_back("-n");
05316 sprintf(npes, "%d", processes);
05317 sshargv.push_back(npes);
05318 }
05319 sshargv.push_back((char *) startScript);
05320 sshargv.push_back((const char *) NULL);
05321 if (arg_verbose)
05322 printf("Charmrun> Starting %s %s \n", h->shell, startScript);
05323
05324 int pid = fork();
05325 if (pid < 0) {
05326 perror("ERROR> starting mpiexec");
05327 exit(1);
05328 }
05329 if (pid == 0) {
05330
05331
05332 for (int i = 3; i < 1024; i++)
05333 close(i);
05334 execvp(sshargv[0], const_cast<char *const *>(&sshargv[0]));
05335 fprintf(stderr, "Charmrun> Couldn't find mpiexec program '%s'!\n",
05336 sshargv[0]);
05337 exit(1);
05338 }
05339 if (arg_verbose)
05340 printf("Charmrun> mpiexec started\n");
05341 return pid;
05342 }
05343
05344 static void start_nodes_mpiexec()
05345 {
05346 char startScript[200];
05347 sprintf(startScript, "./charmrun.%d", getpid());
05348 FILE *f = fopen(startScript, "w");
05349 chmod(startScript, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IROTH);
05350 if (f == NULL) {
05351
05352 sprintf(startScript, "./charmrun.%d", getpid());
05353 f = fopen(startScript, "w");
05354 if (f == NULL) {
05355 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
05356 exit(1);
05357 }
05358 }
05359
05360 nodetab_process & p = my_process_table[0];
05361
05362 ssh_script(f, p, arg_argv);
05363 fclose(f);
05364
05365 ssh_fork_one(p, startScript);
05366
05367 }
05368
05369 static void finish_set_nodes(std::vector<nodetab_process> & process_table, int start, int stop)
05370 {
05371 std::vector<int> num_retries(stop - start, 0);
05372 int done = 0;
05373 while (!done) {
05374 done = 1;
05375 for (int i = start; i < stop; i++) {
05376 nodetab_process & p = process_table[i];
05377 const nodetab_host * h = p.host;
05378 if (p.ssh_pid != 0) {
05379 done = 0;
05380 int status = 0;
05381 waitpid(p.ssh_pid, &status, 0);
05382 if (WIFEXITED(status)) {
05383 if (!WEXITSTATUS(status)) {
05384 p.ssh_pid = 0;
05385 } else {
05386 fprintf(stderr,
05387 "Charmrun> Error %d returned from remote shell (%s:%d)\n",
05388 WEXITSTATUS(status), h->name, p.nodeno);
05389
05390 if (WEXITSTATUS(status) != 255)
05391 exit(1);
05392
05393 if (++num_retries[i - start] <= MAX_NUM_RETRIES) {
05394 fprintf(stderr, "Charmrun> Reconnection attempt %d of %d\n",
05395 num_retries[i - start], MAX_NUM_RETRIES);
05396 start_one_node_ssh(p);
05397 } else {
05398 fprintf(
05399 stderr,
05400 "Charmrun> Too many reconnection attempts; bailing out\n");
05401 exit(1);
05402 }
05403 }
05404 }
05405 }
05406 }
05407 }
05408 }
05409
05410 static void finish_nodes(std::vector<nodetab_process> & process_table)
05411 {
05412 #ifdef HSTART
05413 if (arg_hierarchical_start && !arg_child_charmrun)
05414 finish_set_nodes(process_table, 0, branchfactor);
05415 else
05416 #endif
05417 finish_set_nodes(process_table, 0, process_table.size());
05418 }
05419
05420 static void kill_nodes()
05421 {
05422
05423 for (nodetab_process & p : my_process_table)
05424 {
05425 const nodetab_host * h = p.host;
05426 int status = 0;
05427 if (arg_verbose)
05428 printf("Charmrun> waiting for remote shell (%s:%d), pid %d\n", h->name,
05429 p.nodeno, p.ssh_pid);
05430 kill(p.ssh_pid, 9);
05431 waitpid(p.ssh_pid, &status, 0);
05432 p.ssh_pid = 0;
05433 }
05434 }
05435
05436
05437
05438 static char *find_abs_path(const char *target)
05439 {
05440 char *thepath=getenv("PATH");
05441 char *path=strdup(thepath);
05442 char *subpath=strtok(path,":");
05443 char *abspath=(char*) malloc(PATH_MAX + strlen(target) + 2);
05444 while(subpath!=NULL) {
05445 strcpy(abspath,subpath);
05446 strcat(abspath,"/");
05447 strcat(abspath,target);
05448 if(probefile(abspath)){
05449 free(path);
05450 return abspath;
05451 }
05452 subpath=strtok(NULL,":");
05453 }
05454 free(abspath);
05455 free(path);
05456 return NULL;
05457 }
05458
05459
05460
05461 static void start_nodes_local(std::vector<nodetab_process> & process_table)
05462 {
05463 char ** env = main_envp;
05464
05465
05466 int envc;
05467 for (envc = 0; env[envc]; envc++)
05468 ;
05469 int extra = 0;
05470 #if CMK_AIX && CMK_SMP
05471 ++extra;
05472 #endif
05473 const int proc_active = proc_per.active();
05474 extra += proc_active;
05475 #if CMK_SMP
05476 const int onewth_active = onewth_per.active();
05477 extra += onewth_active;
05478 #endif
05479
05480 char **envp = (char **) malloc((envc + 2 + extra + 1) * sizeof(void *));
05481 for (int i = 0; i < envc; i++)
05482 envp[i] = env[i];
05483 envp[envc] = (char *) malloc(256);
05484 envp[envc + 1] = (char *) malloc(256);
05485 int n = 2;
05486 #if CMK_AIX && CMK_SMP
05487 envp[envc + n] = (char *) malloc(256);
05488 sprintf(envp[envc + n], "MALLOCMULTIHEAP=1");
05489 ++n;
05490 #endif
05491
05492 using Unit = typename TopologyRequest::Unit;
05493 if (proc_active)
05494 {
05495 envp[envc + n] = (char *) malloc(256);
05496 switch (proc_per.unit())
05497 {
05498 case Unit::Host:
05499 sprintf(envp[envc + n], "CmiProcessPerHost=%d", proc_per.host);
05500 break;
05501 case Unit::Socket:
05502 sprintf(envp[envc + n], "CmiProcessPerSocket=%d", proc_per.socket);
05503 break;
05504 case Unit::Core:
05505 sprintf(envp[envc + n], "CmiProcessPerCore=%d", proc_per.core);
05506 break;
05507 case Unit::PU:
05508 sprintf(envp[envc + n], "CmiProcessPerPU=%d", proc_per.pu);
05509 break;
05510 default:
05511 break;
05512 }
05513 ++n;
05514 }
05515 #if CMK_SMP
05516 if (onewth_active)
05517 {
05518 envp[envc + n] = (char *) malloc(256);
05519 switch (onewth_per.unit())
05520 {
05521 case Unit::Host:
05522 sprintf(envp[envc + n], "CmiOneWthPerHost=%d", 1);
05523 break;
05524 case Unit::Socket:
05525 sprintf(envp[envc + n], "CmiOneWthPerSocket=%d", 1);
05526 break;
05527 case Unit::Core:
05528 sprintf(envp[envc + n], "CmiOneWthPerCore=%d", 1);
05529 break;
05530 case Unit::PU:
05531 sprintf(envp[envc + n], "CmiOneWthPerPU=%d", 1);
05532 break;
05533 default:
05534 break;
05535 }
05536 ++n;
05537 }
05538 #endif
05539 envp[envc + n] = 0;
05540
05541
05542 char **dparamp;
05543 std::vector<char *> dparamv;
05544 if (arg_debug || arg_debug_no_pause || arg_in_xterm)
05545 {
05546 char *abs_xterm=find_abs_path(arg_xterm);
05547 if(!abs_xterm)
05548 {
05549 fprintf(stderr, "Charmrun> cannot find xterm for debugging, please add it to your path\n");
05550 exit(1);
05551 }
05552
05553 dparamv.push_back(strdup(abs_xterm));
05554 dparamv.push_back(strdup("-title"));
05555 dparamv.push_back(strdup(pparam_argv[1]));
05556 dparamv.push_back(strdup("-e"));
05557
05558 std::vector<const char *> cparamv;
05559 if (arg_debug || arg_debug_no_pause)
05560 {
05561 const bool isLLDB = strcmp(arg_debugger, "lldb") == 0;
05562 const char *commandflag = isLLDB ? "-o" : "-ex";
05563 const char *argsflag = isLLDB ? "--" : "--args";
05564
05565 cparamv.push_back(arg_debugger);
05566
05567 if (arg_debug_no_pause)
05568 {
05569 cparamv.push_back(commandflag);
05570 cparamv.push_back("r");
05571 }
05572
05573 cparamv.push_back(argsflag);
05574 }
05575
05576 for (int i = 1; pparam_argv[i] != nullptr; ++i)
05577 cparamv.push_back(pparam_argv[i]);
05578
05579 if (!(arg_debug || arg_debug_no_pause))
05580 cparamv.push_back("; echo \"program exited with code $?\" ; read eoln");
05581
05582 dparamv.push_back(cstring_join(cparamv, " "));
05583
05584 if (arg_verbose)
05585 {
05586 printf("Charmrun> xterm args:");
05587 for (const char *p : dparamv)
05588 printf(" %s", p);
05589 printf("\n");
05590 }
05591
05592
05593 dparamv.push_back(nullptr);
05594
05595 dparamp = dparamv.data();
05596 }
05597 else
05598 {
05599 dparamp = (char **)(pparam_argv+1);
05600 }
05601
05602 for (const nodetab_process & p : process_table)
05603 {
05604 if (arg_verbose)
05605 printf("Charmrun> start %d node program on localhost.\n", p.nodeno);
05606 sprintf(envp[envc], "NETSTART=%s", create_netstart(p.nodeno));
05607 sprintf(envp[envc + 1], "CmiNumNodes=%d", 0);
05608
05609 int pid = fork();
05610 if (pid < 0) {
05611 fprintf(stderr, "fork failed: %s\n", strerror(errno));
05612 exit(1);
05613 }
05614 if (pid == 0) {
05615 int fd, fd2 = dup(2);
05616 #if CMK_CHARMPY
05617
05618 if ((p.nodeno != 0) && (-1 != (fd = open("/dev/null", O_RDWR)))) {
05619 #else
05620 if (-1 != (fd = open("/dev/null", O_RDWR))) {
05621 #endif
05622 dup2(fd, 0);
05623 dup2(fd, 1);
05624 dup2(fd, 2);
05625 }
05626 int status = execve(dparamp[0],
05627 const_cast<char *const *>(dparamp), envp);
05628 dup2(fd2, 2);
05629 fprintf(stderr, "execve failed to start process \"%s\": %s\n",
05630 dparamp[0], strerror(errno));
05631 kill(getppid(), 9);
05632 exit(1);
05633 }
05634 }
05635 for (char *p : dparamv)
05636 free(p);
05637 for (int i = envc, i_end = envc + n; i < i_end; ++i)
05638 free(envp[i]);
05639 free(envp);
05640 }
05641
05642 #ifdef __FAULT__
05643
05644 static int current_restart_phase = 1;
05645
05649 static void restart_node(nodetab_process & p)
05650 {
05651 char startScript[200];
05653 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.nodeno);
05654 FILE *f = fopen(startScript, "w");
05655
05659 int i = 0;
05660 while (arg_argv[i] != NULL) {
05661 i++;
05662 }
05663 const char **restart_argv = (const char **) malloc(sizeof(char *) * (i + 4));
05664 i = 0;
05665 while (arg_argv[i] != NULL) {
05666 restart_argv[i] = arg_argv[i];
05667 i++;
05668 }
05669 restart_argv[i] = "+restartaftercrash";
05670
05671 char phase_str[10];
05672 sprintf(phase_str, "%d", ++current_restart_phase);
05673 restart_argv[i + 1] = phase_str;
05674 restart_argv[i + 2] = "+restartisomalloc";
05675 restart_argv[i + 3] = NULL;
05676
05679 static int next_replacement_host = 0;
05680 p.host->crashed = true;
05681 const int host_count = host_table.size();
05682 #if 0
05683
05684 int hosts_checked = 0;
05685 while (host_table[next_replacement_host]->crashed)
05686 {
05687 ++next_replacement_host;
05688 next_replacement_host %= host_count;
05689 if (++hosts_checked == host_count)
05690 {
05691 fprintf(stderr, "Charmrun> All hosts crashed, aborting.\n");
05692 exit(1);
05693 }
05694 }
05695 #endif
05696 p.host = host_table[next_replacement_host];
05697 ++next_replacement_host;
05698 next_replacement_host %= host_count;
05699
05700 ssh_script(f, p, restart_argv);
05701 fclose(f);
05703 int restart_ssh_pid = ssh_fork(p, startScript);
05705 int status = 0;
05706 if (arg_debug_no_pause || arg_debug)
05707 ;
05708 else {
05709 do {
05710 waitpid(restart_ssh_pid, &status, 0);
05711 } while (!WIFEXITED(status));
05712 if (WEXITSTATUS(status) != 0) {
05713 fprintf(stderr,
05714 "Charmrun> Error %d returned from new attempted remote shell \n",
05715 WEXITSTATUS(status));
05716 exit(1);
05717 }
05718 }
05719 PRINT(("Charmrun finished launching new process in %fs\n",
05720 GetClock() - ftTimer));
05721 }
05722
05729 static void reconnect_crashed_client(nodetab_process & crashed)
05730 {
05731 if (0 == skt_select1(server_fd, arg_timeout * 1000)) {
05732 client_connect_problem(crashed, "Timeout waiting for restarted node-program to connect");
05733 }
05734
05735 skt_ip_t clientIP;
05736 unsigned int clientPort;
05737
05738 const SOCKET req_client = skt_accept(server_fd, &clientIP, &clientPort);
05739
05740 if (req_client == SOCKET_ERROR) {
05741 client_connect_problem(crashed, "Failure in restarted node accept");
05742 exit(1);
05743 } else {
05744 skt_tcp_no_nagle(req_client);
05745
05746 ChMessage msg;
05747 if (!skt_select1(req_client, arg_timeout * 1000)) {
05748 client_connect_problem(crashed, "Timeout on IP request for restarted processor");
05749 }
05750
05751 #ifdef HSTART
05752 if (arg_hierarchical_start) {
05753 req_forward_root(crashed);
05754 if (_last_crash != nullptr) {
05755 fprintf(stderr, "ERROR> Charmrun detected multiple crashes.\n");
05756 exit(1);
05757 }
05758
05759 _last_crash = &crashed;
05760 return;
05761 }
05762 #endif
05763 ChMessage_recv(req_client, &msg);
05764 if (msg.len != sizeof(ChSingleNodeinfo)) {
05765 fprintf(stderr, "Charmrun: Bad initnode data length. Aborting\n");
05766 fprintf(stderr, "Charmrun: possibly because: %s.\n", msg.data);
05767 }
05768 fprintf(stderr, "crashed_node %d reconnected fd %d \n",
05769 crashed.nodeno, req_client);
05770
05773 ChSingleNodeinfo *in = (ChSingleNodeinfo *) msg.data;
05774
05775 crashed.req_client = req_client;
05776
05777 nodeinfo_add(in, crashed);
05778
05779 nodeinfo_populate(crashed);
05780
05781 for (const nodetab_process & p : my_process_table)
05782 if (&p != &crashed)
05783 req_send_initnodetab(p);
05784
05785
05786 announce_crash(crashed);
05787 if (_last_crash != nullptr) {
05788 fprintf(stderr, "ERROR> Charmrun detected multiple crashes.\n");
05789 exit(1);
05790 }
05791 _last_crash = &crashed;
05792
05793
05794
05795
05796
05797 ChMessage_free(&msg);
05798 }
05799 }
05800
05806 static void announce_crash(const nodetab_process & crashed)
05807 {
05808 ChMessageHeader hdr;
05809 ChMessageInt_t crashNo = ChMessageInt_new(crashed.nodeno);
05810 ChMessageHeader_new("crashnode", sizeof(ChMessageInt_t), &hdr);
05811 for (const nodetab_process & p : my_process_table)
05812 {
05813 if (&p != &crashed)
05814 {
05815 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
05816 skt_sendN(p.req_client, (const char *) &crashNo,
05817 sizeof(ChMessageInt_t));
05818 }
05819 }
05820 }
05821
05822 #endif
05823
05824 #endif