From 4e2127d6576cea3f54c619d0bb20a22006567206 Mon Sep 17 00:00:00 2001 From: Sam Moore Date: Sat, 23 Feb 2013 16:31:49 +0800 Subject: [PATCH] Things seem to work... So I'll commit before I break everything! --- src/Makefile | 4 +- src/debug.c | 10 + src/log.c | 18 +- src/log.h | 2 + src/main.c | 3 + src/master.c | 312 ++++++++++++++--------- src/master.h | 6 +- src/network.c | 41 ++- src/network.h | 7 +- src/options.c | 18 +- src/options.h | 3 +- src/options.o | Bin 0 -> 31248 bytes src/slave.c | 210 +++++++++------ src/slave.h | 2 +- src/ssh.c | 688 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/ssh.h | 57 +++++ 16 files changed, 1157 insertions(+), 224 deletions(-) create mode 100644 src/debug.c create mode 100644 src/options.o create mode 100644 src/ssh.c create mode 100644 src/ssh.h diff --git a/src/Makefile b/src/Makefile index a9f6729..da2cf0d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,10 +1,10 @@ # Makefile for swarm CXX = gcc -LIBRARIES = -lm -lpthread #-lGL -lglut -lGLU -lpthread +LIBRARIES = /usr/local/lib/libssh2.a -lm -lpthread -lssl -lcrypto -lz #-lGL -lglut -lGLU -lpthread FLAGS = --std=c99 -D_POSIX_C_SOURCE=200112L -Wall -pedantic -g PREPROCESSOR_FLAGS = -LINK_OBJ = options.o log.o task.o network.o master.o daemon.o slave.o main.o +LINK_OBJ = options.o log.o task.o network.o ssh.o master.o daemon.o slave.o main.o BIN = swarm diff --git a/src/debug.c b/src/debug.c new file mode 100644 index 0000000..83682bd --- /dev/null +++ b/src/debug.c @@ -0,0 +1,10 @@ +#include + +void libssh2_debug_fuck(LIBSSH2_SESSION * session, int level, char * fmt, ...) +{ + va_list va; + va_start(va, fmt); + vfprintf(stderr, fmt, va); + va_end(va); + fprintf(stderr, "\n"); +} diff --git a/src/log.c b/src/log.c index 867ab64..3a8c55c 100644 --- a/src/log.c +++ b/src/log.c @@ -14,17 +14,17 @@ void log_print(int level, char * funct, char * fmt, ...) char severity[BUFSIZ]; switch (level) { - case 0: - sprintf(severity, "Error"); + case LOGERR: + sprintf(severity, "ERROR"); break; - case 1: - sprintf(severity, "Warning"); + case LOGWARN: + sprintf(severity, "WARNING"); break; - case 2: - sprintf(severity, "Notice"); + case LOGNOTE: + sprintf(severity, "NOTICE"); break; - case 3: - sprintf(severity, "Info"); + case LOGINFO: + sprintf(severity, "INFO"); break; default: sprintf(severity, "DEBUG"); @@ -32,7 +32,7 @@ void log_print(int level, char * funct, char * fmt, ...) } if (funct != NULL) - last_len = fprintf(stderr, "%s [%d] : %s in %s - ", options.program, getpid(), severity, funct); + last_len = fprintf(stderr, "%s [%d] : %s : %s - ", options.program, getpid(), severity, funct); else { for (int i = 0; i < last_len; ++i); diff --git a/src/log.h b/src/log.h index 24e5947..75e2780 100644 --- a/src/log.h +++ b/src/log.h @@ -7,6 +7,8 @@ #include +enum {LOGERR=0, LOGWARN=1, LOGNOTE=2, LOGINFO=3,LOGDEBUG=4}; + extern void log_print(int level, char * funct, char * fmt,...); extern void error(char * funct, char * fmt, ...); diff --git a/src/main.c b/src/main.c index a7666dc..baab2b7 100644 --- a/src/main.c +++ b/src/main.c @@ -39,7 +39,10 @@ int main(int argc, char ** argv) Master_main(&options); } else + { + fprintf(stderr, "%p %s", options.master_addr, options.master_addr); Slave_main(&options); + } exit(EXIT_SUCCESS); return 0; diff --git a/src/master.c b/src/master.c index e61f5fa..2734d07 100644 --- a/src/master.c +++ b/src/master.c @@ -12,7 +12,10 @@ #include #include #include "slave.h" +#include #include +#include +#include #include #include @@ -22,12 +25,15 @@ #include #include #include +#include "ssh.h" //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems // probably not that great to use threads anyway, since it eats one of your cores // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes +// the signal handler now breaks threads... don't use them + #ifdef THREAD_SENDING pthread_t sender_thread; pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER; @@ -69,12 +75,19 @@ void Master_main(Options * o) void Master_setup(Options * o) { + int err = libssh2_init(0); + if (err != 0) + { + error("Master_setup", "Initialising libssh2 - error code %d", err); + } + signal(SIGCHLD, sigchld_handler); master.o = o; master.barrier_number = -1; master.last_number = -1; master.nSlaves = o->nCPU; master.running = master.nSlaves; + master.nRemote = 0; master.remote_err = NULL; if (master.nSlaves == 0) error("Master_setup", "No CPUs to start slaves with!"); @@ -119,7 +132,6 @@ void Make_slave(int i) master.slave[i].in = sv[1]; master.slave[i].out = sv[1]; master.slave[i].running = true; - master.slave[i].ssh_pid = 0; } void Master_input(char c) @@ -174,9 +186,9 @@ void Master_input(char c) log_print(0, "Master_input", "No host specified for ABSORB directive"); char * np = strtok(NULL, " "); if (np != NULL) - Master_absorb(cmd, options.port, atoi(np)); + Master_absorb(cmd, atoi(np)); else - Master_absorb(cmd, options.port, 0); + Master_absorb(cmd, 0); } else if (strcmp(cmd, "OUTPUT") == 0) { @@ -367,8 +379,11 @@ void Master_output(int i, char c) #endif //THREAD_SENDING if (t == NULL) { - log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name); - error(NULL, "Please refrain from echoing three bell characters in a row."); + log_print(3, "Master_output", "Echo %c back to slave %d", c, i); + write(master.slave[i].in, &c, sizeof(char)); + //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name); + //error(NULL, "Please refrain from echoing three bell characters in a row."); + return; } @@ -395,8 +410,10 @@ void Master_output(int i, char c) { fprintf(stdout, "%d:\n", t->number); } + /* else if (t->output[t->outlen-1] == '\f') { + log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name); static int bufsiz = BUFSIZ; char * buffer = (char*)(calloc(bufsiz, sizeof(char))); @@ -420,6 +437,7 @@ void Master_output(int i, char c) master.slave[i].task_pool = t2; master.last_number = t2->number; } + */ else { fprintf(stdout, "%d: %s", t->number, t->output); @@ -522,9 +540,9 @@ Task * Master_tasker(int i) void Master_loop() { - if (sigsetjmp(env,true) != 0) + if (sigsetjmp(env,true) != 0) // completely necessary evil { - log_print(2, "Master_loop", "Restored from longjmp"); + //log_print(2, "Master_loop", "Restored from longjmp"); } fd_set readSet; //fd_set writeSet; @@ -539,6 +557,7 @@ void Master_loop() bool quit = false; bool input = true; + char buffer[BUFSIZ]; while (!quit) { @@ -569,6 +588,11 @@ void Master_loop() { if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet); } + + for (int i = 0; i < master.nRemote; ++i) + { + FD_SET(master.remote_err[i], &readSet); + } select(master.fd_max+1, &readSet, NULL, NULL, NULL); @@ -611,6 +635,16 @@ void Master_loop() } } + + for (int i = 0; i < master.nRemote; ++i) + { + if (FD_ISSET(master.remote_err[i], &readSet)) + { + int len = read(master.remote_err[i], buffer, sizeof(buffer)); + buffer[len] = '\0'; + fprintf(stderr, "%s", buffer); + } + } } @@ -648,7 +682,7 @@ void Master_send() } write(send_task.slave_fd, "\n", 1*sizeof(char)); master.commands_active++; - log_print(3, "Master_sender", "Sent task %d \"%s\" - %d tasks active", send_task.task->number, send_task.task->message, master.commands_active); + log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active); } #ifdef THREAD_SENDING @@ -702,6 +736,16 @@ void Master_cleanup() signal(SIGCHLD, SIG_IGN); // ignore child exits now + // tell all remote nodes to exit + for (int i = 0; i < master.nRemote; ++i) + { + FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL); + + fprintf(f, "exit\n"); + + fclose(f); + } + for (int i = 0; i < master.nSlaves; ++i) { @@ -717,13 +761,6 @@ void Master_cleanup() if (master.slave[i].pid <= 0) { Network_close(master.slave[i].in); - if (master.slave[i].ssh_pid > 0) - { - log_print(2, "Master_cleanup", "Killing ssh instance %d", master.slave[i].ssh_pid); - kill(master.slave[i].ssh_pid, 15); - if (kill(master.slave[i].ssh_pid, 0) == 0) - kill(master.slave[i].ssh_pid, 9); - } } else { @@ -740,83 +777,109 @@ void Master_cleanup() free(master.buffer); if (master.outfile != NULL) free(master.outfile); -} -void * start_server(void * args) -{ + libssh2_exit(); - *(int*)(args) = Network_server(*(int*)args); - log_print(2, "start_server", "started network server"); - return NULL; } -int Secure_connection(char * addr, int port); -void Master_absorb(char * addr, int port, int np) + + +void Master_absorb(char * addr, int np) { + int port = 0; + + char * user = strstr(addr, "@"); + if (user != NULL) + { + *(user-1) = '\0'; + char * t = user; + user = addr; + addr = t; + } + else + { + user = getpwuid(geteuid())->pw_name; + } + log_print(3, "Master_absorb", "User %s at address %s", user, addr); + + // ssh to the host on port 22 + ssh * s = ssh_new(user, addr, 22); + if (s == NULL) + { + log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr); + return; + } + + + // work out the name to give to the shells char * name = strtok(addr, ":"); name = strtok(NULL, ":"); if (name == NULL) + name = addr; // default is host:X + else + *(name-1) = '\0'; // otherwise use name:X + + // setup array of remote stderr file descriptors + if (master.nRemote++ == 0) { - name = addr; + master.remote_err = (int*)(calloc(master.nRemote, sizeof(int))); + master.remote_reserved = master.nRemote; } - else + else if (master.nRemote >= master.remote_reserved) { - *(name-1) = '\0'; + // resize dynamically + master.remote_reserved *= 2; + master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int))); } - //log_print(0, "name is %s\n", name); - int first_ssh = 0; + int sfd = -1; if (master.o->encrypt) - first_ssh = Secure_connection(addr, port); - - //pthread_t ss; - //int net_fd = port; - //pthread_create(&ss, NULL, start_server, (void*)(&net_fd)); - - char buffer[BUFSIZ]; - if (fork() == 0) { - // The alternative to this kind of terrible hack is OpenMPI's "opal" - // This involves >1000 lines of operating system independent ways to get an IP address from a local interface - // Which will then be completely useless if there is any sort of NAT involved - - //freopen("/dev/null", "r", stdin); - //freopen("/dev/null", "w", stdout); - //freopen("/dev/null", "w", stderr); + int sv[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + error("Master_absorb", "Couldn't create socket for remote swarm"); + sfd = sv[0]; - char * cmd = buffer+sprintf(buffer, "swarm -p "); - if (master.o->encrypt) - cmd = cmd+sprintf(cmd, "%d -e", port+1000); - else - cmd = cmd+sprintf(cmd, "%d -u", port); - - if (np > 0) - cmd = cmd+sprintf(cmd, " -n %d", np); - sprintf(cmd, " -m $(echo $SSH_CONNECTION | awk \'{print $1}\')"); - log_print(3, "Master_absorb", "Execing %s", buffer); - execlp("ssh", "ssh", "-f", addr, buffer, NULL); + ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket + ssh_thread_add(s); // add the ssh to the thread } - log_print(3, "Master_absorb", "Listening on port %d", port); - int net_fd = Network_server(port); - log_print(3, "Master_absorb", "Created network server on port %d", port); + else + { + sfd = Network_server_bind(0, &port); // dynamically bind to a port + ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port + ssh_destroy(s); // don't need the ssh anymore + sfd = Network_server_listen(sfd, NULL); // accept connections and pray + } + master.remote_err[master.nRemote-1] = sfd; + if (sfd > master.fd_max) + master.fd_max = sfd; + + char buffer[BUFSIZ]; + + int newSlaves = 0; + + int len = sprintf(buffer, "%s\n", name); + int w = 0; + while (w < len) + w += write(sfd, buffer+w, len-w); - char * s = buffer; - while (read(net_fd, s, sizeof(char)) != 0) + + len = 0; + do { - if (*s == '\n') - { - *s = '\0'; - break; - } - ++s; + len = read(sfd, buffer+len, sizeof(buffer)); + buffer[len] = '\0'; } + while (buffer[len-1] != '\n'); + buffer[len-1] = '\0'; + newSlaves = atoi(buffer); + + - int newSlaves = atoi(buffer); - log_print(3, "Master_absorb", "Absorbing %d slaves from %s\n", newSlaves, addr); if (newSlaves == 0) { error("Master_absorb", "No slaves to absorb from %s", addr); @@ -825,62 +888,70 @@ void Master_absorb(char * addr, int port, int np) master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave))); if (master.slave == NULL) { - error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", master.nSlaves, master.nSlaves + newSlaves, strerror(errno)); + error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", + master.nSlaves, master.nSlaves + newSlaves, strerror(errno)); } - - if (master.o->encrypt) - { - for (int i = 0; i < newSlaves-1; ++i) - master.slave[master.nSlaves+i].ssh_pid = Secure_connection(addr, port+i+1); - - } - master.slave[master.nSlaves+newSlaves-1].ssh_pid = first_ssh; + for (int i = 0; i < newSlaves; ++i) + { + int ii = master.nSlaves + i; + if (master.o->encrypt) + { + int sv[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + error("Master_absorb", "Couldn't create socket for remote swarm"); + + LIBSSH2_LISTENER * listener = NULL; + // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0) + while (listener == NULL) + { + port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds + listener = ssh_get_listener(s, &port); // port forward to the socket + } - - - char c = '\a'; - log_print(3, "Master_absorb", "Writing bell to slave"); - write(net_fd, &c, sizeof(char)); - + log_print(4,"Master_absorb", "Chose port %d", port); + int len = sprintf(buffer, "%d\n", port); + + int w = 0; + while (w < len) + { + w += write(sfd, buffer+w, len-w); + } + usleep(200000); // give ssh_thread a chance to actually send the data + log_print(4, "Master_absorb", "Creating tunnel..."); + ssh_add_tunnel(s, listener, sv[1]); + master.slave[ii].in = sv[0]; + master.slave[ii].out = sv[0]; - for (int i = 0; i < newSlaves; ++i) - { - log_print(3, NULL, "Absorbing slave %d...", i); - int ii = master.nSlaves + i; - if (i == newSlaves-1) - { - master.slave[ii].out = net_fd; + log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]); } else { - log_print(3, "Master_absorb", "Creating server %d at time %d", i, time(NULL)); - write(net_fd, &c, sizeof(char)); - master.slave[ii].out = Network_server(port + i + 1); + int tmp = Network_server_bind(0, &port); // bind to a port + + master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection + master.slave[ii].out = master.slave[ii].in; } - master.slave[ii].in = master.slave[ii].out; + + + if (master.slave[ii].out > master.fd_max) master.fd_max = master.slave[ii].out; + char buffer[BUFSIZ]; sprintf(buffer, "%s:%d", name, i); master.slave[ii].name = strdup(buffer); master.slave[ii].addr = strdup(addr); master.slave[ii].running = true; master.slave[ii].pid = -1; master.slave[ii].task = NULL; - master.slave[ii].task_pool = NULL; - - FILE * f = fdopen(master.slave[ii].in, "w"); setbuf(f, NULL); - fprintf(f, "name=%s\n", master.slave[ii].name); - - log_print(3, NULL, "Done absorbing slave %d...", i); - + master.slave[ii].task_pool = NULL; } @@ -892,21 +963,6 @@ void Master_absorb(char * addr, int port, int np) } -int Secure_connection(char * addr, int port) -{ - int result = fork(); - if (result == 0) - { - char buffer[BUFSIZ]; - sprintf(buffer, "%d:localhost:%d", port+1000, port); - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); - freopen("/dev/null", "w", stderr); - execl("/usr/bin/ssh", "/usr/bin/ssh", "-N", "-R", buffer, addr, NULL); - } - return result; -} - void sigchld_handler(int signal) { @@ -918,13 +974,6 @@ void sigchld_handler(int signal) if (p == -1) error("sigchld_handler", "waitpid : %s", strerror(errno)); - if (WIFSIGNALED(s)) - { - int sig = WTERMSIG(s); - log_print(2, "sigchld_handler", "A child [%d] was terminated with signal %d; terminating self with same signal", p, sig); - kill(getpid(), sig); - return; - } int i = 0; for (i = 0; i < master.nSlaves; ++i) @@ -937,13 +986,28 @@ void sigchld_handler(int signal) return; } - log_print(1, "sigchld_handler", "Slave %d [%d] exited with code %d; restarting it",i, p, s); + fprintf(stderr, "Unexpected exit of slave %s", master.slave[i].name); + if (WIFSIGNALED(s)) + { + int sig = WTERMSIG(s); + fprintf(stderr, " due to %s", strsignal(sig)); + if (sig == SIGKILL) + { + printf(" - committing suicide\n"); + kill(getpid(), sig); + } + } + else + { + fprintf(stderr, " return code %d.",s); + } + fprintf(stderr, " Starting replacement.\n"); Make_slave(i); if (master.o->end != NULL) { - log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i); + //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i); char buffer[BUFSIZ]; sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end); if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0) diff --git a/src/master.h b/src/master.h index 1a2d5e9..3595e23 100644 --- a/src/master.h +++ b/src/master.h @@ -13,7 +13,7 @@ extern void * Master_sender(void * args); extern void Master_setup(Options * o); extern void Master_cleanup(); extern void Master_send(); -extern void Master_absorb(char * addr, int port, int np); +extern void Master_absorb(char * addr, int np); extern void Make_slave(int i); extern void sigchld_handler(int signal); @@ -43,6 +43,10 @@ typedef struct int commands_active; + int * remote_err; // sockets used as stderr for remote shells + int nRemote; // number of remote shells + int remote_reserved; // number of sockets reserved + Options * o; } Master; diff --git a/src/network.c b/src/network.c index 5354ac3..bcd06d4 100644 --- a/src/network.c +++ b/src/network.c @@ -6,9 +6,21 @@ #define h_addr h_addr_list[0] -int Network_server(int port) {return Network_server_r(NULL, port);} -int Network_server_r(char * addr, int port) + + + +int Network_get_port(int sfd) +{ + static struct sockaddr_in sin; + static socklen_t len = sizeof(struct sockaddr_in); + + if (getsockname(sfd, (struct sockaddr *)&sin, &len) != 0) + error("Network_port", "getsockname : %s", strerror(errno)); + return ntohs(sin.sin_port); +} + +int Network_server_bind(int port, int * bound) { int sfd = socket(PF_INET, SOCK_STREAM, 0); if (sfd < 0) @@ -26,6 +38,15 @@ int Network_server_r(char * addr, int port) { error("Network_server", "Binding socket on port %d : %s", port, strerror(errno)); } + + if (bound != NULL) + *bound = Network_get_port(sfd); + return sfd; +} + +int Network_server_listen(int sfd, char * addr) +{ + int port = Network_get_port(sfd); if (listen(sfd, 1) < 0) { error("Network_server", "Listening on port %d : %s", port, strerror(errno)); @@ -52,12 +73,18 @@ int Network_server_r(char * addr, int port) assert(sfd >= 0); return sfd; - +} + +int Network_server(char * addr, int port) +{ + return Network_server_listen(Network_server_bind(port, &port), addr); } int Network_client(const char * addr, int port, int timeout) { int sfd = socket(PF_INET, SOCK_STREAM, 0); + + //log_print(2, "Network_client", "Created socket"); long arg = fcntl(sfd, F_GETFL, NULL); arg |= O_NONBLOCK; fcntl(sfd, F_SETFL, arg); @@ -75,6 +102,7 @@ int Network_client(const char * addr, int port, int timeout) bcopy ( hp->h_addr, &(server.sin_addr.s_addr), hp->h_length); server.sin_port = htons(port); + int res = connect(sfd, (struct sockaddr *) &server, sizeof(server)); @@ -91,9 +119,9 @@ int Network_client(const char * addr, int port, int timeout) struct timeval * tp; tp = (timeout < 0) ? NULL : &tv; - + int err = select(sfd+1, NULL, &writeSet, NULL, tp); - + if (err == 0) { error("Network_client", "Timed out trying to connect to %s:%d after %d seconds", addr, port, timeout); @@ -126,7 +154,8 @@ int Network_client(const char * addr, int port, int timeout) arg &= (~O_NONBLOCK); fcntl(sfd, F_SETFL, arg); - + + return sfd; } diff --git a/src/network.h b/src/network.h index b00e595..07c9459 100644 --- a/src/network.h +++ b/src/network.h @@ -15,10 +15,13 @@ #include #include -extern int Network_server(int port); -extern int Network_server_r(char * addr, int port); +extern int Network_get_port(int socket); // get port used by socket +extern int Network_server(char * addr, int port); extern int Network_client(const char * addr, int port, int timeout); +extern int Network_server_bind(int port, int * bound); +extern int Network_server_listen(int sfd, char * addr); + extern void Network_close(int sfd); #endif //_NETWORK_H diff --git a/src/options.c b/src/options.c index 6ef6119..ddd2ddc 100644 --- a/src/options.c +++ b/src/options.c @@ -35,6 +35,8 @@ void close_out() fclose(stdout); } +char name[BUFSIZ]; + void Initialise(int argc, char ** argv, Options * o) { srand(time(NULL)); @@ -44,9 +46,7 @@ void Initialise(int argc, char ** argv, Options * o) o->logfile = NULL; o->outfile = NULL; o->verbosity = 2; - o->port = 4000 + rand() % 1000; - o->slavefile = "slaves.swarm"; - o->dummy_shell = false; + o->port = 0; o->append = NULL; o->prepend = NULL; o->end = "\a\a\a"; @@ -54,7 +54,9 @@ void Initialise(int argc, char ** argv, Options * o) o->daemon = false; o->encrypt = true; o->interactive = true; - + + gethostname(name, sizeof(name)); + o->name = strdup(name); o->master_pid = getpid(); @@ -140,11 +142,17 @@ void ParseArguments(int argc, char ** argv, Options * o) error("ParseArguments", "No argument following %s switch", argv[i]); o->nCPU = atoi(argv[++i]); } - else if (argv[i][1] == 'm') + else if (argv[i][1] == 'r') { if (i >= argc-1) error("ParseArguments", "No argument following %s switch", argv[i]); o->master_addr = argv[++i]; + char * p = strstr(o->master_addr, ":"); + if (p != NULL) + { + *(p-1) = '\0'; + o->port = atoi(p); + } } else if (argv[i][1] == 'c') { diff --git a/src/options.h b/src/options.h index de365eb..24f35eb 100644 --- a/src/options.h +++ b/src/options.h @@ -14,14 +14,13 @@ typedef struct { char * program; + char * name; char * shell; char * master_addr; char * logfile; char * outfile; int verbosity; int port; - char * slavefile; - bool dummy_shell; char * prepend; char * append; char * end; diff --git a/src/options.o b/src/options.o new file mode 100644 index 0000000000000000000000000000000000000000..88fbc135e94589c536e11a32b04ebd734b15761b GIT binary patch literal 31248 zcmcJ23w+eovHx$gNm%8PV4|WREH4Qv32*U9mSoAUK}wws34S`e-N|F zp33F3CD|*I@$?m81y6dfQE_na+9N)vWu>2d@%Nv9?nyt|ugEQ1fwX!03MW@6HSS|G z`m=4YR!`psCEZsFS6IQ5&Rp%S9;FQ6g1yzHxhymZ=4DE;70vFgo?c*bvyC#(-slGS%_)Zf+y(3-Z;T>PvM?d#W$DWzdjqhvkI2s6f@tUU~YIKRoFL4pqW@ zdO?dW>AL|v(YJw|_;S`k$}N4>9?ypU)Lh+PEuAmmBE@!=Lv z`dQB>`#ik|ZHJ4vP??Zk;A15vk5ymVd(^hRzhwPwPJxWIO$24Ab_=_SI+!(~0>>zB@c+uBivPJu6^2QJHQ!e~z4ur_$v1COZ*uWlgrx5HuP!s^P+BR^uhe)SN4lx9;F^pqC>99rS9tUmWa&+VRF2kaiZ`(aO?g=0FcWWy}5O%{22`wO5L zX4HoA!HaEJySwL-KKFK*ndi9=uA4GI$>5Ws7fc!`sw1#@b9%w-Lyrs@)F#zv?Eg=r z!9y2*L8FP14Yz)toWz?brJ z<%U@&*IvYQRPgj6HvQCKb0Ymamd;sI4n@Bpe3Lza|Gjet>xCM~w6FBgr-k{4qqAhg zzJFH;LA1F051$C__TxM}x8&RXo?dr9Y=J$)8HCFNhix}yxYSs@%(@igXS#wYx?~j5 z^j|-9V2^bCIu8vSSZKl0p`tJTZ+33Qp*vwMmR9Q4BC6jIf0x@1t@;AFo{XGzsBxg2 z103?Or>~Uu$jrepyOQt3)l{vw11#aA%g>KflNk>lAN`=@w%sK+_orX2*p+^^WJf>j z^&7^)fle*<9H+(!#l0UU{(hMFkAez5pESP%ChZ?&b^u1xq0jw}GM8D{<|1UE)qBj| za|RUWY-pg`*qO3DJ6yxH^rTDghN_gi1x)eWds5BUbA8?5QzlPJVIk zd(Mz{n44y<_0-&My@-6Fn{s7&L6K_?!=#aoMj2IE9I|Bbj%y$`vRxMJzo-m@48eP; z?GQQ#8C-wiOtFv(LV$@iP{tpHta*IRtcZs@KnsNRzl?67a~@U8nGJmn2fgWI;F4_Q zWaq?9yf~ic-d=M34WHm~`{|Ztd>ht##OX<2eGqgW^z3yX_#9I9xZiLR)MK3Tfqm{N3=+$#Os) zk*;{GvO6k~XiSJK4<{1QNWxkkPIg#Bh77S1*5o$O8|;WABF@#}q%-U!ZO&zr+b(k^ zdSbC?Y=skx&vGvBNjambDM{ywcr4;z~o;9=iR zvZ|zT#t#8eD)twRw50%4+KW!G&BxjknLXKdqhcLYI*Qp+Vozaq6tSl=>m+tGvlYaS zVRkyPr!hN^*s;u36YFHwL+p5FeZ)>+wuRV9wx5Fi1rx6z_H^4;#dZ)|!9D6G_AF*o z#8%o?s}8OqcBX9^u=a`9kh6>~R>y-RW%wJ@? zLGiaUzlixw%-1u&nfV6hA7;MMHci#v%DhimZ=cxDyx;bkYX51Qh37P@=rJ`JM-^>H zmkd?VljI%dc93%%RJNN&yVNRnTnQD59T*nFFj}x*!!afi#rRO@I~pwm^nGAPplnG; z@rVLAJ1d>Vr4%yEjpP8LvJ;3aC9aIPQN)!K=Ope#;wp$ciR_$C+{whvCTabt)J5_cMLEyRt*?i5VCl(=!kT}GUfxXXzfPh5z&3B+9i zlMS51)3A8b2f&Oh9CFc@gUgbkoK1}7q zRX#%HB`Pmf`AC(Ypz<=6m#h3lm7k>YlT|)S<)^6pRF#ib`52X-rt+~WAE$Ds%EzmG zg32eVe3Hs1tNe78Pf>Y=%BQOQ43(d$^0QPvP34s;uTuGRmCsQ5OqI`4`D~TXQTbez zpRMwFDnCc%=c@cXm7j0q7pUtCRX$(k{WJ`_i0mfv6p^QiJVRs;k-bEoCGs4R=ZU;P z6(nP*Nq>sq; zL~bDRO(HiExrxX|A~zHH7LjigxrNBBL~bK;JCW}Y`7V*~5xIlN_lev|oB72DJC32X^5h8yg@@FC+5IIWZ zFGM~h@)41biTstw--vudQ+pMPNP*8_ zTUV{wDsZ1F+_-(`I_9ycaO1-}*PRr%Y!wBt4;60QymQ^jQpIy1H>%yTxpC_+-1zLy zb*D&?QxULvsDxR zV>F&8J|AasSJw{O)eGV5OkG=xhS)bgz&%}yJ+*J#q&m75J8Iu}`@l|&idwd-ux7!= zmH|%qG1MB>ZP{)`8|@pnuCoS+yo4g)r_Jc6f{oP!C61y*r)AqlN*-P}&`qddKPm8x z!kcs3pUI?4^LX@4D#o{)))Ff^&E_pbeE4|FTUG@vj5r-G;6z$x zQ`%LfQmEu0`>@i;LZj@23TS!EaLX#IwB`?^ZeYKvpPEQ^M{=ek>NA}tdL21!WCGn>BbaFV6asJtW;l@WUy7G`dYo^2OR5iafax!k(qj{ z9s0UFiZIOOU>90+e)RZlWe)fru0wW77PYR$a4p}DiCbw*=7Yo@J;^;FHSoLd!* zwRZNjMJy%B%s|QI(Nr==#IpDiP-QEcl1#O=McOMn@YxPNyiCF+SOsu24wtZCf)V;s z*X+QWt?^iUEZh~L3b5KiHA?mp+oDY~7TtB%T{9NF3;)e>y#B7E%yGfHkluI$GRM*f zAKdb;qy9a|^HxEjt@uO83ku*ysG!i^f?4q<$0d#j?=F~F_AXp(d1;H|LdW=mGs_Dm zmpd+a{$su_d%-c?ae>nC4r)l@W%f#GMT~X!6ih28;o|oky`bCC>zMCY=D2XXLLGM& zJKl0^daz(hS;52+m(!JfRl!7?eOQZQKjMxD9lcu~w6Anj+DElGehpChy9HBD>;-Mp z%AP2wEVD0t);_gGRV}ys9S_*2Q_XTm542HGT!}3?E_$|j{1!*m7RO!o8b`|BZ|NFO ztb;0hfxXMVb=`IL=hW3>aP^YaQZT*j?t=5n!2rh-+a1%lJD%H7P<4`He(#qyJ@4r4 zDLA97;H+}&)Ekcd;Nkr*ZtpF)@FeWa+%o&e|8b{%wNrKTqGJ2)wry*k0E-< zAMiFcs_Vx3P-BCu1w$v2NGM6D5yIyeC>mJWSmW_GHF_^`*Mb&>87=y~!KEPPao0B2 zdmHOWBUn-64g{L~flyOpJvC$033(emZm6h)8{7>|{-xS*ZB)?P;BIOT29(GGSHP_< zd|q#;Dc}v&G;v8oQ)5#QY^w>?)Hl^EQbOENz_rjFTG-rJ69k>02OO|E5lux@$9iJP z=!zIT62|A8*h_EI;t)DP^*9LrY{;-PLkpYy!BDMxamI!KwB;uA(d*C^?Hzay+iHc{ zJHsoIRwUM%SPKudQRm{hA#c5J*33}T!iBSDDoESuYx28lp$7rCABKf2@IZqNUUFP> zKyhpwj8@Ry5>&Q!g%c|yiKMBm)*T4?n`?qi{*X85_Pe?{fv;e#s%{~R(nB;Me z_(kreflQ|YOB?E&ntT~P=y$=O)HlIO(he_eVsT^J!;xhjHNIvXP}Lk&fqrC^C>Lku z8@d>)nn4$;t@FDmi2Xr`3RN{3{Yr!v$eAJEfCG})70D; z#AO}dFDY3U#DkvlN+pE08vY!OG~LZzCg5>%3{{$tu@B< z%Z$t{?n*Y2)v(hHQB{=z+HI=)vj@~MmwF7nst$NlU6l-XRn^8@)qB%qm5&A;-YTxB zQtxxCk_p&1aCp6S7dN|`v$HkfWKo>m}@yZ7|WT zRguKgdC+QWAf@l3 zrUh_bY7F|DXbY&RUla=Rs+QBG%yOVMU0O8wLjh(WT5!%~2EDl+*C@Rjz)?XRL6J|u zS0S+d@s3RP<8Ij*UqN#nI;xg!t1Fy@*XyBhTN{m7z~^l=wv1eFvGOY|MM))XYt`c|(gFscwwBCIeLIBk@&^V|?(WhJ}P6}|ihEsLs=*TAWE|~G6 z-yN!L&hBfj+QnWt5HE#e&%!3~1MJm$qUD_L_;ipt3h}KQY!3;1sHS2ycgI#31*_iJ zd6%SNiOaKe@R(5y_lY@K9qe@yZc; zJIMHfwh}lX)GYEf!C)wLe5Up{^Sy!E^x-rC=Sf&{!4~Q$Y;}uW^~PezeZh?w1|OCN z)l3I`kOY$gkM!Zrq^dedr_GxVDh|kxRj{WpF)h43CN;`1v*wZ}f34bTu zr>i==YU)`oxD@Z-22r49p6Zx+c*1z%LvBMe+!f{Qc{x*23H4j4=wn9Dse z_>2w@6)+j_5l!YOhLc1s>~PZ@bY*-_ooT6Q&MebbT}{oo&Wh&cJ+V|zRXu!wmx$9U zFl|PU0_RNSRY5;1w|&KL+18ZWMh_d}xCtIM;)_N7a#1~Zca13hYC+BL;zc09W#{t) z#YsqYc_>?cDVMAFY`WZ+Ex+ntl;6SSPvHv#UEjW=5H&af+Ro+QMT2ztrM-6hT8C-` zWe&hUOmWP#Ou?7P2*fIrYrEjI1vHf)+M%x!k#EVpM5c zWO>huP&C#a2N`u8UcNjLS*1DH4WI_<;nGAnwjzRBvKTx!j%ay&+onFgh3_t{%2+%V zsf0Cr8hot^y9!l`bcEXBPBMbV|Gy_EsDRwx6Ud_%$xo*Sx0mNt0#032(~Ggl(DW9AIRhuzX$%NgZ?&MgQR#>Grk%@coh0 zi8VeMi@9YzDhdT_VKWlLdJi};x2#RVF;cbzWF&_5>L+4uS<{JRaw*#iC4=?fxt6Nv z`q+Pb$0+-cYlzf|HLv3Kzws~Z$L}%+Yv0H1Yd>TAIFz#euLFkdYdhgmrD6uyG$>dS8YD-#LhCP|4^jl}4X(vLs3mCMU>-)CJ0}{k4_M7fz@F)SZUsAi z)td_fXIC?B01UWT)L6uB4ZdjGt3F5qjttlzXo{4z*in>EI> zye^x>g;k8tW?r35;2OhC%W{z-)p_tbz|kK4PG9S}IFEb_;3KUMKX(FE$w>s?>!Tk1 z?p~dB0PBE%gW1!S2T$g~ugrsg4RCB%zjIgTBf!3yNB$O;*WUoBGZ$d@=8?ZY501ao z8mwPC^We|q!C%UQzm^AoGY|fD9{fli{Np@$5rpPo{W?AmJ~9t}N*;V%9{lt?_?da| z>3Q%udGLjK@TNR?Fb}>g557DPZoCf?k0*%L4=r+@J>s)de!tzWo<>^PXOjFPPd)Q7 zo(iX;aFR~Ksl3ffu1(@c6V?j&`A;{vYEMLz2)s$dM-^eXmx!iNf9vWtd`KGVPT>8te)ty(sdGNukFE^i zZ<(M%Jc^G5TRT9Pl5OV~NST*rU;sQYoM|b|A$)oqi{mE%>L*uNmo14WR$5&v+oN!! z(i(=JbG0hH-MmY~r*jb3nDC6HuQiVp_gIYY&+r$Bko0<|<#BJtq`uSyD&@zi(yadR z%0y2*#5*R9HxO%`&p5U_8vbcLNygF7=0~J!4EZsJ{MC8nuQTLNGvseJ zaJ*&E?QS!096ODlfd)d-Po3k-QvPpyHQdM-Bb$ws@24gKoZD6Ic-LmqEcw4GlwaC2VVXz0Os*8TmF zA#d9GQv)~kY!iBPfA<;krk-CJ@_Nm|F1%&PoAwtLVkynjDL{3RgF<9YP#FmPN;wf;Rq z&pe*zF9=-g|Ba!?wDb2u{ulwnE6Z&nU(XY`4eujZR z%D5hP2hZDW0J=*@w0@wO~ zYUnZT|D%DQ1@`KAIBtkCU+s5hyGt0C{kl}(y4`j|kJ;`GLVh0m`DTG@J$K~M^CKaz z*Pkr{r{2IH0x|SBjsCtaWw@4f*N7YxzkAKEuGL8hZ2|i+W}l@~0c}^aJF9@wwK}gQ2bc-)rbG$MHQO?_j?j z7P!{)*F1W>_`n4cRyX7ARmQb{wEouxuJs=<^q6*bp#n%+PdD3{VjNj>-@Mkq&2jHD za6EhJak+^XfonaR^60tGz~?}H?fBa7 zzudsh`-{;+Pk{SdWyqWJ@jL@J zkFejaHuOv~#%sNSml^o=LXXzJQQ%tt9flrL{||&bt{s?uY{<_AUdPX)LLOrp)3!YF zyA0gi2cHvq^m_QRz_mTUHuRYG+lEv77+0qL;|$!~XQvssxu4E8@EM>>_jeKFI-a|E zy!?i|IW8dspJwQ}Qs}vv^;~Djn|f|Ba8u7+25zn=1tYS4INNB~Y2fn=oR52IALOHo zQowFjI8Oa}TwkwI=m*6A$mef;&!zEuSQ$P8!DNGfyfiCbkEh1<`{Svql=fxRr4x=R zCau2#7e7e)d5*?kWWRW+I3xc!uV4Ckj+XcG`D2NYe}((2pSNrID@#c*BIM7;jzH=p zl4*B2%O?fi!}hEZ_zafs75Lli_}c}(g9lLG_h>tlEWcUE|Be0gsK9Sw`=1v0L>|Xq z3S6H9-WK?+JT89`cqRK;1p<_5J7@7mRl@CSd@A!}1^!p=*EE6qxjp@SOzVkqyNiT; znB(>mf#1sWQNPdAdOl-0{X9+Mm$5xx6MEic{kIBy4a?sr@Za&c{7m2vu{``q9hG#u zAMm;MHG$*ahUp!F-_QC#7Px-yP{@;0>%W=xmkazmj878yCdOwA{D+L!3VaLWL4m)( z@!TfxBkYGBfmcv>t*Zn+isSa{0^i5+d8@!rY$3N$ZN!$Mn^Lk&>_|vR^ zlaT)duUA_H{xsv;1wM@V=LO!ycJ3GW9UQOk2|UW<^|8Q<*#AX5|Fr%0@^~FDa2M-2 zP2g?ppNRr5=D5Yb+-mx*lEuzn=A6E$|C@zV!k%I)MIL z1-f1RT;NP0{|4)sA@JiluFe*C6OZo&0>6j-QzP&YuMb{!(lfb{v>tVCNKj!#c zCh$g<4-5QJUdL7lJjV8P3H%Z6S5n~bv;MULpTKdjPT;j1KN|#o6Z@0?aBnt#YT2HR zLVh%l%WVR$VE+38f1kqAx<}xDWJu!+tv=@D~~XNZ`YHUjLWC``LaQ@56dre!=S@ z{VCtGKq|*fcd)7Zm;7@S8ohk4(#%BurAzoL{5%_#+)~Xix z-K?ih;D2I!8U_9p?r*cePvG%2-&dk5@$ar-Y8CPi@wj|R;Pq_Jmj!+%$LAV>7xFw> zCvg4#;Hv^(#{M_olcI@#P*z&jw}t#qcpSec@N3!r{}lK?d48GiPf`DR*7LBC{}cD? zae*IXeuu!r?9XQeem2|lqQEa=`Ckfr8T;)w0zaMS-5&(r#^bB)((^)nWDfoNK*%p+ zUcb-P@}IJw_4{0npTqOIko~0bTK4Bqfj9B|Jwf0b*`KEf{3TQf$tm!~?4KzDck#NQ zpSNjy?qht8kiU`V-T4Cl9s8k1;QBepMFM}E=iS8u$G=X9iT*ftcKx}K`x_GSuaUUb zF7VIU&Mtvp!}HF3e~nEpVYw@X{3sr;>jWNP|D*+e6YJOKP3_NDSkG-jKFINQr@)`& z{@y3>3z&aE;CFJIKO*pZ+0WYqej|_9E`i^}@%F61b$q`f@Vl6QUEq&%9KI#+-E8MO z0$;}Kt@+*`$K@sV!-qnC9mk2@f3%<1vb_H3fyPhg@fyPOOye&zKSJR8yndp<_51ZP z0{=SqYofpx@wlHM@U?8u41u4^ew!!oS{|?Y0$;)WLV=HBe>DhP?<+xp_wamKCh!k= zyD=E90{3#e*9-g`9Jk*R_(-0Q-xav} z7y;tzZh=3?@|y)-&g1ov#5rys6Zklehkk(%<@nqq@XOi$7a7NC%dUYd%X&@V99GtE z4Lv;MO8#vF$050m4qNE@8tDxu7RWebJ+e1^5}63T>oCB!O(;H zuVMX*4IK63Z=W%>3%rBlYQ4}ul?VTNf%h~1LxJn(Li#;`_K%j|D%$-c?>o;6J^K0h zF9g1x(v87ejejL6SzKCy)5v( z*f~hA3tYcn`-{L|V88!e;P3PLGlr?Q^8=QjB=CE={xrt*cO5_Zx6 literal 0 HcmV?d00001 diff --git a/src/slave.c b/src/slave.c index 3327759..65e11c8 100644 --- a/src/slave.c +++ b/src/slave.c @@ -1,7 +1,7 @@ -#define _XOPEN_SOURCE +#define _XOPEN_SOURCE 700 #define _GNU_SOURCE -//#define _SIMPLE_SLAVE + #include "slave.h" #include @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -26,117 +27,182 @@ Slave * slave; +char name[BUFSIZ]; + +void Slave_shell(int i, char * shell); +void Slave_cleanup(); -int running; void Slave_main(Options * o) { + - if (fork() != 0) - exit(EXIT_SUCCESS); + setbuf(stdin, NULL); setbuf(stdout, NULL); setbuf(stderr, NULL); + + dup2(fileno(stdout), fileno(stderr)); // yes, this works, apparently + slave = (Slave*)(calloc(o->nCPU, sizeof(slave))); + atexit(Slave_cleanup); - o->verbosity = 100; - freopen(SLAVE_LOGFILE, "w", stderr); - setbuf(stderr, NULL); - slave = (Slave*)(calloc(o->nCPU, sizeof(Slave))); - int net_fd = -1; - if (o->encrypt) - net_fd = Network_client("localhost", o->port,100); + if (strcmp(o->master_addr, "-") != 0) + { + if (fork() != 0) + exit(EXIT_SUCCESS); + + //log_print(2, "Slave_main", "Using unsecured networking; connect to %s:%d", o->master_addr, o->port); + //log_print(2, "Slave_main", "Connecting to %s:%d", o->master_addr, o->port); + int net_fd = Network_client(o->master_addr, o->port, 100); + dup2(net_fd, fileno(stdin)); + dup2(net_fd, fileno(stdout)); + dup2(net_fd, fileno(stderr)); + + } else - net_fd = Network_client(o->master_addr, o->port,100); + { + o->master_addr = "localhost"; + //log_print(2, "Slave_main", "Using port forwarding; connect to %s", o->master_addr); + } - FILE * f = fdopen(net_fd, "w"); setbuf(f, NULL); - fprintf(f, "%d\n", o->nCPU); + char buffer[BUFSIZ]; - log_print(2, "Slave_main", "Waiting on bell from master"); - char c; - if (read(net_fd, &c, sizeof(char)) == 0 || c != '\a') - error("Slave_main", "Didn't get bell from master"); - + fgets(name, sizeof(name), stdin); + name[strlen(name)-1] = '\0'; + //log_print(2, "Slave_main", "Got name %s", name); + fprintf(stdout, "%d\n", o->nCPU); + //log_print(2, "Slave_main", "Wrote nCPU %d", o->nCPU); - log_print(2, "Slave_main", "Got bell from master"); - running = o->nCPU; + int port = 0; for (int i = 0; i < o->nCPU; ++i) { - int new_fd = net_fd; - if (i != o->nCPU-1) - { + //log_print(2, "Slave_main", "Waiting for port number..."); + fgets(buffer, sizeof(buffer), stdin); + + buffer[strlen(buffer)-1] = '\0'; + sscanf(buffer, "%d", &port); + //log_print(2, "Slave_main", "Port number %d", port); + slave[i].in = Network_client(o->master_addr, port,20); + //log_print(2, "Slave_main", "Connected to %s:%d\n", o->master_addr, port); + slave[i].out = slave[i].in; + + Slave_shell(i, o->shell); + } + - - if (read(net_fd, &c, sizeof(char)) == 0 || c != '\a') - error("Slave_main", "Didn't get bell from master authorising connection of slave %d", i); - sleep(1); + Slave_loop(o); - log_print(3, "Slave_main", "Connecting slave %d to port %d at time %d", i, o->port+i+1, time(NULL)); - if (o->encrypt) - new_fd = Network_client("localhost", o->port+i+1, 100); - else - new_fd = Network_client(o->master_addr, o->port+i+1, 100); + exit(EXIT_SUCCESS); +} - - - } +void Slave_shell(int i, char * shell) +{ + slave[i].pid = fork(); - slave[i].in = new_fd; slave[i].out = new_fd; - slave[i].pid = fork(); - if (slave[i].pid == 0) - { - dup2(slave[i].in, fileno(stdin)); - dup2(slave[i].out, fileno(stdout)); - execlp(o->shell, o->shell, NULL); - } + + if (slave[i].pid == 0) + { + dup2(slave[i].in, fileno(stdin)); + dup2(slave[i].out, fileno(stdout)); + //dup2(error_socket[1], fileno(stderr)); + + execlp(shell, shell, NULL); } - - Slave_loop(o); - free(slave); - exit(EXIT_SUCCESS); + // if the input is a network socket, this message gets sent to the master + // which will then echo it back to the socket and hence the shell + FILE * f = fdopen(slave[i].in, "w"); setbuf(f, NULL); + fprintf(f, "name=\"%s:%d\"\n", name,i); } void Slave_loop(Options * o) { - + fd_set readSet; + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; + int p = -1; int s = 0; - - while (running > 0) + char buffer[BUFSIZ]; + while (true) { + FD_ZERO(&readSet); + FD_SET(fileno(stdin), &readSet); p = waitpid(-1, &s, 0); if (p == -1) { - log_print(0, "Slave_loop", "waitpid : %s", strerror(errno)); + //log_print(0, "Slave_loop", "waitpid : %s", strerror(errno)); continue; } - if (s != SHELL_EXIT_CODE) - { - // there was an error - int i = 0; - for (i = 0; i < o->nCPU; ++i) + //log_print(3, "Slave_loop", "Detected child %d exiting...", p); + + // check for an exit command from the master + select(fileno(stdin) + 1, &readSet, NULL, NULL, &tv); + + if (FD_ISSET(fileno(stdin), &readSet)) + { + fgets(buffer, sizeof(buffer), stdin); + if (strcmp(buffer, "exit\n") == 0) { - if (slave[i].pid == p) break; + log_print(2, "Slave_loop", "Received notification of exit.\n"); + exit(EXIT_SUCCESS); } - if (i >= o->nCPU) - error("Slave_loop", "No child matches pid %d", p); + } + + int i = 0; + for (i = 0; i < o->nCPU; ++i) + { + if (slave[i].pid == p) break; + } + if (i >= o->nCPU) + error("Slave_loop", "No child matches pid %d", p); + - log_print(0, "Slave_loop", "Child [%d] exits with status %d; restarting", p, s); - slave[i].pid = fork(); - if (slave[i].pid == 0) + + fprintf(stderr,"Unexpected exit of slave %s:%d", name, i); + if (WIFSIGNALED(s)) + { + int sig = WTERMSIG(s); + fprintf(stderr," due to %s", strsignal(sig)); + if (sig == SIGKILL) { - dup2(slave[i].in, fileno(stdin)); - dup2(slave[i].out, fileno(stdout)); - execlp(o->shell, o->shell, NULL); + fprintf(stderr," - %s committing suicide\n", name); + kill(getpid(), sig); } - - char buffer[] = "\f\a\a\a"; - if (write(slave[i].in, buffer, strlen(buffer)) <= 0) - log_print(0, "Slave_loop", "Slave %d input closed", i); } else - --running; + { + fprintf(stderr," return code %d.", s); + } + + + // cancel any tasks at the master for this slave + static int len = -1; + if (len < 0) + len = strlen(o->end); + write(slave[i].out, o->end, len); + + Slave_shell(i, o->shell); + + } } + +void Slave_cleanup() +{ + for (int i = 0; i < options.nCPU; ++i) + { + kill(slave[i].pid, SIGTERM); + } + sleep(1); + for (int i = 0; i < options.nCPU; ++i) + { + kill(slave[i].pid, SIGKILL); + } + free(slave); +} + + diff --git a/src/slave.h b/src/slave.h index 40b16ad..bf4db8f 100644 --- a/src/slave.h +++ b/src/slave.h @@ -17,7 +17,7 @@ typedef struct Task * task_pool; // tasks specific to the slave - int ssh_pid; + bool running; } Slave; diff --git a/src/ssh.c b/src/ssh.c new file mode 100644 index 0000000..0e1e197 --- /dev/null +++ b/src/ssh.c @@ -0,0 +1,688 @@ +#include "ssh.h" +#include "network.h" +#include "log.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +enum { + AUTH_NONE = 0, + AUTH_PASSWORD, + AUTH_PUBLICKEY +}; + +static bool ssh_fingerprint_ok(char * f); + +static void ssh_get_passwd(char * buffer, int len); + +static bool ssh_agent_auth(ssh * s); + +static bool ssh_publickey_auth(ssh * s, char * dir, int nAttempts); + +static bool ssh_thread_running = false; +static int ssh_array_reserved = 0; +static int ssh_array_used = 0; +static ssh ** ssh_array = NULL; +static int ssh_thread_maxfd = 0; + +static int waitsocket(int socket_fd, LIBSSH2_SESSION *session) +{ + struct timeval timeout; + int rc; + fd_set fd; + fd_set *writefd = NULL; + fd_set *readfd = NULL; + int dir; + + timeout.tv_sec = 10; + timeout.tv_usec = 0; + + FD_ZERO(&fd); + + FD_SET(socket_fd, &fd); + + /* now make sure we wait in the correct direction */ + dir = libssh2_session_block_directions(session); + + + if(dir & LIBSSH2_SESSION_BLOCK_INBOUND) + readfd = &fd; + + if(dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) + writefd = &fd; + + rc = select(socket_fd + 1, readfd, writefd, NULL, &timeout); + + return rc; +} + +ssh * ssh_new(char * username, char * addr, int port) +{ + ssh * s = (ssh*)(calloc(1, sizeof(ssh))); + s->user = username; + s->addr = addr; + + s->socket = Network_client(addr, port,100); + s->session = libssh2_session_init(); + if (s->session == NULL) + { + free(s); + log_print(2,"ssh_new", "libssh2_session_init returned NULL"); + return NULL; + } + + + + int err = libssh2_session_handshake(s->session, s->socket); + if (err != 0) + { + free(s); + log_print(2,"ssh_new", "libssh2_session_handshake fails - error code %d", err); + return NULL; + } + s->fingerprint = (char*)(libssh2_hostkey_hash(s->session, LIBSSH2_HOSTKEY_HASH_SHA1)); + if (!ssh_fingerprint_ok(s->fingerprint)) + { + free(s); + log_print(2,"ssh_new", "Fingerprint of host \"%s\" was not OK", addr); + return NULL; + } + + char * userauthlist = libssh2_userauth_list(s->session, username, strlen(username)); + + int auth = AUTH_NONE; + if (strstr(userauthlist, "password")) + auth |= AUTH_PASSWORD; + if (strstr(userauthlist, "publickey")) + auth |= AUTH_PUBLICKEY; + + bool ok = false; + + if (auth & AUTH_PUBLICKEY) + { + // first try connecting with agent + ok = ssh_agent_auth(s); + + + + if (!ok) + { + log_print(3, "ssh_new", "Agent authentication failed, looking at public keys"); + + if (SSH_DIR[0] == '~' && SSH_DIR[1] == '/') + { + char ssh_dir[BUFSIZ]; + sprintf(ssh_dir, "%s/%s",getenv("HOME"),SSH_DIR+2); + ok = ssh_publickey_auth(s, ssh_dir,3); + } + else + ok = ssh_publickey_auth(s, SSH_DIR,3); + } + + } + + if (auth & AUTH_PASSWORD && !ok) + { + log_print(3, "ssh_new", "public keys failed, try password"); + for (int i = 0; i < 3 && !ok; ++i) + { + printf("Password for %s@%s:", username, addr); + char password[BUFSIZ]; + ssh_get_passwd(password, BUFSIZ); + + if (libssh2_userauth_password(s->session, username, password) == 0) + { + ok = true; + } + } + if (!ok) + log_print(3, "ssh_new", "Failed to authenticate by password."); + } + + if (!ok) + { + free(s); + log_print(2, "ssh_new", "All attempts at authenticating failed."); + return NULL; + } + log_print(3, "ssh_new", "Authenticated!"); + + s->reserved_tunnels = 1; + s->tunnel = (ssh_tunnel*)(calloc(s->reserved_tunnels, sizeof(ssh_tunnel))); + s->nTunnels = 0; + libssh2_session_set_blocking(s->session, 0); + return s; +} + +void ssh_destroy(ssh * s) +{ + ssh_thread_del(s); + + for (int i = 0; i < s->nTunnels; ++i) + { + int err; + char buffer[BUFSIZ]; + do + { + err = libssh2_channel_read(s->tunnel[i].channel, buffer, sizeof(buffer)); + write(s->tunnel[i].forward_sock, buffer, err); + } + while (err > 0); + + while ((err = libssh2_channel_close(s->tunnel[i].channel)) == LIBSSH2_ERROR_EAGAIN) + waitsocket(s->socket, s->session); + + libssh2_channel_free(s->tunnel[i].channel); + close(s->tunnel[i].forward_sock); + } + + + libssh2_session_disconnect(s->session, "goodbye"); + libssh2_session_free(s->session); + + free(s->tunnel); +} + +bool ssh_fingerprint_ok(char * f) +{ + //TODO: Check fingerprint + log_print(1, "ssh_fingerprint_ok", "Unimplemented!"); + return true; +} + +void ssh_get_passwd(char * buffer, int len) +{ + struct termios oflags, nflags; + + tcgetattr(fileno(stdin), &oflags); + nflags = oflags; + nflags.c_lflag &= ~ECHO; + nflags.c_lflag |= ECHONL; + + if (tcsetattr(fileno(stdin), TCSANOW, &nflags) != 0) + { + error("ssh_get_passwd", "tcsetattr : %s", strerror(errno)); + } + + fgets(buffer, len * sizeof(char), stdin); + buffer[strlen(buffer) - 1] = '\0'; + + if (tcsetattr(fileno(stdin), TCSANOW, &oflags) != 0) + { + error("ssh_get_passwd", "tcsetattr : %s", strerror(errno)); + } +} + +bool ssh_publickey_auth(ssh * s, char * dir, int nAttempts) +{ + + + DIR * d = opendir(dir); + struct dirent * dp; + if (d == NULL) + { + log_print(0, "ssh_publickey_auth", "Couldn't open directory %s : %s", dir, strerror(errno)); + return false; + } + + while ((dp = readdir(d)) != NULL) + { + + // skip public keys + if (strstr(dp->d_name, ".pub") != NULL) + continue; + + // assume file is a private key + // find corresponding public key + char pub[BUFSIZ]; char priv[BUFSIZ]; + if (dir[strlen(dir)-1] == '/') + { + sprintf(pub, "%s%s.pub", dir,dp->d_name); + sprintf(priv, "%s%s", dir, dp->d_name); + } + else + { + sprintf(pub, "%s/%s.pub", dir,dp->d_name); + sprintf(priv, "%s/%s", dir, dp->d_name); + } + + struct stat t; + if (stat(priv, &t) != 0) + { + log_print(3,"ssh_publickey_auth", "Can't stat file %s : %s", priv, strerror(errno)); + continue; + } + + if (!S_ISREG(t.st_mode)) + { + log_print(3, "ssh_publickey_auth", "%s doesn't appear to be a regular file", priv); + continue; + } + + if (stat(pub, &t) != 0) + { + log_print(3,"ssh_publickey_auth", "Can't stat file %s : %s", pub, strerror(errno)); + continue; + } + + if (!S_ISREG(t.st_mode)) + { + log_print(3, "ssh_publickey_auth", "%s doesn't appear to be a regular file", pub); + continue; + } + + + + + + + //libssh2_trace(s->session, LIBSSH2_TRACE_AUTH | LIBSSH2_TRACE_PUBLICKEY); + int err = libssh2_userauth_publickey_fromfile(s->session, s->user, pub, priv,""); + if (err == 0) + { + log_print(1, "ssh_publickey_auth", "Shouldn't use keys with no passphrase"); + } + else if (err == LIBSSH2_ERROR_PUBLICKEY_UNVERIFIED) + { + + char passphrase[BUFSIZ]; + for (int i = 0; i < nAttempts; ++i) + { + printf("Passphrase for key %s:", priv); + ssh_get_passwd(passphrase, BUFSIZ); + err = libssh2_userauth_publickey_fromfile(s->session, s->user, pub, priv,passphrase); + if (err != LIBSSH2_ERROR_PUBLICKEY_UNVERIFIED) break; + } + } + if (err == 0) + { + closedir(d); + return true; + } + } + closedir(d); + + + return false; + + + + +} + +bool ssh_agent_auth(ssh * s) +{ + LIBSSH2_AGENT * agent = libssh2_agent_init(s->session); + if (agent == NULL) + { + log_print(0, "ssh_agent_auth", "Couldn't initialise agent support."); + return false; + } + + if (libssh2_agent_connect(agent) != 0) + { + log_print(0, "ssh_agent_auth", "Failed to connect to ssh-agent."); + return false; + } + + if (libssh2_agent_list_identities(agent) != 0) + { + log_print(0, "ssh_agent_auth", "Failure requesting identities to ssh-agent."); + return false; + } + + struct libssh2_agent_publickey * identity = NULL; + struct libssh2_agent_publickey * prev_identity = NULL; + + while (true) + { + int err = libssh2_agent_get_identity(agent, &identity, prev_identity); + if (err == 1) + { + log_print(0, "ssh_agent_auth", "Couldn't continue authentication."); + return false; + } + if (err < 0) + { + log_print(0, "ssh_agent_auth", "Failure obtaining identity from ssh-agent support."); + return false; + } + + if (libssh2_agent_userauth(agent, s->user, identity) == 0) + { + log_print(3, "ssh_agent_auth", "Authentication with username %s and public key %s succeeded!", s->user, identity->comment); + return true; + } + else + { + log_print(3, "ssh_agent_auth", "Authentication with username %s and public key %s failed.", s->user, identity->comment); + } + prev_identity = identity; + } + + return false; + +} + +LIBSSH2_LISTENER * ssh_get_listener(ssh * s, int * port) +{ + pthread_mutex_lock(&ssh_thread_mutex); + libssh2_session_set_blocking(s->session, 1); + //libssh2_trace(s->session, ~0); + LIBSSH2_LISTENER * l = libssh2_channel_forward_listen_ex(s->session, "localhost", *port, port,1); + if (l == NULL) + { + char * error; + libssh2_session_last_error(s->session, &error, NULL, 0); + log_print(0, "ssh_get_listener", "Error: %s", error); + } + libssh2_session_set_blocking(s->session, 0); + pthread_mutex_unlock(&ssh_thread_mutex); + return l; +} + +void ssh_add_tunnel(ssh * s, LIBSSH2_LISTENER * listener, int socket) +{ + pthread_mutex_lock(&ssh_thread_mutex); + //log_print(3, "ssh_add_tunnel", "accepting connection..."); + libssh2_session_set_blocking(s->session , 1); + //libssh2_trace(s->session, ~0); + LIBSSH2_CHANNEL * channel = libssh2_channel_forward_accept(listener); + if (channel == NULL) + { + char * error; + libssh2_session_last_error(s->session, &error, NULL, 0); + log_print(0, "ssh_add_tunnel", "Error: %s", error); + } + libssh2_session_set_blocking(s->session , 0); + //log_print(3, "ssh_add_tunnel", "accepted remote connection..."); + + ssh_tunnel * t = s->tunnel+(s->nTunnels++); + t->forward_sock = socket; + t->channel = channel; + + if (socket > ssh_thread_maxfd) + ssh_thread_maxfd = socket; + + if (s->nTunnels >= s->reserved_tunnels) + { + s->reserved_tunnels *= 2; + s->tunnel = (ssh_tunnel*)(realloc(s->tunnel, s->reserved_tunnels * sizeof(ssh_tunnel))); + } + + pthread_mutex_unlock(&ssh_thread_mutex); +} + +void ssh_exec_swarm(ssh * s, int * port, int * socket, int np) +{ + + // secure things + LIBSSH2_CHANNEL * channel = NULL; + while ((channel = libssh2_channel_open_session(s->session)) == NULL + && libssh2_session_last_error(s->session, NULL, NULL, 0) == LIBSSH2_ERROR_EAGAIN) + { + waitsocket(s->socket, s->session); + } + + if (channel == NULL) + { + error("ssh_exec_swarm", "Couldn't create channel with ssh session"); + } + + + char buffer[BUFSIZ]; + + // connect secure + if (port == NULL && socket != NULL) + { + sprintf(buffer, "%s -r -", options.program); + if (np != 0) + sprintf(buffer, " -n %d", np); + } + else if (port != NULL && socket == NULL) + { + sprintf(buffer, "%s -r $(echo $SSH_CONNECTION | awk \'{print $1}\'):%d", options.program, *port); + if (np != 0) + sprintf(buffer, " -n %d", np); + + } + else + error("ssh_exec_swarm", "Exactly *one* of the port or socket pointers must not be NULL"); + + int err; + while ((err = libssh2_channel_exec(channel, buffer)) == LIBSSH2_ERROR_EAGAIN) + { + waitsocket(s->socket, s->session); + } + + if (socket != NULL) + { + + pthread_mutex_lock(&ssh_thread_mutex); + + ssh_tunnel * t = s->tunnel+(s->nTunnels++); + + t->forward_sock = *socket; + t->channel = channel; + + if (*socket > ssh_thread_maxfd) + ssh_thread_maxfd = *socket; + + + if (s->nTunnels >= s->reserved_tunnels) + { + s->reserved_tunnels *= 2; + s->tunnel = (ssh_tunnel*)(realloc(s->tunnel, s->reserved_tunnels * sizeof(ssh_tunnel))); + } + + pthread_mutex_unlock(&ssh_thread_mutex); + } + else + { + + // read everything and close the channel + while (true) + { + while ((err = libssh2_channel_read(channel, buffer, sizeof(buffer))) > 0); + if (err == LIBSSH2_ERROR_EAGAIN) + { + waitsocket(s->socket, s->session); + } + else + { + break; + } + } + + while ((err = libssh2_channel_close(channel)) == LIBSSH2_ERROR_EAGAIN) + { + waitsocket(s->socket, s->session); + } + libssh2_channel_free(channel); + } + + + + +} + + + + +pthread_mutex_t ssh_thread_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_t ssh_pthread; + +void * ssh_thread(void * args) +{ + + + fd_set readSet; + char buffer[BUFSIZ]; + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; + + while (true) + { + //log_print(1, "ssh_thread", "loop - %d ssh's", ssh_array_used); + FD_ZERO(&readSet); + pthread_mutex_lock(&ssh_thread_mutex); + + if (!ssh_thread_running) break; + + for (int i = 0; i < ssh_array_used; ++i) + { + ssh * s = ssh_array[i]; + if (s == NULL) continue; + for (int j = 0; j < s->nTunnels; ++j) + { + FD_SET(s->tunnel[j].forward_sock, &readSet); + } + } + + pthread_mutex_unlock(&ssh_thread_mutex); + select(ssh_thread_maxfd+1, &readSet, NULL, NULL, &tv); + pthread_mutex_lock(&ssh_thread_mutex); + + for (int i = 0; i < ssh_array_used; ++i) + { + ssh * s = ssh_array[i]; + //log_print(2, "ssh_thread", "array[%d] = %p", i, s); + if (s == NULL) continue; + for (int j = 0; j < s->nTunnels; ++j) + { + //log_print(2, "ssh_thread", "Tunnel number %d, socket %d", j, s->tunnel[j].forward_sock); + if (FD_ISSET(s->tunnel[j].forward_sock, &readSet)) + { + //log_print(2, "ssh_thread", "reading from socket %d", s->tunnel[j].forward_sock); + int len = read(s->tunnel[j].forward_sock, buffer, sizeof(buffer)); + + if (len <= 0) + continue; + buffer[len] = '\0'; + int written = 0; int w = 0; + do + { + //log_print(2, "ssh_thread", "writing %s to channel", buffer); + w = libssh2_channel_write(s->tunnel[j].channel, buffer+written, len-written); + assert(w >= 0); + written += w; + } + while (w > 0 && written < len); + } + while (true) + { + //log_print(2, "ssh_thread", "Try to read from channel %p", s->tunnel[j].channel); + int len = libssh2_channel_read(s->tunnel[j].channel, buffer, sizeof(buffer)); + //log_print(2, "ssh_thread", "Read %s from channel", buffer); + if (len == LIBSSH2_ERROR_EAGAIN) break; + assert(len >= 0); + + int written = 0; int w = 0; + while (written < len) + { + //log_print(2, "ssh_thread", "Wrote %s to socket %d", buffer+written, s->tunnel[j].forward_sock); + w = write(s->tunnel[j].forward_sock, buffer+written, len-written); + written += w; + } + if (libssh2_channel_eof(s->tunnel[j].channel)) + { + //log_print(1, "ssh_thread", "Got to eof in channel %p", s->tunnel[j].channel); + } + } + } + } + pthread_mutex_unlock(&ssh_thread_mutex); + } + + return NULL; +} + +void ssh_thread_add(ssh * s) +{ + pthread_mutex_lock(&ssh_thread_mutex); + + ssh_array_used++; + + bool found = false; + for (int i = 0; (i < ssh_array_reserved && !found); ++i) + { + if (ssh_array[i] == NULL) + { + ssh_array[i] = s; + found = true; + break; + } + } + + + if (!found) + { + int old = ssh_array_reserved; + ssh_array_reserved = (ssh_array_reserved + 1) * 2; + if (ssh_array == NULL) + ssh_array = (ssh**)(calloc(ssh_array_reserved, sizeof(ssh*))); + else + { + ssh_array = (ssh**)(realloc(ssh_array, ssh_array_reserved * sizeof(ssh*))); + for (int i = old+1; i < ssh_array_reserved; ++i) + ssh_array[i] = NULL; + + } + ssh_array[old] = s; + } + + for (int i = 0; i < s->nTunnels; ++i) + { + if (s->tunnel[i].forward_sock > ssh_thread_maxfd) + ssh_thread_maxfd = s->tunnel[i].forward_sock; + } + + if (!ssh_thread_running) + { + ssh_thread_running = true; + sigset_t set; + int err; + sigfillset(&set); + err = pthread_sigmask(SIG_SETMASK, &set, NULL); + if (err != 0) + error("ssh_thread_add", "pthread_sigmask : %s", strerror(errno)); + err = pthread_create(&ssh_pthread, NULL, ssh_thread, NULL); + if (err != 0) + error("ssh_thread_add", "pthread_create : %s", strerror(errno)); + sigemptyset(&set); + err = pthread_sigmask(SIG_SETMASK, &set, NULL); + if (err != 0) + error("ssh_thread_add", "pthread_sigmask : %s", strerror(errno)); + } + + + + pthread_mutex_unlock(&ssh_thread_mutex); +} + +void ssh_thread_del(ssh * s) +{ + pthread_mutex_lock(&ssh_thread_mutex); + + for (int i = 0; i < ssh_array_reserved; ++i) + { + if (ssh_array[i] == s) + { + ssh_array[i] = NULL; + ssh_thread_running = !(--ssh_array_used == 0); + break; + } + } + + pthread_mutex_unlock(&ssh_thread_mutex); +} diff --git a/src/ssh.h b/src/ssh.h new file mode 100644 index 0000000..6d12e99 --- /dev/null +++ b/src/ssh.h @@ -0,0 +1,57 @@ +#ifndef _SSH_H +#define _SSH_H + +#include "network.h" +#include "master.h" +#include "options.h" + +#define SSH_DIR "~/.ssh/" +#define SSH_KNOWN_HOSTS "~/.ssh/known_hosts" + +#include +#include +#include +#include + +typedef struct +{ + int forward_sock; + LIBSSH2_CHANNEL * channel; + int port; + +} ssh_tunnel; + +typedef struct +{ + ssh_tunnel * tunnel; + int nTunnels; + int reserved_tunnels; + + int socket; + LIBSSH2_SESSION *session; + char * fingerprint; + + char * user; + char * addr; +} ssh; + +extern ssh * ssh_new(char * username, char * addr, int port); +extern void ssh_destroy(ssh * s); + +extern void ssh_exec_swarm(ssh * s, int * port, int * socket, int np); +extern LIBSSH2_LISTENER * ssh_get_listener(ssh * s, int * port); +extern void ssh_add_tunnel(ssh * s, LIBSSH2_LISTENER * listener, int socket); + +extern void * ssh_thread(void * args); +extern void ssh_thread_add(ssh * s); +extern void ssh_thread_del(ssh * s); + + +extern pthread_mutex_t ssh_thread_mutex; + +extern pthread_t ssh_pthread; + + +#endif //_SSH_H + +//EOF -- 2.20.1