22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/tcp.h>
26 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
28 // probably not that great to use threads anyway, since it eats one of your cores
29 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
32 pthread_t sender_thread;
33 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
34 pthread_cond_t sender_cv;
35 pthread_cond_t sender_done_cv;
36 #endif //THREAD_SENDING
48 static sigjmp_buf env;
50 void sigchld_handler(int signal);
52 void Master_main(Options * o)
58 atexit(Master_cleanup);
62 if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
64 error("Master_main", "Creating sender thread : %s", strerror(errno));
66 #endif //THREAD_SENDING
70 void Master_setup(Options * o)
72 signal(SIGCHLD, sigchld_handler);
74 master.barrier_number = -1;
75 master.last_number = -1;
76 master.nSlaves = o->nCPU;
77 master.running = master.nSlaves;
78 if (master.nSlaves == 0)
79 error("Master_setup", "No CPUs to start slaves with!");
81 master.outfile = NULL;
83 master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
85 if (master.slave == NULL)
86 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
89 for (int i = 0; i < master.nSlaves; ++i)
95 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
96 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
97 sprintf(master.slave[i].name, "local:%d", i);
98 sprintf(master.slave[i].addr, "local:%d", i);
99 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
100 fprintf(f, "name=%s\n", master.slave[i].name);
104 void Make_slave(int i)
107 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
109 error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
112 master.slave[i].pid = fork();
113 if (master.slave[i].pid == 0)
115 dup2(sv[0],fileno(stdin));
116 dup2(sv[0],fileno(stdout));
117 execlp(master.o->shell, master.o->shell, NULL);
119 master.slave[i].in = sv[1];
120 master.slave[i].out = sv[1];
121 master.slave[i].running = true;
122 master.slave[i].ssh_pid = 0;
125 void Master_input(char c)
127 if (master.buffer == NULL)
129 if (master.bufsiz < BUFSIZ)
130 master.bufsiz = BUFSIZ;
131 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
135 if (c == '\n' || c == EOF || c == '\0')
137 if (master.buflen == 0)
140 bool first_only = false;
142 master.buffer[master.buflen++] = '\0'; // end the string
146 char * start = strtok(master.buffer, "#");
147 start = strtok(NULL, "#");
150 if ( master.buffer[0] != '#')
152 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
153 message = master.buffer;
154 master.buffer = NULL;
155 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
157 master.task_pool = t;
158 master.last_number = t->number;
162 char * cmd = strtok(master.buffer+1, " ");
163 if (strcmp(cmd, "ABSORB") == 0)
165 master.o->encrypt = true;
166 cmd = strtok(NULL, " ");
167 if (strcmp(cmd, "UNSECURE") == 0)
169 master.o->encrypt = false;
170 log_print(1, "Master_absorb", "Using unencrypted connections");
171 cmd = strtok(NULL, " ");
174 log_print(0, "Master_input", "No host specified for ABSORB directive");
175 char * np = strtok(NULL, " ");
177 Master_absorb(cmd, options.port, atoi(np));
179 Master_absorb(cmd, options.port, 0);
181 else if (strcmp(cmd, "OUTPUT") == 0)
183 cmd = strtok(NULL, " ");
186 log_print(3, "Master_input", "Detach output");
187 if (master.outfile != NULL)
188 master.outfile[0] = '\0';
192 log_print(3, "Master_input", "Output to %s",cmd);
193 if (master.outfile == NULL)
194 master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
195 sprintf(master.outfile, "%s", cmd);
196 if (strstr(master.outfile, "%d") == NULL)
198 if (access(master.outfile, F_OK) == 0)
200 if (unlink(master.outfile) != 0)
201 error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
206 else if (strcmp(cmd, "EXIT") == 0)
208 log_print(2, "Master_input", "Received EXIT directive; quitting");
209 freopen("/dev/null", "r", stdin);
210 master.o->daemon = false;
213 else if (strcmp(cmd, "BARRIER") == 0)
216 log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
218 // check there actually are tasks
220 for (int i = 0; i < master.nSlaves && !tasks; ++i)
222 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
227 master.barrier_number = master.last_number;
228 master.barrier_block = false;
230 cmd = strtok(NULL, " ");
231 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
233 if (master.o->daemon == false)
234 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
235 master.barrier_block = true;
239 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
244 log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
252 while (isspace(*start)) start++;
255 char * rep = strtok(master.buffer, " ");
256 rep = strtok(NULL, " ");
257 if (rep != NULL && *rep == '&')
259 first_only = !first_only;
260 rep = strtok(NULL, " ");
266 repetitions = atoi(rep);
267 if (repetitions == 0)
269 if (strcmp(rep, "0") == 0)
270 log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
272 log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
279 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
282 regerror(err, &r, master.buffer, master.bufsiz);
283 error("Master_input", "Error compiling regexec : %s", master.buffer);
286 int first_match = -1;
287 for (int i = 0; i < master.nSlaves; ++i)
289 if (first_only && first_match >= 0 && master.slave[i].task != NULL)
292 int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
293 if (err == REG_NOMATCH)
300 regerror(err, &r, master.buffer, master.bufsiz);
301 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
305 if (first_match < 0) first_match = i;
308 if (master.slave[i].task == NULL)
316 message = strdup(start);
317 Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile);
319 master.slave[i].task_pool = t;
320 master.last_number = t->number;
322 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
326 if (first_only && first_match >= 0)
328 message = strdup(start);
329 Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
331 master.slave[first_match].task_pool = t;
332 master.last_number = t->number;
333 log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);
337 log_print(1, "Master_input", "No shells with names matching regex %s");
341 log_print(3, "Master_input", "Processed task %s", master.buffer);
345 if (master.buflen >= master.bufsiz)
348 master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
350 master.buffer[master.buflen++] = c;
354 void Master_output(int i, char c)
356 log_print(10, "Master_output", "input %c from slave %d", c, i);
357 #ifdef THREAD_SENDING
358 pthread_mutex_lock(&sender_lock);
359 Task * t = master.slave[i].task;
362 pthread_cond_wait(&sender_done_cv, &sender_lock);
364 pthread_mutex_unlock(&sender_lock);
366 Task * t = master.slave[i].task;
367 #endif //THREAD_SENDING
370 log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
371 error(NULL, "Please refrain from echoing three bell characters in a row.");
375 t->output[t->outlen++] = c;
376 if (t->outlen >= t->outsiz)
379 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
380 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
383 if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen
384 && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0))
389 t->output[t->outlen - master.o->endlen] = '\0';
390 t->outlen -= master.o->endlen;
393 master.slave[i].task = NULL;
396 fprintf(stdout, "%d:\n", t->number);
398 else if (t->output[t->outlen-1] == '\f')
400 log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
401 static int bufsiz = BUFSIZ;
402 char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
404 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
408 len = sprintf(buffer, "name=%s", master.slave[i].name);
412 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
414 error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
418 Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL);
419 if (t2->prev == NULL)
420 master.slave[i].task_pool = t2;
421 master.last_number = t2->number;
425 fprintf(stdout, "%d: %s", t->number, t->output);
426 if (t->outfile != NULL && t->outfile[0] != '\0')
428 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
429 static char buf[BUFSIZ];
432 if (strstr(t->outfile, "%d") != NULL)
435 sprintf(buf, t->outfile, t->number);
438 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
442 f = fopen(t->outfile, "a");
444 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
449 fprintf(f, "%s", t->output);
454 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
457 if (t->repetitions == 0)
466 if (--master.commands_active == 0 && master.barrier_number >= 0)
468 master.barrier_number = -1;
469 if (master.barrier_block && master.o->daemon)
471 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
479 Task * Master_tasker(int i)
481 Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
486 if (master.barrier_number >= 0 && t->number > master.barrier_number)
490 if (t->repetitions == 0)
495 master.slave[i].task = t;
501 #ifdef THREAD_SENDING
502 pthread_mutex_lock(&sender_lock);
503 while (send_task.task != NULL) // while the sender is still sending shit
505 pthread_cond_wait(&sender_done_cv, &sender_lock);
508 send_task.slave_fd = master.slave[i].out;
509 pthread_cond_signal(&sender_cv);
510 pthread_mutex_unlock(&sender_lock);
514 send_task.slave_fd = master.slave[i].out;
516 #endif //THREAD_SENDING
525 if (sigsetjmp(env,true) != 0)
527 log_print(2, "Master_loop", "Restored from longjmp");
531 master.fd_max = fileno(stdin);
532 for (int i = 0; i < master.nSlaves; ++i)
534 if (master.slave[i].out > master.fd_max)
535 master.fd_max = master.slave[i].out;
546 if (!input && master.o->daemon)
549 int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
553 error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
555 log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
561 dup2(fd, fileno(stdin));
567 if (input) FD_SET(fileno(stdin), &readSet);
568 for (int i = 0; i < master.nSlaves; ++i)
570 if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
573 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
576 if (input && FD_ISSET(fileno(stdin), &readSet))
579 //log_print(10, "Master_loop", "Read from stdin");
580 input = (read(fileno(stdin), &c, sizeof(char)) != 0);
590 for (int i = 0; i < master.nSlaves; ++i)
592 if (!master.slave[i].running) continue;
594 if (FD_ISSET(master.slave[i].out, &readSet))
596 //log_print(10, "Master_loop", "Read from slave %d", i);
598 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
604 if (master.slave[i].task == NULL)
606 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
608 quit = (--master.running == 0);
609 master.slave[i].running = false;
622 if (master.o->prepend != NULL)
625 if (len == -1) len = strlen(master.o->prepend);
626 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
627 //log_print(0, "Sent prepend %s\n",master.o->prepend);
630 write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char));
631 //log_print(0, "Sent message %s\n",send_task.task->message);
632 if (master.o->append != NULL)
635 if (len == -1) len = strlen(master.o->append);
636 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
637 //log_print(0, "Sent append %s\n",master.o->append);
639 if (master.o->end != NULL)
641 static char * echo = ";echo -en \"";
643 if (len == -1) len = strlen(echo);
644 write(send_task.slave_fd, echo, len*sizeof(char));
645 write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char));
646 write(send_task.slave_fd, "\"", 1*sizeof(char));
647 //log_print(0, "Sent end\n");
649 write(send_task.slave_fd, "\n", 1*sizeof(char));
650 master.commands_active++;
651 log_print(3, "Master_sender", "Sent task %d \"%s\" - %d tasks active", send_task.task->number, send_task.task->message, master.commands_active);
654 #ifdef THREAD_SENDING
655 void * Master_sender(void * args)
660 pthread_mutex_lock(&sender_lock);
661 while (send_task.task == NULL)
663 pthread_cond_wait(&sender_cv, &sender_lock);
665 quit = (send_task.task == NULL);
672 //log_print(0, "Master_sender sent message\n");
675 pthread_cond_broadcast(&sender_done_cv);
677 send_task.task = NULL;
678 pthread_mutex_unlock(&sender_lock);
683 #endif //THREAD_SENDING
688 void Master_cleanup()
691 //log_print(2, "Master_cleanup", "Preparing to exit...");
692 #ifdef THREAD_SENDING
693 pthread_mutex_lock(&sender_lock);
694 send_task.task = NULL;
695 pthread_cond_broadcast(&sender_cv);
696 pthread_mutex_unlock(&sender_lock);
697 pthread_join(sender_thread, NULL);
698 #endif //THREAD_SENDING
700 if (master.task_pool != NULL) Task_Destroy(master.task_pool);
703 signal(SIGCHLD, SIG_IGN); // ignore child exits now
705 for (int i = 0; i < master.nSlaves; ++i)
708 static int exitlen = -1;
709 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE);
710 write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char));
711 //usleep(0.5); //shouldn't matter too much
714 for (int i = 0; i < master.nSlaves; ++i)
716 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
717 if (master.slave[i].pid <= 0)
719 Network_close(master.slave[i].in);
720 if (master.slave[i].ssh_pid > 0)
722 log_print(2, "Master_cleanup", "Killing ssh instance %d", master.slave[i].ssh_pid);
723 kill(master.slave[i].ssh_pid, 15);
724 if (kill(master.slave[i].ssh_pid, 0) == 0)
725 kill(master.slave[i].ssh_pid, 9);
730 kill(master.slave[i].pid, 15); // be nice
731 if (kill(master.slave[i].pid, 0) == 0)
732 kill(master.slave[i].pid, 9); // shoot it down
733 close(master.slave[i].in);
734 close(master.slave[i].out);
736 free(master.slave[i].name);
737 free(master.slave[i].addr);
741 if (master.outfile != NULL)
742 free(master.outfile);
745 void * start_server(void * args)
748 *(int*)(args) = Network_server(*(int*)args);
749 log_print(2, "start_server", "started network server");
753 int Secure_connection(char * addr, int port);
755 void Master_absorb(char * addr, int port, int np)
757 char * name = strtok(addr, ":");
758 name = strtok(NULL, ":");
769 //log_print(0, "name is %s\n", name);
772 if (master.o->encrypt)
773 first_ssh = Secure_connection(addr, port);
777 //pthread_create(&ss, NULL, start_server, (void*)(&net_fd));
782 // The alternative to this kind of terrible hack is OpenMPI's "opal"
783 // This involves >1000 lines of operating system independent ways to get an IP address from a local interface
784 // Which will then be completely useless if there is any sort of NAT involved
786 //freopen("/dev/null", "r", stdin);
787 //freopen("/dev/null", "w", stdout);
788 //freopen("/dev/null", "w", stderr);
790 char * cmd = buffer+sprintf(buffer, "swarm -p ");
791 if (master.o->encrypt)
792 cmd = cmd+sprintf(cmd, "%d -e", port+1000);
794 cmd = cmd+sprintf(cmd, "%d -u", port);
797 cmd = cmd+sprintf(cmd, " -n %d", np);
798 sprintf(cmd, " -m $(echo $SSH_CONNECTION | awk \'{print $1}\')");
799 log_print(3, "Master_absorb", "Execing %s", buffer);
800 execlp("ssh", "ssh", "-f", addr, buffer, NULL);
802 log_print(3, "Master_absorb", "Listening on port %d", port);
803 int net_fd = Network_server(port);
804 log_print(3, "Master_absorb", "Created network server on port %d", port);
808 while (read(net_fd, s, sizeof(char)) != 0)
818 int newSlaves = atoi(buffer);
819 log_print(3, "Master_absorb", "Absorbing %d slaves from %s\n", newSlaves, addr);
822 error("Master_absorb", "No slaves to absorb from %s", addr);
825 master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
826 if (master.slave == NULL)
828 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
832 if (master.o->encrypt)
834 for (int i = 0; i < newSlaves-1; ++i)
835 master.slave[master.nSlaves+i].ssh_pid = Secure_connection(addr, port+i+1);
838 master.slave[master.nSlaves+newSlaves-1].ssh_pid = first_ssh;
846 log_print(3, "Master_absorb", "Writing bell to slave");
847 write(net_fd, &c, sizeof(char));
851 for (int i = 0; i < newSlaves; ++i)
853 log_print(3, NULL, "Absorbing slave %d...", i);
854 int ii = master.nSlaves + i;
855 if (i == newSlaves-1)
857 master.slave[ii].out = net_fd;
861 log_print(3, "Master_absorb", "Creating server %d at time %d", i, time(NULL));
862 write(net_fd, &c, sizeof(char));
863 master.slave[ii].out = Network_server(port + i + 1);
865 master.slave[ii].in = master.slave[ii].out;
868 if (master.slave[ii].out > master.fd_max)
869 master.fd_max = master.slave[ii].out;
871 sprintf(buffer, "%s:%d", name, i);
872 master.slave[ii].name = strdup(buffer);
873 master.slave[ii].addr = strdup(addr);
874 master.slave[ii].running = true;
875 master.slave[ii].pid = -1;
876 master.slave[ii].task = NULL;
877 master.slave[ii].task_pool = NULL;
879 FILE * f = fdopen(master.slave[ii].in, "w"); setbuf(f, NULL);
880 fprintf(f, "name=%s\n", master.slave[ii].name);
882 log_print(3, NULL, "Done absorbing slave %d...", i);
887 master.nSlaves = master.nSlaves + newSlaves;
889 master.running += newSlaves;
891 log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
895 int Secure_connection(char * addr, int port)
901 sprintf(buffer, "%d:localhost:%d", port+1000, port);
902 freopen("/dev/null", "r", stdin);
903 freopen("/dev/null", "w", stdout);
904 freopen("/dev/null", "w", stderr);
905 execl("/usr/bin/ssh", "/usr/bin/ssh", "-N", "-R", buffer, addr, NULL);
910 void sigchld_handler(int signal)
913 if (signal != SIGCHLD)
914 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
917 int p = waitpid(-1, &s, 0);
919 error("sigchld_handler", "waitpid : %s", strerror(errno));
923 int sig = WTERMSIG(s);
924 log_print(2, "sigchld_handler", "A child [%d] was terminated with signal %d; terminating self with same signal", p, sig);
930 for (i = 0; i < master.nSlaves; ++i)
932 if (master.slave[i].pid == p)
935 if (i >= master.nSlaves)
940 log_print(1, "sigchld_handler", "Slave %d [%d] exited with code %d; restarting it",i, p, s);
944 if (master.o->end != NULL)
946 log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
948 sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end);
949 if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0)
950 error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);