X-Git-Url: https://git.ucc.asn.au/?a=blobdiff_plain;f=src%2Fmaster.c;h=f5e17eb00ce4efee6ba189f8c5f7962ef6e953c5;hb=HEAD;hp=e61f5faec3df2619296559953d8b58f9641fb8fe;hpb=063fe056d81e8afd218f6a40ee62aa3463df3e9a;p=matches%2Fswarm.git diff --git a/src/master.c b/src/master.c index e61f5fa..f5e17eb 100644 --- a/src/master.c +++ b/src/master.c @@ -2,6 +2,7 @@ #include "master.h" #include "log.h" +#include #include #include "daemon.h" #include @@ -12,7 +13,10 @@ #include #include #include "slave.h" +#include #include +#include +#include #include #include @@ -22,12 +26,15 @@ #include #include #include +#include "ssh.h" //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems // probably not that great to use threads anyway, since it eats one of your cores // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes +// the signal handler now breaks threads... don't use them + #ifdef THREAD_SENDING pthread_t sender_thread; pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER; @@ -69,12 +76,19 @@ void Master_main(Options * o) void Master_setup(Options * o) { + int err = libssh2_init(0); + if (err != 0) + { + error("Master_setup", "Initialising libssh2 - error code %d", err); + } + signal(SIGCHLD, sigchld_handler); master.o = o; master.barrier_number = -1; master.last_number = -1; master.nSlaves = o->nCPU; master.running = master.nSlaves; + master.nRemote = 0; master.remote_err = NULL; if (master.nSlaves == 0) error("Master_setup", "No CPUs to start slaves with!"); @@ -88,7 +102,19 @@ void Master_setup(Options * o) // One slave per CPU for (int i = 0; i < master.nSlaves; ++i) { - Make_slave(i); + + + int sv[2]; + // Possibly the best function ever + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + { + error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno)); + } + master.slave[i].in = sv[1]; + master.slave[i].out = sv[1]; + master.slave[i].socket_child_end = sv[0]; + + Master_shell(i); @@ -101,25 +127,17 @@ void Master_setup(Options * o) } } -void Make_slave(int i) -{ - int sv[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) - { - error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno)); - } - +void Master_shell(int i) +{ master.slave[i].pid = fork(); if (master.slave[i].pid == 0) { - dup2(sv[0],fileno(stdin)); - dup2(sv[0],fileno(stdout)); + dup2(master.slave[i].socket_child_end,fileno(stdin)); + dup2(master.slave[i].socket_child_end,fileno(stdout)); execlp(master.o->shell, master.o->shell, NULL); } - master.slave[i].in = sv[1]; - master.slave[i].out = sv[1]; + master.slave[i].running = true; - master.slave[i].ssh_pid = 0; } void Master_input(char c) @@ -174,9 +192,9 @@ void Master_input(char c) log_print(0, "Master_input", "No host specified for ABSORB directive"); char * np = strtok(NULL, " "); if (np != NULL) - Master_absorb(cmd, options.port, atoi(np)); + Master_absorb(cmd, atoi(np)); else - Master_absorb(cmd, options.port, 0); + Master_absorb(cmd, 0); } else if (strcmp(cmd, "OUTPUT") == 0) { @@ -367,8 +385,11 @@ void Master_output(int i, char c) #endif //THREAD_SENDING if (t == NULL) { - log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name); - error(NULL, "Please refrain from echoing three bell characters in a row."); + log_print(3, "Master_output", "Echo %c back to slave %d", c, i); + write(master.slave[i].in, &c, sizeof(char)); + //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name); + //error(NULL, "Please refrain from echoing three bell characters in a row."); + return; } @@ -380,14 +401,19 @@ void Master_output(int i, char c) memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen); } - if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen - && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0)) + if (c == EOF + #ifdef SHELL_OUTPUT_FINISHED + || (t->outlen >= SHELL_OUTPUT_FINISHED_LENGTH + && strcmp((t->output)+(t->outlen)-(SHELL_OUTPUT_FINISHED_LENGTH), SHELL_OUTPUT_FINISHED) == 0)) + #else + ) // this is totally readable + #endif //SHELL_OUTPUT_FINISHED { if (c != EOF) { - t->output[t->outlen - master.o->endlen] = '\0'; - t->outlen -= master.o->endlen; + t->output[t->outlen - SHELL_OUTPUT_FINISHED_LENGTH] = '\0'; + t->outlen -= SHELL_OUTPUT_FINISHED_LENGTH; } master.slave[i].task = NULL; @@ -395,8 +421,10 @@ void Master_output(int i, char c) { fprintf(stdout, "%d:\n", t->number); } + /* else if (t->output[t->outlen-1] == '\f') { + log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name); static int bufsiz = BUFSIZ; char * buffer = (char*)(calloc(bufsiz, sizeof(char))); @@ -420,6 +448,7 @@ void Master_output(int i, char c) master.slave[i].task_pool = t2; master.last_number = t2->number; } + */ else { fprintf(stdout, "%d: %s", t->number, t->output); @@ -522,9 +551,9 @@ Task * Master_tasker(int i) void Master_loop() { - if (sigsetjmp(env,true) != 0) + if (sigsetjmp(env,true) != 0) // completely necessary evil { - log_print(2, "Master_loop", "Restored from longjmp"); + //log_print(2, "Master_loop", "Restored from longjmp"); } fd_set readSet; //fd_set writeSet; @@ -539,6 +568,7 @@ void Master_loop() bool quit = false; bool input = true; + char buffer[BUFSIZ]; while (!quit) { @@ -552,7 +582,7 @@ void Master_loop() if (errno != ENXIO) error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno)); else - log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno)); + log_print(LOGWARN, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno)); } else { @@ -569,6 +599,11 @@ void Master_loop() { if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet); } + + for (int i = 0; i < master.nRemote; ++i) + { + FD_SET(master.remote_err[i], &readSet); + } select(master.fd_max+1, &readSet, NULL, NULL, NULL); @@ -611,6 +646,16 @@ void Master_loop() } } + + for (int i = 0; i < master.nRemote; ++i) + { + if (FD_ISSET(master.remote_err[i], &readSet)) + { + int len = read(master.remote_err[i], buffer, sizeof(buffer)); + buffer[len] = '\0'; + fprintf(stderr, "%s", buffer); + } + } } @@ -636,19 +681,20 @@ void Master_send() write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char)); //log_print(0, "Sent append %s\n",master.o->append); } - if (master.o->end != NULL) + #ifdef SHELL_OUTPUT_FINISHED { static char * echo = ";echo -en \""; static int len = -1; if (len == -1) len = strlen(echo); write(send_task.slave_fd, echo, len*sizeof(char)); - write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char)); - write(send_task.slave_fd, "\"", 1*sizeof(char)); + write(send_task.slave_fd, SHELL_OUTPUT_FINISHED, SHELL_OUTPUT_FINISHED_LENGTH * sizeof(char)); + write(send_task.slave_fd, "\"", sizeof(char)); //log_print(0, "Sent end\n"); } - write(send_task.slave_fd, "\n", 1*sizeof(char)); + #endif //SHELL_OUTPUT_FINISHED + write(send_task.slave_fd, "\n", sizeof(char)); master.commands_active++; - log_print(3, "Master_sender", "Sent task %d \"%s\" - %d tasks active", send_task.task->number, send_task.task->message, master.commands_active); + log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active); } #ifdef THREAD_SENDING @@ -702,12 +748,22 @@ void Master_cleanup() signal(SIGCHLD, SIG_IGN); // ignore child exits now + // tell all remote nodes to exit + for (int i = 0; i < master.nRemote; ++i) + { + FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL); + + fprintf(f, SHELL_EXIT_COMMAND); + + fclose(f); + } + for (int i = 0; i < master.nSlaves; ++i) { static int exitlen = -1; - if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE); - write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char)); + if (exitlen == -1) exitlen = strlen(SHELL_EXIT_COMMAND); + write(master.slave[i].in, SHELL_EXIT_COMMAND, exitlen *sizeof(char)); //usleep(0.5); //shouldn't matter too much } @@ -717,13 +773,6 @@ void Master_cleanup() if (master.slave[i].pid <= 0) { Network_close(master.slave[i].in); - if (master.slave[i].ssh_pid > 0) - { - log_print(2, "Master_cleanup", "Killing ssh instance %d", master.slave[i].ssh_pid); - kill(master.slave[i].ssh_pid, 15); - if (kill(master.slave[i].ssh_pid, 0) == 0) - kill(master.slave[i].ssh_pid, 9); - } } else { @@ -740,83 +789,113 @@ void Master_cleanup() free(master.buffer); if (master.outfile != NULL) free(master.outfile); -} -void * start_server(void * args) -{ + libssh2_exit(); - *(int*)(args) = Network_server(*(int*)args); - log_print(2, "start_server", "started network server"); - return NULL; } -int Secure_connection(char * addr, int port); -void Master_absorb(char * addr, int port, int np) + + +void Master_absorb(char * addr, int np) { + int port = 0; + + char * user = strstr(addr, "@"); + if (user != NULL) + { + *(user-1) = '\0'; + char * t = user; + user = addr; + addr = t; + } + else + { + user = getpwuid(geteuid())->pw_name; + } + log_print(3, "Master_absorb", "User %s at address %s", user, addr); + + // ssh to the host on port 22 + ssh * s = ssh_new(user, addr, 22); + if (s == NULL) + { + log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr); + return; + } + + + // work out the name to give to the shells char * name = strtok(addr, ":"); name = strtok(NULL, ":"); if (name == NULL) + name = addr; // default is host:X + else + *(name-1) = '\0'; // otherwise use name:X + + // setup array of remote stderr file descriptors + if (master.nRemote++ == 0) { - name = addr; + master.remote_err = (int*)(calloc(master.nRemote, sizeof(int))); + master.remote_reserved = master.nRemote; } - else + else if (master.nRemote >= master.remote_reserved) { - *(name-1) = '\0'; + // resize dynamically + master.remote_reserved *= 2; + master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int))); } - //log_print(0, "name is %s\n", name); - int first_ssh = 0; + int sfd = -1; if (master.o->encrypt) - first_ssh = Secure_connection(addr, port); + { + int sv[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + error("Master_absorb", "Couldn't create socket for remote swarm"); + sfd = sv[0]; - //pthread_t ss; - //int net_fd = port; - //pthread_create(&ss, NULL, start_server, (void*)(&net_fd)); - - char buffer[BUFSIZ]; - if (fork() == 0) + ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket + ssh_thread_add(s); // add the ssh to the thread + } + else { - // The alternative to this kind of terrible hack is OpenMPI's "opal" - // This involves >1000 lines of operating system independent ways to get an IP address from a local interface - // Which will then be completely useless if there is any sort of NAT involved - - //freopen("/dev/null", "r", stdin); - //freopen("/dev/null", "w", stdout); - //freopen("/dev/null", "w", stderr); + sfd = Network_server_bind(0, &port); // dynamically bind to a port + ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port + ssh_destroy(s); // don't need the ssh anymore + sfd = Network_server_listen(sfd, NULL); // accept connections and pray + } + master.remote_err[master.nRemote-1] = sfd; + if (sfd > master.fd_max) + master.fd_max = sfd; - char * cmd = buffer+sprintf(buffer, "swarm -p "); - if (master.o->encrypt) - cmd = cmd+sprintf(cmd, "%d -e", port+1000); - else - cmd = cmd+sprintf(cmd, "%d -u", port); + + char buffer[BUFSIZ]; + + int newSlaves = 0; - if (np > 0) - cmd = cmd+sprintf(cmd, " -n %d", np); - sprintf(cmd, " -m $(echo $SSH_CONNECTION | awk \'{print $1}\')"); - log_print(3, "Master_absorb", "Execing %s", buffer); - execlp("ssh", "ssh", "-f", addr, buffer, NULL); + int len = sprintf(buffer, "%s\n", name); + int w = 0; + while (w < len) + w += write(sfd, buffer+w, len-w); + + + len = 0; + do + { + len = read(sfd, buffer+len, sizeof(buffer)); + buffer[len] = '\0'; } - log_print(3, "Master_absorb", "Listening on port %d", port); - int net_fd = Network_server(port); - log_print(3, "Master_absorb", "Created network server on port %d", port); + while (buffer[len-1] != '\n'); + buffer[len-1] = '\0'; - - char * s = buffer; - while (read(net_fd, s, sizeof(char)) != 0) + while (newSlaves == 0 && strcmp(buffer, "0") != 0) { - if (*s == '\n') - { - *s = '\0'; - break; - } - ++s; + newSlaves = atoi(buffer); } - int newSlaves = atoi(buffer); - log_print(3, "Master_absorb", "Absorbing %d slaves from %s\n", newSlaves, addr); + + if (newSlaves == 0) { error("Master_absorb", "No slaves to absorb from %s", addr); @@ -825,62 +904,70 @@ void Master_absorb(char * addr, int port, int np) master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave))); if (master.slave == NULL) { - error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", master.nSlaves, master.nSlaves + newSlaves, strerror(errno)); + error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", + master.nSlaves, master.nSlaves + newSlaves, strerror(errno)); } - - if (master.o->encrypt) - { - for (int i = 0; i < newSlaves-1; ++i) - master.slave[master.nSlaves+i].ssh_pid = Secure_connection(addr, port+i+1); - - } - master.slave[master.nSlaves+newSlaves-1].ssh_pid = first_ssh; + for (int i = 0; i < newSlaves; ++i) + { + int ii = master.nSlaves + i; + if (master.o->encrypt) + { + int sv[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + error("Master_absorb", "Couldn't create socket for remote swarm"); + + LIBSSH2_LISTENER * listener = NULL; + // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0) + while (listener == NULL) + { + port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds + listener = ssh_get_listener(s, &port); // port forward to the socket + } - - - char c = '\a'; - log_print(3, "Master_absorb", "Writing bell to slave"); - write(net_fd, &c, sizeof(char)); - + log_print(4,"Master_absorb", "Chose port %d", port); + int len = sprintf(buffer, "%d\n", port); + + int w = 0; + while (w < len) + { + w += write(sfd, buffer+w, len-w); + } + usleep(200000); // give ssh_thread a chance to actually send the data + log_print(4, "Master_absorb", "Creating tunnel..."); + ssh_add_tunnel(s, listener, sv[1]); + master.slave[ii].in = sv[0]; + master.slave[ii].out = sv[0]; - for (int i = 0; i < newSlaves; ++i) - { - log_print(3, NULL, "Absorbing slave %d...", i); - int ii = master.nSlaves + i; - if (i == newSlaves-1) - { - master.slave[ii].out = net_fd; + log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]); } else { - log_print(3, "Master_absorb", "Creating server %d at time %d", i, time(NULL)); - write(net_fd, &c, sizeof(char)); - master.slave[ii].out = Network_server(port + i + 1); + int tmp = Network_server_bind(0, &port); // bind to a port + + master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection + master.slave[ii].out = master.slave[ii].in; } - master.slave[ii].in = master.slave[ii].out; + + + if (master.slave[ii].out > master.fd_max) master.fd_max = master.slave[ii].out; + char buffer[BUFSIZ]; sprintf(buffer, "%s:%d", name, i); master.slave[ii].name = strdup(buffer); master.slave[ii].addr = strdup(addr); master.slave[ii].running = true; master.slave[ii].pid = -1; master.slave[ii].task = NULL; - master.slave[ii].task_pool = NULL; - - FILE * f = fdopen(master.slave[ii].in, "w"); setbuf(f, NULL); - fprintf(f, "name=%s\n", master.slave[ii].name); - - log_print(3, NULL, "Done absorbing slave %d...", i); - + master.slave[ii].task_pool = NULL; } @@ -892,21 +979,6 @@ void Master_absorb(char * addr, int port, int np) } -int Secure_connection(char * addr, int port) -{ - int result = fork(); - if (result == 0) - { - char buffer[BUFSIZ]; - sprintf(buffer, "%d:localhost:%d", port+1000, port); - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); - freopen("/dev/null", "w", stderr); - execl("/usr/bin/ssh", "/usr/bin/ssh", "-N", "-R", buffer, addr, NULL); - } - return result; -} - void sigchld_handler(int signal) { @@ -918,13 +990,6 @@ void sigchld_handler(int signal) if (p == -1) error("sigchld_handler", "waitpid : %s", strerror(errno)); - if (WIFSIGNALED(s)) - { - int sig = WTERMSIG(s); - log_print(2, "sigchld_handler", "A child [%d] was terminated with signal %d; terminating self with same signal", p, sig); - kill(getpid(), sig); - return; - } int i = 0; for (i = 0; i < master.nSlaves; ++i) @@ -937,19 +1002,21 @@ void sigchld_handler(int signal) return; } - log_print(1, "sigchld_handler", "Slave %d [%d] exited with code %d; restarting it",i, p, s); - Make_slave(i); + sigchld_respond(s, "local", i); + + Master_shell(i); - if (master.o->end != NULL) + #ifdef SHELL_OUTPUT_FINISHED { - log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i); + //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i); char buffer[BUFSIZ]; - sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end); - if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0) + int len = sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, SHELL_OUTPUT_FINISHED); + if (write(master.slave[i].in, buffer, len) <= 0) error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i); } + #endif //SHELL_OUTPUT_FINISHED siglongjmp(env,1);