Parallel Programming - Start OpenMP Version
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
index cd0656a..42b97cf 100644 (file)
 /**
  * @file nbody.c
- * @author Sam Moore (20503628) 2012
- * @purpose N-Body simulator - Definition of simulation functions; single threaded version
+ * @purpose Implentation of multi-threaded N-Body simulator using pthreads
+ * @author Sam Moore (20503628) - 2012
  */
 
-#include <stdlib.h>
-#include <stdio.h>
-#include <math.h>
-#include <time.h>
-#include <string.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-
-#include "nbody.h"
-
-//Stuff to do with worker threads
-// The worker threads (if used) are spawned by the computation thread, not the main thread
-// The main thread (which handles initialisation, graphics, user input, cleanup and exit) does not need to know about these.
-#if NUM_THREADS > 1
-       
-       pthread_t worker_thread[NUM_THREADS]; //Worker threads responsible for Force and Position updates
-       System sub_system[NUM_THREADS]; //Array of System's used to divide up the main "universe" System for worker threads
-       pthread_mutex_t mutex_workers;
-       pthread_cond_t workers_done_cv;
-       unsigned worker_count;
-#endif 
+#include "nbody.h" // Declarations
+#include "../single-thread/nbody.c" // Include all functions from the single threaded version
 
-/**
- * Prints the body on screen
- */
-void Body_Print(Body * a)
-{
-       printf("Body %p M=%f X=%f Y=%f Z=%f Fx=%f Fy=%f Fz=%f Vx=%f Vy=%f Vz=%f\n", 
-           (void*)a, a->mass, a->x[0], a->x[1], a->x[2], a->F[0], a->F[1], a->F[2], a->v[0], a->v[1], a->v[2]);
-}
+#include "graphics.h" // For declaration of Graphics_Run only
 
-/**
- * Computing forces
- */
-void Body_Force(Body * a, System * s) 
-{
-       double distance;
-       double con;
-       double gd;
+// --- Variable declarations --- //
 
-       for (unsigned i = 0; i < DIMENSIONS; ++i)
-               a->F[i] = 0;
+pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
+       
+pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
+System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
+pthread_mutex_t mutex_workers; // Mutex used for the barrier between Force and Position updates
+pthread_cond_t workers_done_cv; // Conditional used for the barrier between Force and Position updates
+unsigned workers_busy; // Number of workers currently doing something
 
-       for (unsigned index = 0; index < s->N; ++index)
-       {
-               Body * b = s->body+index;
-               if (b == a)
-                       continue;
-               
-               distance = 0.0;
-               for (unsigned i = 0; i < DIMENSIONS; ++i)
-                       distance += square(b->x[i] - a->x[i]);
-               distance = sqrt(distance);
-               con = G * a->mass * b->mass / square(distance);
-               gd = con / distance;    
-               for (unsigned i = 0; i < DIMENSIONS; ++i)
-                       a->F[i] += gd * (b->x[i] - a->x[i]);
-       }
-}
+pthread_mutex_t mutex_runstate; // Mutex around the runstate
 
-/**
- * Compute velocities
- */
-void Body_Velocity(Body * a) 
-{
-       for (unsigned i = 0; i < DIMENSIONS; ++i)
-               a->v[i] += a->F[i] / a->mass * DELTA_T;
-}
 
-/**
- * Compute positions
- */
-void Body_Position(Body * a) 
-{
-       for (unsigned i = 0; i < DIMENSIONS; ++i)
-               a->x[i] += a->v[i] * DELTA_T;
-}
 
 /**
- * Compute forces on each object in System s1 due to all bodies in System s2
+ * @function Compute_Thread
+ * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
+ *     Spawns worker threads to divide up computation.
+ * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
  */
-void System_Forces(System * s1, System * s2) 
-{
-       //printf("Compute forces for %p - %d bodies...\n", (void*)s1, s1->N);
-       for (unsigned i = 0; i < s1->N; ++i)
-       {
-               Body_Force(s1->body+i, s2);
-               Body_Velocity(s1->body+i);
-       }
-       //printf("Compute positions for %p - Done!\n", (void*)s1);
-}
-
-/**
- * Compute positions of all objects in System
- */
-void System_Positions(System * s)
+void * Compute_Thread(void * arg)
 {
-       //printf("Compute positions for %p - %d bodies...\n", (void*)s, s->N);
-       for (unsigned i = 0; i < s->N; ++i)
-               Body_Position(s->body+i);
-       //printf("Compute positions for %p - Done!\n", (void*)s);
-}
-
-
 
+       System * s = (System*)(arg); //cast argument to a System*
 
