From 063fe056d81e8afd218f6a40ee62aa3463df3e9a Mon Sep 17 00:00:00 2001 From: Sam Moore Date: Mon, 18 Feb 2013 01:21:58 +0800 Subject: [PATCH 1/1] Initial commit (sort of) I screwed up the previous repository --- doc/swarm.1 | 272 ++++++++++++++ doc/swarm.html | 49 +++ src/Makefile | 27 ++ src/daemon.c | 114 ++++++ src/daemon.h | 15 + src/log.c | 67 ++++ src/log.h | 15 + src/main.c | 61 ++++ src/master.c | 958 +++++++++++++++++++++++++++++++++++++++++++++++++ src/master.h | 52 +++ src/network.c | 140 ++++++++ src/network.h | 26 ++ src/options.c | 282 +++++++++++++++ src/options.h | 50 +++ src/slave.c | 142 ++++++++ src/slave.h | 32 ++ src/task.c | 98 +++++ src/task.h | 34 ++ 18 files changed, 2434 insertions(+) create mode 100644 doc/swarm.1 create mode 100644 doc/swarm.html create mode 100644 src/Makefile create mode 100644 src/daemon.c create mode 100644 src/daemon.h create mode 100644 src/log.c create mode 100644 src/log.h create mode 100644 src/main.c create mode 100644 src/master.c create mode 100644 src/master.h create mode 100644 src/network.c create mode 100644 src/network.h create mode 100644 src/options.c create mode 100644 src/options.h create mode 100644 src/slave.c create mode 100644 src/slave.h create mode 100644 src/task.c create mode 100644 src/task.h diff --git a/doc/swarm.1 b/doc/swarm.1 new file mode 100644 index 0000000..b4b6d11 --- /dev/null +++ b/doc/swarm.1 @@ -0,0 +1,272 @@ +.TH swarm 1 "14 Feb 2013" "version 1.0" +.SH NAME +swarm - Tool for parallelising shell scripts +.SH SYNOPSIS +swarm [options] [file] +.SH DESCRIPTION +.B Swarm +allows one to pass commands to simultaneously running instances of a shell such as +.BR bash "(1)." +.BR Swarm " can work with local shells and can also start shells on remote hosts." +.PP +The purpose of +.BR swarm " is to speed up the execution of shell scripts which involve a large number of time consuming tasks, where the order in which the tasks are completed is unimportant. + +.SH OPTIONS +.TP +.BI -n " processes" +Set the number of shells for +.B swarm +to spawn. +By default, one shell is spawned for each CPU found by a call to +.BR sysconf "(3)." +.TP +.BI -s " shell" +Set the type of shell which will be spawned. The default is +.BR bash "(1)." +.TP +.BI -c " command" +Treat command as a single line of input. This option is not very useful unless there is already an instance of +.B swarm +invoked with the +.B --daemon +option running in the current directory. +.TP +.BI -o " file" +.B Swarm +will send output to +.I file +instead of stdout. + + +.TP +.BI -l " [file][:level]" +Set the log file and optionally level. +If no file is given, +.BR swarm " logs to stderr. + +The levels are: 0 (errors), 1 (warnings), 2 (notices), 3 (info). +Each level will include messages of a lower level. + +By default +.BR swarm " logs to stderr, at level 2. + +Any errors occuring in locally running shells will also be sent to the log file. + +.BR Note: " If swarm runs as a daemon, stderr (and hence all log messages) detached +.IR unless " the +.BR -l " option is used. + +.BR Note: " Remote shells will log messages and stderr to ~/.swarm.slave.log + +.TP +.BI -p " port" +Sets the port number to use for connections to remote hosts. + +By default, swarm does not use a fixed port; the port numbers for connections are chosen dynamically. Initially swarm uses ssh (port 22) to start remote swarm slave instances, passing a port to connect to as an argument. Subsequent ports for shell processes are sent to the slave swarm instance over this connection, which connects to the ports and starts the shell processes. +.TP +.BI --daemon +Run +.B swarm +as a daemon in the current directory. +.B Swarm +will background itself by using +.BR fork "(2), and attach stdin to a " fifo "(7). +The daemon will continue to run regardless of any EOF sent to the input +.BR fifo "(7) until it receives an EXIT directive." + +Unless the +.BR -o " option is also used, the daemon's stdout will be redirected to /dev/null". + +Unless the +.BR -l " option is also used, the daemon's stderr (and all log messages) will be redirected to /dev/null". + +Subsequent instances of +.BR swarm " invoked with the " -c " option or a script file in the same directory as the daemon will not start their own shells, and will send the command to the daemon's input +.BR fifo "(7). Certain directives will affect the behaviour of the 'wrapping' swarm. The daemon +.BR does " " not " send any output to the wrapping instance of " swarm ". + +Subsequent instances of +.BR swarm " invoked without the " -c " option or a script file in the same directory will report an error. + +It is of course possible to write commands directly to the daemon's input fifo using +.BR echo "(1) or any other program. Using +.BR swarm " is preferred, because it will check input for directives that will affect its behaviour. + +.BR Note: " The 'daemon' is not really an official Unix style daemon, but 'daemon' sounds much cooler than 'background process'. + + +.SH ARGUMENTS +If the +.BR -c " option was not supplied, a single argument remaining after option processing is assumed to name a file containing valid shell commands and +.BR swarm " directives. If the +.BR -c " option was also supplied, or there was more than a single additional argument, " swarm " will report an error and exit + +.SH INPUT +.B Swarm +treats each line of input as a seperate command to send to a shell. +By default, each command is sent to the first available shell that isn't currently executing a command. + + +.SH OUTPUT +.B Swarm +will output the results of completed shell commands in the order of completion. +.B swarm +puts a prefix '%d: ' in front of the output to indicate the order in which commands were received. +.PP +It is also possible to direct the output of commands to specified files using the OUTPUT directive. This is probably more useful for scripting purposes. + +.SH DIRECTIVES +A directive is any line that starts and ends with a '#' character. +.PP +If a line starts with a '#' but does not contain a second '#', it is treated as a comment. +.PP +If a line contains characters +.I after +the second '#' character, it is treated as a SHELL SPECIFIC COMMAND, +.I not +a DIRECTIVE. +.PP +.B Swarm +understands several directives which can be used to control its behaviour at runtime. Any unrecognised directive is treated as a comment. + +.TP +.B EXIT +Tell +.B swarm +to quit +.TP +.BI OUTPUT " [file]" +If +.I file +is supplied, the output of all commands received before the next OUTPUT directive will be sent to the file. +The file is created if it didn't exist, and overwritten if it did exist. + +If +.I file +contains a '%d' format string, the output of commands will be sent to individual files, where the filenames are created from a call to +.B sprintf(3) +using the commands number as a format argument, and +.I file +as the format string. + +If +.I file +is not supplied, the output of commands received before the next OUTPUT directive will not be saved to any file. + +.TP +BARRIER +.BR Swarm " will continue to read input and create tasks, but will not actually send any commands until all currently executing commands are finished. + +.TP +BARRIER BLOCK +This directive is intended for use with the +.BR -c " option. If a shell script starts a swarm daemon, running swarm -c '#BARRIER BLOCK#' allows the script to block until the swarm daemon finishes all tasks." + +.BR WARNING: " Sending BARRIER BLOCK to a swarm daemon +.I without +using a wrapping instance of +.BR swarm " will cause the swarm daemon to hang after it has completed all tasks. + +For a non-daemon instance of swarm, this directive is essentially identical to BARRIER. + +.TP +.BI ABSORB " host[:name] [processes] +.B Swarm +will start and connect to shells on the remote host and name them accordingly. An exec'd +.BR ssh "(1) is used to start the remote shells." + +The default number of +.I processes +is equal to the number of CPUs on the remote host. +The default +.I name +is the same as +.I host. + +.BR Swarm " will start instances of " ssh "(1) and use remote port forwarding to secure the connections. + +.TP +.BI ABSORB UNSECURE host[:name] [processes] +.B There are security issues associated with the use of this directive + +This directive is the same as ABSORB, except no +.BR ssh "(1) instances will be spawned for remote port forwarding; all data will be sent unencrypted." + +Obviously using unencrypted connections is faster, but dangerous. + + + +.SH SHELL SPECIFIC COMMANDS +By default, any command is sent to the first available shell. +.PP +A line containing two '#' characters followed by a command will be sent to any shells with names matching a POSIX regex between the '#' characters. +.PP +To send the command only to the first available shell with name matching the regex, the regex should be followed with ' &'. +.PP +Shells are normally named according to the host on which they are running, and the order they were spawned. The format is 'host:X' where X is an integer greater or equal to zero. Shells running locally are called 'local:X'. Using the ABSORB directive, it is possible to give remote shells a name that is not the same as the hostname of the remote host running the shell. +.PP +.TP +To print the names of all shells, run: +#.*# echo $name +.TP +To print the name of the first available shell, run: +#.* &# echo $name + +.TP +To run a command only on the shell called 'local:0': +#local:0# command + +.SH EXITED SHELLS / SIGNAL HANDLING +.BR Swarm " detects when locally running shell exits (using a handler for " SIGCHLD "). If the shell exits normally, regardless of error code, +.BR swarm " will create a new shell to replace it. If the shell exits because of a signal that caused it to terminate, +.BR swarm " will send " SIGTERM " to itself, which will cause itself (and as a result all other shells) to exit. + +If a remote shell exits, for whatever reason, +.BR swarm " will create a new shell to replace it. " Swarm " has no knowledge of and will not react to signals sent to remote shells on the remote host. + +If +.BR swarm " itself exits for whatever reason, it will terminate all local and remote shells. + +.SH SECURITY ISSUES +.B Never allow ssh access from accross a public internet to a host with swarm installed. +.PP +When the ABSORB UNSECURE directive is read, +.B swarm +will use +.BR ssh "(1) +to start slaves processes on the remote host. +.PP +These will then open +.I unencrypted +connections to the master. Because commands are sent as plain text over the network, +The remote hosts become vulnerable to "man in the middle" attacks. + +Never use ssh keys with empty passphrases, and restrict access to hosts with +.BR swarm "installed. + +.SH BUGS +.B Swarm +is very recently developed and therefore probably has a shit load of bugs. Probably related to buffering. +.PP +Do not place whitespace after a directive, or it will be treated as a host specific command. +.PP +The stderr of remote shells will always be lost to the void. +.PP +Using things that aren't shells with the +.BR -s " option won't work, because " swarm " automatically adds extra commands for the shells. +.PP +Writing three bell characters in a row will cause +.BR swarm " to break, because it looks for three bell characters to signal the end of output from each command." +.PP +.BR Swarm " does try to be nice and tell shells to 'exit', but usually this doesn't work and it sends them a " kill "(2)." +.PP +There are probably bugs with parsing, like assuming you never use ':' as part of a filename or a name for a shell, etc. +Just don't do it and it won't break. +.PP +Report any other bugs to matches@ucc.asn.au +.SH AUTHOR +Sam Moore (matches@ucc.asn.au) +.SH SEE ALSO +.BR bash "(1), " ssh "(1)." + diff --git a/doc/swarm.html b/doc/swarm.html new file mode 100644 index 0000000..02d4fa9 --- /dev/null +++ b/doc/swarm.html @@ -0,0 +1,49 @@ + + +swarm + + + +

