Did some stuff
[matches/swarm.git] / src / master.c
1 #define _BSD_SOURCE
2 #include "master.h"
3 #include "log.h"
4
5 #include <string.h>
6 #include <sys/wait.h>
7 #include "daemon.h"
8 #include <stdlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <strings.h>
12 #include <unistd.h>
13 #include <assert.h>
14 #include <ctype.h>
15 #include "slave.h"
16 #include <string.h>
17 #include <setjmp.h>
18 #include <sys/types.h>
19 #include <pwd.h>
20
21 #include <unistd.h>
22 #include <regex.h>
23 #include <pthread.h>
24 #include "network.h"
25 #include <signal.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/tcp.h>
29 #include "ssh.h"
30
31 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
32
33 // probably not that great to use threads anyway, since it eats one of your cores
34 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
35
36 // the signal handler now breaks threads... don't use them
37
38 #ifdef THREAD_SENDING
39 pthread_t sender_thread;
40 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
41 pthread_cond_t sender_cv;
42 pthread_cond_t sender_done_cv;
43 #endif //THREAD_SENDING
44
45
46
47 struct
48 {
49         int slave_fd;
50         Task * task;
51 } send_task;
52
53 static Master master;
54
55 static sigjmp_buf env;
56
57 void sigchld_handler(int signal);
58
59 void Master_main(Options * o)
60 {
61         setbuf(stdin, NULL);
62         setbuf(stdout, NULL);
63         setbuf(stderr, NULL);
64
65         atexit(Master_cleanup);
66         Master_setup(o);
67
68         #ifdef THREAD_SENDING
69         if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
70         {
71                 error("Master_main", "Creating sender thread : %s", strerror(errno));
72         }
73         #endif //THREAD_SENDING
74         Master_loop();
75 }
76
77 void Master_setup(Options * o)
78 {
79         int err = libssh2_init(0);
80         if (err != 0)
81         {       
82                 error("Master_setup", "Initialising libssh2 - error code %d", err);
83         }
84         
85         signal(SIGCHLD, sigchld_handler);
86         master.o = o;
87         master.barrier_number = -1;
88         master.last_number = -1;
89         master.nSlaves = o->nCPU;
90         master.running = master.nSlaves;
91         master.nRemote = 0; master.remote_err = NULL;
92         if (master.nSlaves == 0)
93                 error("Master_setup", "No CPUs to start slaves with!");
94
95         master.outfile = NULL;
96
97         master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
98
99         if (master.slave == NULL)
100                 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
101
102         // One slave per CPU
103         for (int i = 0; i < master.nSlaves; ++i)
104         {
105
106                 
107                 int sv[2];
108                 // Possibly the best function ever
109                 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
110                 {
111                         error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
112                 }
113                 master.slave[i].in = sv[1];
114                 master.slave[i].out = sv[1];
115                 master.slave[i].socket_child_end = sv[0];
116
117                 Master_shell(i);
118
119
120                 
121                 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
122                 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
123                 sprintf(master.slave[i].name, "local:%d", i);
124                 sprintf(master.slave[i].addr, "local:%d", i);
125                 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
126                 fprintf(f, "name=%s\n", master.slave[i].name);
127         }
128 }
129
130 void Master_shell(int i)
131 {       
132         master.slave[i].pid = fork();
133         if (master.slave[i].pid == 0)
134         {
135                 dup2(master.slave[i].socket_child_end,fileno(stdin));
136                 dup2(master.slave[i].socket_child_end,fileno(stdout));
137                 execlp(master.o->shell, master.o->shell, NULL);
138         }
139
140         master.slave[i].running = true;
141 }
142
143 void Master_input(char c)
144 {
145         if (master.buffer == NULL)
146         {
147                 if (master.bufsiz < BUFSIZ)
148                         master.bufsiz = BUFSIZ;
149                 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
150                 master.buflen = 0;
151         }
152         
153         if (c == '\n' || c == EOF || c == '\0')
154         {
155                 if (master.buflen == 0)
156                         return;
157                 int repetitions = 1;
158                 bool first_only = false;
159
160                 master.buffer[master.buflen++] = '\0'; // end the string
161                 
162                 char * message;
163
164                 char * start = strtok(master.buffer, "#");
165                 start = strtok(NULL, "#");
166                 if (start == NULL) 
167                 {
168                         if ( master.buffer[0] != '#')
169                         {
170                                 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
171                                 message = master.buffer;
172                                 master.buffer = NULL;
173                                 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
174                                 if (t->prev == NULL)
175                                         master.task_pool = t;
176                                 master.last_number = t->number;
177                         }
178                         else
179                         {
180                                 char * cmd = strtok(master.buffer+1, " ");
181                                 if (strcmp(cmd, "ABSORB") == 0)
182                                 {
183                                         master.o->encrypt = true;
184                                         cmd = strtok(NULL, " ");
185                                         if (strcmp(cmd, "UNSECURE") == 0)
186                                         {
187                                                 master.o->encrypt = false;
188                                                 log_print(1, "Master_absorb", "Using unencrypted connections");
189                                                 cmd = strtok(NULL, " ");
190                                         }
191                                         if (cmd == NULL)
192                                                 log_print(0, "Master_input", "No host specified for ABSORB directive");
193                                         char * np = strtok(NULL, " ");
194                                         if (np != NULL)
195                                                 Master_absorb(cmd, atoi(np));
196                                         else
197                                                 Master_absorb(cmd, 0);
198                                 }
199                                 else if (strcmp(cmd, "OUTPUT") == 0)
200                                 {
201                                         cmd = strtok(NULL, " ");
202                                         if (cmd == NULL)
203                                         {
204                                                 log_print(3, "Master_input", "Detach output");
205                                                 if (master.outfile != NULL)
206                                                         master.outfile[0] = '\0';
207                                         }
208                                         else
209                                         {
210                                                 log_print(3, "Master_input", "Output to %s",cmd);
211                                                 if (master.outfile == NULL)
212                                                         master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
213                                                 sprintf(master.outfile, "%s", cmd);
214                                                 if (strstr(master.outfile, "%d") == NULL)
215                                                 {
216                                                         if (access(master.outfile, F_OK) == 0)
217                                                         {
218                                                                 if (unlink(master.outfile) != 0)
219                                                                         error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
220                                                         }
221                                                 }
222                                         }
223                                 }
224                                 else if (strcmp(cmd, "EXIT") == 0)
225                                 {
226                                         log_print(2, "Master_input", "Received EXIT directive; quitting");
227                                         freopen("/dev/null", "r", stdin);
228                                         master.o->daemon = false;
229                                         return;
230                                 }
231                                 else if (strcmp(cmd, "BARRIER") == 0)
232                                 {
233
234                                         log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
235
236                                         // check there actually are tasks
237                                         bool tasks = false;
238                                         for (int i = 0; i < master.nSlaves && !tasks; ++i)
239                                         {
240                                                 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
241                                         }
242                                         if (tasks)
243                                         {
244
245                                                 master.barrier_number = master.last_number;
246                                                 master.barrier_block = false;
247         
248                                                 cmd = strtok(NULL, " ");
249                                                 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
250                                                 {
251                                                         if (master.o->daemon == false)
252                                                                 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
253                                                         master.barrier_block = true;
254                                                 }
255                                         }
256                                         else
257                                                 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
258
259                                 }
260                                 else
261                                 {
262                                         log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
263                                 }
264                         }
265                         master.buflen = 0;
266                         return;
267                 }
268
269                 *(start-1) = '\0';
270                 while (isspace(*start)) start++;
271
272                 
273                 char * rep = strtok(master.buffer, " ");
274                 rep = strtok(NULL, " ");
275                 if (rep != NULL && *rep == '&')
276                 {
277                         first_only = !first_only;
278                         rep = strtok(NULL, " ");
279                 }
280                 
281                 if (rep != NULL)
282                 {
283                         *(rep-1) = '\0';
284                         repetitions = atoi(rep);
285                         if (repetitions == 0)
286                         {
287                                 if (strcmp(rep, "0") == 0)
288                                         log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
289                                 else
290                                         log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
291                         }
292                 }
293                 
294
295                 regex_t r;
296                 
297                 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
298                 if (err != 0)
299                 {
300                         regerror(err, &r, master.buffer, master.bufsiz);
301                         error("Master_input", "Error compiling regexec : %s", master.buffer);
302                 }
303
304                 int first_match = -1;
305                 for (int i = 0; i < master.nSlaves; ++i)
306                 {
307                         if (first_only && first_match >= 0 && master.slave[i].task != NULL)
308                                 continue;
309
310                         int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
311                         if (err == REG_NOMATCH)
312                         {
313                                 
314                                 continue;
315                         }
316                         else if (err != 0)
317                         {
318                                 regerror(err, &r, master.buffer, master.bufsiz);
319                                 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
320                                 continue;
321                         }
322                         
323                         if (first_match < 0) first_match = i;
324                         if (first_only)
325                         {
326                                 if (master.slave[i].task == NULL) 
327                                 {
328                                         first_match = i;
329                                         break;
330                                 }
331                                 continue;
332                         }
333
334                         message = strdup(start);
335                         Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile); 
336                         if (t->prev == NULL)
337                                 master.slave[i].task_pool = t;
338                         master.last_number = t->number;
339                         
340                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
341                         
342                 }
343
344                 if (first_only && first_match >= 0)
345                 {
346                         message = strdup(start);
347                         Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
348                         if (t->prev == NULL)
349                                 master.slave[first_match].task_pool = t;        
350                         master.last_number = t->number;
351                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);                
352                 }
353
354                 if (first_match < 0)
355                         log_print(1, "Master_input", "No shells with names matching regex %s");
356
357                 regfree(&r);
358                 master.buflen = 0;
359                 log_print(3, "Master_input", "Processed task %s", master.buffer);
360         }
361         else
362         {
363                 if (master.buflen >= master.bufsiz)
364                 {
365                         master.bufsiz *= 2;
366                         master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
367                 }
368                 master.buffer[master.buflen++] = c;
369         }
370 }
371
372 void Master_output(int i, char c)
373 {
374         log_print(10, "Master_output", "input %c from slave %d", c, i);
375         #ifdef THREAD_SENDING
376         pthread_mutex_lock(&sender_lock);
377         Task * t = master.slave[i].task;
378         while (t == NULL)
379         {
380                 pthread_cond_wait(&sender_done_cv, &sender_lock);
381         }
382         pthread_mutex_unlock(&sender_lock);
383         #else
384         Task * t = master.slave[i].task;
385         #endif //THREAD_SENDING
386         if (t == NULL)
387         {
388                 log_print(3, "Master_output", "Echo %c back to slave %d", c, i);
389                 write(master.slave[i].in, &c, sizeof(char));
390                 //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
391                 //error(NULL, "Please refrain from echoing three bell characters in a row.");
392                 return;
393         }
394
395                         
396         t->output[t->outlen++] = c;
397         if (t->outlen >= t->outsiz)
398         {
399                 t->outsiz *= 2;
400                 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
401                 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
402                 
403         }
404         if (c == EOF 
405         #ifdef SHELL_OUTPUT_FINISHED
406                 || (t->outlen >= SHELL_OUTPUT_FINISHED_LENGTH
407                 && strcmp((t->output)+(t->outlen)-(SHELL_OUTPUT_FINISHED_LENGTH), SHELL_OUTPUT_FINISHED) == 0))
408         #else
409         ) // this is totally readable
410         #endif //SHELL_OUTPUT_FINISHED
411         {
412                 
413                 if (c != EOF)
414                 {
415                         t->output[t->outlen - SHELL_OUTPUT_FINISHED_LENGTH] = '\0';
416                         t->outlen -= SHELL_OUTPUT_FINISHED_LENGTH;
417                 }
418
419                 master.slave[i].task = NULL;
420                 if (t->outlen <= 0)
421                 {
422                         fprintf(stdout, "%d:\n", t->number);
423                 }
424                 /*
425                 else if (t->output[t->outlen-1] == '\f')
426                 {
427                         
428                         log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
429                         static int bufsiz = BUFSIZ;
430                         char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
431                         if (buffer == NULL)
432                                 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
433                         int len = 0; 
434                         while (true)
435                         {
436                                 len = sprintf(buffer, "name=%s", master.slave[i].name);
437                                 if (len < bufsiz)
438                                         break;
439                                 bufsiz *= 2;
440                                 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
441                                 if (buffer == NULL)
442                                         error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
443                         }
444
445                                 
446                         Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL); 
447                         if (t2->prev == NULL)
448                                 master.slave[i].task_pool = t2;
449                         master.last_number = t2->number;
450                 }
451                 */
452                 else
453                 {
454                         fprintf(stdout, "%d: %s", t->number, t->output); 
455                         if (t->outfile != NULL && t->outfile[0] != '\0')
456                         {
457                                 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
458                                 static char buf[BUFSIZ];
459                                 
460                                 FILE * f = NULL;
461                                 if (strstr(t->outfile, "%d") != NULL)
462                                 {
463                                         
464                                         sprintf(buf, t->outfile, t->number);
465                                         f = fopen(buf, "w");
466                                         if (f == NULL)
467                                                 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
468                                 }
469                                 else
470                                 {
471                                         f = fopen(t->outfile, "a");
472                                         if (f == NULL)
473                                                 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
474
475                                 }
476                                 
477                                                         
478                                 fprintf(f, "%s", t->output);
479                                 fclose(f);
480                         }
481                 }
482                 
483                 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
484
485
486                 if (t->repetitions == 0)
487                 {
488                         free(t->message);
489                         free(t->output);
490                         free(t->outfile);
491                         free(t);
492                 }
493
494
495                 if (--master.commands_active == 0 && master.barrier_number >= 0)
496                 {
497                         master.barrier_number = -1;
498                         if (master.barrier_block && master.o->daemon)
499                         {
500                                 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
501                                 fprintf(f, "\a");
502                                 fclose(f);
503                         }
504                 }
505         }
506 }
507
508 Task * Master_tasker(int i)
509 {
510         Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
511
512         if (*tp != NULL)
513         {
514                 Task * t = *tp;
515                 if (master.barrier_number >= 0 && t->number > master.barrier_number)
516                         return NULL;
517
518                 t->repetitions--;
519                 if (t->repetitions == 0)
520                 {
521                         (*tp) = t->next; 
522                         Task_Extract(t);
523                 }
524                 master.slave[i].task = t;
525                 
526                 
527                 
528                 
529
530                 #ifdef THREAD_SENDING
531                 pthread_mutex_lock(&sender_lock);
532                 while (send_task.task != NULL) // while the sender is still sending shit
533                 {
534                         pthread_cond_wait(&sender_done_cv, &sender_lock);
535                 }
536                 send_task.task = t;
537                 send_task.slave_fd = master.slave[i].out;
538                 pthread_cond_signal(&sender_cv);
539                 pthread_mutex_unlock(&sender_lock);
540
541                 #else
542                 send_task.task = t;
543                 send_task.slave_fd = master.slave[i].out;
544                 Master_send();
545                 #endif //THREAD_SENDING
546                 
547         }
548         return *tp;
549 }
550
551 void Master_loop()
552 {
553         
554         if (sigsetjmp(env,true) != 0) // completely necessary evil
555         {
556                 //log_print(2, "Master_loop", "Restored from longjmp");
557         }
558         fd_set readSet;
559         //fd_set writeSet;
560         master.fd_max = fileno(stdin);
561         for (int i = 0; i < master.nSlaves; ++i)
562         {
563                 if (master.slave[i].out > master.fd_max)
564                         master.fd_max = master.slave[i].out;
565         }
566
567
568
569         bool quit = false;
570         bool input = true;
571         char buffer[BUFSIZ];
572         
573         while (!quit)
574         {
575                 
576                 if (!input && master.o->daemon)
577                 {
578                                         
579                         int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
580                         if (fd == -1)
581                         {
582                                 if (errno != ENXIO)
583                                         error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
584                                 else
585                                         log_print(LOGWARN, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
586                         }
587                         else
588                         {
589                                 
590                                 input = true;
591                                 dup2(fd, fileno(stdin));
592                         }
593                         
594                 }
595
596                 FD_ZERO(&readSet);
597                 if (input) FD_SET(fileno(stdin), &readSet);     
598                 for (int i = 0; i < master.nSlaves; ++i)
599                 {
600                         if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
601                 }
602
603                 for (int i = 0; i < master.nRemote; ++i)
604                 {
605                         FD_SET(master.remote_err[i], &readSet);
606                 }
607                 
608                 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
609                 
610
611                 if (input && FD_ISSET(fileno(stdin), &readSet))
612                 {                       
613                         char c;
614                         //log_print(10, "Master_loop", "Read from stdin");
615                         input = (read(fileno(stdin), &c, sizeof(char)) != 0);
616                         if (!input)
617                         {
618                                 c = '\n';
619                         }
620                         Master_input(c);
621                 }
622
623                 
624                 
625                 for (int i = 0; i < master.nSlaves; ++i)
626                 {
627                         if (!master.slave[i].running) continue;
628
629                         if (FD_ISSET(master.slave[i].out, &readSet))
630                         {
631                                 //log_print(10, "Master_loop", "Read from slave %d", i);
632                                 char c;                 
633                                 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
634                                 {
635                                         Master_output(i, c);
636                                 }
637                         }                               
638
639                         if (master.slave[i].task == NULL)
640                         {
641                                 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
642                                 {
643                                         quit = (--master.running == 0);
644                                         master.slave[i].running = false;
645                                 }
646                                 
647                         }
648                 }
649
650                 for (int i = 0; i < master.nRemote; ++i)
651                 {
652                         if (FD_ISSET(master.remote_err[i], &readSet))
653                         {
654                                 int len = read(master.remote_err[i], buffer, sizeof(buffer));
655                                 buffer[len] = '\0';
656                                 fprintf(stderr, "%s", buffer);
657                         }
658                 }
659                 
660         }
661
662 }
663
664 void Master_send()
665 {
666
667         if (master.o->prepend != NULL)
668         {
669                 static int len = -1;
670                 if (len == -1) len = strlen(master.o->prepend);
671                 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
672                 //log_print(0, "Sent prepend %s\n",master.o->prepend); 
673         }
674
675         write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char)); 
676         //log_print(0, "Sent message %s\n",send_task.task->message); 
677         if (master.o->append != NULL)
678         {
679                 static int len = -1;
680                 if (len == -1) len = strlen(master.o->append);
681                 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
682                 //log_print(0, "Sent append %s\n",master.o->append); 
683         }
684         #ifdef SHELL_OUTPUT_FINISHED
685         {
686                 static char * echo = ";echo -en \"";
687                 static int len = -1;
688                 if (len == -1) len = strlen(echo);
689                 write(send_task.slave_fd, echo, len*sizeof(char));
690                 write(send_task.slave_fd, SHELL_OUTPUT_FINISHED, SHELL_OUTPUT_FINISHED_LENGTH * sizeof(char));
691                 write(send_task.slave_fd, "\"", sizeof(char));
692                 //log_print(0, "Sent end\n"); 
693         }
694         #endif //SHELL_OUTPUT_FINISHED
695         write(send_task.slave_fd, "\n", sizeof(char));
696         master.commands_active++;
697         log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active);
698 }
699
700 #ifdef THREAD_SENDING
701 void * Master_sender(void * args)
702 {
703         bool quit = false;
704         while (!quit)
705         {
706                 pthread_mutex_lock(&sender_lock);
707                 while (send_task.task == NULL)
708                 {
709                         pthread_cond_wait(&sender_cv, &sender_lock);
710
711                         quit = (send_task.task == NULL);
712                         if (quit)
713                                 break;
714
715
716                         Master_send();
717
718                         //log_print(0, "Master_sender sent message\n");
719
720
721                         pthread_cond_broadcast(&sender_done_cv);
722                 }
723                 send_task.task = NULL;
724                 pthread_mutex_unlock(&sender_lock);     
725         }
726
727         return NULL;
728 }
729 #endif //THREAD_SENDING
730
731
732
733
734 void Master_cleanup()
735 {
736
737         //log_print(2, "Master_cleanup", "Preparing to exit...");
738         #ifdef THREAD_SENDING
739         pthread_mutex_lock(&sender_lock);
740         send_task.task = NULL;
741         pthread_cond_broadcast(&sender_cv);
742         pthread_mutex_unlock(&sender_lock);
743         pthread_join(sender_thread, NULL);
744         #endif //THREAD_SENDING
745
746         if (master.task_pool != NULL) Task_Destroy(master.task_pool);
747
748
749         signal(SIGCHLD, SIG_IGN); // ignore child exits now
750
751         // tell all remote nodes to exit
752         for (int i = 0; i < master.nRemote; ++i)
753         {
754                 FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL);
755
756                 fprintf(f, SHELL_EXIT_COMMAND);
757         
758                 fclose(f);
759         }
760
761         for (int i = 0; i < master.nSlaves; ++i)
762         {
763                 
764                 static int exitlen = -1;
765                 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_COMMAND);
766                 write(master.slave[i].in, SHELL_EXIT_COMMAND, exitlen *sizeof(char));
767                 //usleep(0.5); //shouldn't matter too much
768         }
769
770         for (int i = 0; i < master.nSlaves; ++i)
771         {
772                 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
773                 if (master.slave[i].pid <= 0) 
774                 {
775                         Network_close(master.slave[i].in);
776                 }
777                 else
778                 {
779                         kill(master.slave[i].pid, 15); // be nice
780                         if (kill(master.slave[i].pid, 0) == 0)
781                                 kill(master.slave[i].pid, 9); // shoot it down
782                         close(master.slave[i].in);
783                         close(master.slave[i].out);
784                 }
785                 free(master.slave[i].name);
786                 free(master.slave[i].addr);
787         }
788         free(master.slave);
789         free(master.buffer);
790         if (master.outfile != NULL)
791                 free(master.outfile);
792
793         libssh2_exit();
794
795 }
796
797
798
799
800 void Master_absorb(char * addr, int np)
801 {
802         int port = 0;
803
804         char * user = strstr(addr, "@");
805         if (user != NULL)
806         {
807                 *(user-1) = '\0';
808                 char * t = user;
809                 user = addr;
810                 addr = t;
811         }
812         else
813         {
814                 user = getpwuid(geteuid())->pw_name;
815         }
816         log_print(3, "Master_absorb", "User %s at address %s", user, addr);
817
818         // ssh to the host on port 22
819         ssh * s = ssh_new(user, addr, 22);
820         if (s == NULL)
821         {
822                 log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr);
823                 return;
824         }
825         
826
827         // work out the name to give to the shells
828         char * name = strtok(addr, ":");
829         name = strtok(NULL, ":");
830         if (name == NULL)
831                 name = addr; // default is host:X
832         else
833                 *(name-1) = '\0'; // otherwise use name:X
834
835         // setup array of remote stderr file descriptors
836         if (master.nRemote++ == 0)
837         {
838                 master.remote_err = (int*)(calloc(master.nRemote, sizeof(int)));
839                 master.remote_reserved = master.nRemote;
840         }
841         else if (master.nRemote >= master.remote_reserved)
842         {
843                 // resize dynamically
844                 master.remote_reserved *= 2;
845                 master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int)));
846         }
847
848
849         
850         int sfd = -1;
851         if (master.o->encrypt)
852         {
853                 int sv[2];
854                 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
855                         error("Master_absorb", "Couldn't create socket for remote swarm");
856                 sfd = sv[0];
857
858                 ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket
859                 ssh_thread_add(s); // add the ssh to the thread
860         }
861         else
862         {
863                 sfd = Network_server_bind(0, &port); // dynamically bind to a port
864                 ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port
865                 ssh_destroy(s); // don't need the ssh anymore
866                 sfd = Network_server_listen(sfd, NULL); // accept connections and pray
867         }
868         master.remote_err[master.nRemote-1] = sfd;
869         if (sfd > master.fd_max)
870                 master.fd_max = sfd;
871
872
873         char buffer[BUFSIZ];
874
875         int newSlaves = 0;
876         
877         int len = sprintf(buffer, "%s\n", name);
878         int w = 0;
879         while (w < len)
880                 w += write(sfd, buffer+w, len-w);
881         
882         
883         len = 0;
884         do
885         {
886                 len = read(sfd, buffer+len, sizeof(buffer));
887                 buffer[len] = '\0';
888         }
889         while (buffer[len-1] != '\n');
890         buffer[len-1] = '\0';
891
892         while (newSlaves == 0 && strcmp(buffer, "0") != 0)
893         {
894                 newSlaves = atoi(buffer);
895         }
896
897         
898
899         if (newSlaves == 0)
900         {
901                 error("Master_absorb", "No slaves to absorb from %s", addr);
902         }
903
904         master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
905         if (master.slave == NULL)
906         {
907                 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", 
908                         master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
909         }
910
911         
912         for (int i = 0; i < newSlaves; ++i)
913         {
914                 int ii = master.nSlaves + i;
915
916                 if (master.o->encrypt)
917                 {
918                         int sv[2];
919                         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
920                                 error("Master_absorb", "Couldn't create socket for remote swarm");
921
922                         
923                         LIBSSH2_LISTENER * listener = NULL;
924                         // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0)
925                         while (listener == NULL)
926                         {
927                                 port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds
928                                 listener = ssh_get_listener(s, &port); // port forward to the socket
929                         }
930
931                         log_print(4,"Master_absorb", "Chose port %d", port);
932                         int len = sprintf(buffer, "%d\n", port);
933                         
934                         int w = 0;
935                         while (w < len)
936                         {
937                                 w += write(sfd, buffer+w, len-w);
938                         }
939                         usleep(200000); // give ssh_thread a chance to actually send the data
940
941                         log_print(4, "Master_absorb", "Creating tunnel...");
942                         ssh_add_tunnel(s, listener, sv[1]);
943                         master.slave[ii].in = sv[0];
944                         master.slave[ii].out = sv[0];
945
946                         log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]);
947                 }
948                 else
949                 {
950                         int tmp = Network_server_bind(0, &port); // bind to a port      
951                         
952                         master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection
953                         master.slave[ii].out = master.slave[ii].in;
954                 }
955
956         
957                 
958                 
959
960                 if (master.slave[ii].out > master.fd_max)
961                         master.fd_max = master.slave[ii].out;
962
963                 char buffer[BUFSIZ];
964                 sprintf(buffer, "%s:%d", name, i);
965                 master.slave[ii].name = strdup(buffer);
966                 master.slave[ii].addr = strdup(addr);
967                 master.slave[ii].running = true;
968                 master.slave[ii].pid = -1;
969                 master.slave[ii].task = NULL;
970                 master.slave[ii].task_pool = NULL;      
971         }       
972
973
974         master.nSlaves = master.nSlaves + newSlaves;
975
976         master.running += newSlaves;
977
978         log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
979
980 }
981
982 void sigchld_handler(int signal)
983 {
984
985         if (signal != SIGCHLD)
986                 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
987
988         int s = 0;
989         int p = waitpid(-1, &s, 0);
990         if (p == -1)
991                 error("sigchld_handler", "waitpid : %s", strerror(errno));
992         
993
994         int i = 0;
995         for (i = 0; i < master.nSlaves; ++i)
996         {
997                 if (master.slave[i].pid == p)
998                         break;
999         }
1000         if (i >= master.nSlaves)
1001         {
1002                 return;
1003         }
1004
1005
1006         sigchld_respond(s, "local", i);
1007
1008         Master_shell(i);
1009
1010         #ifdef SHELL_OUTPUT_FINISHED
1011         {
1012                 //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
1013                 char buffer[BUFSIZ];
1014                 int len = sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, SHELL_OUTPUT_FINISHED);
1015                 if (write(master.slave[i].in, buffer, len) <= 0)
1016                         error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);
1017
1018         }
1019         #endif //SHELL_OUTPUT_FINISHED
1020
1021         siglongjmp(env,1);
1022
1023         
1024 }
1025

UCC git Repository :: git.ucc.asn.au