Parallel Programming - Commit before I break everything
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
1 /**
2  * @file nbody.c
3  * @purpose Implentation of multi-threaded N-Body simulator using pthreads
4  * @author Sam Moore (20503628) - 2012
5  */
6
7 #include "nbody.h" // Declarations
8 #include "../single-thread/nbody.c" // Include all functions from the single threaded version
9
10 #include "graphics.h" // For declaration of Graphics_Run only
11
12 // --- Variable declarations --- //
13
14 pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
15         
16 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
17 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
18 System * nested_sub_system = NULL; //Array of Systems for division of "universe" for the nested worker threads
19
20 pthread_mutex_t mutex_runstate; // Mutex around the runstate
21
22
23
24 pthread_attr_t attr; //thread attribute for the workers. 
25
26 Barrier force_barrier; // I laughed at this variable name. A bit sad really.
27 Barrier position_barrier;
28
29
30 Barrier graphics_barrier;
31
32
33 /**
34  * @function Compute_Thread
35  * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
36  *      Spawns worker threads to divide up computation.
37  * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
38  */
39 void * Compute_Thread(void * arg)
40 {
41
42         System * s = (System*)(arg); //cast argument to a System*
43
44
45         // If no number of threads provided, use the default value, unless someone changed that to a stupid value
46         if (options.num_threads <= 0)
47                 options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
48
49         if (options.nested_threads <= 0)
50                 options.nested_threads = 1;
51
52         // Do a sanity check; there is no point spawning more threads than bodies.
53         if (options.num_threads > s->N)
54         {
55                 fprintf(stderr, 
56                         "Warning: Using %u threads instead of %u specified, because there are only %u bodies to simulate!\n",
57                         s->N, options.num_threads, s->N);       
58                 options.num_threads = s->N;
59         }
60         
61         pthread_attr_init(&attr);
62         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
63         
64         if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
65         {
66                 worker_thread = Allocate_Threads(options.num_threads);
67                 sub_system = Split_System(&universe, options.num_threads);
68
69                 if (options.nested_threads > 1)
70                         nested_sub_system = Split_System(&universe, options.nested_threads);
71
72                 #ifdef PERSISTENT_THREADS
73                 for (unsigned i = 0; i < options.num_threads; ++i)
74                 {
75                         if (pthread_create(worker_thread+i, & attr, Worker_Thread, (void*)(sub_system+i)) != 0)
76                         {
77                                 perror("In compute thread, couldn't create worker thread");
78                                 QuitProgram(true);
79                                 pthread_exit(NULL);
80                         }
81                 }
82                 #endif //PERSISTENT_THREADS
83
84                 
85         }
86         #ifdef PERSISTENT_THREADS
87         else
88         {
89                 while (!ExitCondition())
90                 {
91                         if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
92                                 DisplayStatistics();
93
94                         // Just do everything in this thread 
95                         System_Forces(s, s);
96                         System_Positions(s);
97                 }
98                 QuitProgram(false);
99                 pthread_exit(NULL);
100         }
101
102         #else
103         // The main computation loop
104         while (!ExitCondition())
105         {
106                 if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
107                         DisplayStatistics();
108
109                 if (options.num_threads <= 1)
110                 {
111                         // Just do everything in this thread 
112                         System_Forces(s, s);
113                         System_Positions(s);
114                         continue;
115                 }
116
117                 
118
119                 //Compute forces
120                 for (unsigned i = 0; i < options.num_threads; ++i)
121                 {
122                         if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
123                         {
124                                 perror("In compute thread, couldn't create worker thread (force)");
125                                 QuitProgram(true);
126                                 pthread_exit(NULL);
127                         }       
128                         
129                 }
130
131
132                 Barrier_Wait(&force_barrier);
133
134                 //All the forces are now computed
135
136                 if (options.draw_graphics && options.pedantic_graphics)
137                 {
138                         Barrier_Wait(&graphics_barrier);
139                 }
140
141                 //Compute positions
142                 for (unsigned i = 0; i < options.num_threads; ++i)
143                 {
144                         if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
145                         {
146                                 perror("In compute thread, couldn't create worker thread (position)");
147                                 QuitProgram(true);
148                                 pthread_exit(NULL);
149                         }       
150                 }
151
152                 //Wait for positions to be computed
153                 Barrier_Wait(&position_barrier);
154
155
156                 //Update number of steps computed
157                 universe.steps += 1;
158
159                 
160
161                 
162
163
164         }
165         QuitProgram(false);
166         #endif //PERSISTENT_THREADS
167         return NULL;
168 }
169
170 /**
171  * @function BeforeDraw
172  * @purpose Called in graphics thread before the draw loop
173  *      When --pedantic-graphics enabled, will wait for position computations to finish before drawing
174  *      Otherwise does nothing
175  */
176 void BeforeDraw()
177 {
178         if (options.verbosity != 0 && universe.steps % options.verbosity == 0)
179                 DisplayStatistics();
180         //printf("BEFORE DRAW\n");
181         if (!options.pedantic_graphics)
182                 return;
183         
184         Barrier_Wait(&position_barrier);
185
186
187         Barrier_Enter(&graphics_barrier);       
188 }
189
190 /**
191  * @function AfterDraw
192  * @purpose Called in graphics thread after the draw loop
193  *      When --pedantic-graphics is supplied, will signal computation thread that drawing is finished
194  *              So that positions can be safely altered
195  *      Otherwise does nothing
196  */
197 void AfterDraw()
198 {
199         universe.steps += 1;
200         if (!options.pedantic_graphics)
201                 return;
202         Barrier_Leave(&graphics_barrier);
203         
204 }
205
206 #ifdef PERSISTENT_THREADS
207
208 /**
209  * @function Worker_Thread
210  * @purpose Thread - Calculate stuff
211  */
212 void * Worker_Thread(void * arg)
213 {
214         System * s = (System*)(arg);
215
216
217         pthread_t * nested_workers = NULL;
218         System_ForcePair * system_pairs = NULL;
219         System * nested_position = NULL;
220         
221         Barrier nested_barrier; 
222         Barrier_Init(&nested_barrier);
223
224         printf("options.nested_threads == %d\n", (int)(options.nested_threads));
225
226         if (options.nested_threads != 1)        
227         {
228
229                 system_pairs = (System_ForcePair*)(calloc(options.nested_threads, sizeof(System_ForcePair)));
230                 if (system_pairs == NULL) // Handle tedious error cases
231                 {
232                         perror("Couldn't allocate array of system pairs");
233                         QuitProgram(true);
234                         pthread_exit(NULL);
235                 }
236                 nested_workers = Allocate_Threads(options.nested_threads);
237                 nested_position = 
238
239                 for (unsigned i = 0; i < options.nested_threads; ++i)
240                 {
241                         system_pairs[i].A = s;
242                         system_pairs[i].B = nested_sub_system+i;
243                 }
244         }
245
246         while (!ExitCondition())
247         {
248                 
249                 if (options.nested_threads == 1)
250                 {
251                         Barrier_Enter(&force_barrier);
252                         System_Forces(s, &universe);
253                         Barrier_Leave(&force_barrier);
254                 }
255                 else
256                 {
257                         for (unsigned i = 0; i < options.nested_threads; ++i)
258                         {
259                                 if (pthread_create(nested_workers+i, &attr, Force_Thread, (void*)(system_pairs+i)) != 0)
260                                 {
261                                         perror("In worker thread, couldn't create nested worker thread (force)");
262                                         QuitProgram(true);
263                                         free(nested_workers);
264                                         pthread_exit(NULL);                                     
265                                 }       
266                         }
267                 }
268                 //printf("Computed forces for %p\n", arg);
269                 Barrier_Wait(&force_barrier);
270                 //printf("Computed ALL forces\n");
271
272                 if (options.draw_graphics && options.pedantic_graphics)
273                         Barrier_Wait(&graphics_barrier);
274
275                 Barrier_Enter(&position_barrier);
276                 
277                 System_Positions(s);
278
279                 Barrier_Leave(&position_barrier);
280                 //printf("Computed positions for %p\n", arg);
281                 Barrier_Wait(&position_barrier);
282                 //printf("Computed ALL positions\n");
283         }
284         printf("Worker thread exits\n");
285         QuitProgram(false);
286         pthread_exit(NULL);
287 }
288
289 #endif //PERSISTENT_THREADS
290
291 /**
292  * @function Force_Thread
293  * @purpose Thread - Calculates the forces on objects in a System
294  * @param s - Can be cast to the System*
295  */
296 void * Force_Thread(void * s)
297 {
298         System_ForcePair * pair = (System_ForcePair*)s;
299         Barrier_Enter(&force_barrier);
300
301         System_Forces(pair->A, pair->B); //Simple wrapper
302
303         Barrier_Leave(&force_barrier);
304
305         return NULL;
306 }
307
308
309 /**
310  * @function Position_Thread
311  * @purpose Thread - Calculates the positions of objects in a System 
312  * @param s - Can be cast to the System*
313  */
314 void * Position_Thread(void * s)
315 {
316         Barrier_Enter(&position_barrier);
317         System_Positions((System*)s); // Simple wrapper
318         Barrier_Leave(&position_barrier);
319         return NULL;
320 }       
321
322 /**
323  * @function QuitProgram
324  * @purpose This function can either be called by the main thread in order to signal other threads
325  *              that it wants to exit. The main thread then calls pthread_join before exiting.
326  *      It can also be called by a child thread to request the main thread to exit.
327  *      It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
328  */
329 void QuitProgram(bool error)
330 {
331         if (runstate == QUIT || runstate == QUIT_ERROR)
332                 return; //Don't do anything if already quitting
333         pthread_mutex_lock(&mutex_runstate); // aquire mutex
334         if (error) // set the runstate
335                 runstate = QUIT_ERROR;
336         else
337                 runstate = QUIT;
338         pthread_mutex_unlock(&mutex_runstate); //release mutex
339 }
340
341 /**
342  * @function Thread_Cleanup
343  * @purpose Will be called in the main thread when exit() is called
344  *      Automatically tells all other threads to quit (if they haven't already been told) 
345  *      Then waits for them to finish.
346  *      Also frees memory associated with the worker threads.   
347  */
348 void Thread_Cleanup(void)
349 {
350         if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
351                 QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
352         pthread_join(compute_thread, NULL);
353         free(worker_thread);
354         free(sub_system);
355 }
356
357
358 /**
359  * @function Simulation_Run
360  * @purpose Initialise and start the simulation. Will be called in the main thread.
361  *      Replaces the single-threaded macro that does nothing, and sets up the compute thread
362  * @param argc - Number of arguments - Passed to Graphics_Run if needed
363  * @param argv - Argument strings - Passed to Graphics_Run if needed
364  */
365 void Simulation_Run(int argc, char ** argv)
366 {
367         atexit(Thread_Cleanup);
368
369         Barrier_Init(&force_barrier);
370         Barrier_Init(&position_barrier);
371         Barrier_Init(&graphics_barrier);
372
373         
374         if (options.draw_graphics)
375         {
376                 // The graphics are enabled, so create a thread to do computations
377                 // Graphics are done in the main loop
378                 //printf("Graphics are enabled\n");
379                 #ifdef PERSISTENT_THREADS
380                 Compute_Thread((void*)(&universe));
381                 #else
382                 if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
383                 {
384                         perror("Error creating compute thread");
385                         exit(EXIT_FAILURE);
386                 }
387                 #endif //PERSISTENT_THREADS
388                 //printf("Run compute thread\n");
389                 Graphics_Run(argc, argv);
390         }
391         else
392         
393                 Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
394 }
395
396
397
398 void Barrier_Init(Barrier * b)
399 {
400         b->threads_busy = 0;
401 }       
402
403 void Barrier_Enter(Barrier * b)
404 {
405         pthread_mutex_lock(&(b->mutex));
406         b->threads_busy += 1;
407         pthread_mutex_unlock(&(b->mutex));
408 }
409
410 void Barrier_Leave(Barrier * b)
411 {
412         pthread_mutex_lock(&(b->mutex));
413         b->threads_busy -= 1;
414         if (b->threads_busy == 0)
415         {
416                 pthread_cond_signal(&(b->threads_done_cv));
417         }
418         pthread_mutex_unlock(&(b->mutex));
419 }
420
421 void Barrier_Wait(Barrier * b)
422 {
423         pthread_mutex_lock(&(b->mutex));
424         while (b->threads_busy > 0)
425                 pthread_cond_wait(&(b->threads_done_cv), &(b->mutex));
426         pthread_mutex_unlock(&(b->mutex));
427 }
428
429 /**
430  * @function Split_System
431  * @purpose Helper to divide one system into an array of systems
432  *      Each sub system will have N = (s->N / n) bodies in it
433  * @param s - The original system (typically &universe)
434  * @param n - The number of sub systems in the array
435  *
436  * WARNING: It is the caller's responsibility to free() the returned array
437  */
438 System * Split_System(System * s, unsigned n)
439 {
440         System * result = (System*)(calloc(n, sizeof(System)));
441         if (result == NULL)
442         {
443                 perror("Couldn't create array of sub systems");
444                 QuitProgram(true);
445                 pthread_exit(NULL);
446         }
447
448         unsigned n_per_system = (s->N) / n;
449         unsigned remainder = (s->N) % n;
450
451         for (unsigned i = 0; i < n; ++i)        
452         {
453                 result[i].N = n_per_system;
454                 if (i == n-1)
455                         result[i].N += remainder;
456                 result[i].body = (s->body) + (n_per_system * i);
457                 result[i].steps = 0;
458         }
459         return result;
460 }
461
462 /**
463  * @function Allocate_Threads
464  * @purpose Helper function to allocate an array of pthread_t objects
465  *      Handles all the pointless, er, "important" error checking that should be done
466  * @param n - Number of threads in the array
467  * 
468  * WARNING: Remember to free() the array!!!
469  */
470 pthread_t * Allocate_Threads(unsigned n)
471 {
472         pthread_t * result = (pthread_t*)(calloc(n, sizeof(pthread_t)));
473         if (result == NULL)
474         {
475                 perror("Unable to allocate memory for threads");
476                 QuitProgram(true);
477                 pthread_exit(NULL);
478         }
479         return result;
480 }

UCC git Repository :: git.ucc.asn.au