Parallel Programming - Start OpenMP Version
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
1 /**
2  * @file nbody.c
3  * @purpose Implentation of multi-threaded N-Body simulator using pthreads
4  * @author Sam Moore (20503628) - 2012
5  */
6
7 #include "nbody.h" // Declarations
8 #include "../single-thread/nbody.c" // Include all functions from the single threaded version
9
10 #include "graphics.h" // For declaration of Graphics_Run only
11
12 // --- Variable declarations --- //
13
14 pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
15         
16 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
17 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
18 pthread_mutex_t mutex_workers; // Mutex used for the barrier between Force and Position updates
19 pthread_cond_t workers_done_cv; // Conditional used for the barrier between Force and Position updates
20 unsigned workers_busy; // Number of workers currently doing something
21
22 pthread_mutex_t mutex_runstate; // Mutex around the runstate
23
24
25
26 /**
27  * @function Compute_Thread
28  * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
29  *      Spawns worker threads to divide up computation.
30  * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
31  */
32 void * Compute_Thread(void * arg)
33 {
34
35         System * s = (System*)(arg); //cast argument to a System*
36
37
38         // If no number of threads provided, use the default value, unless someone changed that to a stupid value
39         if (options.num_threads <= 0)
40                 options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
41
42         // Do a sanity check; there is no point spawning more threads than bodies.
43         if (options.num_threads > s->N)
44         {
45                 fprintf(stderr, 
46                         "Warning: Using %u threads instead of %u specified, because there are only %u bodies to simulate!\n",
47                         s->N, options.num_threads, s->N);       
48                 options.num_threads = s->N;
49         }
50         
51         if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
52         {
53                 worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
54                 if (worker_thread == NULL)
55                 {
56                         perror("Couldn't allocate array of worker threads");
57                         QuitProgram(true);
58                         pthread_exit(NULL);
59                 }
60                 sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
61                 if (sub_system == NULL)
62                 {
63                         perror("Couldn't allocate array of systems for worker threads to use");
64                         QuitProgram(true);
65                         pthread_exit(NULL);
66                 }
67
68                 // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
69                 unsigned bodies_per_system = (s->N) / options.num_threads;
70                 unsigned remainder = (s->N) % options.num_threads;
71                 for (unsigned i = 0; i < options.num_threads; ++i)
72                 {
73                         sub_system[i].body = (s->body)+(i*bodies_per_system);
74                         sub_system[i].N = bodies_per_system;
75                         sub_system[i].steps = 0;
76                         if (i == options.num_threads - 1)
77                                 sub_system[i].N += remainder; // The last thread gets the remainder
78
79                 }
80         }
81
82
83         pthread_attr_t attr; //thread attribute for the workers. 
84         pthread_attr_init(&attr);
85         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
86
87         // The main computation loop
88         while (true)
89         {
90                 
91                 if (runstate != RUN) pthread_exit(NULL); //Check whether the thread needs to exit
92
93
94         
95                 //Check whether the program should quit due to steps being computed, or a timeout
96                 if (ExitCondition())
97                 {
98                         QuitProgram(false);
99                         continue; // The check at the start of the next loop will stop the thread
100                 }
101
102                 if (options.draw_graphics == false && options.verbosity != 0 
103                         && universe.steps % options.verbosity == 1)
104                 {
105                         DisplayStatistics();
106                 }
107
108
109                 if (options.num_threads <= 1)
110                 {
111                         // Just do everything in this thread 
112                         System_Forces(s, s);
113                         System_Positions(s);
114                         continue;
115                 }
116
117                 
118
119                 
120                 workers_busy = options.num_threads; //All threads working
121
122                 //Compute forces
123                 for (unsigned i = 0; i < options.num_threads; ++i)
124                 {
125                         if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
126                         {
127                                 perror("In compute thread, couldn't create worker thread (force)");
128                                 QuitProgram(true);
129                                 pthread_exit(NULL);
130                         }       
131                 }
132
133
134
135                 //Barrier - Wait for forces to be computed
136                 pthread_mutex_lock(&mutex_workers);
137                 while (workers_busy > 0)
138                         pthread_cond_wait(&workers_done_cv, &mutex_workers);
139                 pthread_mutex_unlock(&mutex_workers);
140
141                 //All the forces are now computed
142                 
143                 workers_busy = options.num_threads; //All threads working
144
145                 //Compute positions
146                 for (unsigned i = 0; i < options.num_threads; ++i)
147                 {
148                         if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
149                         {
150                                 perror("In compute thread, couldn't create worker thread (position)");
151                                 QuitProgram(true);
152                                 pthread_exit(NULL);
153                         }       
154                 }
155
156                 //Wait for positions to be computed
157                 pthread_mutex_lock(&mutex_workers);
158                 while (workers_busy > 0)
159                         pthread_cond_wait(&workers_done_cv, &mutex_workers);
160                 pthread_mutex_unlock(&mutex_workers);
161
162                 //Update number of steps computed
163                 universe.steps += 1;
164
165
166
167         }
168 }
169
170 /**
171  * @function Force_Thread
172  * @purpose Thread - Calculates the forces on objects in a System
173  * @param s - Can be cast to the System*
174  */
175 void * Force_Thread(void * s)
176 {
177         
178         System_Forces((System*)s, &universe); //Simple wrapper
179
180         pthread_mutex_lock(&mutex_workers);
181         workers_busy -= 1;      // Count this thread as done
182         if (workers_busy == 0)
183         {
184                 pthread_cond_signal(&workers_done_cv); // All threads done; wake up the compute_thread
185         }
186         pthread_mutex_unlock(&mutex_workers);
187         return NULL;
188 }
189
190 /**
191  * @function Position_Thread
192  * @purpose Thread - Calculates the positions of objects in a System 
193  * @param s - Can be cast to the System*
194  */
195 void * Position_Thread(void * s)
196 {
197         
198         System_Positions((System*)s); // Simple wrapper
199
200         pthread_mutex_lock(&mutex_workers);
201         workers_busy -= 1; // Count this thread as done
202         if (workers_busy == 0)
203         {
204                 pthread_cond_signal(&workers_done_cv); //All threads done; wake up the compute_thread
205         }
206         pthread_mutex_unlock(&mutex_workers);
207         return NULL;
208 }       
209
210 /**
211  * @function QuitProgram
212  * @purpose This function can either be called by the main thread in order to signal other threads
213  *              that it wants to exit. The main thread then calls pthread_join before exiting.
214  *      It can also be called by a child thread to request the main thread to exit.
215  *      It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
216  */
217 void QuitProgram(bool error)
218 {
219         if (runstate == QUIT || runstate == QUIT_ERROR)
220                 return; //Don't do anything if already quitting
221         pthread_mutex_lock(&mutex_runstate); // aquire mutex
222         if (error) // set the runstate
223                 runstate = QUIT_ERROR;
224         else
225                 runstate = QUIT;
226         pthread_mutex_unlock(&mutex_runstate); //release mutex
227 }
228
229 /**
230  * @function Thread_Cleanup
231  * @purpose Will be called in the main thread when exit() is called
232  *      Automatically tells all other threads to quit (if they haven't already been told) 
233  *      Then waits for them to finish.
234  *      Also frees memory associated with the worker threads.   
235  */
236 void Thread_Cleanup(void)
237 {
238         if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
239                 QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
240         pthread_join(compute_thread, NULL);
241         free(worker_thread);
242         free(sub_system);
243 }
244
245
246 /**
247  * @function Simulation_Run
248  * @purpose Initialise and start the simulation. Will be called in the main thread.
249  *      Replaces the single-threaded macro that does nothing, and sets up the compute thread
250  * @param argc - Number of arguments - Passed to Graphics_Run if needed
251  * @param argv - Argument strings - Passed to Graphics_Run if needed
252  */
253 void Simulation_Run(int argc, char ** argv)
254 {
255         atexit(Thread_Cleanup);
256
257         if (options.draw_graphics)
258         {
259                 // The graphics are enabled, so create a thread to do computations
260                 // Graphics are done in the main loop
261                 if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
262                 {
263                         perror("Error creating compute thread");
264                         exit(EXIT_FAILURE);
265                 }
266                 Graphics_Run(argc, argv);
267         }
268         else
269                 Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
270 }

UCC git Repository :: git.ucc.asn.au