From df06a43b2727b939ee572a1ddaa39e707aff030a Mon Sep 17 00:00:00 2001 From: Sam Moore Date: Sun, 16 Sep 2012 22:31:27 +0800 Subject: [PATCH] Parallel Programming - Commit before I break everything About to radically alter structure of pthread program to enable nested threads Well actually I already started. But oh well. Santa Claus (Just kidding, this is Sam again) --- .../pprog/assignment1/mthread/nbody.c | 210 +++++++++++------- .../pprog/assignment1/mthread/nbody.h | 26 ++- .../assignment1/single-thread/graphics.c | 1 + .../pprog/assignment1/single-thread/nbody.c | 4 +- 4 files changed, 162 insertions(+), 79 deletions(-) diff --git a/course/semester2/pprog/assignment1/mthread/nbody.c b/course/semester2/pprog/assignment1/mthread/nbody.c index 301cd8b1..de9b8aaa 100644 --- a/course/semester2/pprog/assignment1/mthread/nbody.c +++ b/course/semester2/pprog/assignment1/mthread/nbody.c @@ -15,15 +15,17 @@ pthread_t compute_thread; // The thread responsible for computations; it spawns pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads +System * nested_sub_system = NULL; //Array of Systems for division of "universe" for the nested worker threads pthread_mutex_t mutex_runstate; // Mutex around the runstate -#ifdef PERSISTENT_THREADS + + +pthread_attr_t attr; //thread attribute for the workers. + Barrier force_barrier; // I laughed at this variable name. A bit sad really. Barrier position_barrier; -#else -Barrier worker_barrier; -#endif //PERSISTENT_THREADS + Barrier graphics_barrier; @@ -44,6 +46,9 @@ void * Compute_Thread(void * arg) if (options.num_threads <= 0) options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1; + if (options.nested_threads <= 0) + options.nested_threads = 1; + // Do a sanity check; there is no point spawning more threads than bodies. if (options.num_threads > s->N) { @@ -53,48 +58,28 @@ void * Compute_Thread(void * arg) options.num_threads = s->N; } - pthread_attr_t attr; //thread attribute for the workers. pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused. if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1 { - worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t))); - if (worker_thread == NULL) - { - perror("Couldn't allocate array of worker threads"); - QuitProgram(true); - pthread_exit(NULL); - } - sub_system = (System*)(calloc(options.num_threads, sizeof(System))); - if (sub_system == NULL) - { - perror("Couldn't allocate array of systems for worker threads to use"); - QuitProgram(true); - pthread_exit(NULL); - } + worker_thread = Allocate_Threads(options.num_threads); + sub_system = Split_System(&universe, options.num_threads); - // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread - unsigned bodies_per_system = (s->N) / options.num_threads; - unsigned remainder = (s->N) % options.num_threads; + if (options.nested_threads > 1) + nested_sub_system = Split_System(&universe, options.nested_threads); + + #ifdef PERSISTENT_THREADS for (unsigned i = 0; i < options.num_threads; ++i) { - sub_system[i].body = (s->body)+(i*bodies_per_system); - sub_system[i].N = bodies_per_system; - sub_system[i].steps = 0; - if (i == options.num_threads - 1) - sub_system[i].N += remainder; // The last thread gets the remainder - - #ifdef PERSISTENT_THREADS if (pthread_create(worker_thread+i, & attr, Worker_Thread, (void*)(sub_system+i)) != 0) { perror("In compute thread, couldn't create worker thread"); QuitProgram(true); pthread_exit(NULL); } - #endif //PERSISTENT_THREADS - } + #endif //PERSISTENT_THREADS } @@ -104,14 +89,11 @@ void * Compute_Thread(void * arg) while (!ExitCondition()) { if (options.verbosity != 0 && universe.steps % options.verbosity == 1) - { DisplayStatistics(); - } // Just do everything in this thread System_Forces(s, s); System_Positions(s); - continue; } QuitProgram(false); pthread_exit(NULL); @@ -119,20 +101,10 @@ void * Compute_Thread(void * arg) #else // The main computation loop - while (true) + while (!ExitCondition()) { - //Check whether the program should quit due to steps being computed, or a timeout - if (ExitCondition()) - { - QuitProgram(false); - pthread_exit(NULL); - } - if (options.verbosity != 0 && universe.steps % options.verbosity == 1) - { DisplayStatistics(); - } - if (options.num_threads <= 1) { @@ -157,14 +129,13 @@ void * Compute_Thread(void * arg) } - Barrier_Wait(&worker_barrier); + Barrier_Wait(&force_barrier); //All the forces are now computed if (options.draw_graphics && options.pedantic_graphics) { Barrier_Wait(&graphics_barrier); - Barrier_Enter(&graphics_barrier); } //Compute positions @@ -179,21 +150,20 @@ void * Compute_Thread(void * arg) } //Wait for positions to be computed - Barrier_Wait(&worker_barrier); + Barrier_Wait(&position_barrier); //Update number of steps computed universe.steps += 1; - if (options.draw_graphics && options.pedantic_graphics) - Barrier_Leave(&graphics_barrier); + } + QuitProgram(false); #endif //PERSISTENT_THREADS - return NULL; } @@ -210,12 +180,9 @@ void BeforeDraw() //printf("BEFORE DRAW\n"); if (!options.pedantic_graphics) return; - #ifdef PERSISTENT_THREADS - Barrier_Wait(&position_barrier); - #else - Barrier_Wait(&graphics_barrier); - #endif //PERSISTENT_THREADS + Barrier_Wait(&position_barrier); + Barrier_Enter(&graphics_barrier); } @@ -245,15 +212,59 @@ void AfterDraw() void * Worker_Thread(void * arg) { System * s = (System*)(arg); - while (!ExitCondition()) + + + pthread_t * nested_workers = NULL; + System_ForcePair * system_pairs = NULL; + System * nested_position = NULL; + + Barrier nested_barrier; + Barrier_Init(&nested_barrier); + + printf("options.nested_threads == %d\n", (int)(options.nested_threads)); + + if (options.nested_threads != 1) { - - Barrier_Enter(&force_barrier); - System_Forces(s, &universe); - - Barrier_Leave(&force_barrier); + system_pairs = (System_ForcePair*)(calloc(options.nested_threads, sizeof(System_ForcePair))); + if (system_pairs == NULL) // Handle tedious error cases + { + perror("Couldn't allocate array of system pairs"); + QuitProgram(true); + pthread_exit(NULL); + } + nested_workers = Allocate_Threads(options.nested_threads); + nested_position = + + for (unsigned i = 0; i < options.nested_threads; ++i) + { + system_pairs[i].A = s; + system_pairs[i].B = nested_sub_system+i; + } + } + while (!ExitCondition()) + { + + if (options.nested_threads == 1) + { + Barrier_Enter(&force_barrier); + System_Forces(s, &universe); + Barrier_Leave(&force_barrier); + } + else + { + for (unsigned i = 0; i < options.nested_threads; ++i) + { + if (pthread_create(nested_workers+i, &attr, Force_Thread, (void*)(system_pairs+i)) != 0) + { + perror("In worker thread, couldn't create nested worker thread (force)"); + QuitProgram(true); + free(nested_workers); + pthread_exit(NULL); + } + } + } //printf("Computed forces for %p\n", arg); Barrier_Wait(&force_barrier); //printf("Computed ALL forces\n"); @@ -270,12 +281,12 @@ void * Worker_Thread(void * arg) Barrier_Wait(&position_barrier); //printf("Computed ALL positions\n"); } + printf("Worker thread exits\n"); QuitProgram(false); pthread_exit(NULL); } - -#else +#endif //PERSISTENT_THREADS /** * @function Force_Thread @@ -284,15 +295,17 @@ void * Worker_Thread(void * arg) */ void * Force_Thread(void * s) { - Barrier_Enter(&worker_barrier); + System_ForcePair * pair = (System_ForcePair*)s; + Barrier_Enter(&force_barrier); - System_Forces((System*)s, &universe); //Simple wrapper + System_Forces(pair->A, pair->B); //Simple wrapper - Barrier_Leave(&worker_barrier); + Barrier_Leave(&force_barrier); return NULL; } + /** * @function Position_Thread * @purpose Thread - Calculates the positions of objects in a System @@ -300,14 +313,12 @@ void * Force_Thread(void * s) */ void * Position_Thread(void * s) { - Barrier_Enter(&worker_barrier); + Barrier_Enter(&position_barrier); System_Positions((System*)s); // Simple wrapper - Barrier_Leave(&worker_barrier); + Barrier_Leave(&position_barrier); return NULL; } -#endif //PERSISTENT_THREADS - /** * @function QuitProgram * @purpose This function can either be called by the main thread in order to signal other threads @@ -355,12 +366,8 @@ void Simulation_Run(int argc, char ** argv) { atexit(Thread_Cleanup); - #ifdef PERSISTENT_THREADS Barrier_Init(&force_barrier); Barrier_Init(&position_barrier); - #else - Barrier_Init(&worker_barrier); - #endif //PERSISTENT_THREADS Barrier_Init(&graphics_barrier); @@ -418,3 +425,56 @@ void Barrier_Wait(Barrier * b) pthread_cond_wait(&(b->threads_done_cv), &(b->mutex)); pthread_mutex_unlock(&(b->mutex)); } + +/** + * @function Split_System + * @purpose Helper to divide one system into an array of systems + * Each sub system will have N = (s->N / n) bodies in it + * @param s - The original system (typically &universe) + * @param n - The number of sub systems in the array + * + * WARNING: It is the caller's responsibility to free() the returned array + */ +System * Split_System(System * s, unsigned n) +{ + System * result = (System*)(calloc(n, sizeof(System))); + if (result == NULL) + { + perror("Couldn't create array of sub systems"); + QuitProgram(true); + pthread_exit(NULL); + } + + unsigned n_per_system = (s->N) / n; + unsigned remainder = (s->N) % n; + + for (unsigned i = 0; i < n; ++i) + { + result[i].N = n_per_system; + if (i == n-1) + result[i].N += remainder; + result[i].body = (s->body) + (n_per_system * i); + result[i].steps = 0; + } + return result; +} + +/** + * @function Allocate_Threads + * @purpose Helper function to allocate an array of pthread_t objects + * Handles all the pointless, er, "important" error checking that should be done + * @param n - Number of threads in the array + * + * WARNING: Remember to free() the array!!! + */ +pthread_t * Allocate_Threads(unsigned n) +{ + pthread_t * result = (pthread_t*)(calloc(n, sizeof(pthread_t))); + if (result == NULL) + { + perror("Unable to allocate memory for threads"); + QuitProgram(true); + pthread_exit(NULL); + } + return result; +} diff --git a/course/semester2/pprog/assignment1/mthread/nbody.h b/course/semester2/pprog/assignment1/mthread/nbody.h index 7abe5250..f2de0229 100644 --- a/course/semester2/pprog/assignment1/mthread/nbody.h +++ b/course/semester2/pprog/assignment1/mthread/nbody.h @@ -10,7 +10,7 @@ #define DEFAULT_WORKING_THREADS 2 -//#define PERSISTENT_THREADS //If defined, threads will not be continually destroyed and then respawned +#define PERSISTENT_THREADS //If defined, threads will not be continually destroyed and then respawned @@ -28,15 +28,24 @@ void AfterDraw(); void * Compute_Thread(void * system); //Thread - Continuously perform computations for a System of bodies. May spawn additional worker threads. + +System * Split_System(System * s, unsigned n); // Splits one system into a number of other systems, returns an array of size n +pthread_t * Allocate_Threads(unsigned n); // Allocates space for threads - handles errors + #ifdef PERSISTENT_THREADS void * Worker_Thread(void * arg); -#else +#endif //PERSISTENT_THREADS void * Force_Thread(void * system); //Thread - Compute forces for all objects in a system void * Position_Thread(void * system); //Thread - Compute positions for all objects in a system -#endif //PERSISTENT_THREADS void Thread_Cleanup(void); //Called at program exit to safely join computation thread +/** + * Structure to represent a barrier for multiple threads + * @param mutex - Mutex around the counter + * @param busy - Counter of threads within the barrier + * @param threads_done_cv - Condition to wake up threads waiting on barrier once all working threads have left it + */ typedef struct { pthread_mutex_t mutex; @@ -44,6 +53,17 @@ typedef struct pthread_cond_t threads_done_cv; } Barrier; +/** + * Structure to represent a pair of Systems; passed to Force_Thread + * @param A - System to calculate forces for + * @param B - System causing forces on System A + */ +typedef struct +{ + System * A; + System * B; +} System_ForcePair; + void Barrier_Init(Barrier * b); void Barrier_Enter(Barrier * b); void Barrier_Leave(Barrier * b); diff --git a/course/semester2/pprog/assignment1/single-thread/graphics.c b/course/semester2/pprog/assignment1/single-thread/graphics.c index 864ff915..6fdb5810 100644 --- a/course/semester2/pprog/assignment1/single-thread/graphics.c +++ b/course/semester2/pprog/assignment1/single-thread/graphics.c @@ -136,6 +136,7 @@ void Graphics_Display() { Body * b = universe.body+i; glColor3f(0.0f, b->mass/1e11*100, 0.0f); + //glColor3f(1.0f, 0.0f, 0.0f); glPushMatrix(); // to save the current matrix glTranslated(scale*b->x[0], scale*b->x[1], scale*b->x[2]); glutSolidSphere (BALL_SIZE, 10, 10); diff --git a/course/semester2/pprog/assignment1/single-thread/nbody.c b/course/semester2/pprog/assignment1/single-thread/nbody.c index 67afd0fd..e4b4bc70 100644 --- a/course/semester2/pprog/assignment1/single-thread/nbody.c +++ b/course/semester2/pprog/assignment1/single-thread/nbody.c @@ -235,6 +235,8 @@ void DisplayStatistics() */ bool ExitCondition(void) { - return (runstate != RUN || (options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout)) + bool result = (runstate != RUN || (options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout)) || (options.num_steps > 0 && universe.steps > options.num_steps)); + //printf("runstate %d\n timeout %d\n steps %d\n", (int)(runstate != RUN), (int)(options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout)), (int)(options.num_steps > 0 && universe.steps > options.num_steps)); + return result; } -- 2.20.1