Parallel Programming - Commit before I break everything
authorSam Moore <sam@daedalus.(none)>
Sun, 16 Sep 2012 14:31:27 +0000 (22:31 +0800)
committerSam Moore <sam@daedalus.(none)>
Sun, 16 Sep 2012 14:31:27 +0000 (22:31 +0800)
About to radically alter structure of pthread program to enable nested threads
Well actually I already started. But oh well.

Santa Claus
 (Just kidding, this is Sam again)

course/semester2/pprog/assignment1/mthread/nbody.c
course/semester2/pprog/assignment1/mthread/nbody.h
course/semester2/pprog/assignment1/single-thread/graphics.c
course/semester2/pprog/assignment1/single-thread/nbody.c

index 301cd8b..de9b8aa 100644 (file)
@@ -15,15 +15,17 @@ pthread_t compute_thread; // The thread responsible for computations; it spawns
        
 pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
 System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
+System * nested_sub_system = NULL; //Array of Systems for division of "universe" for the nested worker threads
 
 pthread_mutex_t mutex_runstate; // Mutex around the runstate
 
-#ifdef PERSISTENT_THREADS
+
+
+pthread_attr_t attr; //thread attribute for the workers. 
+
 Barrier force_barrier; // I laughed at this variable name. A bit sad really.
 Barrier position_barrier;
-#else
-Barrier worker_barrier;
-#endif //PERSISTENT_THREADS
+
 
 Barrier graphics_barrier;
 
@@ -44,6 +46,9 @@ void * Compute_Thread(void * arg)
        if (options.num_threads <= 0)
                options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
 
+       if (options.nested_threads <= 0)
+               options.nested_threads = 1;
+
        // Do a sanity check; there is no point spawning more threads than bodies.
        if (options.num_threads > s->N)
        {
@@ -53,48 +58,28 @@ void * Compute_Thread(void * arg)
                options.num_threads = s->N;
        }
        
-       pthread_attr_t attr; //thread attribute for the workers. 
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
        
        if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
        {
-               worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
-               if (worker_thread == NULL)
-               {
-                       perror("Couldn't allocate array of worker threads");
-                       QuitProgram(true);
-                       pthread_exit(NULL);
-               }
-               sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
-               if (sub_system == NULL)
-               {
-                       perror("Couldn't allocate array of systems for worker threads to use");
-                       QuitProgram(true);
-                       pthread_exit(NULL);
-               }
+               worker_thread = Allocate_Threads(options.num_threads);
+               sub_system = Split_System(&universe, options.num_threads);
 
-               // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
-               unsigned bodies_per_system = (s->N) / options.num_threads;
-               unsigned remainder = (s->N) % options.num_threads;
+               if (options.nested_threads > 1)
+                       nested_sub_system = Split_System(&universe, options.nested_threads);
+
+               #ifdef PERSISTENT_THREADS
                for (unsigned i = 0; i < options.num_threads; ++i)
                {
-                       sub_system[i].body = (s->body)+(i*bodies_per_system);
-                       sub_system[i].N = bodies_per_system;
-                       sub_system[i].steps = 0;
-                       if (i == options.num_threads - 1)
-                               sub_system[i].N += remainder; // The last thread gets the remainder
-
-                       #ifdef PERSISTENT_THREADS
                        if (pthread_create(worker_thread+i, & attr, Worker_Thread, (void*)(sub_system+i)) != 0)
                        {
                                perror("In compute thread, couldn't create worker thread");
                                QuitProgram(true);
                                pthread_exit(NULL);
                        }
-                       #endif //PERSISTENT_THREADS
-       
                }
+               #endif //PERSISTENT_THREADS
 
                
        }
@@ -104,14 +89,11 @@ void * Compute_Thread(void * arg)
                while (!ExitCondition())
                {
                        if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
-                       {
                                DisplayStatistics();
-                       }
 
                        // Just do everything in this thread 
                        System_Forces(s, s);
                        System_Positions(s);
-                       continue;
                }
                QuitProgram(false);
                pthread_exit(NULL);
@@ -119,20 +101,10 @@ void * Compute_Thread(void * arg)
 
        #else
        // The main computation loop
-       while (true)
+       while (!ExitCondition())
        {
-               //Check whether the program should quit due to steps being computed, or a timeout
-               if (ExitCondition())
-               {
-                       QuitProgram(false);
-                       pthread_exit(NULL);
-               }
-
                if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
-               {
                        DisplayStatistics();
-               }
-
 
                if (options.num_threads <= 1)
                {
@@ -157,14 +129,13 @@ void * Compute_Thread(void * arg)
                }
 
 
-               Barrier_Wait(&worker_barrier);
+               Barrier_Wait(&force_barrier);
 
                //All the forces are now computed
 
                if (options.draw_graphics && options.pedantic_graphics)
                {
                        Barrier_Wait(&graphics_barrier);
-                       Barrier_Enter(&graphics_barrier);
                }
 
                //Compute positions
@@ -179,21 +150,20 @@ void * Compute_Thread(void * arg)
                }
 
                //Wait for positions to be computed
