Parallel Programming - Finished OpenMP
[matches/honours.git] / course / semester2 / pprog / assignment1 / mthread / nbody.c
index 42b97cf..a551d7d 100644 (file)
@@ -6,38 +6,59 @@
 
 #include "nbody.h" // Declarations
 #include "../single-thread/nbody.c" // Include all functions from the single threaded version
-
+#include <assert.h>
 #include "graphics.h" // For declaration of Graphics_Run only
+#include "barrier.c"
 
 // --- Variable declarations --- //
 
-pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads
+pthread_t compute_thread; // The thread responsible for computations; it spawns worker threads (* terms and conditions apply)
        
 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
-pthread_mutex_t mutex_workers; // Mutex used for the barrier between Force and Position updates
-pthread_cond_t workers_done_cv; // Conditional used for the barrier between Force and Position updates
-unsigned workers_busy; // Number of workers currently doing something
+System * nested_sub_system = NULL; //Array of Systems for division of "universe" for the nested worker threads
 
 pthread_mutex_t mutex_runstate; // Mutex around the runstate
 
 
 
+
+Barrier force_barrier; // I laughed at this variable name. A bit sad really.
+Barrier position_barrier;
+
+
+Barrier graphics_barrier;
+
+pthread_mutex_t mutex_threads_running;
+int threads_running = 0;
+
 /**
  * @function Compute_Thread
  * @purpose Thread - Continuously computes steps for a system of bodies. Seperate from graphics functions.
  *     Spawns worker threads to divide up computation.
- * @param arg - Can be cast to the System for which computations are performed (ie: &universe)
+ * @param arg - Can be cast to the System for which computations are performed 
+ *
+ *     NOTE:
+ *     This will always be (void*)(&universe) where universe is the global System variable that has every Body in it.
+ *     But I don't like global variables. And since the argument had to be passed, I thought I might as well use it.
+ *     That way, when I change "universe" to "solar_system", I only have to change the argument where this is called, not all through it.
+ *     Find and replace? Who uses that!?
  */
+
+
 void * Compute_Thread(void * arg)
 {
 
        System * s = (System*)(arg); //cast argument to a System*
 
 
-       // If no number of threads provided, use the default value, unless someone changed that to a stupid value
+
+       // If no number of threads provided, use the default value, unless someone (me) changed that to a stupid value
        if (options.num_threads <= 0)
-               options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
+               options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1; // Fear the ternary operator!
+
+       if (options.nested_threads <= 0)
+               options.nested_threads = 1;
 
        // Do a sanity check; there is no point spawning more threads than bodies.
        if (options.num_threads > s->N)
@@ -47,105 +68,119 @@ void * Compute_Thread(void * arg)
                        s->N, options.num_threads, s->N);       
                options.num_threads = s->N;
        }
+
+       // Initialise the barriers ("shields up!")
+       Barrier_Init(&force_barrier, options.num_threads);
+       Barrier_Init(&position_barrier, options.num_threads);
+       Barrier_Init(&graphics_barrier, 1);
+
        
-       if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
+       
+       if (options.num_threads > 1) // If we require additional worker threads...
        {
-               worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
-               if (worker_thread == NULL)
-               {
-                       perror("Couldn't allocate array of worker threads");
-                       QuitProgram(true);
-                       pthread_exit(NULL);
-               }
-               sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
-               if (sub_system == NULL)
+               // Allocate worker threads and sub systems
+               
+               sub_system = Split_System(&universe, options.num_threads);
+
+               if (options.nested_threads > 1)
+                       nested_sub_system = Split_System(&universe, options.nested_threads);
+
+               #ifdef PERSISTENT_THREADS // Code for the smart way of doing it (spawn threads once, keep them running)
+               worker_thread = Allocate_Threads(options.num_threads-1);
+               // Spawn a bunch of worker threads, and let them all do their thing.
+               // Note the "-1". Because this thread can do work too!
+               for (unsigned i = 0; i < options.num_threads-1; ++i)
                {
-                       perror("Couldn't allocate array of systems for worker threads to use");
-                       QuitProgram(true);
-                       pthread_exit(NULL);
+                       if (pthread_create(worker_thread+i, NULL, Worker_Thread, (void*)(sub_system+i)) != 0)
+                       {
+                               perror("In compute thread, couldn't create worker thread");
+                               QuitProgram(true);
+                               pthread_exit(NULL);
+                       }
                }
-
-               // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
-               unsigned bodies_per_system = (s->N) / options.num_threads;
-               unsigned remainder = (s->N) % options.num_threads;
-               for (unsigned i = 0; i < options.num_threads; ++i)
+               Worker_Thread((void*)(sub_system+options.num_threads-1)); // This thread becomes a worker thread
+               #else
+               worker_thread = Allocate_Threads(options.num_threads);
+               #endif //PERSISTENT_THREADS
+               
+       }
+       #ifdef PERSISTENT_THREADS
+       else // We only require one worker thread...
+       {
+               // So just do all computations in this thread
+               while (!ExitCondition())
                {
-                       sub_system[i].body = (s->body)+(i*bodies_per_system);
-                       sub_system[i].N = bodies_per_system;
-                       sub_system[i].steps = 0;
-                       if (i == options.num_threads - 1)
-                               sub_system[i].N += remainder; // The last thread gets the remainder
+                       
+                       System_Forces(s, s);
+                       // If required, wait for graphics to finish drawing
+                       if (options.draw_graphics && options.pedantic_graphics)
+                               Barrier_Wait(&graphics_barrier);
+                       System_Positions(s);
+                       StepFunction(s);
 
+                       if (options.draw_graphics && options.pedantic_graphics)
+                               Barrier_ForceExit(&position_barrier); //Make the graphics continue
                }
+               QuitProgram(false);
+               pthread_exit(NULL);
        }
 
