Parallel Programming - Stuff happened
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
1 /**
2  * @file nbody.c
3  * @purpose Implentation of multi-threaded N-Body simulator using pthreads
4  * @author Sam Moore (20503628) - 2012
5  */
6
7 #include "nbody.h" // Declarations
8 #include "../single-thread/nbody.c" // Include all functions from the single threaded version
9
10 #include "graphics.h" // For declaration of Graphics_Run only
11
12 // --- Variable declarations --- //
13
14 pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
15         
16 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
17 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
18
19 pthread_mutex_t mutex_runstate; // Mutex around the runstate
20
21 #ifdef PERSISTENT_THREADS
22 Barrier force_barrier; // I laughed at this variable name. A bit sad really.
23 Barrier position_barrier;
24 #else
25 Barrier worker_barrier;
26 #endif //PERSISTENT_THREADS
27
28 Barrier graphics_barrier;
29
30
31 /**
32  * @function Compute_Thread
33  * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
34  *      Spawns worker threads to divide up computation.
35  * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
36  */
37 void * Compute_Thread(void * arg)
38 {
39
40         System * s = (System*)(arg); //cast argument to a System*
41
42
43         // If no number of threads provided, use the default value, unless someone changed that to a stupid value
44         if (options.num_threads <= 0)
45                 options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
46
47         // Do a sanity check; there is no point spawning more threads than bodies.
48         if (options.num_threads > s->N)
49         {
50                 fprintf(stderr, 
51                         "Warning: Using %u threads instead of %u specified, because there are only %u bodies to simulate!\n",
52                         s->N, options.num_threads, s->N);       
53                 options.num_threads = s->N;
54         }
55         
56         pthread_attr_t attr; //thread attribute for the workers. 
57         pthread_attr_init(&attr);
58         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
59         
60         if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
61         {
62                 worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
63                 if (worker_thread == NULL)
64                 {
65                         perror("Couldn't allocate array of worker threads");
66                         QuitProgram(true);
67                         pthread_exit(NULL);
68                 }
69                 sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
70                 if (sub_system == NULL)
71                 {
72                         perror("Couldn't allocate array of systems for worker threads to use");
73                         QuitProgram(true);
74                         pthread_exit(NULL);
75                 }
76
77                 // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
78                 unsigned bodies_per_system = (s->N) / options.num_threads;
79                 unsigned remainder = (s->N) % options.num_threads;
80                 for (unsigned i = 0; i < options.num_threads; ++i)
81                 {
82                         sub_system[i].body = (s->body)+(i*bodies_per_system);
83                         sub_system[i].N = bodies_per_system;
84                         sub_system[i].steps = 0;
85                         if (i == options.num_threads - 1)
86                                 sub_system[i].N += remainder; // The last thread gets the remainder
87
88                         #ifdef PERSISTENT_THREADS
89                         if (pthread_create(worker_thread+i, & attr, Worker_Thread, (void*)(sub_system+i)) != 0)
90                         {
91                                 perror("In compute thread, couldn't create worker thread");
92                                 QuitProgram(true);
93                                 pthread_exit(NULL);
94                         }
95                         #endif //PERSISTENT_THREADS
96         
97                 }
98
99                 
100         }
101         #ifdef PERSISTENT_THREADS
102         else
103         {
104                 while (!ExitCondition())
105                 {
106                         if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
107                         {
108                                 DisplayStatistics();
109                         }
110
111                         // Just do everything in this thread 
112                         System_Forces(s, s);
113                         System_Positions(s);
114                         continue;
115                 }
116                 QuitProgram(false);
117                 pthread_exit(NULL);
118         }
119
120         #else
121         // The main computation loop
122         while (true)
123         {
124                 //Check whether the program should quit due to steps being computed, or a timeout
125                 if (ExitCondition())
126                 {
127                         QuitProgram(false);
128                         pthread_exit(NULL);
129                 }
130
131                 if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
132                 {
133                         DisplayStatistics();
134                 }
135
136
137                 if (options.num_threads <= 1)
138                 {
139                         // Just do everything in this thread 
140                         System_Forces(s, s);
141                         System_Positions(s);
142                         continue;
143                 }
144
145                 
146
147                 //Compute forces
148                 for (unsigned i = 0; i < options.num_threads; ++i)
149                 {
150                         if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
151                         {
152                                 perror("In compute thread, couldn't create worker thread (force)");
153                                 QuitProgram(true);
154                                 pthread_exit(NULL);
155                         }       
156                         
157                 }
158
159
160                 Barrier_Wait(&worker_barrier);
161
162                 //All the forces are now computed
163
164                 if (options.draw_graphics && options.pedantic_graphics)
165                 {
166                         Barrier_Wait(&graphics_barrier);
167                         Barrier_Enter(&graphics_barrier);
168                 }
169
170                 //Compute positions
171                 for (unsigned i = 0; i < options.num_threads; ++i)
172                 {
173                         if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
174                         {
175                                 perror("In compute thread, couldn't create worker thread (position)");
176                                 QuitProgram(true);
177                                 pthread_exit(NULL);
178                         }       
179                 }
180
181                 //Wait for positions to be computed
182                 Barrier_Wait(&worker_barrier);
183
184
185                 //Update number of steps computed
186                 universe.steps += 1;
187
188                 if (options.draw_graphics && options.pedantic_graphics)
189                         Barrier_Leave(&graphics_barrier);
190
191                 
192
193
194         }
195         #endif //PERSISTENT_THREADS
196         
197         return NULL;
198 }
199
200 /**
201  * @function BeforeDraw
202  * @purpose Called in graphics thread before the draw loop
203  *      When --pedantic-graphics enabled, will wait for position computations to finish before drawing
204  *      Otherwise does nothing
205  */
206 void BeforeDraw()
207 {
208         if (options.verbosity != 0 && universe.steps % options.verbosity == 0)
209                 DisplayStatistics();
210         //printf("BEFORE DRAW\n");
211         if (!options.pedantic_graphics)
212                 return;
213         #ifdef PERSISTENT_THREADS
214         Barrier_Wait(&position_barrier);
215         
216         #else
217         Barrier_Wait(&graphics_barrier);
218         #endif //PERSISTENT_THREADS
219
220         Barrier_Enter(&graphics_barrier);       
221 }
222
223 /**
224  * @function AfterDraw
225  * @purpose Called in graphics thread after the draw loop
226  *      When --pedantic-graphics is supplied, will signal computation thread that drawing is finished
227  *              So that positions can be safely altered
228  *      Otherwise does nothing
229  */
230 void AfterDraw()
231 {
232         universe.steps += 1;
233         if (!options.pedantic_graphics)
234                 return;
235         Barrier_Leave(&graphics_barrier);
236         
237 }
238
239 #ifdef PERSISTENT_THREADS
240
241 /**
242  * @function Worker_Thread
243  * @purpose Thread - Calculate stuff
244  */
245 void * Worker_Thread(void * arg)
246 {
247         System * s = (System*)(arg);
248         while (!ExitCondition())
249         {
250                 
251                 Barrier_Enter(&force_barrier);
252
253                 System_Forces(s, &universe);
254                 
255                 Barrier_Leave(&force_barrier);
256
257                 //printf("Computed forces for %p\n", arg);
258                 Barrier_Wait(&force_barrier);
259                 //printf("Computed ALL forces\n");
260
261                 if (options.draw_graphics && options.pedantic_graphics)
262                         Barrier_Wait(&graphics_barrier);
263
264                 Barrier_Enter(&position_barrier);
265                 
266                 System_Positions(s);
267
268                 Barrier_Leave(&position_barrier);
269                 //printf("Computed positions for %p\n", arg);
270                 Barrier_Wait(&position_barrier);
271                 //printf("Computed ALL positions\n");
272         }
273         QuitProgram(false);
274         pthread_exit(NULL);
275 }
276
277
278 #else
279
280 /**
281  * @function Force_Thread
282  * @purpose Thread - Calculates the forces on objects in a System
283  * @param s - Can be cast to the System*
284  */
285 void * Force_Thread(void * s)
286 {
287         Barrier_Enter(&worker_barrier);
288
289         System_Forces((System*)s, &universe); //Simple wrapper
290
291         Barrier_Leave(&worker_barrier);
292
293         return NULL;
294 }
295
296 /**
297  * @function Position_Thread
298  * @purpose Thread - Calculates the positions of objects in a System 
299  * @param s - Can be cast to the System*
300  */
301 void * Position_Thread(void * s)
302 {
303         Barrier_Enter(&worker_barrier);
304         System_Positions((System*)s); // Simple wrapper
305         Barrier_Leave(&worker_barrier);
306         return NULL;
307 }       
308
309 #endif //PERSISTENT_THREADS
310
311 /**
312  * @function QuitProgram
313  * @purpose This function can either be called by the main thread in order to signal other threads
314  *              that it wants to exit. The main thread then calls pthread_join before exiting.
315  *      It can also be called by a child thread to request the main thread to exit.
316  *      It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
317  */
318 void QuitProgram(bool error)
319 {
320         if (runstate == QUIT || runstate == QUIT_ERROR)
321                 return; //Don't do anything if already quitting
322         pthread_mutex_lock(&mutex_runstate); // aquire mutex
323         if (error) // set the runstate
324                 runstate = QUIT_ERROR;
325         else
326                 runstate = QUIT;
327         pthread_mutex_unlock(&mutex_runstate); //release mutex
328 }
329
330 /**
331  * @function Thread_Cleanup
332  * @purpose Will be called in the main thread when exit() is called
333  *      Automatically tells all other threads to quit (if they haven't already been told) 
334  *      Then waits for them to finish.
335  *      Also frees memory associated with the worker threads.   
336  */
337 void Thread_Cleanup(void)
338 {
339         if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
340                 QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
341         pthread_join(compute_thread, NULL);
342         free(worker_thread);
343         free(sub_system);
344 }
345
346
347 /**
348  * @function Simulation_Run
349  * @purpose Initialise and start the simulation. Will be called in the main thread.
350  *      Replaces the single-threaded macro that does nothing, and sets up the compute thread
351  * @param argc - Number of arguments - Passed to Graphics_Run if needed
352  * @param argv - Argument strings - Passed to Graphics_Run if needed
353  */
354 void Simulation_Run(int argc, char ** argv)
355 {
356         atexit(Thread_Cleanup);
357
358         #ifdef PERSISTENT_THREADS
359         Barrier_Init(&force_barrier);
360         Barrier_Init(&position_barrier);
361         #else
362         Barrier_Init(&worker_barrier);
363         #endif //PERSISTENT_THREADS
364         Barrier_Init(&graphics_barrier);
365
366         
367         if (options.draw_graphics)
368         {
369                 // The graphics are enabled, so create a thread to do computations
370                 // Graphics are done in the main loop
371                 //printf("Graphics are enabled\n");
372                 #ifdef PERSISTENT_THREADS
373                 Compute_Thread((void*)(&universe));
374                 #else
375                 if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
376                 {
377                         perror("Error creating compute thread");
378                         exit(EXIT_FAILURE);
379                 }
380                 #endif //PERSISTENT_THREADS
381                 //printf("Run compute thread\n");
382                 Graphics_Run(argc, argv);
383         }
384         else
385         
386                 Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
387 }
388
389
390
391 void Barrier_Init(Barrier * b)
392 {
393         b->threads_busy = 0;
394 }       
395
396 void Barrier_Enter(Barrier * b)
397 {
398         pthread_mutex_lock(&(b->mutex));
399         b->threads_busy += 1;
400         pthread_mutex_unlock(&(b->mutex));
401 }
402
403 void Barrier_Leave(Barrier * b)
404 {
405         pthread_mutex_lock(&(b->mutex));
406         b->threads_busy -= 1;
407         if (b->threads_busy == 0)
408         {
409                 pthread_cond_signal(&(b->threads_done_cv));
410         }
411         pthread_mutex_unlock(&(b->mutex));
412 }
413
414 void Barrier_Wait(Barrier * b)
415 {
416         pthread_mutex_lock(&(b->mutex));
417         while (b->threads_busy > 0)
418                 pthread_cond_wait(&(b->threads_done_cv), &(b->mutex));
419         pthread_mutex_unlock(&(b->mutex));
420 }

UCC git Repository :: git.ucc.asn.au