-               Barrier_Wait(&worker_barrier);
+               Barrier_Wait(&position_barrier);
 
 
                //Update number of steps computed
                universe.steps += 1;
 
-               if (options.draw_graphics && options.pedantic_graphics)
-                       Barrier_Leave(&graphics_barrier);
+               
 
                
 
 
        }
+       QuitProgram(false);
        #endif //PERSISTENT_THREADS
-       
        return NULL;
 }
 
@@ -210,12 +180,9 @@ void BeforeDraw()
        //printf("BEFORE DRAW\n");
        if (!options.pedantic_graphics)
                return;
-       #ifdef PERSISTENT_THREADS
-       Barrier_Wait(&position_barrier);
        
-       #else
-       Barrier_Wait(&graphics_barrier);
-       #endif //PERSISTENT_THREADS
+       Barrier_Wait(&position_barrier);
+
 
        Barrier_Enter(&graphics_barrier);       
 }
@@ -245,15 +212,59 @@ void AfterDraw()
 void * Worker_Thread(void * arg)
 {
        System * s = (System*)(arg);
-       while (!ExitCondition())
+
+
+       pthread_t * nested_workers = NULL;
+       System_ForcePair * system_pairs = NULL;
+       System * nested_position = NULL;
+       
+       Barrier nested_barrier; 
+       Barrier_Init(&nested_barrier);
+
+       printf("options.nested_threads == %d\n", (int)(options.nested_threads));
+
+       if (options.nested_threads != 1)        
        {
-               
-               Barrier_Enter(&force_barrier);
 
-               System_Forces(s, &universe);
-               
-               Barrier_Leave(&force_barrier);
+               system_pairs = (System_ForcePair*)(calloc(options.nested_threads, sizeof(System_ForcePair)));
+               if (system_pairs == NULL) // Handle tedious error cases
+               {
+                       perror("Couldn't allocate array of system pairs");
+                       QuitProgram(true);
+                       pthread_exit(NULL);
+               }
+               nested_workers = Allocate_Threads(options.nested_threads);
+               nested_position = 
+
+               for (unsigned i = 0; i < options.nested_threads; ++i)
+               {
+                       system_pairs[i].A = s;
+                       system_pairs[i].B = nested_sub_system+i;
+               }
+       }
 
+       while (!ExitCondition())
+       {
+               
+               if (options.nested_threads == 1)
+               {
+                       Barrier_Enter(&force_barrier);
+                       System_Forces(s, &universe);
+                       Barrier_Leave(&force_barrier);
+               }
+               else
+               {
+                       for (unsigned i = 0; i < options.nested_threads; ++i)
+                       {
+                               if (pthread_create(nested_workers+i, &attr, Force_Thread, (void*)(system_pairs+i)) != 0)
+                               {
+                                       perror("In worker thread, couldn't create nested worker thread (force)");
+                                       QuitProgram(true);
+                                       free(nested_workers);
+                                       pthread_exit(NULL);                                     
+                               }       
+                       }
+               }
                //printf("Computed forces for %p\n", arg);
                Barrier_Wait(&force_barrier);
                //printf("Computed ALL forces\n");
@@ -270,12 +281,12 @@ void * Worker_Thread(void * arg)
                Barrier_Wait(&position_barrier);
                //printf("Computed ALL positions\n");
        }
+       printf("Worker thread exits\n");
        QuitProgram(false);
        pthread_exit(NULL);
 }
 
-
-#else
+#endif //PERSISTENT_THREADS
 
 /**
  * @function Force_Thread
@@ -284,15 +295,17 @@ void * Worker_Thread(void * arg)
  */
 void * Force_Thread(void * s)
 {
-       Barrier_Enter(&worker_barrier);
+       System_ForcePair * pair = (System_ForcePair*)s;
+       Barrier_Enter(&force_barrier);
 
-       System_Forces((System*)s, &universe); //Simple wrapper
+       System_Forces(pair->A, pair->B); //Simple wrapper
 
-       Barrier_Leave(&worker_barrier);
+       Barrier_Leave(&force_barrier);
 
        return NULL;
 }
 
+
 /**
  * @function Position_Thread
  * @purpose Thread - Calculates the positions of objects in a System 
@@ -300,14 +313,12 @@ void * Force_Thread(void * s)
  */
 void * Position_Thread(void * s)
 {
-       Barrier_Enter(&worker_barrier);
+       Barrier_Enter(&position_barrier);
        System_Positions((System*)s); // Simple wrapper
-       Barrier_Leave(&worker_barrier);
+       Barrier_Leave(&position_barrier);
        return NULL;
 }      
 