-
-       pthread_attr_t attr; //thread attribute for the workers. 
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
-
-       // The main computation loop
-       while (true)
-       {
-               
-               if (runstate != RUN) pthread_exit(NULL); //Check whether the thread needs to exit
+       #else // Code for the stupid way of doing it (respawn threads each step)
+               // (ie: The way I immediately implemented and didn't realise was stupid until someone told me)
 
 
        
-               //Check whether the program should quit due to steps being computed, or a timeout
-               if (ExitCondition())
-               {
-                       QuitProgram(false);
-                       continue; // The check at the start of the next loop will stop the thread
-               }
-
-               if (options.draw_graphics == false && options.verbosity != 0 
-                       && universe.steps % options.verbosity == 1)
-               {
-                       DisplayStatistics();
-               }
 
+       // Run until we can't run anymore
+       while (!ExitCondition())
+       {
+               
 
-               if (options.num_threads <= 1)
+               if (options.num_threads <= 1) // If there is only 1 worker thread...
                {
                        // Just do everything in this thread 
                        System_Forces(s, s);
+                       // If required, wait for graphics to finish drawing
+                       if (options.draw_graphics && options.pedantic_graphics)
+                               Barrier_Join(&graphics_barrier);
+
                        System_Positions(s);
+                       StepFunction(s);
+
+                       if (options.draw_graphics && options.pedantic_graphics)
+                               Barrier_Join(&position_barrier); //Make the graphics continue
                        continue;
                }
 
                
 
-               
-               workers_busy = options.num_threads; //All threads working
-
-               //Compute forces
+               //Compute forces by spawning threads, each thread gets a sub system
                for (unsigned i = 0; i < options.num_threads; ++i)
                {
-                       if (pthread_create(worker_thread+i, &attr, Force_Thread, (void*)(sub_system+i)) != 0)
+                       if (pthread_create(worker_thread+i, NULL, Force_Thread, (void*)(sub_system+i)) != 0)
                        {
                                perror("In compute thread, couldn't create worker thread (force)");
                                QuitProgram(true);
                                pthread_exit(NULL);
                        }       
+                               
                }
 
+               for (unsigned i = 0; i < options.num_threads; ++i)
+                       pthread_join(worker_thread[i], NULL);
 
+               
+               // If required, wait for graphics to finish drawing
+               if (options.draw_graphics && options.pedantic_graphics)
+                       Barrier_Wait(&graphics_barrier);
 
-               //Barrier - Wait for forces to be computed
-               pthread_mutex_lock(&mutex_workers);
-               while (workers_busy > 0)
-                       pthread_cond_wait(&workers_done_cv, &mutex_workers);
-               pthread_mutex_unlock(&mutex_workers);
 
-               //All the forces are now computed
-               
-               workers_busy = options.num_threads; //All threads working
+               Barrier_Enter(&position_barrier);
 
-               //Compute positions
+               //Compute positions by spawning a bunch of threads to do it
                for (unsigned i = 0; i < options.num_threads; ++i)
                {
-                       if (pthread_create(worker_thread+i, &attr, Position_Thread, (void*)(sub_system+i)) != 0)
+                       if (pthread_create(worker_thread+i, NULL, Position_Thread, (void*)(sub_system+i)) != 0)
                        {
                                perror("In compute thread, couldn't create worker thread (position)");
                                QuitProgram(true);
@@ -153,20 +188,130 @@ void * Compute_Thread(void * arg)
                        }       
                }
 
-               //Wait for positions to be computed
-               pthread_mutex_lock(&mutex_workers);
-               while (workers_busy > 0)
-                       pthread_cond_wait(&workers_done_cv, &mutex_workers);
-               pthread_mutex_unlock(&mutex_workers);
+               
+               for (unsigned i = 0; i < options.num_threads; ++i)
+                       pthread_join(worker_thread[i], NULL);
+
+
+               StepFunction(s); // Execute single threaded stuff
+               
+
+       }
+       QuitProgram(false);
+       #endif //PERSISTENT_THREADS
+       return NULL;
+}
+
+/**
+ * @function BeforeDraw
+ * @purpose Called in graphics thread before the draw loop
+ *     When --pedantic-graphics enabled, will wait for position computations to finish before drawing
+ *     Otherwise does nothing
+ *
+ *     This originally seemed like a good place to put the code now in StepFunction(), since only one thread runs this
+ *     But then I realised that the graphics might be disabled, 
+ *             and there was no point having a thread that only existed to call that code.
+ *
+ *     So I changed it to the horrible solution that I currently have.
+ */
+void BeforeDraw()
+{
+       
+       //printf("BEFORE DRAW\n");
+       if (!options.pedantic_graphics)
+               return;
+       
+       //printf("Graphics thread waits on position barrier\n");
+       Barrier_Wait(&position_barrier);        
+       //printf("\tGraphics thread wakes up\n");
+       Barrier_Enter(&graphics_barrier);
+}
 
