2734d0772d8d6391d7583114b1b2c36963f5b079
[matches/swarm.git] / src / master.c
1 #define _BSD_SOURCE
2 #include "master.h"
3 #include "log.h"
4
5 #include <sys/wait.h>
6 #include "daemon.h"
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <strings.h>
11 #include <unistd.h>
12 #include <assert.h>
13 #include <ctype.h>
14 #include "slave.h"
15 #include <string.h>
16 #include <setjmp.h>
17 #include <sys/types.h>
18 #include <pwd.h>
19
20 #include <unistd.h>
21 #include <regex.h>
22 #include <pthread.h>
23 #include "network.h"
24 #include <signal.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/tcp.h>
28 #include "ssh.h"
29
30 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
31
32 // probably not that great to use threads anyway, since it eats one of your cores
33 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
34
35 // the signal handler now breaks threads... don't use them
36
37 #ifdef THREAD_SENDING
38 pthread_t sender_thread;
39 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
40 pthread_cond_t sender_cv;
41 pthread_cond_t sender_done_cv;
42 #endif //THREAD_SENDING
43
44
45
46 struct
47 {
48         int slave_fd;
49         Task * task;
50 } send_task;
51
52 static Master master;
53
54 static sigjmp_buf env;
55
56 void sigchld_handler(int signal);
57
58 void Master_main(Options * o)
59 {
60         setbuf(stdin, NULL);
61         setbuf(stdout, NULL);
62         setbuf(stderr, NULL);
63
64         atexit(Master_cleanup);
65         Master_setup(o);
66
67         #ifdef THREAD_SENDING
68         if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
69         {
70                 error("Master_main", "Creating sender thread : %s", strerror(errno));
71         }
72         #endif //THREAD_SENDING
73         Master_loop();
74 }
75
76 void Master_setup(Options * o)
77 {
78         int err = libssh2_init(0);
79         if (err != 0)
80         {       
81                 error("Master_setup", "Initialising libssh2 - error code %d", err);
82         }
83         
84         signal(SIGCHLD, sigchld_handler);
85         master.o = o;
86         master.barrier_number = -1;
87         master.last_number = -1;
88         master.nSlaves = o->nCPU;
89         master.running = master.nSlaves;
90         master.nRemote = 0; master.remote_err = NULL;
91         if (master.nSlaves == 0)
92                 error("Master_setup", "No CPUs to start slaves with!");
93
94         master.outfile = NULL;
95
96         master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
97
98         if (master.slave == NULL)
99                 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
100
101         // One slave per CPU
102         for (int i = 0; i < master.nSlaves; ++i)
103         {
104                 Make_slave(i);
105
106
107                 
108                 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
109                 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
110                 sprintf(master.slave[i].name, "local:%d", i);
111                 sprintf(master.slave[i].addr, "local:%d", i);
112                 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
113                 fprintf(f, "name=%s\n", master.slave[i].name);
114         }
115 }
116
117 void Make_slave(int i)
118 {
119         int sv[2];
120         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
121         {
122                 error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
123         }
124         
125         master.slave[i].pid = fork();
126         if (master.slave[i].pid == 0)
127         {
128                 dup2(sv[0],fileno(stdin));
129                 dup2(sv[0],fileno(stdout));
130                 execlp(master.o->shell, master.o->shell, NULL);
131         }
132         master.slave[i].in = sv[1];
133         master.slave[i].out = sv[1];
134         master.slave[i].running = true;
135 }
136
137 void Master_input(char c)
138 {
139         if (master.buffer == NULL)
140         {
141                 if (master.bufsiz < BUFSIZ)
142                         master.bufsiz = BUFSIZ;
143                 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
144                 master.buflen = 0;
145         }
146         
147         if (c == '\n' || c == EOF || c == '\0')
148         {
149                 if (master.buflen == 0)
150                         return;
151                 int repetitions = 1;
152                 bool first_only = false;
153
154                 master.buffer[master.buflen++] = '\0'; // end the string
155                 
156                 char * message;
157
158                 char * start = strtok(master.buffer, "#");
159                 start = strtok(NULL, "#");
160                 if (start == NULL) 
161                 {
162                         if ( master.buffer[0] != '#')
163                         {
164                                 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
165                                 message = master.buffer;
166                                 master.buffer = NULL;
167                                 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
168                                 if (t->prev == NULL)
169                                         master.task_pool = t;
170                                 master.last_number = t->number;
171                         }
172                         else
173                         {
174                                 char * cmd = strtok(master.buffer+1, " ");
175                                 if (strcmp(cmd, "ABSORB") == 0)
176                                 {
177                                         master.o->encrypt = true;
178                                         cmd = strtok(NULL, " ");
179                                         if (strcmp(cmd, "UNSECURE") == 0)
180                                         {
181                                                 master.o->encrypt = false;
182                                                 log_print(1, "Master_absorb", "Using unencrypted connections");
183                                                 cmd = strtok(NULL, " ");
184                                         }
185                                         if (cmd == NULL)
186                                                 log_print(0, "Master_input", "No host specified for ABSORB directive");
187                                         char * np = strtok(NULL, " ");
188                                         if (np != NULL)
189                                                 Master_absorb(cmd, atoi(np));
190                                         else
191                                                 Master_absorb(cmd, 0);
192                                 }
193                                 else if (strcmp(cmd, "OUTPUT") == 0)
194                                 {
195                                         cmd = strtok(NULL, " ");
196                                         if (cmd == NULL)
197                                         {
198                                                 log_print(3, "Master_input", "Detach output");
199                                                 if (master.outfile != NULL)
200                                                         master.outfile[0] = '\0';
201                                         }
202                                         else
203                                         {
204                                                 log_print(3, "Master_input", "Output to %s",cmd);
205                                                 if (master.outfile == NULL)
206                                                         master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
207                                                 sprintf(master.outfile, "%s", cmd);
208                                                 if (strstr(master.outfile, "%d") == NULL)
209                                                 {
210                                                         if (access(master.outfile, F_OK) == 0)
211                                                         {
212                                                                 if (unlink(master.outfile) != 0)
213                                                                         error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
214                                                         }
215                                                 }
216                                         }
217                                 }
218                                 else if (strcmp(cmd, "EXIT") == 0)
219                                 {
220                                         log_print(2, "Master_input", "Received EXIT directive; quitting");
221                                         freopen("/dev/null", "r", stdin);
222                                         master.o->daemon = false;
223                                         return;
224                                 }
225                                 else if (strcmp(cmd, "BARRIER") == 0)
226                                 {
227
228                                         log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
229
230                                         // check there actually are tasks
231                                         bool tasks = false;
232                                         for (int i = 0; i < master.nSlaves && !tasks; ++i)
233                                         {
234                                                 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
235                                         }
236                                         if (tasks)
237                                         {
238
239                                                 master.barrier_number = master.last_number;
240                                                 master.barrier_block = false;
241         
242                                                 cmd = strtok(NULL, " ");
243                                                 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
244                                                 {
245                                                         if (master.o->daemon == false)
246                                                                 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
247                                                         master.barrier_block = true;
248                                                 }
249                                         }
250                                         else
251                                                 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
252
253                                 }
254                                 else
255                                 {
256                                         log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
257                                 }
258                         }
259                         master.buflen = 0;
260                         return;
261                 }
262
263                 *(start-1) = '\0';
264                 while (isspace(*start)) start++;
265
266                 
267                 char * rep = strtok(master.buffer, " ");
268                 rep = strtok(NULL, " ");
269                 if (rep != NULL && *rep == '&')
270                 {
271                         first_only = !first_only;
272                         rep = strtok(NULL, " ");
273                 }
274                 
275                 if (rep != NULL)
276                 {
277                         *(rep-1) = '\0';
278                         repetitions = atoi(rep);
279                         if (repetitions == 0)
280                         {
281                                 if (strcmp(rep, "0") == 0)
282                                         log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
283                                 else
284                                         log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
285                         }
286                 }
287                 
288
289                 regex_t r;
290                 
291                 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
292                 if (err != 0)
293                 {
294                         regerror(err, &r, master.buffer, master.bufsiz);
295                         error("Master_input", "Error compiling regexec : %s", master.buffer);
296                 }
297
298                 int first_match = -1;
299                 for (int i = 0; i < master.nSlaves; ++i)
300                 {
301                         if (first_only && first_match >= 0 && master.slave[i].task != NULL)
302                                 continue;
303
304                         int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
305                         if (err == REG_NOMATCH)
306                         {
307                                 
308                                 continue;
309                         }
310                         else if (err != 0)
311                         {
312                                 regerror(err, &r, master.buffer, master.bufsiz);
313                                 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
314                                 continue;
315                         }
316                         
317                         if (first_match < 0) first_match = i;
318                         if (first_only)
319                         {
320                                 if (master.slave[i].task == NULL) 
321                                 {
322                                         first_match = i;
323                                         break;
324                                 }
325                                 continue;
326                         }
327
328                         message = strdup(start);
329                         Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile); 
330                         if (t->prev == NULL)
331                                 master.slave[i].task_pool = t;
332                         master.last_number = t->number;
333                         
334                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
335                         
336                 }
337
338                 if (first_only && first_match >= 0)
339                 {
340                         message = strdup(start);
341                         Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
342                         if (t->prev == NULL)
343                                 master.slave[first_match].task_pool = t;        
344                         master.last_number = t->number;
345                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);                
346                 }
347
348                 if (first_match < 0)
349                         log_print(1, "Master_input", "No shells with names matching regex %s");
350
351                 regfree(&r);
352                 master.buflen = 0;
353                 log_print(3, "Master_input", "Processed task %s", master.buffer);
354         }
355         else
356         {
357                 if (master.buflen >= master.bufsiz)
358                 {
359                         master.bufsiz *= 2;
360                         master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
361                 }
362                 master.buffer[master.buflen++] = c;
363         }
364 }
365
366 void Master_output(int i, char c)
367 {
368         log_print(10, "Master_output", "input %c from slave %d", c, i);
369         #ifdef THREAD_SENDING
370         pthread_mutex_lock(&sender_lock);
371         Task * t = master.slave[i].task;
372         while (t == NULL)
373         {
374                 pthread_cond_wait(&sender_done_cv, &sender_lock);
375         }
376         pthread_mutex_unlock(&sender_lock);
377         #else
378         Task * t = master.slave[i].task;
379         #endif //THREAD_SENDING
380         if (t == NULL)
381         {
382                 log_print(3, "Master_output", "Echo %c back to slave %d", c, i);
383                 write(master.slave[i].in, &c, sizeof(char));
384                 //log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
385                 //error(NULL, "Please refrain from echoing three bell characters in a row.");
386                 return;
387         }
388
389                         
390         t->output[t->outlen++] = c;
391         if (t->outlen >= t->outsiz)
392         {
393                 t->outsiz *= 2;
394                 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
395                 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
396                 
397         }
398         if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen
399                 && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0))
400         {
401                 
402                 if (c != EOF)
403                 {
404                         t->output[t->outlen - master.o->endlen] = '\0';
405                         t->outlen -= master.o->endlen;
406                 }
407
408                 master.slave[i].task = NULL;
409                 if (t->outlen <= 0)
410                 {
411                         fprintf(stdout, "%d:\n", t->number);
412                 }
413                 /*
414                 else if (t->output[t->outlen-1] == '\f')
415                 {
416                         
417                         log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
418                         static int bufsiz = BUFSIZ;
419                         char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
420                         if (buffer == NULL)
421                                 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
422                         int len = 0; 
423                         while (true)
424                         {
425                                 len = sprintf(buffer, "name=%s", master.slave[i].name);
426                                 if (len < bufsiz)
427                                         break;
428                                 bufsiz *= 2;
429                                 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
430                                 if (buffer == NULL)
431                                         error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
432                         }
433
434                                 
435                         Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL); 
436                         if (t2->prev == NULL)
437                                 master.slave[i].task_pool = t2;
438                         master.last_number = t2->number;
439                 }
440                 */
441                 else
442                 {
443                         fprintf(stdout, "%d: %s", t->number, t->output); 
444                         if (t->outfile != NULL && t->outfile[0] != '\0')
445                         {
446                                 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
447                                 static char buf[BUFSIZ];
448                                 
449                                 FILE * f = NULL;
450                                 if (strstr(t->outfile, "%d") != NULL)
451                                 {
452                                         
453                                         sprintf(buf, t->outfile, t->number);
454                                         f = fopen(buf, "w");
455                                         if (f == NULL)
456                                                 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
457                                 }
458                                 else
459                                 {
460                                         f = fopen(t->outfile, "a");
461                                         if (f == NULL)
462                                                 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
463
464                                 }
465                                 
466                                                         
467                                 fprintf(f, "%s", t->output);
468                                 fclose(f);
469                         }
470                 }
471                 
472                 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
473
474
475                 if (t->repetitions == 0)
476                 {
477                         free(t->message);
478                         free(t->output);
479                         free(t->outfile);
480                         free(t);
481                 }
482
483
484                 if (--master.commands_active == 0 && master.barrier_number >= 0)
485                 {
486                         master.barrier_number = -1;
487                         if (master.barrier_block && master.o->daemon)
488                         {
489                                 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
490                                 fprintf(f, "\a");
491                                 fclose(f);
492                         }
493                 }
494         }
495 }
496
497 Task * Master_tasker(int i)
498 {
499         Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
500
501         if (*tp != NULL)
502         {
503                 Task * t = *tp;
504                 if (master.barrier_number >= 0 && t->number > master.barrier_number)
505                         return NULL;
506
507                 t->repetitions--;
508                 if (t->repetitions == 0)
509                 {
510                         (*tp) = t->next; 
511                         Task_Extract(t);
512                 }
513                 master.slave[i].task = t;
514                 
515                 
516                 
517                 
518
519                 #ifdef THREAD_SENDING
520                 pthread_mutex_lock(&sender_lock);
521                 while (send_task.task != NULL) // while the sender is still sending shit
522                 {
523                         pthread_cond_wait(&sender_done_cv, &sender_lock);
524                 }
525                 send_task.task = t;
526                 send_task.slave_fd = master.slave[i].out;
527                 pthread_cond_signal(&sender_cv);
528                 pthread_mutex_unlock(&sender_lock);
529
530                 #else
531                 send_task.task = t;
532                 send_task.slave_fd = master.slave[i].out;
533                 Master_send();
534                 #endif //THREAD_SENDING
535                 
536         }
537         return *tp;
538 }
539
540 void Master_loop()
541 {
542         
543         if (sigsetjmp(env,true) != 0) // completely necessary evil
544         {
545                 //log_print(2, "Master_loop", "Restored from longjmp");
546         }
547         fd_set readSet;
548         //fd_set writeSet;
549         master.fd_max = fileno(stdin);
550         for (int i = 0; i < master.nSlaves; ++i)
551         {
552                 if (master.slave[i].out > master.fd_max)
553                         master.fd_max = master.slave[i].out;
554         }
555
556
557
558         bool quit = false;
559         bool input = true;
560         char buffer[BUFSIZ];
561         
562         while (!quit)
563         {
564                 
565                 if (!input && master.o->daemon)
566                 {
567                                         
568                         int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
569                         if (fd == -1)
570                         {
571                                 if (errno != ENXIO)
572                                         error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
573                                 else
574                                         log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
575                         }
576                         else
577                         {
578                                 
579                                 input = true;
580                                 dup2(fd, fileno(stdin));
581                         }
582                         
583                 }
584
585                 FD_ZERO(&readSet);
586                 if (input) FD_SET(fileno(stdin), &readSet);     
587                 for (int i = 0; i < master.nSlaves; ++i)
588                 {
589                         if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
590                 }
591
592                 for (int i = 0; i < master.nRemote; ++i)
593                 {
594                         FD_SET(master.remote_err[i], &readSet);
595                 }
596                 
597                 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
598                 
599
600                 if (input && FD_ISSET(fileno(stdin), &readSet))
601                 {                       
602                         char c;
603                         //log_print(10, "Master_loop", "Read from stdin");
604                         input = (read(fileno(stdin), &c, sizeof(char)) != 0);
605                         if (!input)
606                         {
607                                 c = '\n';
608                         }
609                         Master_input(c);
610                 }
611
612                 
613                 
614                 for (int i = 0; i < master.nSlaves; ++i)
615                 {
616                         if (!master.slave[i].running) continue;
617
618                         if (FD_ISSET(master.slave[i].out, &readSet))
619                         {
620                                 //log_print(10, "Master_loop", "Read from slave %d", i);
621                                 char c;                 
622                                 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
623                                 {
624                                         Master_output(i, c);
625                                 }
626                         }                               
627
628                         if (master.slave[i].task == NULL)
629                         {
630                                 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
631                                 {
632                                         quit = (--master.running == 0);
633                                         master.slave[i].running = false;
634                                 }
635                                 
636                         }
637                 }
638
639                 for (int i = 0; i < master.nRemote; ++i)
640                 {
641                         if (FD_ISSET(master.remote_err[i], &readSet))
642                         {
643                                 int len = read(master.remote_err[i], buffer, sizeof(buffer));
644                                 buffer[len] = '\0';
645                                 fprintf(stderr, "%s", buffer);
646                         }
647                 }
648                 
649         }
650
651 }
652
653 void Master_send()
654 {
655
656         if (master.o->prepend != NULL)
657         {
658                 static int len = -1;
659                 if (len == -1) len = strlen(master.o->prepend);
660                 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
661                 //log_print(0, "Sent prepend %s\n",master.o->prepend); 
662         }
663
664         write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char)); 
665         //log_print(0, "Sent message %s\n",send_task.task->message); 
666         if (master.o->append != NULL)
667         {
668                 static int len = -1;
669                 if (len == -1) len = strlen(master.o->append);
670                 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
671                 //log_print(0, "Sent append %s\n",master.o->append); 
672         }
673         if (master.o->end != NULL)
674         {
675                 static char * echo = ";echo -en \"";
676                 static int len = -1;
677                 if (len == -1) len = strlen(echo);
678                 write(send_task.slave_fd, echo, len*sizeof(char));
679                 write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char));
680                 write(send_task.slave_fd, "\"", 1*sizeof(char));
681                 //log_print(0, "Sent end\n"); 
682         }
683         write(send_task.slave_fd, "\n", 1*sizeof(char));
684         master.commands_active++;
685         log_print(3, "Master_sender", "Sent task %d \"%s\" on socket %d - %d tasks active", send_task.task->number, send_task.task->message, send_task.slave_fd, master.commands_active);
686 }
687
688 #ifdef THREAD_SENDING
689 void * Master_sender(void * args)
690 {
691         bool quit = false;
692         while (!quit)
693         {
694                 pthread_mutex_lock(&sender_lock);
695                 while (send_task.task == NULL)
696                 {
697                         pthread_cond_wait(&sender_cv, &sender_lock);
698
699                         quit = (send_task.task == NULL);
700                         if (quit)
701                                 break;
702
703
704                         Master_send();
705
706                         //log_print(0, "Master_sender sent message\n");
707
708
709                         pthread_cond_broadcast(&sender_done_cv);
710                 }
711                 send_task.task = NULL;
712                 pthread_mutex_unlock(&sender_lock);     
713         }
714
715         return NULL;
716 }
717 #endif //THREAD_SENDING
718
719
720
721
722 void Master_cleanup()
723 {
724
725         //log_print(2, "Master_cleanup", "Preparing to exit...");
726         #ifdef THREAD_SENDING
727         pthread_mutex_lock(&sender_lock);
728         send_task.task = NULL;
729         pthread_cond_broadcast(&sender_cv);
730         pthread_mutex_unlock(&sender_lock);
731         pthread_join(sender_thread, NULL);
732         #endif //THREAD_SENDING
733
734         if (master.task_pool != NULL) Task_Destroy(master.task_pool);
735
736
737         signal(SIGCHLD, SIG_IGN); // ignore child exits now
738
739         // tell all remote nodes to exit
740         for (int i = 0; i < master.nRemote; ++i)
741         {
742                 FILE * f = fdopen(master.remote_err[i], "r+"); setbuf(f, NULL);
743
744                 fprintf(f, "exit\n");
745         
746                 fclose(f);
747         }
748
749         for (int i = 0; i < master.nSlaves; ++i)
750         {
751                 
752                 static int exitlen = -1;
753                 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE);
754                 write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char));
755                 //usleep(0.5); //shouldn't matter too much
756         }
757
758         for (int i = 0; i < master.nSlaves; ++i)
759         {
760                 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
761                 if (master.slave[i].pid <= 0) 
762                 {
763                         Network_close(master.slave[i].in);
764                 }
765                 else
766                 {
767                         kill(master.slave[i].pid, 15); // be nice
768                         if (kill(master.slave[i].pid, 0) == 0)
769                                 kill(master.slave[i].pid, 9); // shoot it down
770                         close(master.slave[i].in);
771                         close(master.slave[i].out);
772                 }
773                 free(master.slave[i].name);
774                 free(master.slave[i].addr);
775         }
776         free(master.slave);
777         free(master.buffer);
778         if (master.outfile != NULL)
779                 free(master.outfile);
780
781         libssh2_exit();
782
783 }
784
785
786
787
788 void Master_absorb(char * addr, int np)
789 {
790         int port = 0;
791
792         char * user = strstr(addr, "@");
793         if (user != NULL)
794         {
795                 *(user-1) = '\0';
796                 char * t = user;
797                 user = addr;
798                 addr = t;
799         }
800         else
801         {
802                 user = getpwuid(geteuid())->pw_name;
803         }
804         log_print(3, "Master_absorb", "User %s at address %s", user, addr);
805
806         // ssh to the host on port 22
807         ssh * s = ssh_new(user, addr, 22);
808         if (s == NULL)
809         {
810                 log_print(0, "Master_absorb", "Couldn't ssh to %s@%s", user, addr);
811                 return;
812         }
813         
814
815         // work out the name to give to the shells
816         char * name = strtok(addr, ":");
817         name = strtok(NULL, ":");
818         if (name == NULL)
819                 name = addr; // default is host:X
820         else
821                 *(name-1) = '\0'; // otherwise use name:X
822
823         // setup array of remote stderr file descriptors
824         if (master.nRemote++ == 0)
825         {
826                 master.remote_err = (int*)(calloc(master.nRemote, sizeof(int)));
827                 master.remote_reserved = master.nRemote;
828         }
829         else if (master.nRemote >= master.remote_reserved)
830         {
831                 // resize dynamically
832                 master.remote_reserved *= 2;
833                 master.remote_err = (int*)(realloc(master.remote_err,master.remote_reserved * sizeof(int)));
834         }
835
836
837         
838         int sfd = -1;
839         if (master.o->encrypt)
840         {
841                 int sv[2];
842                 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
843                         error("Master_absorb", "Couldn't create socket for remote swarm");
844                 sfd = sv[0];
845
846                 ssh_exec_swarm(s, NULL, sv+1, np); // start swarm remotely forward it to the socket
847                 ssh_thread_add(s); // add the ssh to the thread
848         }
849         else
850         {
851                 sfd = Network_server_bind(0, &port); // dynamically bind to a port
852                 ssh_exec_swarm(s, &port, NULL, np); // start swarm remotely, have it connect on the port
853                 ssh_destroy(s); // don't need the ssh anymore
854                 sfd = Network_server_listen(sfd, NULL); // accept connections and pray
855         }
856         master.remote_err[master.nRemote-1] = sfd;
857         if (sfd > master.fd_max)
858                 master.fd_max = sfd;
859
860
861         char buffer[BUFSIZ];
862
863         int newSlaves = 0;
864         
865         int len = sprintf(buffer, "%s\n", name);
866         int w = 0;
867         while (w < len)
868                 w += write(sfd, buffer+w, len-w);
869         
870         
871         len = 0;
872         do
873         {
874                 len = read(sfd, buffer+len, sizeof(buffer));
875                 buffer[len] = '\0';
876         }
877         while (buffer[len-1] != '\n');
878         buffer[len-1] = '\0';
879         newSlaves = atoi(buffer);
880
881         
882
883         if (newSlaves == 0)
884         {
885                 error("Master_absorb", "No slaves to absorb from %s", addr);
886         }
887
888         master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
889         if (master.slave == NULL)
890         {
891                 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", 
892                         master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
893         }
894
895         
896         for (int i = 0; i < newSlaves; ++i)
897         {
898                 int ii = master.nSlaves + i;
899
900                 if (master.o->encrypt)
901                 {
902                         int sv[2];
903                         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
904                                 error("Master_absorb", "Couldn't create socket for remote swarm");
905
906                         
907                         LIBSSH2_LISTENER * listener = NULL;
908                         // libssh2 can't finalise the connection when the port is dynamic (ie: port = 0)
909                         while (listener == NULL)
910                         {
911                                 port = 20000 + rand() % 30000; // so pick ports at random until binding succeeds
912                                 listener = ssh_get_listener(s, &port); // port forward to the socket
913                         }
914
915                         log_print(4,"Master_absorb", "Chose port %d", port);
916                         int len = sprintf(buffer, "%d\n", port);
917                         
918                         int w = 0;
919                         while (w < len)
920                         {
921                                 w += write(sfd, buffer+w, len-w);
922                         }
923                         usleep(200000); // give ssh_thread a chance to actually send the data
924
925                         log_print(4, "Master_absorb", "Creating tunnel...");
926                         ssh_add_tunnel(s, listener, sv[1]);
927                         master.slave[ii].in = sv[0];
928                         master.slave[ii].out = sv[0];
929
930                         log_print(4, "Master_absorb", "Tunnel for slave %d using socket %d<->%d setup", ii, sv[0], sv[1]);
931                 }
932                 else
933                 {
934                         int tmp = Network_server_bind(0, &port); // bind to a port      
935                         
936                         master.slave[ii].in = Network_server_listen(tmp, NULL); // listen for connection
937                         master.slave[ii].out = master.slave[ii].in;
938                 }
939
940         
941                 
942                 
943
944                 if (master.slave[ii].out > master.fd_max)
945                         master.fd_max = master.slave[ii].out;
946
947                 char buffer[BUFSIZ];
948                 sprintf(buffer, "%s:%d", name, i);
949                 master.slave[ii].name = strdup(buffer);
950                 master.slave[ii].addr = strdup(addr);
951                 master.slave[ii].running = true;
952                 master.slave[ii].pid = -1;
953                 master.slave[ii].task = NULL;
954                 master.slave[ii].task_pool = NULL;      
955         }       
956
957
958         master.nSlaves = master.nSlaves + newSlaves;
959
960         master.running += newSlaves;
961
962         log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
963
964 }
965
966 void sigchld_handler(int signal)
967 {
968
969         if (signal != SIGCHLD)
970                 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
971
972         int s = 0;
973         int p = waitpid(-1, &s, 0);
974         if (p == -1)
975                 error("sigchld_handler", "waitpid : %s", strerror(errno));
976         
977
978         int i = 0;
979         for (i = 0; i < master.nSlaves; ++i)
980         {
981                 if (master.slave[i].pid == p)
982                         break;
983         }
984         if (i >= master.nSlaves)
985         {
986                 return;
987         }
988
989         fprintf(stderr, "Unexpected exit of slave %s", master.slave[i].name);
990         if (WIFSIGNALED(s))
991         {
992                 int sig = WTERMSIG(s);
993                 fprintf(stderr, " due to %s", strsignal(sig));
994                 if (sig == SIGKILL)
995                 {
996                         printf(" - committing suicide\n");
997                         kill(getpid(), sig);
998                 }
999         }
1000         else
1001         {               
1002                 fprintf(stderr, " return code %d.",s);
1003         }
1004         fprintf(stderr, " Starting replacement.\n");
1005
1006         Make_slave(i);
1007
1008         if (master.o->end != NULL)
1009         {
1010                 //log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
1011                 char buffer[BUFSIZ];
1012                 sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end);
1013                 if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0)
1014                         error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);
1015
1016         }
1017
1018         siglongjmp(env,1);
1019
1020         
1021 }
1022

UCC git Repository :: git.ucc.asn.au