-#endif //PERSISTENT_THREADS
-
 /**
  * @function QuitProgram
  * @purpose This function can either be called by the main thread in order to signal other threads
@@ -355,12 +366,8 @@ void Simulation_Run(int argc, char ** argv)
 {
        atexit(Thread_Cleanup);
 
-       #ifdef PERSISTENT_THREADS
        Barrier_Init(&force_barrier);
        Barrier_Init(&position_barrier);
-       #else
-       Barrier_Init(&worker_barrier);
-       #endif //PERSISTENT_THREADS
        Barrier_Init(&graphics_barrier);
 
        
@@ -418,3 +425,56 @@ void Barrier_Wait(Barrier * b)
                pthread_cond_wait(&(b->threads_done_cv), &(b->mutex));
        pthread_mutex_unlock(&(b->mutex));
 }
+
+/**
+ * @function Split_System
+ * @purpose Helper to divide one system into an array of systems
+ *     Each sub system will have N = (s->N / n) bodies in it
+ * @param s - The original system (typically &universe)
+ * @param n - The number of sub systems in the array
+ *
+ * WARNING: It is the caller's responsibility to free() the returned array
+ */
+System * Split_System(System * s, unsigned n)
+{
+       System * result = (System*)(calloc(n, sizeof(System)));
+       if (result == NULL)
+       {
+               perror("Couldn't create array of sub systems");
+               QuitProgram(true);
+               pthread_exit(NULL);
+       }
+
+       unsigned n_per_system = (s->N) / n;
+       unsigned remainder = (s->N) % n;
+
+       for (unsigned i = 0; i < n; ++i)        
+       {
+               result[i].N = n_per_system;
+               if (i == n-1)
+                       result[i].N += remainder;
+               result[i].body = (s->body) + (n_per_system * i);
+               result[i].steps = 0;
+       }
+       return result;
+}
+
+/**
+ * @function Allocate_Threads
+ * @purpose Helper function to allocate an array of pthread_t objects
+ *     Handles all the pointless, er, "important" error checking that should be done
+ * @param n - Number of threads in the array
+ * 
+ * WARNING: Remember to free() the array!!!
+ */
+pthread_t * Allocate_Threads(unsigned n)
+{
+       pthread_t * result = (pthread_t*)(calloc(n, sizeof(pthread_t)));
+       if (result == NULL)
+       {
+               perror("Unable to allocate memory for threads");
+               QuitProgram(true);
+               pthread_exit(NULL);
+       }
+       return result;
+}
index 7abe525..f2de022 100644 (file)
@@ -10,7 +10,7 @@
 
 #define DEFAULT_WORKING_THREADS 2 
 
-//#define PERSISTENT_THREADS //If defined, threads will not be continually destroyed and then respawned
+#define PERSISTENT_THREADS //If defined, threads will not be continually destroyed and then respawned
 
 
 
@@ -28,15 +28,24 @@ void AfterDraw();
 
 void * Compute_Thread(void * system); //Thread - Continuously perform computations for a System of bodies. May spawn additional worker threads.
 
+
+System * Split_System(System * s, unsigned n); // Splits one system into a number of other systems, returns an array of size n
+pthread_t * Allocate_Threads(unsigned n); // Allocates space for threads - handles errors
+
 #ifdef PERSISTENT_THREADS
 void * Worker_Thread(void * arg);
-#else
+#endif //PERSISTENT_THREADS
 void * Force_Thread(void * system); //Thread - Compute forces for all objects in a system
 void * Position_Thread(void * system); //Thread - Compute positions for all objects in a system
-#endif //PERSISTENT_THREADS
 
 void Thread_Cleanup(void); //Called at program exit to safely join computation thread
 
+/**
+ * Structure to represent a barrier for multiple threads
+ * @param mutex - Mutex around the counter
+ * @param busy - Counter of threads within the barrier
+ * @param threads_done_cv - Condition to wake up threads waiting on barrier once all working threads have left it
+ */
 typedef struct
 {
        pthread_mutex_t mutex;
@@ -44,6 +53,17 @@ typedef struct
        pthread_cond_t threads_done_cv;
 } Barrier;
 
+/**
+ * Structure to represent a pair of Systems; passed to Force_Thread
+ * @param A - System to calculate forces for
+ * @param B - System causing forces on System A
+ */
+typedef struct
+{
+       System * A;
+       System * B;
+} System_ForcePair;
+
 void Barrier_Init(Barrier * b);
 void Barrier_Enter(Barrier * b);
 void Barrier_Leave(Barrier * b);
index 864ff91..6fdb581 100644 (file)
@@ -136,6 +136,7 @@ void Graphics_Display()
        {
                Body * b = universe.body+i;
                glColor3f(0.0f, b->mass/1e11*100, 0.0f);
+               //glColor3f(1.0f, 0.0f, 0.0f);
                glPushMatrix(); // to save the current matrix
                glTranslated(scale*b->x[0], scale*b->x[1], scale*b->x[2]);
                glutSolidSphere (BALL_SIZE, 10, 10);
index 67afd0f..e4b4bc7 100644 (file)
@@ -235,6 +235,8 @@ void DisplayStatistics()
  */
 bool ExitCondition(void)
 {
-       return (runstate != RUN || (options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout))
+       bool result = (runstate != RUN || (options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout))
                || (options.num_steps > 0 && universe.steps > options.num_steps));
+       //printf("runstate %d\n timeout %d\n steps %d\n", (int)(runstate != RUN), (int)(options.timeout > 0.00 && ((unsigned)(time(NULL) - options.start_time.tv_sec) >= options.timeout)), (int)(options.num_steps > 0 && universe.steps > options.num_steps));
+       return result;
 }

UCC git Repository :: git.ucc.asn.au