-/*
- * This function reads an input file. You can change it if you choose a 
- * different file format
- */
-#define LINE_SIZE BUFSIZ
-void System_Init(System * s, char *fileName) 
-{
-       char line[LINE_SIZE];
-       char * token;
-       FILE * file;
-
-       file = fopen(fileName, "rt");
-       s->N = atoi(fgets(line, LINE_SIZE, file));
-       s->body = (Body*) calloc((size_t)s->N, sizeof(Body));
 
-       //printf("----------------------Initial field-------------------------------\n");
+       // If no number of threads provided, use the default value, unless someone changed that to a stupid value
+       if (options.num_threads <= 0)
+               options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
 
-       for (unsigned i = 0; i < s->N; ++i)
+       // Do a sanity check; there is no point spawning more threads than bodies.
+       if (options.num_threads > s->N)
        {
-               if (fgets(line, LINE_SIZE, file) != NULL)
-               {
-                       Body * a = s->body+i;
-                       token = strtok(line, ",; ");
-                       a->mass = atof(token);
-                       
-                       for (unsigned j = 0; j < DIMENSIONS; ++j)
-                       {
-                               token = strtok(NULL, ",; ");
-                               a->x[j] = atof(token);
-                       }
-                       for (unsigned j = 0; j < DIMENSIONS; ++j)
-                       {
-                               token = strtok(NULL, ",; ");
-                               a->v[j] = atof(token);
-                       }
-                       //Body_Print(a);
-               }
+               fprintf(stderr, 
+                       "Warning: Using %u threads instead of %u specified, because there are only %u bodies to simulate!\n",
+                       s->N, options.num_threads, s->N);       
+               options.num_threads = s->N;
        }
-
-       //printf("--------------------------------------------------------------\n");
-       
-       fclose(file);
-}
-
-/**
- * Cleans up the universe by freeing all memory
- */
-void Universe_Cleanup()
-{
-       free(universe.body);
        
-}
-
-/**
- * Thread - Continuously computes steps for a system of bodies
- */
-void * Compute_Thread(void * arg)
-{
+       if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
+       {
+               worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
+               if (worker_thread == NULL)
+               {
+                       perror("Couldn't allocate array of worker threads");
+                       QuitProgram(true);
+                       pthread_exit(NULL);
+               }
+               sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
+               if (sub_system == NULL)
+               {
+                       perror("Couldn't allocate array of systems for worker threads to use");
+                       QuitProgram(true);
+                       pthread_exit(NULL);
+               }
 
-       System * s = (System*)(arg); //cast argument to a System*
-       // First set up the "sub_system" array
-       #if NUM_THREADS > 1
-               unsigned bodies_per_system = (s->N) / NUM_THREADS;
-               unsigned remainder = (s->N) % NUM_THREADS;
-               for (unsigned i = 0; i < NUM_THREADS; ++i)
+               // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
+               unsigned bodies_per_system = (s->N) / options.num_threads;
+               unsigned remainder = (s->N) % options.num_threads;
+               for (unsigned i = 0; i < options.num_threads; ++i)
                {
                        sub_system[i].body = (s->body)+(i*bodies_per_system);
                        sub_system[i].N = bodies_per_system;
-                       if (i == NUM_THREADS - 1)
-                               sub_system[i].N += remainder;
+                       sub_system[i].steps = 0;
+                       if (i == options.num_threads - 1)
+                               sub_system[i].N += remainder; // The last thread gets the remainder
+
                }
+       }
+
 
-               pthread_attr_t attr; //thread attribute for the workers
-               pthread_attr_init(&attr);
-               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+       pthread_attr_t attr; //thread attribute for the workers. 
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
 
-       #endif
+       // The main computation loop
        while (true)
        {
                
-               if (runstate != RUN) pthread_exit(NULL); //Check whether the main thread wants to exit
-       
+               if (runstate != RUN) pthread_exit(NULL); //Check whether the thread needs to exit
+
 
        
-               #if NUM_THREADS <= 1
-                       //No worker threads; do everything in this thread
+               //Check whether the program should quit due to steps being computed, or a timeout
+               if (ExitCondition())
+               {
+                       QuitProgram(false);
+                       continue; // The check at the start of the next loop will stop the thread
+               }
+
+               if (options.draw_graphics == false && options.verbosity != 0 
+                       && universe.steps % options.verbosity == 1)
+               {
+                       DisplayStatistics();
+               }
+
+
+               if (options.num_threads <= 1)
+               {
+                       // Just do everything in this thread 
                        System_Forces(s, s);
                        System_Positions(s);
-               #else
+                       continue;
+               }
 
                
 
                
-               worker_count = NUM_THREADS; //All threads working
+               workers_busy = options.num_threads; //All threads working
 
                //Compute forces
-               for (unsigned i = 0; i < NUM_THREADS; ++i)
+               for (unsigned i = 0; i < options.num_threads; ++i)
                {
                        if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
                        {
@@ -219,16 +132,18 @@ void * Compute_Thread(void * arg)
 
 
 
-               //Wait for forces to be computed
+               //Barrier - Wait for forces to be computed
                pthread_mutex_lock(&mutex_workers);
-               while (worker_count > 0)
+               while (workers_busy > 0)
                        pthread_cond_wait(&workers_done_cv, &mutex_workers);
                pthread_mutex_unlock(&mutex_workers);
+
+               //All the forces are now computed
                
-               worker_count = NUM_THREADS; //All threads working
+               workers_busy = options.num_threads; //All threads working
 
                //Compute positions
-               for (unsigned i = 0; i < NUM_THREADS; ++i)
+               for (unsigned i = 0; i < options.num_threads; ++i)
                {
                        if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
                        {
@@ -240,58 +155,116 @@ void * Compute_Thread(void * arg)
 
                //Wait for positions to be computed
                pthread_mutex_lock(&mutex_workers);
-               while (worker_count > 0)
+               while (workers_busy > 0)
                        pthread_cond_wait(&workers_done_cv, &mutex_workers);
-               pthread_mutex_unlock(&mutex_workers);           
+               pthread_mutex_unlock(&mutex_workers);
+
+               //Update number of steps computed
+               universe.steps += 1;
+
 
-               
-               #endif
 
        }
 }
 
+/**
+ * @function Force_Thread
+ * @purpose Thread - Calculates the forces on objects in a System
+ * @param s - Can be cast to the System*
+ */
 void * Force_Thread(void * s)
 {
        
-       System_Forces((System*)s, &universe);
+       System_Forces((System*)s, &universe); //Simple wrapper
+
        pthread_mutex_lock(&mutex_workers);
-       worker_count -= 1;
-       if (worker_count == 0)
+       workers_busy -= 1;      // Count this thread as done
+       if (workers_busy == 0)
        {
-               pthread_cond_signal(&workers_done_cv);
+               pthread_cond_signal(&workers_done_cv); // All threads done; wake up the compute_thread
        }
        pthread_mutex_unlock(&mutex_workers);
        return NULL;
 }
 
+/**
+ * @function Position_Thread
+ * @purpose Thread - Calculates the positions of objects in a System 
+ * @param s - Can be cast to the System*
+ */
 void * Position_Thread(void * s)
 {
        
-       System_Positions((System*)s);
+       System_Positions((System*)s); // Simple wrapper
+
        pthread_mutex_lock(&mutex_workers);
-       worker_count -= 1;
-       if (worker_count == 0)
+       workers_busy -= 1; // Count this thread as done
+       if (workers_busy == 0)
        {
-               pthread_cond_signal(&workers_done_cv);
+               pthread_cond_signal(&workers_done_cv); //All threads done; wake up the compute_thread
        }
        pthread_mutex_unlock(&mutex_workers);
        return NULL;
 }      
 
 /**
- * Child threads can call this to signal the main thread to quit the program
- * The main thread also uses this to tell child threads that the program is quitting
- *     Note that the function doesn't call exit(), that is done by the main thread
+ * @function QuitProgram
+ * @purpose This function can either be called by the main thread in order to signal other threads
+ *             that it wants to exit. The main thread then calls pthread_join before exiting.
+ *     It can also be called by a child thread to request the main thread to exit.
+ *     It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
  */
 void QuitProgram(bool error)
 {
-       
-       pthread_mutex_lock(&mutex_runstate);
-       if (error)
+       if (runstate == QUIT || runstate == QUIT_ERROR)
+               return; //Don't do anything if already quitting
+       pthread_mutex_lock(&mutex_runstate); // aquire mutex
+       if (error) // set the runstate
                runstate = QUIT_ERROR;
        else
                runstate = QUIT;
-       pthread_mutex_unlock(&mutex_runstate);
+       pthread_mutex_unlock(&mutex_runstate); //release mutex
 }
 
+/**
+ * @function Thread_Cleanup
+ * @purpose Will be called in the main thread when exit() is called
+ *     Automatically tells all other threads to quit (if they haven't already been told) 
+ *     Then waits for them to finish.
+ *     Also frees memory associated with the worker threads.   
+ */
+void Thread_Cleanup(void)
+{
+       if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
+               QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
+       pthread_join(compute_thread, NULL);
+       free(worker_thread);
+       free(sub_system);
+}
+
+
+/**
+ * @function Simulation_Run
+ * @purpose Initialise and start the simulation. Will be called in the main thread.
+ *     Replaces the single-threaded macro that does nothing, and sets up the compute thread
+ * @param argc - Number of arguments - Passed to Graphics_Run if needed
+ * @param argv - Argument strings - Passed to Graphics_Run if needed
+ */
+void Simulation_Run(int argc, char ** argv)
+{
+       atexit(Thread_Cleanup);
 
+       if (options.draw_graphics)
+       {
+               // The graphics are enabled, so create a thread to do computations
+               // Graphics are done in the main loop
+               if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
+               {
+                       perror("Error creating compute thread");
+                       exit(EXIT_FAILURE);
+               }
+               Graphics_Run(argc, argv);
+       }
+       else
+               Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
+}

UCC git Repository :: git.ucc.asn.au