-               //Update number of steps computed
-               universe.steps += 1;
+/**
+ * @function AfterDraw
+ * @purpose Called in graphics thread after the draw loop
+ *     When --pedantic-graphics is supplied, will signal computation threads that drawing is finished
+ *             So that positions can be safely altered
+ *     Otherwise does nothing
+ */
+void AfterDraw()
+{
+       //universe.steps += 1;
+       if (!options.pedantic_graphics)
+               return;
+       Barrier_Join(&graphics_barrier);
+       
+}
 
+#ifdef PERSISTENT_THREADS
 
+/**
+ * @function Worker_Thread
+ * @purpose Thread - A self contained worker thread to compute a particular sub system of bodies
+ *
+ * This is the "smart" way to do it, because threads are only created once, and compute both force and position.
+ * The greatest difficulty with pthreads is getting a *single* thread from the team to execute certain code
+ *     (ie: The stuff in StepFunction()).
+ * With the "continuously respawning threads of stupidity" approach, 
+ * because there is one "master" thread (not necessarilly the main thread... don't get confused now)
+ *     to keep respawning the workers, the single threaded code can just be executed in the master thread.
+ *
+ * With this approach, I have created a hacky solution so that the *last* thread to leave the position barrier gets to call StepFunction.
+ *     
+ */
+void * Worker_Thread(void * arg)
+{
+       System * s = (System*)(arg); // This is mainly to save typing the RHS a lot of times
 
+       // Each thread runs until the whole program is supposed to end
+       while (!ExitCondition()) 
+       {
+               
+       
+               System_Forces(s, &universe); // Each thread computes the forces for its share of bodies
+
+               // Do not confuse with "Barrier_Wait".
+               // Barrier_Wait does not affect the barrier; it just waits for it
+               // Barrier_Join actively updates the state of the barrier, and wakes up sleeping threads if required.
+
+               Barrier_Join(&force_barrier); // All threads must reach here before moving on.
+               if (ExitCondition()) return NULL;
+
+               
+               //fprintf(stderr,"Thread %p - force barrier finished\n", arg);
+               //printf("Computed ALL forces\n");
+
+       
+               // If required, wait for the graphics to finish drawing stuff
+               if (options.draw_graphics && options.pedantic_graphics)
+               {
+                       //printf("Worker %p waits on graphics barrier\n", arg);
+                       Barrier_Wait(&graphics_barrier);
+                       //printf("\tWorker %p wakes up after graphics barrier\n", arg);
+                       if (ExitCondition()) return NULL;
+               }
+               
+
+               
+               Barrier_Enter(&position_barrier);
+               System_Positions(s); // Each thread updates the positions for its share of bodies
+
+
+               // Barrier_JoinCall behaves in the same way as Barrier_Join, except the *last* thread 
+               //      (ie: the one that wakes up the others) also calls the function with arguments given.
+               Barrier_JoinCall(&position_barrier, StepFunction, (void*)(&universe));
+               if (ExitCondition()) return NULL;
+               //Barrier_Join(&position_barrier);
+
+               // All threads have computed positions, and *one* thread calls StepFunction()
+               
        }
+       QuitProgram(false); // Set the run state of the program
+       return NULL;
 }
 
