Parallel Programming - Trivial
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
1 /**
2  * @file nbody.c
3  * @purpose Implentation of multi-threaded N-Body simulator using pthreads
4  * @author Sam Moore (20503628) - 2012
5  */
6
7 #include "nbody.h" // Declarations
8 #include "../single-thread/nbody.c" // Include all functions from the single threaded version
9
10 #include "graphics.h" // For declaration of Graphics_Run only
11
12 // --- Variable declarations --- //
13
14 pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
15         
16 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
17 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
18 pthread_mutex_t mutex_workers; // Mutex used for the barrier between Force and Position updates
19 pthread_cond_t workers_done_cv; // Conditional used for the barrier between Force and Position updates
20 unsigned workers_busy; // Number of workers currently doing something
21
22 pthread_mutex_t mutex_runstate; // Mutex around the runstate
23
24
25
26 /**
27  * @function Compute_Thread
28  * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
29  *      Spawns worker threads to divide up computation.
30  * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
31  */
32 void * Compute_Thread(void * arg)
33 {
34
35         System * s = (System*)(arg); //cast argument to a System*
36
37
38         // If no number of threads provided, use the default value, unless someone changed that to a stupid value
39         if (options.num_threads <= 0)
40                 options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
41
42         // Do a sanity check; there is no point spawning more threads than bodies.
43         if (options.num_threads > s->N)
44         {
45                 fprintf(stderr, 
46                         "Warning: Using %u threads instead of %u specified, because there are only %u bodies to simulate!\n",
47                         s->N, options.num_threads, s->N);       
48                 options.num_threads = s->N;
49         }
50         
51         if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
52         {
53                 worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
54                 if (worker_thread == NULL)
55                 {
56                         perror("Couldn't allocate array of worker threads");
57                         QuitProgram(true);
58                         pthread_exit(NULL);
59                 }
60                 sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
61                 if (sub_system == NULL)
62                 {
63                         perror("Couldn't allocate array of systems for worker threads to use");
64                         QuitProgram(true);
65                         pthread_exit(NULL);
66                 }
67
68                 // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
69                 unsigned bodies_per_system = (s->N) / options.num_threads;
70                 unsigned remainder = (s->N) % options.num_threads;
71                 for (unsigned i = 0; i < options.num_threads; ++i)
72                 {
73                         sub_system[i].body = (s->body)+(i*bodies_per_system);
74                         sub_system[i].N = bodies_per_system;
75                         sub_system[i].steps = 0;
76                         if (i == options.num_threads - 1)
77                                 sub_system[i].N += remainder; // The last thread gets the remainder
78
79                 }
80         }
81
82
83         pthread_attr_t attr; //thread attribute for the workers. 
84         pthread_attr_init(&attr);
85         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
86
87         // The main computation loop
88         while (true)
89         {
90                 //Check whether the program should quit due to steps being computed, or a timeout
91                 if (ExitCondition())
92                 {
93                         QuitProgram(false);
94                         pthread_exit(NULL);
95                 }
96
97                 if (options.draw_graphics == false && options.verbosity != 0 
98                         && universe.steps % options.verbosity == 1)
99                 {
100                         DisplayStatistics();
101                 }
102
103
104                 if (options.num_threads <= 1)
105                 {
106                         // Just do everything in this thread 
107                         System_Forces(s, s);
108                         System_Positions(s);
109                         continue;
110                 }
111
112                 
113
114                 
115                 workers_busy = options.num_threads; //All threads working
116
117                 //Compute forces
118                 for (unsigned i = 0; i < options.num_threads; ++i)
119                 {
120                         if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
121                         {
122                                 perror("In compute thread, couldn't create worker thread (force)");
123                                 QuitProgram(true);
124                                 pthread_exit(NULL);
125                         }       
126                 }
127
128
129
130                 //Barrier - Wait for forces to be computed
131                 pthread_mutex_lock(&mutex_workers);
132                 while (workers_busy > 0)
133                         pthread_cond_wait(&workers_done_cv, &mutex_workers);
134                 pthread_mutex_unlock(&mutex_workers);
135
136                 //All the forces are now computed
137                 
138                 workers_busy = options.num_threads; //All threads working
139
140                 //Compute positions
141                 for (unsigned i = 0; i < options.num_threads; ++i)
142                 {
143                         if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
144                         {
145                                 perror("In compute thread, couldn't create worker thread (position)");
146                                 QuitProgram(true);
147                                 pthread_exit(NULL);
148                         }       
149                 }
150
151                 //Wait for positions to be computed
152                 pthread_mutex_lock(&mutex_workers);
153                 while (workers_busy > 0)
154                         pthread_cond_wait(&workers_done_cv, &mutex_workers);
155                 pthread_mutex_unlock(&mutex_workers);
156
157                 //Update number of steps computed
158                 universe.steps += 1;
159
160
161
162         }
163 }
164
165 /**
166  * @function Force_Thread
167  * @purpose Thread - Calculates the forces on objects in a System
168  * @param s - Can be cast to the System*
169  */
170 void * Force_Thread(void * s)
171 {
172         
173         System_Forces((System*)s, &universe); //Simple wrapper
174
175         pthread_mutex_lock(&mutex_workers);
176         workers_busy -= 1;      // Count this thread as done
177         if (workers_busy == 0)
178         {
179                 pthread_cond_signal(&workers_done_cv); // All threads done; wake up the compute_thread
180         }
181         pthread_mutex_unlock(&mutex_workers);
182         return NULL;
183 }
184
185 /**
186  * @function Position_Thread
187  * @purpose Thread - Calculates the positions of objects in a System 
188  * @param s - Can be cast to the System*
189  */
190 void * Position_Thread(void * s)
191 {
192         
193         System_Positions((System*)s); // Simple wrapper
194
195         pthread_mutex_lock(&mutex_workers);
196         workers_busy -= 1; // Count this thread as done
197         if (workers_busy == 0)
198         {
199                 pthread_cond_signal(&workers_done_cv); //All threads done; wake up the compute_thread
200         }
201         pthread_mutex_unlock(&mutex_workers);
202         return NULL;
203 }       
204
205 /**
206  * @function QuitProgram
207  * @purpose This function can either be called by the main thread in order to signal other threads
208  *              that it wants to exit. The main thread then calls pthread_join before exiting.
209  *      It can also be called by a child thread to request the main thread to exit.
210  *      It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
211  */
212 void QuitProgram(bool error)
213 {
214         if (runstate == QUIT || runstate == QUIT_ERROR)
215                 return; //Don't do anything if already quitting
216         pthread_mutex_lock(&mutex_runstate); // aquire mutex
217         if (error) // set the runstate
218                 runstate = QUIT_ERROR;
219         else
220                 runstate = QUIT;
221         pthread_mutex_unlock(&mutex_runstate); //release mutex
222 }
223
224 /**
225  * @function Thread_Cleanup
226  * @purpose Will be called in the main thread when exit() is called
227  *      Automatically tells all other threads to quit (if they haven't already been told) 
228  *      Then waits for them to finish.
229  *      Also frees memory associated with the worker threads.   
230  */
231 void Thread_Cleanup(void)
232 {
233         if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
234                 QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
235         pthread_join(compute_thread, NULL);
236         free(worker_thread);
237         free(sub_system);
238 }
239
240
241 /**
242  * @function Simulation_Run
243  * @purpose Initialise and start the simulation. Will be called in the main thread.
244  *      Replaces the single-threaded macro that does nothing, and sets up the compute thread
245  * @param argc - Number of arguments - Passed to Graphics_Run if needed
246  * @param argv - Argument strings - Passed to Graphics_Run if needed
247  */
248 void Simulation_Run(int argc, char ** argv)
249 {
250         atexit(Thread_Cleanup);
251
252         if (options.draw_graphics)
253         {
254                 // The graphics are enabled, so create a thread to do computations
255                 // Graphics are done in the main loop
256                 if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
257                 {
258                         perror("Error creating compute thread");
259                         exit(EXIT_FAILURE);
260                 }
261                 Graphics_Run(argc, argv);
262         }
263         else
264                 Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
265 }

UCC git Repository :: git.ucc.asn.au