What is it?

+

Swarm is a program for POSIX compliant systems which allows for parallelising the running of shell commands.

+

By "POSIX compliant", I really mean debian and ubuntu, because that's what I used to write it.

+ +

How is it useful?

+

Shell scripting is a powerful tool for automating tasks. In some cases, the order in which a sequence of tasks is completed may be unimportant. +However as of yet there is no simple way to parallelise shell scripts.

+

Swarm runs several instances of a shell, and constructs a pool of tasks for each shell. Multiple tasks can be run simultaneously in swarm.

+ +

Yes, it is possible to use job control in most shells to achieve this. However, swarm is simpler to use, and can be also run commands in a distributed cluster environment, rather than being restricted to a single machine.

+ +

Swarm keeps distinct shell instances running simultaneously. Shell specific commands allow swarm to be used to control which hosts in a cluster will execute commands. This is useful for use with decentralised services (for example, you could make the webserver run a specific command, and the fileserver run another).

+ +

Swarm can be run in the background as a daemon, with subsequent invokations of swarm sending commands to the daemon. This means that parallel and non-parallel sections can be easily seperated in a normal shell script.

+ +

Security

+ +

Swarm uses ssh to encrypt connections to remote hosts. It is also possible to make unencrypted connections. These are obviously a bad idea unless speed is important, and no hosts in the cluster connect via a public network.

+ +

Swarm does not run any daemons to listen for connections. Swarm must use ssh to start remote (slave) instances of swarm. So for someone to trick swarm into accepting a dodgy connection, they'd have to trick ssh into accepting a dodgy connection and then start swarm first.

+ + +

Obviously, not actually being a computer scientist, I can make no guarantees about the security of swarm.

+ +

You'll need to make an ssh key and put the public key on all hosts in a cluster running swarm. Don't make a key with an empty passphrase. Instead use ssh-agent.

+ +

Where do I get it

+ +

http://matches.ucc.asn.au/swarm.git

+ +

How do I use it?

+ +

Once you've got it, make install it, then run man swarm and RTFM. Or click here.

+

Alternately, UCC::Progcomp 2013 uses swarm in this script. So read that script and work it out.

+ +

Whom do I sue?

+ +

Swarm might break your computers. There. I warned you, so you can't sue me.

+

(Honestly I don't think it will break things, as long as you don't run it unencrypted accross a public network).

+ + + + diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..a9f6729 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,27 @@ +# Makefile for swarm + +CXX = gcc +LIBRARIES = -lm -lpthread #-lGL -lglut -lGLU -lpthread +FLAGS = --std=c99 -D_POSIX_C_SOURCE=200112L -Wall -pedantic -g +PREPROCESSOR_FLAGS = +LINK_OBJ = options.o log.o task.o network.o master.o daemon.o slave.o main.o + + +BIN = swarm + +$(BIN) : $(LINK_OBJ) + $(CXX) -o $(BIN) $(LINK_OBJ) $(PREPROCESSOR_FLAGS) $(LIBRARIES) + +%.o : %.c + $(CXX) $(FLAGS) $(PREPROCESSOR_FLAGS) -c $< + +clean : + $(RM) $(BIN) $(OBJ) $(LINK_OBJ) + +clean_full: #cleans up all backup files + $(RM) $(BIN) $(OBJ) $(LINK_OBJ) + $(RM) *.*~ + $(RM) *~ + $(RM) *.dat + $(RM) *.out + $(RM) *.err diff --git a/src/daemon.c b/src/daemon.c new file mode 100644 index 0000000..75beac3 --- /dev/null +++ b/src/daemon.c @@ -0,0 +1,114 @@ +#include "daemon.h" +#include "options.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void Daemon_wrapper(Options * o) +{ + int bufsiz = BUFSIZ; + int buflen = 0; + char * buffer = NULL; + + bool fill_buffer = false; + + char c; char last_c = '\n'; + int fd = open(DAEMON_FIFO, O_WRONLY); + while (read(fileno(stdin), &c, sizeof(char)) != 0) + { + if (fill_buffer) + { + buffer[buflen++] = c; + if (c == '\n') + { + + //fprintf(stderr, "Buffer is %s", buffer); + if (write(fd, buffer, buflen*sizeof(char)) <= 0) + error("Daemon_wrapper", "Couldn't write to daemon fifo %s", DAEMON_FIFO); + + if (strcmp(buffer, "#BARRIER BLOCK#\n") == 0) + { + //fprintf(stderr, "Barrier block received"); + FILE * f = fopen(DAEMON_BARRIER_FIFO, "r"); + if (f == NULL) + error("Daemon_wrapper", "Couldn't open daemon barrier fifo %s", DAEMON_BARRIER_FIFO); + for (int i = 0; ; ++i) + { + int c = fgetc(f); + if (c == EOF) + { + buffer[i] = '\0'; + break; + } + + buffer[i] = (char)c; + } + fclose(f); + if (strcmp(buffer, "\a") != 0) + error("Daemon_wrapper", "Expected to read a single bell from %s; got %s", DAEMON_BARRIER_FIFO, buffer); + } + else if (strcmp(buffer, "#EXIT#\n") == 0) + { + free(buffer); + exit(EXIT_SUCCESS); + } + + buflen = 0; + fill_buffer = NULL; + } + if (buflen >= bufsiz) + { + bufsiz *= 2; + buffer = (char*)(realloc(buffer, bufsiz*sizeof(char))); + if (buffer == NULL) + error("Daemon_wrapper", "Couldn't resize buffer to size %d : %s", bufsiz, strerror(errno)); + + } + last_c = c; + continue; + } + + if (c == '#' && last_c == '\n') + { + fill_buffer = true; + if (buffer == NULL) + buffer = (char*)(calloc(bufsiz, sizeof(char))); + if (buffer == NULL) + error("Daemon_wrapper", "Couldn't allocate buffer of size %d : %s", bufsiz, strerror(errno)); + + buflen = 0; + buffer[buflen++] = c; + } + else + write(fd, &c, sizeof(char)); + + last_c = c; + } + + if (fill_buffer) + { + buffer[buflen++] = '\n'; + if (write(fd, buffer, buflen*sizeof(char)) <= 0) + error("Daemon_wrapper", "Couldn't write to daemon fifo %s (final write)", DAEMON_FIFO); + } + else + { + c = '\n'; + if (write(fd, &c, sizeof(char)) <= 0) + error("Daemon_wrapper", "Couldn't write to daemon fifo %s (final write)", DAEMON_FIFO); + } + + if (buffer != NULL) + free(buffer); + exit(EXIT_SUCCESS); +} diff --git a/src/daemon.h b/src/daemon.h new file mode 100644 index 0000000..3b7b713 --- /dev/null +++ b/src/daemon.h @@ -0,0 +1,15 @@ +#ifndef _DAEMON_H +#define _DAEMON_H + +#define DAEMON_PID_FILE ".swarm.daemon.pid" +#define DAEMON_FIFO ".swarm.daemon.in" +#define DAEMON_BARRIER_FIFO ".swarm.daemon.barrier" + +#include "log.h" + +extern void Daemon_wrapper(); + + +#endif //DAEMON_H + +//EOF diff --git a/src/log.c b/src/log.c new file mode 100644 index 0000000..867ab64 --- /dev/null +++ b/src/log.c @@ -0,0 +1,67 @@ +#include "log.h" +#include "options.h" +#include + +static int last_len = 0; + +void log_print(int level, char * funct, char * fmt, ...) +{ + if (level > options.verbosity) + return; + + + + char severity[BUFSIZ]; + switch (level) + { + case 0: + sprintf(severity, "Error"); + break; + case 1: + sprintf(severity, "Warning"); + break; + case 2: + sprintf(severity, "Notice"); + break; + case 3: + sprintf(severity, "Info"); + break; + default: + sprintf(severity, "DEBUG"); + break; + } + + if (funct != NULL) + last_len = fprintf(stderr, "%s [%d] : %s in %s - ", options.program, getpid(), severity, funct); + else + { + for (int i = 0; i < last_len; ++i); + fprintf(stderr, " "); + } + va_list va; + va_start(va, fmt); + vfprintf(stderr, fmt, va); + va_end(va); + fprintf(stderr, "\n"); +} + +void error(char * funct, char * fmt, ...) +{ + if (funct != NULL) + last_len = fprintf(stderr, "%s [%d] : Fatal error in %s - ", options.program, getpid(), funct); + else + { + for (int i = 0; i < last_len; ++i) + fprintf(stderr, " "); + fprintf(stderr, "Fatal - "); + } + va_list va; + va_start(va, fmt); + vfprintf(stderr, fmt, va); + va_end(va); + fprintf(stderr, "\n"); + + exit(EXIT_FAILURE); +} + + diff --git a/src/log.h b/src/log.h new file mode 100644 index 0000000..24e5947 --- /dev/null +++ b/src/log.h @@ -0,0 +1,15 @@ +#ifndef _LOG_H +#define _LOG_H + +#include +#include +#include + +#include + +extern void log_print(int level, char * funct, char * fmt,...); +extern void error(char * funct, char * fmt, ...); + +#endif //_LOG_H + +//EOF diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..a7666dc --- /dev/null +++ b/src/main.c @@ -0,0 +1,61 @@ +#include +#include + +#include "options.h" +#include "master.h" +#include "slave.h" +#include "daemon.h" +#include + + +Options options; + +void Cleanup(); + +void Signal_handler(int signal); + + +int main(int argc, char ** argv) +{ + atexit(Cleanup); + Initialise(argc, argv, &options); + + if (signal(SIGTERM, Signal_handler) == SIG_ERR) + error("main", "Setting signal handler"); + if (signal(SIGINT, Signal_handler) == SIG_ERR) + error("main", "Setting signal handler"); + if (signal(SIGHUP, Signal_handler) == SIG_ERR) + error("main", "Setting signal handler"); + if (signal(SIGPIPE, Signal_handler) == SIG_ERR) + error("main", "Setting signal handler"); + //if (signal(SIGSEGV, Signal_handler) == SIG_ERR) + // error("main", "Setting signal handler"); + + if (options.master_addr == NULL) + { + if (options.daemon_wrapper) + Daemon_wrapper(&options); + else + Master_main(&options); + } + else + Slave_main(&options); + + exit(EXIT_SUCCESS); + return 0; +} + +void Signal_handler(int sig) +{ + + signal(sig, SIG_IGN); + signal(SIGCHLD, SIG_IGN); + log_print(2, "Signal_handler", "Got signal %d; exiting", sig); + exit(EXIT_SUCCESS); +} + + +void Cleanup() +{ + +} diff --git a/src/master.c b/src/master.c new file mode 100644 index 0000000..e61f5fa --- /dev/null +++ b/src/master.c @@ -0,0 +1,958 @@ +#define _BSD_SOURCE +#include "master.h" +#include "log.h" + +#include +#include "daemon.h" +#include +#include +#include +#include +#include +#include +#include +#include "slave.h" +#include + +#include +#include +#include +#include "network.h" +#include +#include +#include +#include + +//#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems + +// probably not that great to use threads anyway, since it eats one of your cores +// It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes + +#ifdef THREAD_SENDING +pthread_t sender_thread; +pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t sender_cv; +pthread_cond_t sender_done_cv; +#endif //THREAD_SENDING + + + +struct +{ + int slave_fd; + Task * task; +} send_task; + +static Master master; + +static sigjmp_buf env; + +void sigchld_handler(int signal); + +void Master_main(Options * o) +{ + setbuf(stdin, NULL); + setbuf(stdout, NULL); + setbuf(stderr, NULL); + + atexit(Master_cleanup); + Master_setup(o); + + #ifdef THREAD_SENDING + if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0) + { + error("Master_main", "Creating sender thread : %s", strerror(errno)); + } + #endif //THREAD_SENDING + Master_loop(); +} + +void Master_setup(Options * o) +{ + signal(SIGCHLD, sigchld_handler); + master.o = o; + master.barrier_number = -1; + master.last_number = -1; + master.nSlaves = o->nCPU; + master.running = master.nSlaves; + if (master.nSlaves == 0) + error("Master_setup", "No CPUs to start slaves with!"); + + master.outfile = NULL; + + master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave))); + + if (master.slave == NULL) + error("Master_setup", "Allocating memory for %d slaves", master.nSlaves); + + // One slave per CPU + for (int i = 0; i < master.nSlaves; ++i) + { + Make_slave(i); + + + + master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char)); + master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char)); + sprintf(master.slave[i].name, "local:%d", i); + sprintf(master.slave[i].addr, "local:%d", i); + FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL); + fprintf(f, "name=%s\n", master.slave[i].name); + } +} + +void Make_slave(int i) +{ + int sv[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0) + { + error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno)); + } + + master.slave[i].pid = fork(); + if (master.slave[i].pid == 0) + { + dup2(sv[0],fileno(stdin)); + dup2(sv[0],fileno(stdout)); + execlp(master.o->shell, master.o->shell, NULL); + } + master.slave[i].in = sv[1]; + master.slave[i].out = sv[1]; + master.slave[i].running = true; + master.slave[i].ssh_pid = 0; +} + +void Master_input(char c) +{ + if (master.buffer == NULL) + { + if (master.bufsiz < BUFSIZ) + master.bufsiz = BUFSIZ; + master.buffer = (char*)(calloc(master.bufsiz, sizeof(char))); + master.buflen = 0; + } + + if (c == '\n' || c == EOF || c == '\0') + { + if (master.buflen == 0) + return; + int repetitions = 1; + bool first_only = false; + + master.buffer[master.buflen++] = '\0'; // end the string + + char * message; + + char * start = strtok(master.buffer, "#"); + start = strtok(NULL, "#"); + if (start == NULL) + { + if ( master.buffer[0] != '#') + { + log_print(3, "Master_input", "Created general task \"%s\"", master.buffer); + message = master.buffer; + master.buffer = NULL; + Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile); + if (t->prev == NULL) + master.task_pool = t; + master.last_number = t->number; + } + else + { + char * cmd = strtok(master.buffer+1, " "); + if (strcmp(cmd, "ABSORB") == 0) + { + master.o->encrypt = true; + cmd = strtok(NULL, " "); + if (strcmp(cmd, "UNSECURE") == 0) + { + master.o->encrypt = false; + log_print(1, "Master_absorb", "Using unencrypted connections"); + cmd = strtok(NULL, " "); + } + if (cmd == NULL) + log_print(0, "Master_input", "No host specified for ABSORB directive"); + char * np = strtok(NULL, " "); + if (np != NULL) + Master_absorb(cmd, options.port, atoi(np)); + else + Master_absorb(cmd, options.port, 0); + } + else if (strcmp(cmd, "OUTPUT") == 0) + { + cmd = strtok(NULL, " "); + if (cmd == NULL) + { + log_print(3, "Master_input", "Detach output"); + if (master.outfile != NULL) + master.outfile[0] = '\0'; + } + else + { + log_print(3, "Master_input", "Output to %s",cmd); + if (master.outfile == NULL) + master.outfile = (char*)(calloc(BUFSIZ, sizeof(char))); + sprintf(master.outfile, "%s", cmd); + if (strstr(master.outfile, "%d") == NULL) + { + if (access(master.outfile, F_OK) == 0) + { + if (unlink(master.outfile) != 0) + error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno)); + } + } + } + } + else if (strcmp(cmd, "EXIT") == 0) + { + log_print(2, "Master_input", "Received EXIT directive; quitting"); + freopen("/dev/null", "r", stdin); + master.o->daemon = false; + return; + } + else if (strcmp(cmd, "BARRIER") == 0) + { + + log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active); + + // check there actually are tasks + bool tasks = false; + for (int i = 0; i < master.nSlaves && !tasks; ++i) + { + tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL); + } + if (tasks) + { + + master.barrier_number = master.last_number; + master.barrier_block = false; + + cmd = strtok(NULL, " "); + if (cmd != NULL && strcmp(cmd, "BLOCK") == 0) + { + if (master.o->daemon == false) + log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER"); + master.barrier_block = true; + } + } + else + log_print(3, "Master_input", "No tasks; BARRIER has no effect"); + + } + else + { + log_print(1, "Master_input", "Unrecognised directive #%s#", cmd); + } + } + master.buflen = 0; + return; + } + + *(start-1) = '\0'; + while (isspace(*start)) start++; + + + char * rep = strtok(master.buffer, " "); + rep = strtok(NULL, " "); + if (rep != NULL && *rep == '&') + { + first_only = !first_only; + rep = strtok(NULL, " "); + } + + if (rep != NULL) + { + *(rep-1) = '\0'; + repetitions = atoi(rep); + if (repetitions == 0) + { + if (strcmp(rep, "0") == 0) + log_print(0,"Master_input", "Can't assign task with %s repetitions", rep); + else + log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer); + } + } + + + regex_t r; + + int err = regcomp(&r, master.buffer+1, REG_EXTENDED); + if (err != 0) + { + regerror(err, &r, master.buffer, master.bufsiz); + error("Master_input", "Error compiling regexec : %s", master.buffer); + } + + int first_match = -1; + for (int i = 0; i < master.nSlaves; ++i) + { + if (first_only && first_match >= 0 && master.slave[i].task != NULL) + continue; + + int err = regexec(&r, master.slave[i].name, 0, NULL, 0); + if (err == REG_NOMATCH) + { + + continue; + } + else if (err != 0) + { + regerror(err, &r, master.buffer, master.bufsiz); + log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1); + continue; + } + + if (first_match < 0) first_match = i; + if (first_only) + { + if (master.slave[i].task == NULL) + { + first_match = i; + break; + } + continue; + } + + message = strdup(start); + Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile); + if (t->prev == NULL) + master.slave[i].task_pool = t; + master.last_number = t->number; + + log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name); + + } + + if (first_only && first_match >= 0) + { + message = strdup(start); + Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile); + if (t->prev == NULL) + master.slave[first_match].task_pool = t; + master.last_number = t->number; + log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name); + } + + if (first_match < 0) + log_print(1, "Master_input", "No shells with names matching regex %s"); + + regfree(&r); + master.buflen = 0; + log_print(3, "Master_input", "Processed task %s", master.buffer); + } + else + { + if (master.buflen >= master.bufsiz) + { + master.bufsiz *= 2; + master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char))); + } + master.buffer[master.buflen++] = c; + } +} + +void Master_output(int i, char c) +{ + log_print(10, "Master_output", "input %c from slave %d", c, i); + #ifdef THREAD_SENDING + pthread_mutex_lock(&sender_lock); + Task * t = master.slave[i].task; + while (t == NULL) + { + pthread_cond_wait(&sender_done_cv, &sender_lock); + } + pthread_mutex_unlock(&sender_lock); + #else + Task * t = master.slave[i].task; + #endif //THREAD_SENDING + if (t == NULL) + { + log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name); + error(NULL, "Please refrain from echoing three bell characters in a row."); + } + + + t->output[t->outlen++] = c; + if (t->outlen >= t->outsiz) + { + t->outsiz *= 2; + t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char))); + memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen); + + } + if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen + && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0)) + { + + if (c != EOF) + { + t->output[t->outlen - master.o->endlen] = '\0'; + t->outlen -= master.o->endlen; + } + + master.slave[i].task = NULL; + if (t->outlen <= 0) + { + fprintf(stdout, "%d:\n", t->number); + } + else if (t->output[t->outlen-1] == '\f') + { + log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name); + static int bufsiz = BUFSIZ; + char * buffer = (char*)(calloc(bufsiz, sizeof(char))); + if (buffer == NULL) + error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno)); + int len = 0; + while (true) + { + len = sprintf(buffer, "name=%s", master.slave[i].name); + if (len < bufsiz) + break; + bufsiz *= 2; + buffer = (char*)(realloc(buffer, bufsiz * sizeof(char))); + if (buffer == NULL) + error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno)); + } + + + Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL); + if (t2->prev == NULL) + master.slave[i].task_pool = t2; + master.last_number = t2->number; + } + else + { + fprintf(stdout, "%d: %s", t->number, t->output); + if (t->outfile != NULL && t->outfile[0] != '\0') + { + log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile); + static char buf[BUFSIZ]; + + FILE * f = NULL; + if (strstr(t->outfile, "%d") != NULL) + { + + sprintf(buf, t->outfile, t->number); + f = fopen(buf, "w"); + if (f == NULL) + error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno)); + } + else + { + f = fopen(t->outfile, "a"); + if (f == NULL) + error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno)); + + } + + + fprintf(f, "%s", t->output); + fclose(f); + } + } + + log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1); + + + if (t->repetitions == 0) + { + free(t->message); + free(t->output); + free(t->outfile); + free(t); + } + + + if (--master.commands_active == 0 && master.barrier_number >= 0) + { + master.barrier_number = -1; + if (master.barrier_block && master.o->daemon) + { + FILE * f = fopen(DAEMON_BARRIER_FIFO, "w"); + fprintf(f, "\a"); + fclose(f); + } + } + } +} + +Task * Master_tasker(int i) +{ + Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool); + + if (*tp != NULL) + { + Task * t = *tp; + if (master.barrier_number >= 0 && t->number > master.barrier_number) + return NULL; + + t->repetitions--; + if (t->repetitions == 0) + { + (*tp) = t->next; + Task_Extract(t); + } + master.slave[i].task = t; + + + + + + #ifdef THREAD_SENDING + pthread_mutex_lock(&sender_lock); + while (send_task.task != NULL) // while the sender is still sending shit + { + pthread_cond_wait(&sender_done_cv, &sender_lock); + } + send_task.task = t; + send_task.slave_fd = master.slave[i].out; + pthread_cond_signal(&sender_cv); + pthread_mutex_unlock(&sender_lock); + + #else + send_task.task = t; + send_task.slave_fd = master.slave[i].out; + Master_send(); + #endif //THREAD_SENDING + + } + return *tp; +} + +void Master_loop() +{ + + if (sigsetjmp(env,true) != 0) + { + log_print(2, "Master_loop", "Restored from longjmp"); + } + fd_set readSet; + //fd_set writeSet; + master.fd_max = fileno(stdin); + for (int i = 0; i < master.nSlaves; ++i) + { + if (master.slave[i].out > master.fd_max) + master.fd_max = master.slave[i].out; + } + + + + bool quit = false; + bool input = true; + + while (!quit) + { + + if (!input && master.o->daemon) + { + + int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK); + if (fd == -1) + { + if (errno != ENXIO) + error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno)); + else + log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno)); + } + else + { + + input = true; + dup2(fd, fileno(stdin)); + } + + } + + FD_ZERO(&readSet); + if (input) FD_SET(fileno(stdin), &readSet); + for (int i = 0; i < master.nSlaves; ++i) + { + if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet); + } + + select(master.fd_max+1, &readSet, NULL, NULL, NULL); + + + if (input && FD_ISSET(fileno(stdin), &readSet)) + { + char c; + //log_print(10, "Master_loop", "Read from stdin"); + input = (read(fileno(stdin), &c, sizeof(char)) != 0); + if (!input) + { + c = '\n'; + } + Master_input(c); + } + + + + for (int i = 0; i < master.nSlaves; ++i) + { + if (!master.slave[i].running) continue; + + if (FD_ISSET(master.slave[i].out, &readSet)) + { + //log_print(10, "Master_loop", "Read from slave %d", i); + char c; + if (read(master.slave[i].out, &c, sizeof(char)) != 0) + { + Master_output(i, c); + } + } + + if (master.slave[i].task == NULL) + { + if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks + { + quit = (--master.running == 0); + master.slave[i].running = false; + } + + } + } + + } + +} + +void Master_send() +{ + + if (master.o->prepend != NULL) + { + static int len = -1; + if (len == -1) len = strlen(master.o->prepend); + write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char)); + //log_print(0, "Sent prepend %s\n",master.o->prepend); + } + + write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char)); + //log_print(0, "Sent message %s\n",send_task.task->message); + if (master.o->append != NULL) + { + static int len = -1; + if (len == -1) len = strlen(master.o->append); + write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char)); + //log_print(0, "Sent append %s\n",master.o->append); + } + if (master.o->end != NULL) + { + static char * echo = ";echo -en \""; + static int len = -1; + if (len == -1) len = strlen(echo); + write(send_task.slave_fd, echo, len*sizeof(char)); + write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char)); + write(send_task.slave_fd, "\"", 1*sizeof(char)); + //log_print(0, "Sent end\n"); + } + write(send_task.slave_fd, "\n", 1*sizeof(char)); + master.commands_active++; + log_print(3, "Master_sender", "Sent task %d \"%s\" - %d tasks active", send_task.task->number, send_task.task->message, master.commands_active); +} + +#ifdef THREAD_SENDING +void * Master_sender(void * args) +{ + bool quit = false; + while (!quit) + { + pthread_mutex_lock(&sender_lock); + while (send_task.task == NULL) + { + pthread_cond_wait(&sender_cv, &sender_lock); + + quit = (send_task.task == NULL); + if (quit) + break; + + + Master_send(); + + //log_print(0, "Master_sender sent message\n"); + + + pthread_cond_broadcast(&sender_done_cv); + } + send_task.task = NULL; + pthread_mutex_unlock(&sender_lock); + } + + return NULL; +} +#endif //THREAD_SENDING + + + + +void Master_cleanup() +{ + + //log_print(2, "Master_cleanup", "Preparing to exit..."); + #ifdef THREAD_SENDING + pthread_mutex_lock(&sender_lock); + send_task.task = NULL; + pthread_cond_broadcast(&sender_cv); + pthread_mutex_unlock(&sender_lock); + pthread_join(sender_thread, NULL); + #endif //THREAD_SENDING + + if (master.task_pool != NULL) Task_Destroy(master.task_pool); + + + signal(SIGCHLD, SIG_IGN); // ignore child exits now + + for (int i = 0; i < master.nSlaves; ++i) + { + + static int exitlen = -1; + if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE); + write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char)); + //usleep(0.5); //shouldn't matter too much + } + + for (int i = 0; i < master.nSlaves; ++i) + { + if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool); + if (master.slave[i].pid <= 0) + { + Network_close(master.slave[i].in); + if (master.slave[i].ssh_pid > 0) + { + log_print(2, "Master_cleanup", "Killing ssh instance %d", master.slave[i].ssh_pid); + kill(master.slave[i].ssh_pid, 15); + if (kill(master.slave[i].ssh_pid, 0) == 0) + kill(master.slave[i].ssh_pid, 9); + } + } + else + { + kill(master.slave[i].pid, 15); // be nice + if (kill(master.slave[i].pid, 0) == 0) + kill(master.slave[i].pid, 9); // shoot it down + close(master.slave[i].in); + close(master.slave[i].out); + } + free(master.slave[i].name); + free(master.slave[i].addr); + } + free(master.slave); + free(master.buffer); + if (master.outfile != NULL) + free(master.outfile); +} + +void * start_server(void * args) +{ + + *(int*)(args) = Network_server(*(int*)args); + log_print(2, "start_server", "started network server"); + return NULL; +} + +int Secure_connection(char * addr, int port); + +void Master_absorb(char * addr, int port, int np) +{ + char * name = strtok(addr, ":"); + name = strtok(NULL, ":"); + if (name == NULL) + { + name = addr; + } + else + { + *(name-1) = '\0'; + } + + + //log_print(0, "name is %s\n", name); + + int first_ssh = 0; + if (master.o->encrypt) + first_ssh = Secure_connection(addr, port); + + //pthread_t ss; + //int net_fd = port; + //pthread_create(&ss, NULL, start_server, (void*)(&net_fd)); + + char buffer[BUFSIZ]; + if (fork() == 0) + { + // The alternative to this kind of terrible hack is OpenMPI's "opal" + // This involves >1000 lines of operating system independent ways to get an IP address from a local interface + // Which will then be completely useless if there is any sort of NAT involved + + //freopen("/dev/null", "r", stdin); + //freopen("/dev/null", "w", stdout); + //freopen("/dev/null", "w", stderr); + + char * cmd = buffer+sprintf(buffer, "swarm -p "); + if (master.o->encrypt) + cmd = cmd+sprintf(cmd, "%d -e", port+1000); + else + cmd = cmd+sprintf(cmd, "%d -u", port); + + if (np > 0) + cmd = cmd+sprintf(cmd, " -n %d", np); + sprintf(cmd, " -m $(echo $SSH_CONNECTION | awk \'{print $1}\')"); + log_print(3, "Master_absorb", "Execing %s", buffer); + execlp("ssh", "ssh", "-f", addr, buffer, NULL); + } + log_print(3, "Master_absorb", "Listening on port %d", port); + int net_fd = Network_server(port); + log_print(3, "Master_absorb", "Created network server on port %d", port); + + + char * s = buffer; + while (read(net_fd, s, sizeof(char)) != 0) + { + if (*s == '\n') + { + *s = '\0'; + break; + } + ++s; + } + + int newSlaves = atoi(buffer); + log_print(3, "Master_absorb", "Absorbing %d slaves from %s\n", newSlaves, addr); + if (newSlaves == 0) + { + error("Master_absorb", "No slaves to absorb from %s", addr); + } + + master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave))); + if (master.slave == NULL) + { + error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", master.nSlaves, master.nSlaves + newSlaves, strerror(errno)); + } + + + if (master.o->encrypt) + { + for (int i = 0; i < newSlaves-1; ++i) + master.slave[master.nSlaves+i].ssh_pid = Secure_connection(addr, port+i+1); + + } + master.slave[master.nSlaves+newSlaves-1].ssh_pid = first_ssh; + + + + + + + char c = '\a'; + log_print(3, "Master_absorb", "Writing bell to slave"); + write(net_fd, &c, sizeof(char)); + + + + for (int i = 0; i < newSlaves; ++i) + { + log_print(3, NULL, "Absorbing slave %d...", i); + int ii = master.nSlaves + i; + if (i == newSlaves-1) + { + master.slave[ii].out = net_fd; + } + else + { + log_print(3, "Master_absorb", "Creating server %d at time %d", i, time(NULL)); + write(net_fd, &c, sizeof(char)); + master.slave[ii].out = Network_server(port + i + 1); + } + master.slave[ii].in = master.slave[ii].out; + + + if (master.slave[ii].out > master.fd_max) + master.fd_max = master.slave[ii].out; + + sprintf(buffer, "%s:%d", name, i); + master.slave[ii].name = strdup(buffer); + master.slave[ii].addr = strdup(addr); + master.slave[ii].running = true; + master.slave[ii].pid = -1; + master.slave[ii].task = NULL; + master.slave[ii].task_pool = NULL; + + FILE * f = fdopen(master.slave[ii].in, "w"); setbuf(f, NULL); + fprintf(f, "name=%s\n", master.slave[ii].name); + + log_print(3, NULL, "Done absorbing slave %d...", i); + + } + + + master.nSlaves = master.nSlaves + newSlaves; + + master.running += newSlaves; + + log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr); + +} + +int Secure_connection(char * addr, int port) +{ + int result = fork(); + if (result == 0) + { + char buffer[BUFSIZ]; + sprintf(buffer, "%d:localhost:%d", port+1000, port); + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + execl("/usr/bin/ssh", "/usr/bin/ssh", "-N", "-R", buffer, addr, NULL); + } + return result; +} + +void sigchld_handler(int signal) +{ + + if (signal != SIGCHLD) + error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD); + + int s = 0; + int p = waitpid(-1, &s, 0); + if (p == -1) + error("sigchld_handler", "waitpid : %s", strerror(errno)); + + if (WIFSIGNALED(s)) + { + int sig = WTERMSIG(s); + log_print(2, "sigchld_handler", "A child [%d] was terminated with signal %d; terminating self with same signal", p, sig); + kill(getpid(), sig); + return; + } + + int i = 0; + for (i = 0; i < master.nSlaves; ++i) + { + if (master.slave[i].pid == p) + break; + } + if (i >= master.nSlaves) + { + return; + } + + log_print(1, "sigchld_handler", "Slave %d [%d] exited with code %d; restarting it",i, p, s); + + Make_slave(i); + + if (master.o->end != NULL) + { + log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i); + char buffer[BUFSIZ]; + sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end); + if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0) + error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i); + + } + + siglongjmp(env,1); + + +} + diff --git a/src/master.h b/src/master.h new file mode 100644 index 0000000..1a2d5e9 --- /dev/null +++ b/src/master.h @@ -0,0 +1,52 @@ +#ifndef _MASTER_H +#define _MASTER_H + +#include "options.h" +#include "task.h" +#include +#include "slave.h" + + +extern void Master_main(Options * o); +extern void Master_loop(); +extern void * Master_sender(void * args); +extern void Master_setup(Options * o); +extern void Master_cleanup(); +extern void Master_send(); +extern void Master_absorb(char * addr, int port, int np); + +extern void Make_slave(int i); +extern void sigchld_handler(int signal); + + + +typedef struct +{ + Slave * slave; + int nSlaves; + int running; + + Task * task_pool; // tasks that aren't for a single specific slave + + int last_number; //number of last task + + char * buffer; // buffer for storing input + int buflen; // length of buffer string + int bufsiz; // actual size of buffer + + char * outfile; + + int fd_max; + + int barrier_number; + bool barrier_block; + + int commands_active; + + Options * o; +} Master; + + + + +#endif //_MASTER_H diff --git a/src/network.c b/src/network.c new file mode 100644 index 0000000..5354ac3 --- /dev/null +++ b/src/network.c @@ -0,0 +1,140 @@ +#include "network.h" +#include +#include +#include +#include "log.h" + +#define h_addr h_addr_list[0] + +int Network_server(int port) {return Network_server_r(NULL, port);} + +int Network_server_r(char * addr, int port) +{ + int sfd = socket(PF_INET, SOCK_STREAM, 0); + if (sfd < 0) + { + error("Network_server", "Creating socket on port %d : %s", port, strerror(errno)); + } + + struct sockaddr_in name; + + name.sin_family = AF_INET; + name.sin_addr.s_addr = htonl(INADDR_ANY); + name.sin_port = htons(port); + + if (bind( sfd, (struct sockaddr *) &name, sizeof(name) ) < 0) + { + error("Network_server", "Binding socket on port %d : %s", port, strerror(errno)); + } + if (listen(sfd, 1) < 0) + { + error("Network_server", "Listening on port %d : %s", port, strerror(errno)); + } + + int psd; + if (addr == NULL) + psd = accept(sfd, 0, 0); + else + { + struct sockaddr_in client; + struct hostent *hp; + + client.sin_family = AF_INET; + hp = gethostbyname(addr); + bcopy ( hp->h_addr, &(client.sin_addr.s_addr), hp->h_length); + client.sin_port = htons(port); + socklen_t len = sizeof(client); + + psd = accept(sfd, (struct sockaddr*)&client, &len); + } + close(sfd); + sfd = psd; + assert(sfd >= 0); + + return sfd; + +} + +int Network_client(const char * addr, int port, int timeout) +{ + int sfd = socket(PF_INET, SOCK_STREAM, 0); + long arg = fcntl(sfd, F_GETFL, NULL); + arg |= O_NONBLOCK; + fcntl(sfd, F_SETFL, arg); + + if (sfd < 0) + { + error("Network_client", "Creating socket for address %s:%d : %s", addr, port, strerror(errno)); + } + struct sockaddr_in server; + struct hostent *hp; + + + server.sin_family = AF_INET; + hp = gethostbyname(addr); + bcopy ( hp->h_addr, &(server.sin_addr.s_addr), hp->h_length); + server.sin_port = htons(port); + + int res = connect(sfd, (struct sockaddr *) &server, sizeof(server)); + + + if (res < 0 && errno == EINPROGRESS) + { + + fd_set writeSet; + FD_ZERO(&writeSet); + FD_SET(sfd, &writeSet); + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + struct timeval * tp; + tp = (timeout < 0) ? NULL : &tv; + + int err = select(sfd+1, NULL, &writeSet, NULL, tp); + + if (err == 0) + { + error("Network_client", "Timed out trying to connect to %s:%d after %d seconds", addr, port, timeout); + } + else if (err < 0) + { + error("Network_client", "Connecting to %s:%d - Error in select(2) call : %s", addr, port, strerror(errno)); + } + else if (FD_ISSET(sfd, &writeSet)) + { + int so_error; + socklen_t len = sizeof so_error; + getsockopt(sfd, SOL_SOCKET, SO_ERROR, &so_error, &len); + if (so_error != 0) + { + error("Network_client", "Connecting to %s:%d : %s", addr, port, strerror(so_error)); + } + } + else + { + error("Network_client", "select(2) returned %d but the socket is not writable!?", err); + } + } + else + { + error("Network_client", "Connecting to %s:%d : %s", addr, port, strerror(errno)); + } + + arg = fcntl(sfd, F_GETFL, NULL); + arg &= (~O_NONBLOCK); + fcntl(sfd, F_SETFL, arg); + + + return sfd; +} + +void Network_close(int sfd) +{ + if (shutdown(sfd, 2) != 0) + { + error("Network_close", "Closing socket : %s", strerror(errno)); + } + close(sfd); +} diff --git a/src/network.h b/src/network.h new file mode 100644 index 0000000..b00e595 --- /dev/null +++ b/src/network.h @@ -0,0 +1,26 @@ +#ifndef _NETWORK_H +#define _NETWORK_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int Network_server(int port); +extern int Network_server_r(char * addr, int port); +extern int Network_client(const char * addr, int port, int timeout); + +extern void Network_close(int sfd); + +#endif //_NETWORK_H + +//EOF diff --git a/src/options.c b/src/options.c new file mode 100644 index 0000000..6ef6119 --- /dev/null +++ b/src/options.c @@ -0,0 +1,282 @@ +#include "options.h" +#include +#include +#include +#include +#include "log.h" +#include +#include +#include +#include +#include "daemon.h" +#include + + +void remove_command() +{ + remove(COMMAND_FILE); + +} + +void remove_daemon() +{ + remove(DAEMON_PID_FILE); + remove(DAEMON_FIFO); + remove(DAEMON_BARRIER_FIFO); +} + +void close_err() +{ + fclose(stderr); +} + +void close_out() +{ + fclose(stdout); +} + +void Initialise(int argc, char ** argv, Options * o) +{ + srand(time(NULL)); + o->program = argv[0]; + o->master_addr = NULL; + o->shell = "bash"; // choosing other shells seems to not work... for some reason + o->logfile = NULL; + o->outfile = NULL; + o->verbosity = 2; + o->port = 4000 + rand() % 1000; + o->slavefile = "slaves.swarm"; + o->dummy_shell = false; + o->append = NULL; + o->prepend = NULL; + o->end = "\a\a\a"; + o->nCPU = sysconf( _SC_NPROCESSORS_ONLN ); + o->daemon = false; + o->encrypt = true; + o->interactive = true; + + + o->master_pid = getpid(); + + ParseArguments(argc, argv, o); + + o->endlen = (o->end != NULL) ? strlen(o->end) : 0; + + if (!o->daemon) + { + FILE * f = fopen(DAEMON_PID_FILE, "r"); + if (f != NULL) + { + int daemon_pid; + fscanf(f, "%d", &daemon_pid); + fclose(f); + if (kill(daemon_pid, 0) != 0) + { + error("Initialise", "There was a daemon [%d] running here, but it's gone for some reason.", daemon_pid); + } + o->daemon_wrapper = true; + + } + } + else + o->daemon_wrapper = false; + + if (o->logfile != NULL) + { + if (o->logfile[0] == '+') + freopen(o->logfile+1, "a", stderr); + else + freopen(o->logfile, "w", stderr); + + setbuf(stderr, NULL); + atexit(close_err); + } + if (o->outfile != NULL) + { + if (o->outfile[0] == '+') + freopen(o->outfile+1, "a", stdout); + else + freopen(o->outfile, "w", stdout); + atexit(close_out); + } + + if (o->verbosity >= 3) + { + char buffer[BUFSIZ]; getcwd(buffer, BUFSIZ); + char * type = (o->daemon) ? "daemon" : (o->daemon_wrapper) ? "wrapper" : (o->master_addr != NULL) ? "slave" : "interactive"; + log_print(3, "Initialise", "Directory %s; type of instance: %s", buffer, type); + } + if (o->daemon_wrapper && o->interactive) + { + log_print(1, "Initialise", "There is a daemonised swarm [%d] running in this directory."); + log_print(1, "Initialise", "You can only pass commands to the daemon by invoking %s -c [command]", options.program); + log_print(1, "Initialise", "Running `swarm -c \"#EXIT#\"' will quit the daemon, unless it is waiting on a BARRIER"); + error("Initialise", "Can't run an interactive wrapper to a daemon"); + } + +} + +void ParseArguments(int argc, char ** argv, Options * o) +{ + + for (int i = 1; i < argc; ++i) + { + if (argv[i][0] == '-' && argv[i][2] == '\0') + { + + + + + + if (argv[i][1] == 'p') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + o->port = atoi(argv[++i]); + } + else if (argv[i][1] == 'n') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + o->nCPU = atoi(argv[++i]); + } + else if (argv[i][1] == 'm') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + o->master_addr = argv[++i]; + } + else if (argv[i][1] == 'c') + { + + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + + if (!o->interactive) + error("ParseArguments", "Can't use %s switch in combination with a script", argv[i]); + + // insert terrible hack here + o->interactive = false; + FILE * f = fopen(COMMAND_FILE, "a"); + fprintf(f, "%s\n", argv[++i]); + fclose(f); + dup2(open(COMMAND_FILE, O_RDONLY), fileno(stdin)); + atexit(remove_command); + } + else if (argv[i][1] == 's') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + + o->shell = argv[++i]; // obviously this breaks things + } + else if (argv[i][1] == 'l') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + + char * l = argv[++i]; + while (*l != '\0') if (*(l++) == ':') break; + if (*l != '\0') + { + o->verbosity = atoi(l); + *(l-1) = 0; + } + if (argv[i][0] != '\0') + o->logfile = argv[i]; + + } + else if (argv[i][1] == 'o') + { + if (i >= argc-1) + error("ParseArguments", "No argument following %s switch", argv[i]); + + o->outfile = argv[++i]; + } + else if (argv[i][1] == 'e') + o->encrypt = true; + else if (argv[i][1] == 'u') + o->encrypt = false; + else + { + fprintf(stderr, "%s : Unrecognised switch \"%s\"\n", argv[0], argv[i]); + exit(EXIT_FAILURE); + } + } + else if (strcmp(argv[i], "--daemon") == 0) + { + FILE * f = fopen(DAEMON_PID_FILE, "r"); + if (f != NULL) + { + int daemon_pid; fscanf(f, "%d\n", &daemon_pid); + fclose(f); + + if (kill(daemon_pid, 0) != 0) + { + if (errno == ESRCH) + { + log_print(0, "ParseArguments", "It looks like a daemon [%d] failed to exit cleanly. Starting a new daemon.", daemon_pid); + remove_daemon(); + } + else + { + error("ParseArguments", "Couldn't determine whether a daemon [%d] was already running : %s", daemon_pid, strerror(errno)); + } + } + else + error("ParseArguments", "A daemon is already running!"); + } + + int pid = fork(); + if (pid != 0) + { + //setbuf(stdout, NULL); + //fprintf(stdout, "%d\n", pid); + exit(EXIT_SUCCESS); // fork off into daemon + } + atexit(remove_daemon); + + f = fopen(DAEMON_PID_FILE, "w"); + if (f == NULL) + error("ParseArguments", "Couldn't open %s : %s", DAEMON_PID_FILE, strerror(errno)); + fprintf(f, "%d", getpid()); fclose(f); + + fprintf(stdout, "%d\n", getpid()); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + + mkfifo(DAEMON_FIFO, 0600); + mkfifo(DAEMON_BARRIER_FIFO, 0600); + freopen(DAEMON_FIFO, "r", stdin); + + o->daemon = true; + + + } + else if (o->interactive) + { + o->interactive = false; + dup2(open(argv[i], O_RDONLY), fileno(stdin)); // replace stdin + } + else + { + fprintf(stderr, "%s : Usage %s [options] [script]\n", argv[0], argv[0]); + fprintf(stderr, "%s : (extra argv[%d] %s)\n", argv[0], i, argv[i]); + exit(EXIT_FAILURE); + } + } +} + + +char * strdup(const char * str) +{ + int n = strlen(str) + 1; + char * dup = (char*)(calloc(n, sizeof(char))); + if (dup != NULL) + { + strcpy(dup, str); + } + return dup; +} + + diff --git a/src/options.h b/src/options.h new file mode 100644 index 0000000..de365eb --- /dev/null +++ b/src/options.h @@ -0,0 +1,50 @@ +#ifndef _OPTIONS_H +#define _OPTIONS_H + +#include +#include +#include +#include + +#define COMMAND_FILE ".swarm.command" + +#define SHELL_EXIT_MESSAGE "\nexit 666\n" +#define SHELL_EXIT_CODE 666 // make sure this matches the above + +typedef struct +{ + char * program; + char * shell; + char * master_addr; + char * logfile; + char * outfile; + int verbosity; + int port; + char * slavefile; + bool dummy_shell; + char * prepend; + char * append; + char * end; + int endlen; + int nCPU; + int master_pid; + bool daemon; + bool daemon_wrapper; + bool encrypt; + bool interactive; + bool handle_signals; + +} Options; + + +extern Options options; + +extern void Initialise(int argc, char ** argv, Options * o); +extern void ParseArguments(int argc, char ** argv, Options * o); + +extern char * strdup(); + + +#endif //_OPTIONS_H + + diff --git a/src/slave.c b/src/slave.c new file mode 100644 index 0000000..3327759 --- /dev/null +++ b/src/slave.c @@ -0,0 +1,142 @@ +#define _XOPEN_SOURCE +#define _GNU_SOURCE + +//#define _SIMPLE_SLAVE + +#include "slave.h" +#include + +#include "network.h" +#include "daemon.h" +#include "log.h" +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + + +Slave * slave; + + +int running; + +void Slave_main(Options * o) +{ + + if (fork() != 0) + exit(EXIT_SUCCESS); + + + o->verbosity = 100; + freopen(SLAVE_LOGFILE, "w", stderr); + setbuf(stderr, NULL); + slave = (Slave*)(calloc(o->nCPU, sizeof(Slave))); + + int net_fd = -1; + if (o->encrypt) + net_fd = Network_client("localhost", o->port,100); + else + net_fd = Network_client(o->master_addr, o->port,100); + + FILE * f = fdopen(net_fd, "w"); setbuf(f, NULL); + fprintf(f, "%d\n", o->nCPU); + + log_print(2, "Slave_main", "Waiting on bell from master"); + char c; + if (read(net_fd, &c, sizeof(char)) == 0 || c != '\a') + error("Slave_main", "Didn't get bell from master"); + + + + + log_print(2, "Slave_main", "Got bell from master"); + running = o->nCPU; + for (int i = 0; i < o->nCPU; ++i) + { + int new_fd = net_fd; + if (i != o->nCPU-1) + { + + + if (read(net_fd, &c, sizeof(char)) == 0 || c != '\a') + error("Slave_main", "Didn't get bell from master authorising connection of slave %d", i); + sleep(1); + + log_print(3, "Slave_main", "Connecting slave %d to port %d at time %d", i, o->port+i+1, time(NULL)); + if (o->encrypt) + new_fd = Network_client("localhost", o->port+i+1, 100); + else + new_fd = Network_client(o->master_addr, o->port+i+1, 100); + + + + } + + slave[i].in = new_fd; slave[i].out = new_fd; + + slave[i].pid = fork(); + if (slave[i].pid == 0) + { + dup2(slave[i].in, fileno(stdin)); + dup2(slave[i].out, fileno(stdout)); + execlp(o->shell, o->shell, NULL); + } + } + + Slave_loop(o); + + free(slave); + exit(EXIT_SUCCESS); +} + +void Slave_loop(Options * o) +{ + + int p = -1; int s = 0; + + while (running > 0) + { + p = waitpid(-1, &s, 0); + if (p == -1) + { + log_print(0, "Slave_loop", "waitpid : %s", strerror(errno)); + continue; + } + if (s != SHELL_EXIT_CODE) + { + // there was an error + + int i = 0; + for (i = 0; i < o->nCPU; ++i) + { + if (slave[i].pid == p) break; + } + if (i >= o->nCPU) + error("Slave_loop", "No child matches pid %d", p); + + log_print(0, "Slave_loop", "Child [%d] exits with status %d; restarting", p, s); + slave[i].pid = fork(); + if (slave[i].pid == 0) + { + dup2(slave[i].in, fileno(stdin)); + dup2(slave[i].out, fileno(stdout)); + execlp(o->shell, o->shell, NULL); + } + + char buffer[] = "\f\a\a\a"; + if (write(slave[i].in, buffer, strlen(buffer)) <= 0) + log_print(0, "Slave_loop", "Slave %d input closed", i); + } + else + --running; + } +} diff --git a/src/slave.h b/src/slave.h new file mode 100644 index 0000000..40b16ad --- /dev/null +++ b/src/slave.h @@ -0,0 +1,32 @@ +#ifndef _SLAVE_H +#define _SLAVE_H + +#define SLAVE_LOGFILE ".swarm.slave.log" + +#include "options.h" +#include +#include "task.h" // avoid compiler errors +typedef struct +{ + int in; + int out; + int pid; // pid of the process running the slave; == 0 for network slaves + char * name; + char * addr; + Task * task; + Task * task_pool; // tasks specific to the slave + + + int ssh_pid; + + bool running; +} Slave; + +extern void Slave_main(Options * o); + +extern void Slave_loop(Options * o); + + +#endif //_SLAVE_H + +//EOF diff --git a/src/task.c b/src/task.c new file mode 100644 index 0000000..2f3cded --- /dev/null +++ b/src/task.c @@ -0,0 +1,98 @@ +#include "task.h" +#include "log.h" + +static int number = 0; // number of created tasks + +Task * Task_Append(Task * p, char * message, int bufsiz, int repetitions, char * outfile) +{ + Task * t = (Task*)(calloc(1, sizeof(Task))); + t->message = message; + t->bufsiz = bufsiz; + t->outsiz = bufsiz; + t->output = (char*)(calloc(t->outsiz, sizeof(char))); + t->outlen = 0; + t->next = NULL; + t->prev = NULL; + t->number = number; + t->repetitions = repetitions; + if (outfile == NULL || outfile[0] == '\0') + t->outfile = NULL; + else + t->outfile = strdup(outfile); + + + if (p != NULL) + { + t->next = p->next; + p->next = t; + t->prev = p; + if (t->next != NULL) + t->next->prev = t; + } + + ++number; + return t; +} + +Task * Task_Prepend(Task * n, char * message, int bufsiz, int repetitions, char * outfile) +{ + Task * t = (Task*)(calloc(1, sizeof(Task))); + t->message = message; + t->bufsiz = bufsiz; + t->outsiz = bufsiz; + t->output = (char*)(calloc(t->outsiz, sizeof(char))); + t->outlen = 0; + t->next = NULL; + t->prev = NULL; + t->number = number; + t->repetitions = repetitions; + if (outfile == NULL || outfile[0] == '\0') + t->outfile = NULL; + else + t->outfile = strdup(outfile); + + if (n != NULL) + { + t->prev = n->prev; + n->prev = t; + t->next = n; + if (t->prev != NULL) + t->prev->next = t; + } + ++number; + return t; +} + +void Task_Extract(Task * t) +{ + Task * n = t->next; + if (n != NULL) + n->prev = t->prev; + Task * p = t->prev; + if (p != NULL) + p->next = t->next; + +} + +void Task_Destroy(Task * t) +{ + Task * n = t->next; + if (n != NULL) + Task_Destroy(n); + free(t->message); + free(t->output); + free(t); +} + +void Task_DebugPrint(Task * t) +{ + if (t == NULL) + log_print(3, "Task_DebugPrint", "no tasks"); + + int count = 0; + while (t != NULL) + { + log_print(3, "Task_DebugPrint", "%d %p <- %p -> %p: %s", count,(void*)(t->prev), (void*)t, (void*)(t->next), t->message); + t = t->next; + } +} diff --git a/src/task.h b/src/task.h new file mode 100644 index 0000000..caf5e7f --- /dev/null +++ b/src/task.h @@ -0,0 +1,34 @@ +#ifndef _TASK_H +#define _TASK_H + +#include "options.h" + + +typedef struct t +{ + char * message; // string + char * output; + int number; // identify the task by the order it was created in + int bufsiz; + int outsiz; + int outlen; + int repetitions; // number of times task must be run + + char * outfile; // name of file to send output to (can be NULL) + + struct t * next; + struct t * prev; +} Task; + + +extern Task * Task_Append(Task * p, char * message, int bufsiz, int repetitions, char * outfile); +extern Task * Task_Prepend(Task * n, char * message, int bufsiz, int repetitions, char * outfile); +extern void Task_Extract(Task * t); +extern void Task_Destroy(Task * t); + +extern void Task_DebugPrint(Task * t); + + +#endif //_TASK_H + +//EOF -- 2.20.1