Initial commit (sort of)
[matches/swarm.git] / src / master.c
1 #define _BSD_SOURCE
2 #include "master.h"
3 #include "log.h"
4
5 #include <sys/wait.h>
6 #include "daemon.h"
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <strings.h>
11 #include <unistd.h>
12 #include <assert.h>
13 #include <ctype.h>
14 #include "slave.h"
15 #include <setjmp.h>
16
17 #include <unistd.h>
18 #include <regex.h>
19 #include <pthread.h>
20 #include "network.h"
21 #include <signal.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/tcp.h>
25
26 //#define THREAD_SENDING // You decided to solve a problem with threads; now you have two problems
27
28 // probably not that great to use threads anyway, since it eats one of your cores
29 // It probably spends 90% of its time sleeping, and 9.9% unlocking mutexes
30
31 #ifdef THREAD_SENDING
32 pthread_t sender_thread;
33 pthread_mutex_t sender_lock = PTHREAD_MUTEX_INITIALIZER;
34 pthread_cond_t sender_cv;
35 pthread_cond_t sender_done_cv;
36 #endif //THREAD_SENDING
37
38
39
40 struct
41 {
42         int slave_fd;
43         Task * task;
44 } send_task;
45
46 static Master master;
47
48 static sigjmp_buf env;
49
50 void sigchld_handler(int signal);
51
52 void Master_main(Options * o)
53 {
54         setbuf(stdin, NULL);
55         setbuf(stdout, NULL);
56         setbuf(stderr, NULL);
57
58         atexit(Master_cleanup);
59         Master_setup(o);
60
61         #ifdef THREAD_SENDING
62         if (pthread_create(&sender_thread, NULL, Master_sender, NULL) != 0)
63         {
64                 error("Master_main", "Creating sender thread : %s", strerror(errno));
65         }
66         #endif //THREAD_SENDING
67         Master_loop();
68 }
69
70 void Master_setup(Options * o)
71 {
72         signal(SIGCHLD, sigchld_handler);
73         master.o = o;
74         master.barrier_number = -1;
75         master.last_number = -1;
76         master.nSlaves = o->nCPU;
77         master.running = master.nSlaves;
78         if (master.nSlaves == 0)
79                 error("Master_setup", "No CPUs to start slaves with!");
80
81         master.outfile = NULL;
82
83         master.slave = (Slave*)(calloc(master.nSlaves, sizeof(Slave)));
84
85         if (master.slave == NULL)
86                 error("Master_setup", "Allocating memory for %d slaves", master.nSlaves);
87
88         // One slave per CPU
89         for (int i = 0; i < master.nSlaves; ++i)
90         {
91                 Make_slave(i);
92
93
94                 
95                 master.slave[i].name = (char*)calloc(BUFSIZ, sizeof(char));
96                 master.slave[i].addr = (char*)calloc(BUFSIZ, sizeof(char));
97                 sprintf(master.slave[i].name, "local:%d", i);
98                 sprintf(master.slave[i].addr, "local:%d", i);
99                 FILE * f = fdopen(master.slave[i].in, "w"); setbuf(f, NULL);
100                 fprintf(f, "name=%s\n", master.slave[i].name);
101         }
102 }
103
104 void Make_slave(int i)
105 {
106         int sv[2];
107         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) != 0)
108         {
109                 error("Make_slave", "Setting up socketpair for slave %d : %s", i, strerror(errno));
110         }
111         
112         master.slave[i].pid = fork();
113         if (master.slave[i].pid == 0)
114         {
115                 dup2(sv[0],fileno(stdin));
116                 dup2(sv[0],fileno(stdout));
117                 execlp(master.o->shell, master.o->shell, NULL);
118         }
119         master.slave[i].in = sv[1];
120         master.slave[i].out = sv[1];
121         master.slave[i].running = true;
122         master.slave[i].ssh_pid = 0;
123 }
124
125 void Master_input(char c)
126 {
127         if (master.buffer == NULL)
128         {
129                 if (master.bufsiz < BUFSIZ)
130                         master.bufsiz = BUFSIZ;
131                 master.buffer = (char*)(calloc(master.bufsiz, sizeof(char)));
132                 master.buflen = 0;
133         }
134         
135         if (c == '\n' || c == EOF || c == '\0')
136         {
137                 if (master.buflen == 0)
138                         return;
139                 int repetitions = 1;
140                 bool first_only = false;
141
142                 master.buffer[master.buflen++] = '\0'; // end the string
143                 
144                 char * message;
145
146                 char * start = strtok(master.buffer, "#");
147                 start = strtok(NULL, "#");
148                 if (start == NULL) 
149                 {
150                         if ( master.buffer[0] != '#')
151                         {
152                                 log_print(3, "Master_input", "Created general task \"%s\"", master.buffer);
153                                 message = master.buffer;
154                                 master.buffer = NULL;
155                                 Task * t = Task_Append(master.task_pool, message, master.buflen, repetitions, master.outfile);
156                                 if (t->prev == NULL)
157                                         master.task_pool = t;
158                                 master.last_number = t->number;
159                         }
160                         else
161                         {
162                                 char * cmd = strtok(master.buffer+1, " ");
163                                 if (strcmp(cmd, "ABSORB") == 0)
164                                 {
165                                         master.o->encrypt = true;
166                                         cmd = strtok(NULL, " ");
167                                         if (strcmp(cmd, "UNSECURE") == 0)
168                                         {
169                                                 master.o->encrypt = false;
170                                                 log_print(1, "Master_absorb", "Using unencrypted connections");
171                                                 cmd = strtok(NULL, " ");
172                                         }
173                                         if (cmd == NULL)
174                                                 log_print(0, "Master_input", "No host specified for ABSORB directive");
175                                         char * np = strtok(NULL, " ");
176                                         if (np != NULL)
177                                                 Master_absorb(cmd, options.port, atoi(np));
178                                         else
179                                                 Master_absorb(cmd, options.port, 0);
180                                 }
181                                 else if (strcmp(cmd, "OUTPUT") == 0)
182                                 {
183                                         cmd = strtok(NULL, " ");
184                                         if (cmd == NULL)
185                                         {
186                                                 log_print(3, "Master_input", "Detach output");
187                                                 if (master.outfile != NULL)
188                                                         master.outfile[0] = '\0';
189                                         }
190                                         else
191                                         {
192                                                 log_print(3, "Master_input", "Output to %s",cmd);
193                                                 if (master.outfile == NULL)
194                                                         master.outfile = (char*)(calloc(BUFSIZ, sizeof(char)));
195                                                 sprintf(master.outfile, "%s", cmd);
196                                                 if (strstr(master.outfile, "%d") == NULL)
197                                                 {
198                                                         if (access(master.outfile, F_OK) == 0)
199                                                         {
200                                                                 if (unlink(master.outfile) != 0)
201                                                                         error("Master_input", "Removing %s for output : %s", master.outfile, strerror(errno));
202                                                         }
203                                                 }
204                                         }
205                                 }
206                                 else if (strcmp(cmd, "EXIT") == 0)
207                                 {
208                                         log_print(2, "Master_input", "Received EXIT directive; quitting");
209                                         freopen("/dev/null", "r", stdin);
210                                         master.o->daemon = false;
211                                         return;
212                                 }
213                                 else if (strcmp(cmd, "BARRIER") == 0)
214                                 {
215
216                                         log_print(3, "Master_input", "Received BARRIER directive; %d commands running", master.commands_active);
217
218                                         // check there actually are tasks
219                                         bool tasks = false;
220                                         for (int i = 0; i < master.nSlaves && !tasks; ++i)
221                                         {
222                                                 tasks = (master.slave[i].task != NULL || master.slave[i].task_pool != NULL);
223                                         }
224                                         if (tasks)
225                                         {
226
227                                                 master.barrier_number = master.last_number;
228                                                 master.barrier_block = false;
229         
230                                                 cmd = strtok(NULL, " ");
231                                                 if (cmd != NULL && strcmp(cmd, "BLOCK") == 0)
232                                                 {
233                                                         if (master.o->daemon == false)
234                                                                 log_print(1, "Master_input", "Not a daemon; BARRIER BLOCK functions as BARRIER");
235                                                         master.barrier_block = true;
236                                                 }
237                                         }
238                                         else
239                                                 log_print(3, "Master_input", "No tasks; BARRIER has no effect");
240
241                                 }
242                                 else
243                                 {
244                                         log_print(1, "Master_input", "Unrecognised directive #%s#", cmd);
245                                 }
246                         }
247                         master.buflen = 0;
248                         return;
249                 }
250
251                 *(start-1) = '\0';
252                 while (isspace(*start)) start++;
253
254                 
255                 char * rep = strtok(master.buffer, " ");
256                 rep = strtok(NULL, " ");
257                 if (rep != NULL && *rep == '&')
258                 {
259                         first_only = !first_only;
260                         rep = strtok(NULL, " ");
261                 }
262                 
263                 if (rep != NULL)
264                 {
265                         *(rep-1) = '\0';
266                         repetitions = atoi(rep);
267                         if (repetitions == 0)
268                         {
269                                 if (strcmp(rep, "0") == 0)
270                                         log_print(0,"Master_input", "Can't assign task with %s repetitions", rep);
271                                 else
272                                         log_print(0,"Master_input", "Require an integer (not \"%s\") number of repetitions (full directive is %s)", rep, master.buffer);
273                         }
274                 }
275                 
276
277                 regex_t r;
278                 
279                 int err = regcomp(&r, master.buffer+1, REG_EXTENDED);
280                 if (err != 0)
281                 {
282                         regerror(err, &r, master.buffer, master.bufsiz);
283                         error("Master_input", "Error compiling regexec : %s", master.buffer);
284                 }
285
286                 int first_match = -1;
287                 for (int i = 0; i < master.nSlaves; ++i)
288                 {
289                         if (first_only && first_match >= 0 && master.slave[i].task != NULL)
290                                 continue;
291
292                         int err = regexec(&r, master.slave[i].name, 0, NULL, 0);
293                         if (err == REG_NOMATCH)
294                         {
295                                 
296                                 continue;
297                         }
298                         else if (err != 0)
299                         {
300                                 regerror(err, &r, master.buffer, master.bufsiz);
301                                 log_print(0, "Master_input", "Error in regexec : %s", master.buffer+1);
302                                 continue;
303                         }
304                         
305                         if (first_match < 0) first_match = i;
306                         if (first_only)
307                         {
308                                 if (master.slave[i].task == NULL) 
309                                 {
310                                         first_match = i;
311                                         break;
312                                 }
313                                 continue;
314                         }
315
316                         message = strdup(start);
317                         Task * t = Task_Append(master.slave[i].task_pool, message, strlen(message), repetitions, master.outfile); 
318                         if (t->prev == NULL)
319                                 master.slave[i].task_pool = t;
320                         master.last_number = t->number;
321                         
322                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[i].name);
323                         
324                 }
325
326                 if (first_only && first_match >= 0)
327                 {
328                         message = strdup(start);
329                         Task * t = Task_Append(master.slave[first_match].task_pool, message, strlen(message), repetitions, master.outfile);
330                         if (t->prev == NULL)
331                                 master.slave[first_match].task_pool = t;        
332                         master.last_number = t->number;
333                         log_print(3, "Master_input", "Shell %s matches regex for task", master.slave[first_match].name);                
334                 }
335
336                 if (first_match < 0)
337                         log_print(1, "Master_input", "No shells with names matching regex %s");
338
339                 regfree(&r);
340                 master.buflen = 0;
341                 log_print(3, "Master_input", "Processed task %s", master.buffer);
342         }
343         else
344         {
345                 if (master.buflen >= master.bufsiz)
346                 {
347                         master.bufsiz *= 2;
348                         master.buffer = (char*)(realloc(master.buffer, master.bufsiz*sizeof(char)));
349                 }
350                 master.buffer[master.buflen++] = c;
351         }
352 }
353
354 void Master_output(int i, char c)
355 {
356         log_print(10, "Master_output", "input %c from slave %d", c, i);
357         #ifdef THREAD_SENDING
358         pthread_mutex_lock(&sender_lock);
359         Task * t = master.slave[i].task;
360         while (t == NULL)
361         {
362                 pthread_cond_wait(&sender_done_cv, &sender_lock);
363         }
364         pthread_mutex_unlock(&sender_lock);
365         #else
366         Task * t = master.slave[i].task;
367         #endif //THREAD_SENDING
368         if (t == NULL)
369         {
370                 log_print(0, "Master_output", "Read input from %s, but no task assigned!",master.slave[i].name);
371                 error(NULL, "Please refrain from echoing three bell characters in a row.");
372         }
373
374                         
375         t->output[t->outlen++] = c;
376         if (t->outlen >= t->outsiz)
377         {
378                 t->outsiz *= 2;
379                 t->output = (char*)(realloc(t->output, t->outsiz*sizeof(char)));
380                 memset(t->output+(t->outlen), 0, sizeof(char) * t->outsiz - t->outlen);
381                 
382         }
383         if (c == EOF || (master.o->endlen > 0 && t->outlen >= master.o->endlen
384                 && strcmp((t->output)+(t->outlen)-(master.o->endlen), master.o->end) == 0))
385         {
386                 
387                 if (c != EOF)
388                 {
389                         t->output[t->outlen - master.o->endlen] = '\0';
390                         t->outlen -= master.o->endlen;
391                 }
392
393                 master.slave[i].task = NULL;
394                 if (t->outlen <= 0)
395                 {
396                         fprintf(stdout, "%d:\n", t->number);
397                 }
398                 else if (t->output[t->outlen-1] == '\f')
399                 {
400                         log_print(2, "Master_output", "Slave %d requests name (%s)", i, master.slave[i].name);
401                         static int bufsiz = BUFSIZ;
402                         char * buffer = (char*)(calloc(bufsiz, sizeof(char)));
403                         if (buffer == NULL)
404                                 error("Master_output", "Creating name request buffer of size %d : %s", bufsiz, strerror(errno));
405                         int len = 0; 
406                         while (true)
407                         {
408                                 len = sprintf(buffer, "name=%s", master.slave[i].name);
409                                 if (len < bufsiz)
410                                         break;
411                                 bufsiz *= 2;
412                                 buffer = (char*)(realloc(buffer, bufsiz * sizeof(char)));
413                                 if (buffer == NULL)
414                                         error("Master_output", "Resizing name request buffer to size %d : %s", bufsiz, strerror(errno));
415                         }
416
417                                 
418                         Task * t2 = Task_Append(master.slave[i].task_pool, buffer,len, 1, NULL); 
419                         if (t2->prev == NULL)
420                                 master.slave[i].task_pool = t2;
421                         master.last_number = t2->number;
422                 }
423                 else
424                 {
425                         fprintf(stdout, "%d: %s", t->number, t->output); 
426                         if (t->outfile != NULL && t->outfile[0] != '\0')
427                         {
428                                 log_print(3, "Master_output", "Writing result of task %d to file \"%s\"", t->number, t->outfile);
429                                 static char buf[BUFSIZ];
430                                 
431                                 FILE * f = NULL;
432                                 if (strstr(t->outfile, "%d") != NULL)
433                                 {
434                                         
435                                         sprintf(buf, t->outfile, t->number);
436                                         f = fopen(buf, "w");
437                                         if (f == NULL)
438                                                 error("Master_output", "Couldn't open file \"%s\" : %s", buf, strerror(errno));
439                                 }
440                                 else
441                                 {
442                                         f = fopen(t->outfile, "a");
443                                         if (f == NULL)
444                                                 error("Master_output", "Couldn't open file \"%s\" : %s", t->outfile, strerror(errno));
445
446                                 }
447                                 
448                                                         
449                                 fprintf(f, "%s", t->output);
450                                 fclose(f);
451                         }
452                 }
453                 
454                 log_print(3, "Master_output", "Task %d finished; %d tasks active", t->number, master.commands_active-1);
455
456
457                 if (t->repetitions == 0)
458                 {
459                         free(t->message);
460                         free(t->output);
461                         free(t->outfile);
462                         free(t);
463                 }
464
465
466                 if (--master.commands_active == 0 && master.barrier_number >= 0)
467                 {
468                         master.barrier_number = -1;
469                         if (master.barrier_block && master.o->daemon)
470                         {
471                                 FILE * f = fopen(DAEMON_BARRIER_FIFO, "w");
472                                 fprintf(f, "\a");
473                                 fclose(f);
474                         }
475                 }
476         }
477 }
478
479 Task * Master_tasker(int i)
480 {
481         Task ** tp = (master.slave[i].task_pool != NULL) ? &(master.slave[i].task_pool) : &(master.task_pool);
482
483         if (*tp != NULL)
484         {
485                 Task * t = *tp;
486                 if (master.barrier_number >= 0 && t->number > master.barrier_number)
487                         return NULL;
488
489                 t->repetitions--;
490                 if (t->repetitions == 0)
491                 {
492                         (*tp) = t->next; 
493                         Task_Extract(t);
494                 }
495                 master.slave[i].task = t;
496                 
497                 
498                 
499                 
500
501                 #ifdef THREAD_SENDING
502                 pthread_mutex_lock(&sender_lock);
503                 while (send_task.task != NULL) // while the sender is still sending shit
504                 {
505                         pthread_cond_wait(&sender_done_cv, &sender_lock);
506                 }
507                 send_task.task = t;
508                 send_task.slave_fd = master.slave[i].out;
509                 pthread_cond_signal(&sender_cv);
510                 pthread_mutex_unlock(&sender_lock);
511
512                 #else
513                 send_task.task = t;
514                 send_task.slave_fd = master.slave[i].out;
515                 Master_send();
516                 #endif //THREAD_SENDING
517                 
518         }
519         return *tp;
520 }
521
522 void Master_loop()
523 {
524         
525         if (sigsetjmp(env,true) != 0)
526         {
527                 log_print(2, "Master_loop", "Restored from longjmp");
528         }
529         fd_set readSet;
530         //fd_set writeSet;
531         master.fd_max = fileno(stdin);
532         for (int i = 0; i < master.nSlaves; ++i)
533         {
534                 if (master.slave[i].out > master.fd_max)
535                         master.fd_max = master.slave[i].out;
536         }
537
538
539
540         bool quit = false;
541         bool input = true;
542         
543         while (!quit)
544         {
545                 
546                 if (!input && master.o->daemon)
547                 {
548                                         
549                         int fd = open(DAEMON_FIFO, O_RDONLY | O_NONBLOCK);
550                         if (fd == -1)
551                         {
552                                 if (errno != ENXIO)
553                                         error("Master_loop", "Daemon trying to reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
554                                 else
555                                         log_print(2, "Master_loop", "Daemon couldn't reopen fifo %s : %s", DAEMON_FIFO, strerror(errno));
556                         }
557                         else
558                         {
559                                 
560                                 input = true;
561                                 dup2(fd, fileno(stdin));
562                         }
563                         
564                 }
565
566                 FD_ZERO(&readSet);
567                 if (input) FD_SET(fileno(stdin), &readSet);     
568                 for (int i = 0; i < master.nSlaves; ++i)
569                 {
570                         if (master.slave[i].running) FD_SET(master.slave[i].out, &readSet);
571                 }
572                 
573                 select(master.fd_max+1, &readSet, NULL, NULL, NULL);
574                 
575
576                 if (input && FD_ISSET(fileno(stdin), &readSet))
577                 {                       
578                         char c;
579                         //log_print(10, "Master_loop", "Read from stdin");
580                         input = (read(fileno(stdin), &c, sizeof(char)) != 0);
581                         if (!input)
582                         {
583                                 c = '\n';
584                         }
585                         Master_input(c);
586                 }
587
588                 
589                 
590                 for (int i = 0; i < master.nSlaves; ++i)
591                 {
592                         if (!master.slave[i].running) continue;
593
594                         if (FD_ISSET(master.slave[i].out, &readSet))
595                         {
596                                 //log_print(10, "Master_loop", "Read from slave %d", i);
597                                 char c;                 
598                                 if (read(master.slave[i].out, &c, sizeof(char)) != 0)
599                                 {
600                                         Master_output(i, c);
601                                 }
602                         }                               
603
604                         if (master.slave[i].task == NULL)
605                         {
606                                 if (Master_tasker(i) == NULL && !input && !master.o->daemon) // no more input; no tasks
607                                 {
608                                         quit = (--master.running == 0);
609                                         master.slave[i].running = false;
610                                 }
611                                 
612                         }
613                 }
614                 
615         }
616
617 }
618
619 void Master_send()
620 {
621
622         if (master.o->prepend != NULL)
623         {
624                 static int len = -1;
625                 if (len == -1) len = strlen(master.o->prepend);
626                 write(send_task.slave_fd, master.o->prepend, (len-1) * sizeof(char));
627                 //log_print(0, "Sent prepend %s\n",master.o->prepend); 
628         }
629
630         write(send_task.slave_fd, send_task.task->message, (send_task.task->bufsiz) * sizeof(char)); 
631         //log_print(0, "Sent message %s\n",send_task.task->message); 
632         if (master.o->append != NULL)
633         {
634                 static int len = -1;
635                 if (len == -1) len = strlen(master.o->append);
636                 write(send_task.slave_fd, master.o->append, (len-1) * sizeof(char));
637                 //log_print(0, "Sent append %s\n",master.o->append); 
638         }
639         if (master.o->end != NULL)
640         {
641                 static char * echo = ";echo -en \"";
642                 static int len = -1;
643                 if (len == -1) len = strlen(echo);
644                 write(send_task.slave_fd, echo, len*sizeof(char));
645                 write(send_task.slave_fd, master.o->end, (master.o->endlen) * sizeof(char));
646                 write(send_task.slave_fd, "\"", 1*sizeof(char));
647                 //log_print(0, "Sent end\n"); 
648         }
649         write(send_task.slave_fd, "\n", 1*sizeof(char));
650         master.commands_active++;
651         log_print(3, "Master_sender", "Sent task %d \"%s\" - %d tasks active", send_task.task->number, send_task.task->message, master.commands_active);
652 }
653
654 #ifdef THREAD_SENDING
655 void * Master_sender(void * args)
656 {
657         bool quit = false;
658         while (!quit)
659         {
660                 pthread_mutex_lock(&sender_lock);
661                 while (send_task.task == NULL)
662                 {
663                         pthread_cond_wait(&sender_cv, &sender_lock);
664
665                         quit = (send_task.task == NULL);
666                         if (quit)
667                                 break;
668
669
670                         Master_send();
671
672                         //log_print(0, "Master_sender sent message\n");
673
674
675                         pthread_cond_broadcast(&sender_done_cv);
676                 }
677                 send_task.task = NULL;
678                 pthread_mutex_unlock(&sender_lock);     
679         }
680
681         return NULL;
682 }
683 #endif //THREAD_SENDING
684
685
686
687
688 void Master_cleanup()
689 {
690
691         //log_print(2, "Master_cleanup", "Preparing to exit...");
692         #ifdef THREAD_SENDING
693         pthread_mutex_lock(&sender_lock);
694         send_task.task = NULL;
695         pthread_cond_broadcast(&sender_cv);
696         pthread_mutex_unlock(&sender_lock);
697         pthread_join(sender_thread, NULL);
698         #endif //THREAD_SENDING
699
700         if (master.task_pool != NULL) Task_Destroy(master.task_pool);
701
702
703         signal(SIGCHLD, SIG_IGN); // ignore child exits now
704
705         for (int i = 0; i < master.nSlaves; ++i)
706         {
707                 
708                 static int exitlen = -1;
709                 if (exitlen == -1) exitlen = strlen(SHELL_EXIT_MESSAGE);
710                 write(master.slave[i].in, SHELL_EXIT_MESSAGE, exitlen *sizeof(char));
711                 //usleep(0.5); //shouldn't matter too much
712         }
713
714         for (int i = 0; i < master.nSlaves; ++i)
715         {
716                 if (master.slave[i].task_pool != NULL) Task_Destroy(master.slave[i].task_pool);
717                 if (master.slave[i].pid <= 0) 
718                 {
719                         Network_close(master.slave[i].in);
720                         if (master.slave[i].ssh_pid > 0)
721                         {
722                                 log_print(2, "Master_cleanup", "Killing ssh instance %d", master.slave[i].ssh_pid);
723                                 kill(master.slave[i].ssh_pid, 15);
724                                 if (kill(master.slave[i].ssh_pid, 0) == 0)
725                                         kill(master.slave[i].ssh_pid, 9);
726                         }
727                 }
728                 else
729                 {
730                         kill(master.slave[i].pid, 15); // be nice
731                         if (kill(master.slave[i].pid, 0) == 0)
732                                 kill(master.slave[i].pid, 9); // shoot it down
733                         close(master.slave[i].in);
734                         close(master.slave[i].out);
735                 }
736                 free(master.slave[i].name);
737                 free(master.slave[i].addr);
738         }
739         free(master.slave);
740         free(master.buffer);
741         if (master.outfile != NULL)
742                 free(master.outfile);
743 }
744
745 void * start_server(void * args)
746 {
747
748         *(int*)(args) = Network_server(*(int*)args);
749         log_print(2, "start_server", "started network server");
750         return NULL;
751 }
752
753 int Secure_connection(char * addr, int port);
754
755 void Master_absorb(char * addr, int port, int np)
756 {
757         char * name = strtok(addr, ":");
758         name = strtok(NULL, ":");
759         if (name == NULL)
760         {
761                 name = addr;
762         }
763         else
764         {
765                 *(name-1) = '\0';
766         }
767
768
769         //log_print(0, "name is %s\n", name);
770         
771         int first_ssh = 0;
772         if (master.o->encrypt)
773                 first_ssh = Secure_connection(addr, port);
774
775         //pthread_t ss;
776         //int net_fd = port;
777         //pthread_create(&ss, NULL, start_server, (void*)(&net_fd));
778         
779         char buffer[BUFSIZ];
780         if (fork() == 0)
781         {
782                 // The alternative to this kind of terrible hack is OpenMPI's "opal"
783                 // This involves >1000 lines of operating system independent ways to get an IP address from a local interface 
784                 // Which will then be completely useless if there is any sort of NAT involved
785                 
786                 //freopen("/dev/null", "r", stdin);
787                 //freopen("/dev/null", "w", stdout);
788                 //freopen("/dev/null", "w", stderr);
789
790                 char * cmd = buffer+sprintf(buffer, "swarm -p ");
791                 if (master.o->encrypt)
792                         cmd = cmd+sprintf(cmd, "%d -e", port+1000);
793                 else
794                         cmd = cmd+sprintf(cmd, "%d -u", port);
795         
796                 if (np > 0)
797                         cmd = cmd+sprintf(cmd, " -n %d", np);
798                 sprintf(cmd, " -m $(echo $SSH_CONNECTION | awk \'{print $1}\')");
799                 log_print(3, "Master_absorb", "Execing %s", buffer);
800                 execlp("ssh", "ssh", "-f", addr, buffer, NULL);
801         }
802         log_print(3, "Master_absorb", "Listening on port %d", port);
803         int net_fd = Network_server(port);
804         log_print(3, "Master_absorb", "Created network server on port %d", port);
805
806         
807         char * s = buffer;
808         while (read(net_fd, s, sizeof(char)) != 0)
809         {
810                 if (*s == '\n')
811                 {
812                         *s = '\0';
813                         break;
814                 }
815                 ++s;
816         }
817
818         int newSlaves = atoi(buffer);
819         log_print(3, "Master_absorb", "Absorbing %d slaves from %s\n", newSlaves, addr);
820         if (newSlaves == 0)
821         {
822                 error("Master_absorb", "No slaves to absorb from %s", addr);
823         }
824
825         master.slave = (Slave*)(realloc(master.slave, (master.nSlaves + newSlaves) * sizeof(Slave)));
826         if (master.slave == NULL)
827         {
828                 error("Master_absorb", "Resizing slave array from %d to %d slaves : %s", master.nSlaves, master.nSlaves + newSlaves, strerror(errno));
829         }
830
831
832         if (master.o->encrypt)
833         {
834                 for (int i = 0; i < newSlaves-1; ++i)
835                         master.slave[master.nSlaves+i].ssh_pid = Secure_connection(addr, port+i+1);
836
837         }       
838         master.slave[master.nSlaves+newSlaves-1].ssh_pid = first_ssh;
839         
840
841
842
843         
844
845         char c = '\a';
846         log_print(3, "Master_absorb", "Writing bell to slave");
847         write(net_fd, &c, sizeof(char));
848         
849
850
851         for (int i = 0; i < newSlaves; ++i)
852         {
853                 log_print(3, NULL, "Absorbing slave %d...", i);
854                 int ii = master.nSlaves + i;
855                 if (i == newSlaves-1)
856                 {
857                         master.slave[ii].out = net_fd;
858                 }
859                 else
860                 {
861                         log_print(3, "Master_absorb", "Creating server %d at time %d", i, time(NULL));
862                         write(net_fd, &c, sizeof(char));
863                         master.slave[ii].out = Network_server(port + i + 1);
864                 }
865                 master.slave[ii].in = master.slave[ii].out;
866                 
867
868                 if (master.slave[ii].out > master.fd_max)
869                         master.fd_max = master.slave[ii].out;
870
871                 sprintf(buffer, "%s:%d", name, i);
872                 master.slave[ii].name = strdup(buffer);
873                 master.slave[ii].addr = strdup(addr);
874                 master.slave[ii].running = true;
875                 master.slave[ii].pid = -1;
876                 master.slave[ii].task = NULL;
877                 master.slave[ii].task_pool = NULL;
878
879                 FILE * f = fdopen(master.slave[ii].in, "w"); setbuf(f, NULL);
880                 fprintf(f, "name=%s\n", master.slave[ii].name);
881
882                 log_print(3, NULL, "Done absorbing slave %d...", i);
883         
884         }       
885
886
887         master.nSlaves = master.nSlaves + newSlaves;
888
889         master.running += newSlaves;
890
891         log_print(2, "Master_absorb", "Successfully absorbed %d slaves from %s", newSlaves, addr);
892
893 }
894
895 int Secure_connection(char * addr, int port)
896 {
897         int result = fork();
898         if (result == 0)
899         {
900                 char buffer[BUFSIZ];
901                 sprintf(buffer, "%d:localhost:%d", port+1000, port);
902                 freopen("/dev/null", "r", stdin);
903                 freopen("/dev/null", "w", stdout);
904                 freopen("/dev/null", "w", stderr);
905                 execl("/usr/bin/ssh", "/usr/bin/ssh", "-N", "-R", buffer, addr, NULL);
906         }
907         return result;
908 }
909
910 void sigchld_handler(int signal)
911 {
912
913         if (signal != SIGCHLD)
914                 error("sigchld_handler", "Got signal (%d) which isn't SIGCHLD (%d)", signal, SIGCHLD);
915
916         int s = 0;
917         int p = waitpid(-1, &s, 0);
918         if (p == -1)
919                 error("sigchld_handler", "waitpid : %s", strerror(errno));
920         
921         if (WIFSIGNALED(s))
922         {
923                 int sig = WTERMSIG(s);
924                 log_print(2, "sigchld_handler", "A child [%d] was terminated with signal %d; terminating self with same signal", p, sig);
925                 kill(getpid(), sig);
926                 return;
927         }
928
929         int i = 0;
930         for (i = 0; i < master.nSlaves; ++i)
931         {
932                 if (master.slave[i].pid == p)
933                         break;
934         }
935         if (i >= master.nSlaves)
936         {
937                 return;
938         }
939
940         log_print(1, "sigchld_handler", "Slave %d [%d] exited with code %d; restarting it",i, p, s);
941
942         Make_slave(i);
943
944         if (master.o->end != NULL)
945         {
946                 log_print(1, "sigchld_handler", "Trying to convince slave %d to be nice", i);
947                 char buffer[BUFSIZ];
948                 sprintf(buffer, "name=%s;echo -en \"%s\"\n", master.slave[i].name, master.o->end);
949                 if (write(master.slave[i].in, buffer, strlen(buffer)) <= 0)
950                         error("sigchld_handler", "Couldn't restart slave %d; it is unresponsive", i);
951
952         }
953
954         siglongjmp(env,1);
955
956         
957 }
958

UCC git Repository :: git.ucc.asn.au