+#endif //PERSISTENT_THREADS
+
 /**
  * @function Force_Thread
  * @purpose Thread - Calculates the forces on objects in a System
@@ -174,19 +319,16 @@ void * Compute_Thread(void * arg)
  */
 void * Force_Thread(void * s)
 {
+       //System_ForcePair * pair = (System_ForcePair*)s;
+
        
-       System_Forces((System*)s, &universe); //Simple wrapper
+       System_Forces(s, &universe); //Simple wrapper
+       //printf("Force_Thread waits\n");
 
-       pthread_mutex_lock(&mutex_workers);
-       workers_busy -= 1;      // Count this thread as done
-       if (workers_busy == 0)
-       {
-               pthread_cond_signal(&workers_done_cv); // All threads done; wake up the compute_thread
-       }
-       pthread_mutex_unlock(&mutex_workers);
        return NULL;
 }
 
+
 /**
  * @function Position_Thread
  * @purpose Thread - Calculates the positions of objects in a System 
@@ -194,77 +336,170 @@ void * Force_Thread(void * s)
  */
 void * Position_Thread(void * s)
 {
-       
+
        System_Positions((System*)s); // Simple wrapper
+       Barrier_Join(&position_barrier); // This needed so that graphics will wait
 
-       pthread_mutex_lock(&mutex_workers);
-       workers_busy -= 1; // Count this thread as done
-       if (workers_busy == 0)
-       {
-               pthread_cond_signal(&workers_done_cv); //All threads done; wake up the compute_thread
-       }
-       pthread_mutex_unlock(&mutex_workers);
        return NULL;
 }      
 
 /**
  * @function QuitProgram
- * @purpose This function can either be called by the main thread in order to signal other threads
- *             that it wants to exit. The main thread then calls pthread_join before exiting.
- *     It can also be called by a child thread to request the main thread to exit.
- *     It is only used this way if there is an unrecovarable error (ie: Can't allocate memory in a child thread)
+ * @purpose This function can be called in any thread to signal all threads to exit
+ *     Repeated calls to this function have no effect
+ *
+ *     All threads periodically call ExitCondition(), which will return true if the program should exit.
+ *             One (not the only way) to return true is if this function has been called.
+ *     Threads will call this function if they detect ExitCondition() is true. Only the first call has any effect.
  */
