17 #include <sys/types.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/tcp.h>
30 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
32 // probably not that great to use threads anyway, since it eats one of your cores
33 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
35 // the signal handler now breaks threads... don't use them
38 pthread_t sender_thread;
39 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
40 pthread_cond_t sender_cv;
41 pthread_cond_t sender_done_cv;
42 #endif //THREAD_SENDING
54 static sigjmp_buf env;
56 void sigchld_handler(int signal);
58 void Master_main(Options * o)
64 atexit(Master_cleanup);
68 if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
70 error("Master_main", "Creating sender thread : %s", strerror(errno));
72 #endif //THREAD_SENDING
76 void Master_setup(Options * o)
78 int err = libssh2_init(0);
81 error("Master_setup", "Initialising libssh2 - error code %d", err);
84 signal(SIGCHLD, sigchld_handler);
86 master.barrier_number = -1;
87 master.last_number = -1;
88 master.nSlaves = o->nCPU;
89 master.running = master.nSlaves;
90 master.nRemote = 0; master.remote_err = NULL;
91 if (master.nSlaves == 0)
92 error("Master_setup", "No CPUs to start slaves with!");
94 master.outfile = NULL;
96 master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
98 if (master.slave == NULL)
99 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
102 for (int i = 0; i < master.nSlaves; ++i)
108 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
109 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
110 sprintf(master.slave[i].name, "local:%d", i);
111 sprintf(master.slave[i].addr, "local:%d", i);
112 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
113 fprintf(f, "name=%s\n", master.slave[i].name);
117 void Make_slave(int i)
120 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
122 error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
125 master.slave[i].pid = fork();
126 if (master.slave[i].pid == 0)
128 dup2(sv[0],fileno(stdin));
129 dup2(sv[0],fileno(stdout));
130 execlp(master.o->shell, master.o->shell, NULL);
132 master.slave[i].in = sv[1];
133 master.slave[i].out = sv[1];
134 master.slave[i].running = true;
137 void Master_input(char c)
139 if (master.buffer == NULL)
141 if (master.bufsiz < BUFSIZ)
142 master.bufsiz = BUFSIZ;
143 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
147 if (c == '\n' || c == EOF || c == '\0')
149 if (master.buflen == 0)
152 bool first_only = false;
154 master.buffer[master.buflen++] = '\0'; // end the string
158 char * start = strtok(master.buffer, "#");
159 start = strtok(NULL, "#");
162 if ( master.buffer[0] != '#')
164 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
165 message = master.buffer;
166 master.buffer = NULL;
167 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
169 master.task_pool = t;
170 master.last_number = t->number;
174 char * cmd = strtok(master.buffer+1, " ");
175 if (strcmp(cmd, "ABSORB") == 0)
177 master.o->encrypt = true;
178 cmd = strtok(NULL, " ");
179 if (strcmp(cmd, "UNSECURE") == 0)
181 master.o->encrypt = false;
182 log_print(1, "Master_absorb", "Using unencrypted connections");
183 cmd = strtok(NULL, " ");
186 log_print(0, "Master_input", "No host specified for ABSORB directive");
187 char * np = strtok(NULL, " ");
189 Master_absorb(cmd, atoi(np));
191 Master_absorb(cmd, 0);
193 else if (strcmp(cmd, "OUTPUT") == 0)
195 cmd = strtok(NULL, " ");
198 log_print(3, "Master_input", "Detach output");
199 if (master.outfile != NULL)
200 master.outfile[0] = '\0';
204 log_print(3, "Master_input", "Output to %s",cmd);
205 if (master.outfile == NULL)
206 master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
207 sprintf(master.outfile, "%s", cmd);
208 if (strstr(master.outfile, "%d") == NULL)
210 if (access(master.outfile, F_OK) == 0)
212 if (unlink(master.outfile) != 0)
213 error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
218 else if (strcmp(cmd, "EXIT") == 0)
220 log_print(2, "Master_input", "Received EXIT directive; quitting");
221 freopen("/dev/null", "r", stdin);
222 master.o->daemon = false;
225 else if (strcmp(cmd, "BARRIER") == 0)
228 log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
230 // check there actually are tasks
232 for (int i = 0; i < master.nSlaves && !tasks; ++i)
234 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
239 master.barrier_number = master.last_number;
240 master.barrier_block = false;
242 cmd = strtok(NULL, " ");
243 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
245 if (master.o->daemon == false)
246 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
247 master.barrier_block = true;
251 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
256 log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
264 while (isspace(*start)) start++;
267 char * rep = strtok(master.buffer, " ");
268 rep = strtok(NULL, " ");
269 if (rep != NULL && *rep == '&')
271 first_only = !first_only;
272 rep = strtok(NULL, " ");
278 repetitions = atoi(rep);
279 if (repetitions == 0)
281 if (strcmp(rep, "0") == 0)
282 log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
284 log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
291 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
294 regerror(err, &r, master.buffer, master.bufsiz);
295 error("Master_input", "Error compiling regexec : %s", master.buffer);
298 int first_match = -1;
299 for (int i = 0; i < master.nSlaves; ++i)
301 if (first_only && first_match >= 0 && master.slave[i].task != NULL)
304 int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
305 if (err == REG_NOMATCH)
312 regerror(err, &r, master.buffer, master.bufsiz);
313 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
317 if (first_match < 0) first_match = i;
320 if (master.slave[i].task == NULL)
328 message = strdup(start);
329 Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile);
331 master.slave[i].task_pool = t;
332 master.last_number = t->number;
334 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
338 if (first_only && first_match >= 0)
340 message = strdup(start);
341 Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
343 master.slave[first_match].task_pool = t;
344 master.last_number = t->number;
345 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);
349 log_print(1, "Master_input", "No shells with names matching regex %s");
353 log_print(3, "Master_input", "Processed task %s", master.buffer);
357 if (master.buflen >= master.bufsiz)
360 master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
362 master.buffer[master.buflen++] = c;
366 void Master_output(int i, char c)
368 log_print(10, "Master_output", "input %c from slave %d", c, i);
369 #ifdef THREAD_SENDING
370 pthread_mutex_lock(&sender_lock);
371 Task * t = master.slave[i].task;
374 pthread_cond_wait(&sender_done_cv, &sender_lock);
376 pthread_mutex_unlock(&sender_lock);
378 Task * t = master.slave[i].task;
379 #endif //THREAD_SENDING
382 log_print(3, "Master_output", "Echo %c back to slave %d", c, i);
383 write(master.slave[i].in, &c, sizeof(char));
384 //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
385 //error(NULL, "Please refrain from echoing three bell characters in a row.");
390 t->output[t->outlen++] = c;
391 if (t->outlen >= t->outsiz)
394 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
395 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
398 if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen
399 && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0))
404 t->output[t->outlen - master.o->endlen] = '\0';
405 t->outlen -= master.o->endlen;
408 master.slave[i].task = NULL;
411 fprintf(stdout, "%d:\n", t->number);
414 else if (t->output[t->outlen-1] == '\f')
417 log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
418 static int bufsiz = BUFSIZ;
419 char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
421 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
425 len = sprintf(buffer, "name=%s", master.slave[i].name);
429 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
431 error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
435 Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL);
436 if (t2->prev == NULL)
437 master.slave[i].task_pool = t2;
438 master.last_number = t2->number;
443 fprintf(stdout, "%d: %s", t->number, t->output);
444 if (t->outfile != NULL && t->outfile[0] != '\0')
446 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
447 static char buf[BUFSIZ];
450 if (strstr(t->outfile, "%d") != NULL)
453 sprintf(buf, t->outfile, t->number);
456 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
460 f = fopen(t->outfile, "a");
462 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
467 fprintf(f, "%s", t->output);
472 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
475 if (t->repetitions == 0)
484 if (--master.commands_active == 0 && master.barrier_number >= 0)
486 master.barrier_number = -1;
487 if (master.barrier_block && master.o->daemon)
489 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
497 Task * Master_tasker(int i)
499 Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
504 if (master.barrier_number >= 0 && t->number > master.barrier_number)
508 if (t->repetitions == 0)
513 master.slave[i].task = t;
519 #ifdef THREAD_SENDING
520 pthread_mutex_lock(&sender_lock);
521 while (send_task.task != NULL) // while the sender is still sending shit
523 pthread_cond_wait(&sender_done_cv, &sender_lock);
526 send_task.slave_fd = master.slave[i].out;
527 pthread_cond_signal(&sender_cv);
528 pthread_mutex_unlock(&sender_lock);
532 send_task.slave_fd = master.slave[i].out;
534 #endif //THREAD_SENDING
543 if (sigsetjmp(env,true) != 0) // completely necessary evil
545 //log_print(2, "Master_loop", "Restored from longjmp");
549 master.fd_max = fileno(stdin);
550 for (int i = 0; i < master.nSlaves; ++i)
552 if (master.slave[i].out > master.fd_max)
553 master.fd_max = master.slave[i].out;
565 if (!input && master.o->daemon)
568 int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
572 error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
574 log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
580 dup2(fd, fileno(stdin));
586 if (input) FD_SET(fileno(stdin), &readSet);
587 for (int i = 0; i < master.nSlaves; ++i)
589 if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
592 for (int i = 0; i < master.nRemote; ++i)
594 FD_SET(master.remote_err[i], &readSet);
597 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
600 if (input && FD_ISSET(fileno(stdin), &readSet))
603 //log_print(10, "Master_loop", "Read from stdin");
604 input = (read(fileno(stdin), &c, sizeof(char)) != 0);
614 for (int i = 0; i < master.nSlaves; ++i)
616 if (!master.slave[i].running) continue;
618 if (FD_ISSET(master.slave[i].out, &readSet))
620 //log_print(10, "Master_loop", "Read from slave %d", i);
622 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
628 if (master.slave[i].task == NULL)
630 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
632 quit = (--master.running == 0);
633 master.slave[i].running = false;
639 for (int i = 0; i < master.nRemote; ++i)
641 if (FD_ISSET(master.remote_err[i], &readSet))
643 int len = read(master.remote_err[i], buffer, sizeof(buffer));
645 fprintf(stderr, "%s", buffer);
656 if (master.o->prepend != NULL)
659 if (len == -1) len = strlen(master.o->prepend);
660 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
661 //log_print(0, "Sent prepend %s\n",master.o->prepend);
664 write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char));
665 //log_print(0, "Sent message %s\n",send_task.task->message);
666 if (master.o->append != NULL)
669 if (len == -1) len = strlen(master.o->append);
670 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
671 //log_print(0, "Sent append %s\n",master.o->append);
673 if (master.o->end != NULL)
675 static char * echo = ";echo -en \"";
677 if (len == -1) len = strlen(echo);
678 write(send_task.slave_fd, echo, len*sizeof(char));
679 write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char));
680 write(send_task.slave_fd, "\"", 1*sizeof(char));
681 //log_print(0, "Sent end\n");
683 write(send_task.slave_fd, "\n", 1*sizeof(char));
684 master.commands_active++;
685 log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active);
688 #ifdef THREAD_SENDING
689 void * Master_sender(void * args)
694 pthread_mutex_lock(&sender_lock);
695 while (send_task.task == NULL)
697 pthread_cond_wait(&sender_cv, &sender_lock);
699 quit = (send_task.task == NULL);
706 //log_print(0, "Master_sender sent message\n");
709 pthread_cond_broadcast(&sender_done_cv);
711 send_task.task = NULL;
712 pthread_mutex_unlock(&sender_lock);
717 #endif //THREAD_SENDING
722 void Master_cleanup()
725 //log_print(2, "Master_cleanup", "Preparing to exit...");
726 #ifdef THREAD_SENDING
727 pthread_mutex_lock(&sender_lock);
728 send_task.task = NULL;
729 pthread_cond_broadcast(&sender_cv);
730 pthread_mutex_unlock(&sender_lock);
731 pthread_join(sender_thread, NULL);
732 #endif //THREAD_SENDING
734 if (master.task_pool != NULL) Task_Destroy(master.task_pool);
737 signal(SIGCHLD, SIG_IGN); // ignore child exits now
739 // tell all remote nodes to exit
740 for (int i = 0; i < master.nRemote; ++i)
742 FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL);
744 fprintf(f, "exit\n");
749 for (int i = 0; i < master.nSlaves; ++i)
752 static int exitlen = -1;
753 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE);
754 write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char));
755 //usleep(0.5); //shouldn't matter too much
758 for (int i = 0; i < master.nSlaves; ++i)
760 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
761 if (master.slave[i].pid <= 0)
763 Network_close(master.slave[i].in);
767 kill(master.slave[i].pid, 15); // be nice
768 if (kill(master.slave[i].pid, 0) == 0)
769 kill(master.slave[i].pid, 9); // shoot it down
770 close(master.slave[i].in);
771 close(master.slave[i].out);
773 free(master.slave[i].name);
774 free(master.slave[i].addr);
778 if (master.outfile != NULL)
779 free(master.outfile);
788 void Master_absorb(char * addr, int np)
792 char * user = strstr(addr, "@");
802 user = getpwuid(geteuid())->pw_name;
804 log_print(3, "Master_absorb", "User %s at address %s", user, addr);
806 // ssh to the host on port 22
807 ssh * s = ssh_new(user, addr, 22);
810 log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr);
815 // work out the name to give to the shells
816 char * name = strtok(addr, ":");
817 name = strtok(NULL, ":");
819 name = addr; // default is host:X
821 *(name-1) = '\0'; // otherwise use name:X
823 // setup array of remote stderr file descriptors
824 if (master.nRemote++ == 0)
826 master.remote_err = (int*)(calloc(master.nRemote, sizeof(int)));
827 master.remote_reserved = master.nRemote;
829 else if (master.nRemote >= master.remote_reserved)
831 // resize dynamically
832 master.remote_reserved *= 2;
833 master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int)));
839 if (master.o->encrypt)
842 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
843 error("Master_absorb", "Couldn't create socket for remote swarm");
846 ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket
847 ssh_thread_add(s); // add the ssh to the thread
851 sfd = Network_server_bind(0, &port); // dynamically bind to a port
852 ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port
853 ssh_destroy(s); // don't need the ssh anymore
854 sfd = Network_server_listen(sfd, NULL); // accept connections and pray
856 master.remote_err[master.nRemote-1] = sfd;
857 if (sfd > master.fd_max)
865 int len = sprintf(buffer, "%s\n", name);
868 w += write(sfd, buffer+w, len-w);
874 len = read(sfd, buffer+len, sizeof(buffer));
877 while (buffer[len-1] != '\n');
878 buffer[len-1] = '\0';
879 newSlaves = atoi(buffer);
885 error("Master_absorb", "No slaves to absorb from %s", addr);
888 master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
889 if (master.slave == NULL)
891 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s",
892 master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
896 for (int i = 0; i < newSlaves; ++i)
898 int ii = master.nSlaves + i;
900 if (master.o->encrypt)
903 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
904 error("Master_absorb", "Couldn't create socket for remote swarm");
907 LIBSSH2_LISTENER * listener = NULL;
908 // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0)
909 while (listener == NULL)
911 port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds
912 listener = ssh_get_listener(s, &port); // port forward to the socket
915 log_print(4,"Master_absorb", "Chose port %d", port);
916 int len = sprintf(buffer, "%d\n", port);
921 w += write(sfd, buffer+w, len-w);
923 usleep(200000); // give ssh_thread a chance to actually send the data
925 log_print(4, "Master_absorb", "Creating tunnel...");
926 ssh_add_tunnel(s, listener, sv[1]);
927 master.slave[ii].in = sv[0];
928 master.slave[ii].out = sv[0];
930 log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]);
934 int tmp = Network_server_bind(0, &port); // bind to a port
936 master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection
937 master.slave[ii].out = master.slave[ii].in;
944 if (master.slave[ii].out > master.fd_max)
945 master.fd_max = master.slave[ii].out;
948 sprintf(buffer, "%s:%d", name, i);
949 master.slave[ii].name = strdup(buffer);
950 master.slave[ii].addr = strdup(addr);
951 master.slave[ii].running = true;
952 master.slave[ii].pid = -1;
953 master.slave[ii].task = NULL;
954 master.slave[ii].task_pool = NULL;
958 master.nSlaves = master.nSlaves + newSlaves;
960 master.running += newSlaves;
962 log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
966 void sigchld_handler(int signal)
969 if (signal != SIGCHLD)
970 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
973 int p = waitpid(-1, &s, 0);
975 error("sigchld_handler", "waitpid : %s", strerror(errno));
979 for (i = 0; i < master.nSlaves; ++i)
981 if (master.slave[i].pid == p)
984 if (i >= master.nSlaves)
989 fprintf(stderr, "Unexpected exit of slave %s", master.slave[i].name);
992 int sig = WTERMSIG(s);
993 fprintf(stderr, " due to %s", strsignal(sig));
996 printf(" - committing suicide\n");
1002 fprintf(stderr, " return code %d.",s);
1004 fprintf(stderr, " Starting replacement.\n");
1008 if (master.o->end != NULL)
1010 //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
1011 char buffer[BUFSIZ];
1012 sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end);
1013 if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0)
1014 error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);