18 #include <sys/types.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/tcp.h>
31 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
33 // probably not that great to use threads anyway, since it eats one of your cores
34 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
36 // the signal handler now breaks threads... don't use them
39 pthread_t sender_thread;
40 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
41 pthread_cond_t sender_cv;
42 pthread_cond_t sender_done_cv;
43 #endif //THREAD_SENDING
55 static sigjmp_buf env;
57 void sigchld_handler(int signal);
59 void Master_main(Options * o)
65 atexit(Master_cleanup);
69 if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
71 error("Master_main", "Creating sender thread : %s", strerror(errno));
73 #endif //THREAD_SENDING
77 void Master_setup(Options * o)
79 int err = libssh2_init(0);
82 error("Master_setup", "Initialising libssh2 - error code %d", err);
85 signal(SIGCHLD, sigchld_handler);
87 master.barrier_number = -1;
88 master.last_number = -1;
89 master.nSlaves = o->nCPU;
90 master.running = master.nSlaves;
91 master.nRemote = 0; master.remote_err = NULL;
92 if (master.nSlaves == 0)
93 error("Master_setup", "No CPUs to start slaves with!");
95 master.outfile = NULL;
97 master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
99 if (master.slave == NULL)
100 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
103 for (int i = 0; i < master.nSlaves; ++i)
108 // Possibly the best function ever
109 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
111 error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
113 master.slave[i].in = sv[1];
114 master.slave[i].out = sv[1];
115 master.slave[i].socket_child_end = sv[0];
121 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
122 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
123 sprintf(master.slave[i].name, "local:%d", i);
124 sprintf(master.slave[i].addr, "local:%d", i);
125 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
126 fprintf(f, "name=%s\n", master.slave[i].name);
130 void Master_shell(int i)
132 master.slave[i].pid = fork();
133 if (master.slave[i].pid == 0)
135 dup2(master.slave[i].socket_child_end,fileno(stdin));
136 dup2(master.slave[i].socket_child_end,fileno(stdout));
137 execlp(master.o->shell, master.o->shell, NULL);
140 master.slave[i].running = true;
143 void Master_input(char c)
145 if (master.buffer == NULL)
147 if (master.bufsiz < BUFSIZ)
148 master.bufsiz = BUFSIZ;
149 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
153 if (c == '\n' || c == EOF || c == '\0')
155 if (master.buflen == 0)
158 bool first_only = false;
160 master.buffer[master.buflen++] = '\0'; // end the string
164 char * start = strtok(master.buffer, "#");
165 start = strtok(NULL, "#");
168 if ( master.buffer[0] != '#')
170 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
171 message = master.buffer;
172 master.buffer = NULL;
173 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
175 master.task_pool = t;
176 master.last_number = t->number;
180 char * cmd = strtok(master.buffer+1, " ");
181 if (strcmp(cmd, "ABSORB") == 0)
183 master.o->encrypt = true;
184 cmd = strtok(NULL, " ");
185 if (strcmp(cmd, "UNSECURE") == 0)
187 master.o->encrypt = false;
188 log_print(1, "Master_absorb", "Using unencrypted connections");
189 cmd = strtok(NULL, " ");
192 log_print(0, "Master_input", "No host specified for ABSORB directive");
193 char * np = strtok(NULL, " ");
195 Master_absorb(cmd, atoi(np));
197 Master_absorb(cmd, 0);
199 else if (strcmp(cmd, "OUTPUT") == 0)
201 cmd = strtok(NULL, " ");
204 log_print(3, "Master_input", "Detach output");
205 if (master.outfile != NULL)
206 master.outfile[0] = '\0';
210 log_print(3, "Master_input", "Output to %s",cmd);
211 if (master.outfile == NULL)
212 master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
213 sprintf(master.outfile, "%s", cmd);
214 if (strstr(master.outfile, "%d") == NULL)
216 if (access(master.outfile, F_OK) == 0)
218 if (unlink(master.outfile) != 0)
219 error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
224 else if (strcmp(cmd, "EXIT") == 0)
226 log_print(2, "Master_input", "Received EXIT directive; quitting");
227 freopen("/dev/null", "r", stdin);
228 master.o->daemon = false;
231 else if (strcmp(cmd, "BARRIER") == 0)
234 log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
236 // check there actually are tasks
238 for (int i = 0; i < master.nSlaves && !tasks; ++i)
240 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
245 master.barrier_number = master.last_number;
246 master.barrier_block = false;
248 cmd = strtok(NULL, " ");
249 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
251 if (master.o->daemon == false)
252 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
253 master.barrier_block = true;
257 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
262 log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
270 while (isspace(*start)) start++;
273 char * rep = strtok(master.buffer, " ");
274 rep = strtok(NULL, " ");
275 if (rep != NULL && *rep == '&')
277 first_only = !first_only;
278 rep = strtok(NULL, " ");
284 repetitions = atoi(rep);
285 if (repetitions == 0)
287 if (strcmp(rep, "0") == 0)
288 log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
290 log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
297 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
300 regerror(err, &r, master.buffer, master.bufsiz);
301 error("Master_input", "Error compiling regexec : %s", master.buffer);
304 int first_match = -1;
305 for (int i = 0; i < master.nSlaves; ++i)
307 if (first_only && first_match >= 0 && master.slave[i].task != NULL)
310 int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
311 if (err == REG_NOMATCH)
318 regerror(err, &r, master.buffer, master.bufsiz);
319 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
323 if (first_match < 0) first_match = i;
326 if (master.slave[i].task == NULL)
334 message = strdup(start);
335 Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile);
337 master.slave[i].task_pool = t;
338 master.last_number = t->number;
340 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
344 if (first_only && first_match >= 0)
346 message = strdup(start);
347 Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
349 master.slave[first_match].task_pool = t;
350 master.last_number = t->number;
351 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);
355 log_print(1, "Master_input", "No shells with names matching regex %s");
359 log_print(3, "Master_input", "Processed task %s", master.buffer);
363 if (master.buflen >= master.bufsiz)
366 master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
368 master.buffer[master.buflen++] = c;
372 void Master_output(int i, char c)
374 log_print(10, "Master_output", "input %c from slave %d", c, i);
375 #ifdef THREAD_SENDING
376 pthread_mutex_lock(&sender_lock);
377 Task * t = master.slave[i].task;
380 pthread_cond_wait(&sender_done_cv, &sender_lock);
382 pthread_mutex_unlock(&sender_lock);
384 Task * t = master.slave[i].task;
385 #endif //THREAD_SENDING
388 log_print(3, "Master_output", "Echo %c back to slave %d", c, i);
389 write(master.slave[i].in, &c, sizeof(char));
390 //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
391 //error(NULL, "Please refrain from echoing three bell characters in a row.");
396 t->output[t->outlen++] = c;
397 if (t->outlen >= t->outsiz)
400 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
401 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
405 #ifdef SHELL_OUTPUT_FINISHED
406 || (t->outlen >= SHELL_OUTPUT_FINISHED_LENGTH
407 && strcmp((t->output)+(t->outlen)-(SHELL_OUTPUT_FINISHED_LENGTH), SHELL_OUTPUT_FINISHED) == 0))
409 ) // this is totally readable
410 #endif //SHELL_OUTPUT_FINISHED
415 t->output[t->outlen - SHELL_OUTPUT_FINISHED_LENGTH] = '\0';
416 t->outlen -= SHELL_OUTPUT_FINISHED_LENGTH;
419 master.slave[i].task = NULL;
422 fprintf(stdout, "%d:\n", t->number);
425 else if (t->output[t->outlen-1] == '\f')
428 log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
429 static int bufsiz = BUFSIZ;
430 char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
432 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
436 len = sprintf(buffer, "name=%s", master.slave[i].name);
440 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
442 error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
446 Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL);
447 if (t2->prev == NULL)
448 master.slave[i].task_pool = t2;
449 master.last_number = t2->number;
454 fprintf(stdout, "%d: %s", t->number, t->output);
455 if (t->outfile != NULL && t->outfile[0] != '\0')
457 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
458 static char buf[BUFSIZ];
461 if (strstr(t->outfile, "%d") != NULL)
464 sprintf(buf, t->outfile, t->number);
467 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
471 f = fopen(t->outfile, "a");
473 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
478 fprintf(f, "%s", t->output);
483 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
486 if (t->repetitions == 0)
495 if (--master.commands_active == 0 && master.barrier_number >= 0)
497 master.barrier_number = -1;
498 if (master.barrier_block && master.o->daemon)
500 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
508 Task * Master_tasker(int i)
510 Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
515 if (master.barrier_number >= 0 && t->number > master.barrier_number)
519 if (t->repetitions == 0)
524 master.slave[i].task = t;
530 #ifdef THREAD_SENDING
531 pthread_mutex_lock(&sender_lock);
532 while (send_task.task != NULL) // while the sender is still sending shit
534 pthread_cond_wait(&sender_done_cv, &sender_lock);
537 send_task.slave_fd = master.slave[i].out;
538 pthread_cond_signal(&sender_cv);
539 pthread_mutex_unlock(&sender_lock);
543 send_task.slave_fd = master.slave[i].out;
545 #endif //THREAD_SENDING
554 if (sigsetjmp(env,true) != 0) // completely necessary evil
556 //log_print(2, "Master_loop", "Restored from longjmp");
560 master.fd_max = fileno(stdin);
561 for (int i = 0; i < master.nSlaves; ++i)
563 if (master.slave[i].out > master.fd_max)
564 master.fd_max = master.slave[i].out;
576 if (!input && master.o->daemon)
579 int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
583 error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
585 log_print(LOGWARN, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
591 dup2(fd, fileno(stdin));
597 if (input) FD_SET(fileno(stdin), &readSet);
598 for (int i = 0; i < master.nSlaves; ++i)
600 if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
603 for (int i = 0; i < master.nRemote; ++i)
605 FD_SET(master.remote_err[i], &readSet);
608 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
611 if (input && FD_ISSET(fileno(stdin), &readSet))
614 //log_print(10, "Master_loop", "Read from stdin");
615 input = (read(fileno(stdin), &c, sizeof(char)) != 0);
625 for (int i = 0; i < master.nSlaves; ++i)
627 if (!master.slave[i].running) continue;
629 if (FD_ISSET(master.slave[i].out, &readSet))
631 //log_print(10, "Master_loop", "Read from slave %d", i);
633 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
639 if (master.slave[i].task == NULL)
641 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
643 quit = (--master.running == 0);
644 master.slave[i].running = false;
650 for (int i = 0; i < master.nRemote; ++i)
652 if (FD_ISSET(master.remote_err[i], &readSet))
654 int len = read(master.remote_err[i], buffer, sizeof(buffer));
656 fprintf(stderr, "%s", buffer);
667 if (master.o->prepend != NULL)
670 if (len == -1) len = strlen(master.o->prepend);
671 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
672 //log_print(0, "Sent prepend %s\n",master.o->prepend);
675 write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char));
676 //log_print(0, "Sent message %s\n",send_task.task->message);
677 if (master.o->append != NULL)
680 if (len == -1) len = strlen(master.o->append);
681 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
682 //log_print(0, "Sent append %s\n",master.o->append);
684 #ifdef SHELL_OUTPUT_FINISHED
686 static char * echo = ";echo -en \"";
688 if (len == -1) len = strlen(echo);
689 write(send_task.slave_fd, echo, len*sizeof(char));
690 write(send_task.slave_fd, SHELL_OUTPUT_FINISHED, SHELL_OUTPUT_FINISHED_LENGTH * sizeof(char));
691 write(send_task.slave_fd, "\"", sizeof(char));
692 //log_print(0, "Sent end\n");
694 #endif //SHELL_OUTPUT_FINISHED
695 write(send_task.slave_fd, "\n", sizeof(char));
696 master.commands_active++;
697 log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active);
700 #ifdef THREAD_SENDING
701 void * Master_sender(void * args)
706 pthread_mutex_lock(&sender_lock);
707 while (send_task.task == NULL)
709 pthread_cond_wait(&sender_cv, &sender_lock);
711 quit = (send_task.task == NULL);
718 //log_print(0, "Master_sender sent message\n");
721 pthread_cond_broadcast(&sender_done_cv);
723 send_task.task = NULL;
724 pthread_mutex_unlock(&sender_lock);
729 #endif //THREAD_SENDING
734 void Master_cleanup()
737 //log_print(2, "Master_cleanup", "Preparing to exit...");
738 #ifdef THREAD_SENDING
739 pthread_mutex_lock(&sender_lock);
740 send_task.task = NULL;
741 pthread_cond_broadcast(&sender_cv);
742 pthread_mutex_unlock(&sender_lock);
743 pthread_join(sender_thread, NULL);
744 #endif //THREAD_SENDING
746 if (master.task_pool != NULL) Task_Destroy(master.task_pool);
749 signal(SIGCHLD, SIG_IGN); // ignore child exits now
751 // tell all remote nodes to exit
752 for (int i = 0; i < master.nRemote; ++i)
754 FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL);
756 fprintf(f, SHELL_EXIT_COMMAND);
761 for (int i = 0; i < master.nSlaves; ++i)
764 static int exitlen = -1;
765 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_COMMAND);
766 write(master.slave[i].in, SHELL_EXIT_COMMAND, exitlen *sizeof(char));
767 //usleep(0.5); //shouldn't matter too much
770 for (int i = 0; i < master.nSlaves; ++i)
772 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
773 if (master.slave[i].pid <= 0)
775 Network_close(master.slave[i].in);
779 kill(master.slave[i].pid, 15); // be nice
780 if (kill(master.slave[i].pid, 0) == 0)
781 kill(master.slave[i].pid, 9); // shoot it down
782 close(master.slave[i].in);
783 close(master.slave[i].out);
785 free(master.slave[i].name);
786 free(master.slave[i].addr);
790 if (master.outfile != NULL)
791 free(master.outfile);
800 void Master_absorb(char * addr, int np)
804 char * user = strstr(addr, "@");
814 user = getpwuid(geteuid())->pw_name;
816 log_print(3, "Master_absorb", "User %s at address %s", user, addr);
818 // ssh to the host on port 22
819 ssh * s = ssh_new(user, addr, 22);
822 log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr);
827 // work out the name to give to the shells
828 char * name = strtok(addr, ":");
829 name = strtok(NULL, ":");
831 name = addr; // default is host:X
833 *(name-1) = '\0'; // otherwise use name:X
835 // setup array of remote stderr file descriptors
836 if (master.nRemote++ == 0)
838 master.remote_err = (int*)(calloc(master.nRemote, sizeof(int)));
839 master.remote_reserved = master.nRemote;
841 else if (master.nRemote >= master.remote_reserved)
843 // resize dynamically
844 master.remote_reserved *= 2;
845 master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int)));
851 if (master.o->encrypt)
854 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
855 error("Master_absorb", "Couldn't create socket for remote swarm");
858 ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket
859 ssh_thread_add(s); // add the ssh to the thread
863 sfd = Network_server_bind(0, &port); // dynamically bind to a port
864 ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port
865 ssh_destroy(s); // don't need the ssh anymore
866 sfd = Network_server_listen(sfd, NULL); // accept connections and pray
868 master.remote_err[master.nRemote-1] = sfd;
869 if (sfd > master.fd_max)
877 int len = sprintf(buffer, "%s\n", name);
880 w += write(sfd, buffer+w, len-w);
886 len = read(sfd, buffer+len, sizeof(buffer));
889 while (buffer[len-1] != '\n');
890 buffer[len-1] = '\0';
892 while (newSlaves == 0 && strcmp(buffer, "0") != 0)
894 newSlaves = atoi(buffer);
901 error("Master_absorb", "No slaves to absorb from %s", addr);
904 master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
905 if (master.slave == NULL)
907 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s",
908 master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
912 for (int i = 0; i < newSlaves; ++i)
914 int ii = master.nSlaves + i;
916 if (master.o->encrypt)
919 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
920 error("Master_absorb", "Couldn't create socket for remote swarm");
923 LIBSSH2_LISTENER * listener = NULL;
924 // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0)
925 while (listener == NULL)
927 port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds
928 listener = ssh_get_listener(s, &port); // port forward to the socket
931 log_print(4,"Master_absorb", "Chose port %d", port);
932 int len = sprintf(buffer, "%d\n", port);
937 w += write(sfd, buffer+w, len-w);
939 usleep(200000); // give ssh_thread a chance to actually send the data
941 log_print(4, "Master_absorb", "Creating tunnel...");
942 ssh_add_tunnel(s, listener, sv[1]);
943 master.slave[ii].in = sv[0];
944 master.slave[ii].out = sv[0];
946 log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]);
950 int tmp = Network_server_bind(0, &port); // bind to a port
952 master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection
953 master.slave[ii].out = master.slave[ii].in;
960 if (master.slave[ii].out > master.fd_max)
961 master.fd_max = master.slave[ii].out;
964 sprintf(buffer, "%s:%d", name, i);
965 master.slave[ii].name = strdup(buffer);
966 master.slave[ii].addr = strdup(addr);
967 master.slave[ii].running = true;
968 master.slave[ii].pid = -1;
969 master.slave[ii].task = NULL;
970 master.slave[ii].task_pool = NULL;
974 master.nSlaves = master.nSlaves + newSlaves;
976 master.running += newSlaves;
978 log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
982 void sigchld_handler(int signal)
985 if (signal != SIGCHLD)
986 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
989 int p = waitpid(-1, &s, 0);
991 error("sigchld_handler", "waitpid : %s", strerror(errno));
995 for (i = 0; i < master.nSlaves; ++i)
997 if (master.slave[i].pid == p)
1000 if (i >= master.nSlaves)
1006 sigchld_respond(s, "local", i);
1010 #ifdef SHELL_OUTPUT_FINISHED
1012 //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
1013 char buffer[BUFSIZ];
1014 int len = sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, SHELL_OUTPUT_FINISHED);
1015 if (write(master.slave[i].in, buffer, len) <= 0)
1016 error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);
1019 #endif //SHELL_OUTPUT_FINISHED