-void QuitProgram(bool error)
+inline void QuitProgram(bool error)
 {
-       if (runstate == QUIT || runstate == QUIT_ERROR)
-               return; //Don't do anything if already quitting
+       //If already quitting, don't do anything
+       if (runstate == QUIT || runstate == QUIT_ERROR) 
+               return; 
+
+       
+
+       // set the runstate (checked in ExitCondition())
+
        pthread_mutex_lock(&mutex_runstate); // aquire mutex
-       if (error) // set the runstate
-               runstate = QUIT_ERROR;
+       if (error) 
+               runstate = QUIT_ERROR; // Program is exiting due to an error
        else
-               runstate = QUIT;
+               runstate = QUIT; // Program is exiting naturally
        pthread_mutex_unlock(&mutex_runstate); //release mutex
+
+       
 }
 
 /**
  * @function Thread_Cleanup
- * @purpose Will be called in the main thread when exit() is called
- *     Automatically tells all other threads to quit (if they haven't already been told) 
+ * @purpose Will be called in the *main* thread when exit() is called
+ *     Ensures working threads will exit, and waits for them to finish.
  *     Then waits for them to finish.
  *     Also frees memory associated with the worker threads.   
  */
 void Thread_Cleanup(void)
 {
-       if (runstate == RUN) // If this is true, as far as child threads are concerned, the simulation is still running
-               QuitProgram(false); // So call QuitProgram which will set runstate, and cause child threads to exit
-       pthread_join(compute_thread, NULL);
-       free(worker_thread);
-       free(sub_system);
+       
+
+       // Threads recheck the exit condition whenever they leave a barrier.
+       // These calls will stop any threads waiting forever in a barrier for threads that exited before getting to the barrier.
+       Barrier_ForceExit(&force_barrier); 
+       Barrier_ForceExit(&position_barrier);
+
+
+       if (options.draw_graphics) // If the graphics are enabled...
+       {
+               // Then there is a computation thread, since graphics are done in the main thread
+               pthread_join(compute_thread, NULL); 
+       }
+
+       #ifdef PERSISTENT_THREADS
+               for (unsigned i = 0; i < options.num_threads-1; ++i)
+               {
+                       pthread_join(worker_thread[i], NULL);
+               }
+       #else 
+               // All other worker threads (if they were spawned) are terminated in Compute_Thread
+       #endif //PERSISTENT_THREADS
+
+       // Scary memory management here.
+       if (worker_thread != NULL)
+               free(worker_thread);
+       if (sub_system != NULL)
+               free(sub_system);
+       worker_thread = NULL;
+       sub_system = NULL;
+
 }
 
 
 /**
  * @function Simulation_Run
  * @purpose Initialise and start the simulation. Will be called in the main thread.
- *     Replaces the single-threaded macro that does nothing, and sets up the compute thread
+ *     Replaces the single-threaded macro that does nothing, and sets up the graphics and computation threads
  * @param argc - Number of arguments - Passed to Graphics_Run if needed
  * @param argv - Argument strings - Passed to Graphics_Run if needed
  */
 void Simulation_Run(int argc, char ** argv)
 {
        atexit(Thread_Cleanup);
-
-       if (options.draw_graphics)
+       if (options.draw_graphics) // The graphics are enabled
        {
-               // The graphics are enabled, so create a thread to do computations
-               // Graphics are done in the main loop
+               // I have chosen to do graphics in the main thread in this case.
+               // A *single* seperate thread is spawned here to do computations.
+               // This computation thread will spawn any additional worker threads required.
                if (pthread_create(&compute_thread, NULL, Compute_Thread, (void*)&universe) != 0)
                {
                        perror("Error creating compute thread");
                        exit(EXIT_FAILURE);
                }
+
+               // This is run in the main thread
+               // It is effectively the graphics initialisation, followed by the glut loop
                Graphics_Run(argc, argv);
+
+               // The main thread reaches here after leaving the glut loop when ExitCondition() returns true.
+
+               QuitProgram(false);
+
+               exit(EXIT_SUCCESS); // This is the main thread; use exit()
+               
+       }
+       else //The graphics are disabled
+       {
+               // If graphics are disabled, there is no point spawning an extra thread.
+               // In this case, the *main* thread starts computations.
+               // Note that it will probably spawn additional worker threads (unless options.num_threads <= 1)
+               Compute_Thread((void*)(&universe));
+               QuitProgram(false);
+               exit(EXIT_SUCCESS);
        }
-       else
-               Compute_Thread((void*)(&universe)); // Graphics are disabled, so do computations in the main thread
 }
+
+
+
+
+
+/**
+ * @function Allocate_Threads
+ * @purpose Helper function to allocate an array of pthread_t objects
+ *     Handles all the pointless, er, "important" error checking that should be done
+ * @param n - Number of threads in the array
+ * 
+ * WARNING: Remember to free() the array!!!
+ */
+pthread_t * Allocate_Threads(unsigned n)
+{
+       pthread_t * result = (pthread_t*)(calloc(n, sizeof(pthread_t)));
+       if (result == NULL)
+       {
+               perror("Unable to allocate memory for threads");
+               QuitProgram(true);
+               pthread_exit(NULL);
+       }
+       return result;
+}
+
+/**
+ * @function StepFunction
+ * @purpose Helper to perform stuff in a single thread every step, after position computations are done
+ *     The reason this has void* all over the place is so that I can pass the function pointer (to horrible dragons and fiendish demons).
+ * @param arg - Can be cast to System* for which steps are to be updated
+ *     Will always be (void*)(&universe). But I have been brainwashed into the "global variables are baaaaad" philosophy.
+ * @returns arg
+ */
+void * StepFunction(void * arg)
+{
+       //fprintf(stderr, "StepFunction called\n");
+       System * s = (System*)(arg);
+       s->steps += 1; //Increment number of steps computed
+
+       if (options.verbosity != 0 && s->steps % options.verbosity == 0)
+               DisplayStatistics();
+
+
+       return arg;
+}
+

UCC git Repository :: git.ucc.asn.au