pthread_t * worker_thread = NULL; //Array of worker threads responsible for Force and Position updates
System * sub_system = NULL; //Array of Systems used to divide up the main "universe" System for worker threads
+System * nested_sub_system = NULL; //Array of Systems for division of "universe" for the nested worker threads
pthread_mutex_t mutex_runstate; // Mutex around the runstate
-#ifdef PERSISTENT_THREADS
+
+
+pthread_attr_t attr; //thread attribute for the workers.
+
Barrier force_barrier; // I laughed at this variable name. A bit sad really.
Barrier position_barrier;
-#else
-Barrier worker_barrier;
-#endif //PERSISTENT_THREADS
+
Barrier graphics_barrier;
if (options.num_threads <= 0)
options.num_threads = (DEFAULT_WORKING_THREADS > 1) ? DEFAULT_WORKING_THREADS : 1;
+ if (options.nested_threads <= 0)
+ options.nested_threads = 1;
+
// Do a sanity check; there is no point spawning more threads than bodies.
if (options.num_threads > s->N)
{
options.num_threads = s->N;
}
- pthread_attr_t attr; //thread attribute for the workers.
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //Needs to be detached, so that memory can be reused.
if (options.num_threads > 1) // Allocate worker threads and sub systems, as long as there would be more than 1
{
- worker_thread = (pthread_t*)(calloc(options.num_threads, sizeof(pthread_t)));
- if (worker_thread == NULL)
- {
- perror("Couldn't allocate array of worker threads");
- QuitProgram(true);
- pthread_exit(NULL);
- }
- sub_system = (System*)(calloc(options.num_threads, sizeof(System)));
- if (sub_system == NULL)
- {
- perror("Couldn't allocate array of systems for worker threads to use");
- QuitProgram(true);
- pthread_exit(NULL);
- }
+ worker_thread = Allocate_Threads(options.num_threads);
+ sub_system = Split_System(&universe, options.num_threads);
- // Divide up the Body array owned by s into options.num_threads arrays, one for each worker thread
- unsigned bodies_per_system = (s->N) / options.num_threads;
- unsigned remainder = (s->N) % options.num_threads;
+ if (options.nested_threads > 1)
+ nested_sub_system = Split_System(&universe, options.nested_threads);
+
+ #ifdef PERSISTENT_THREADS
for (unsigned i = 0; i < options.num_threads; ++i)
{
- sub_system[i].body = (s->body)+(i*bodies_per_system);
- sub_system[i].N = bodies_per_system;
- sub_system[i].steps = 0;
- if (i == options.num_threads - 1)
- sub_system[i].N += remainder; // The last thread gets the remainder
-
- #ifdef PERSISTENT_THREADS
if (pthread_create(worker_thread+i, & attr, Worker_Thread, (void*)(sub_system+i)) != 0)
{
perror("In compute thread, couldn't create worker thread");
QuitProgram(true);
pthread_exit(NULL);
}
- #endif //PERSISTENT_THREADS
-
}
+ #endif //PERSISTENT_THREADS
}
while (!ExitCondition())
{
if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
- {
DisplayStatistics();
- }
// Just do everything in this thread
System_Forces(s, s);
System_Positions(s);
- continue;
}
QuitProgram(false);
pthread_exit(NULL);
#else
// The main computation loop
- while (true)
+ while (!ExitCondition())
{
- //Check whether the program should quit due to steps being computed, or a timeout
- if (ExitCondition())
- {
- QuitProgram(false);
- pthread_exit(NULL);
- }
-
if (options.verbosity != 0 && universe.steps % options.verbosity == 1)
- {
DisplayStatistics();
- }
-
if (options.num_threads <= 1)
{
}
- Barrier_Wait(&worker_barrier);
+ Barrier_Wait(&force_barrier);
//All the forces are now computed
if (options.draw_graphics && options.pedantic_graphics)
{
Barrier_Wait(&graphics_barrier);
- Barrier_Enter(&graphics_barrier);
}
//Compute positions
}
//Wait for positions to be computed
- Barrier_Wait(&worker_barrier);
+ Barrier_Wait(&position_barrier);
//Update number of steps computed
universe.steps += 1;
- if (options.draw_graphics && options.pedantic_graphics)
- Barrier_Leave(&graphics_barrier);
+
}
+ QuitProgram(false);
#endif //PERSISTENT_THREADS
-
return NULL;
}
//printf("BEFORE DRAW\n");
if (!options.pedantic_graphics)
return;
- #ifdef PERSISTENT_THREADS
- Barrier_Wait(&position_barrier);
- #else
- Barrier_Wait(&graphics_barrier);
- #endif //PERSISTENT_THREADS
+ Barrier_Wait(&position_barrier);
+
Barrier_Enter(&graphics_barrier);
}
void * Worker_Thread(void * arg)
{
System * s = (System*)(arg);
- while (!ExitCondition())
+
+
+ pthread_t * nested_workers = NULL;
+ System_ForcePair * system_pairs = NULL;
+ System * nested_position = NULL;
+
+ Barrier nested_barrier;
+ Barrier_Init(&nested_barrier);
+
+ printf("options.nested_threads == %d\n", (int)(options.nested_threads));
+
+ if (options.nested_threads != 1)
{
-
- Barrier_Enter(&force_barrier);
- System_Forces(s, &universe);
-
- Barrier_Leave(&force_barrier);
+ system_pairs = (System_ForcePair*)(calloc(options.nested_threads, sizeof(System_ForcePair)));
+ if (system_pairs == NULL) // Handle tedious error cases
+ {
+ perror("Couldn't allocate array of system pairs");
+ QuitProgram(true);
+ pthread_exit(NULL);
+ }
+ nested_workers = Allocate_Threads(options.nested_threads);
+ nested_position =
+
+ for (unsigned i = 0; i < options.nested_threads; ++i)
+ {
+ system_pairs[i].A = s;
+ system_pairs[i].B = nested_sub_system+i;
+ }
+ }
+ while (!ExitCondition())
+ {
+
+ if (options.nested_threads == 1)
+ {
+ Barrier_Enter(&force_barrier);
+ System_Forces(s, &universe);
+ Barrier_Leave(&force_barrier);
+ }
+ else
+ {
+ for (unsigned i = 0; i < options.nested_threads; ++i)
+ {
+ if (pthread_create(nested_workers+i, &attr, Force_Thread, (void*)(system_pairs+i)) != 0)
+ {
+ perror("In worker thread, couldn't create nested worker thread (force)");
+ QuitProgram(true);
+ free(nested_workers);
+ pthread_exit(NULL);
+ }
+ }
+ }
//printf("Computed forces for %p\n", arg);
Barrier_Wait(&force_barrier);
//printf("Computed ALL forces\n");
Barrier_Wait(&position_barrier);
//printf("Computed ALL positions\n");
}
+ printf("Worker thread exits\n");
QuitProgram(false);
pthread_exit(NULL);
}
-
-#else
+#endif //PERSISTENT_THREADS
/**
* @function Force_Thread
*/
void * Force_Thread(void * s)
{
- Barrier_Enter(&worker_barrier);
+ System_ForcePair * pair = (System_ForcePair*)s;
+ Barrier_Enter(&force_barrier);
- System_Forces((System*)s, &universe); //Simple wrapper
+ System_Forces(pair->A, pair->B); //Simple wrapper
- Barrier_Leave(&worker_barrier);
+ Barrier_Leave(&force_barrier);
return NULL;
}
+
/**
* @function Position_Thread
* @purpose Thread - Calculates the positions of objects in a System
*/
void * Position_Thread(void * s)
{
- Barrier_Enter(&worker_barrier);
+ Barrier_Enter(&position_barrier);
System_Positions((System*)s); // Simple wrapper
- Barrier_Leave(&worker_barrier);
+ Barrier_Leave(&position_barrier);
return NULL;
}
-#endif //PERSISTENT_THREADS
-
/**
* @function QuitProgram
* @purpose This function can either be called by the main thread in order to signal other threads
{
atexit(Thread_Cleanup);
- #ifdef PERSISTENT_THREADS
Barrier_Init(&force_barrier);
Barrier_Init(&position_barrier);
- #else
- Barrier_Init(&worker_barrier);
- #endif //PERSISTENT_THREADS
Barrier_Init(&graphics_barrier);
pthread_cond_wait(&(b->threads_done_cv), &(b->mutex));
pthread_mutex_unlock(&(b->mutex));
}
+
+/**
+ * @function Split_System
+ * @purpose Helper to divide one system into an array of systems
+ * Each sub system will have N = (s->N / n) bodies in it
+ * @param s - The original system (typically &universe)
+ * @param n - The number of sub systems in the array
+ *
+ * WARNING: It is the caller's responsibility to free() the returned array
+ */
+System * Split_System(System * s, unsigned n)
+{
+ System * result = (System*)(calloc(n, sizeof(System)));
+ if (result == NULL)
+ {
+ perror("Couldn't create array of sub systems");
+ QuitProgram(true);
+ pthread_exit(NULL);
+ }
+
+ unsigned n_per_system = (s->N) / n;
+ unsigned remainder = (s->N) % n;
+
+ for (unsigned i = 0; i < n; ++i)
+ {
+ result[i].N = n_per_system;
+ if (i == n-1)
+ result[i].N += remainder;
+ result[i].body = (s->body) + (n_per_system * i);
+ result[i].steps = 0;
+ }
+ return result;
+}
+
+/**
+ * @function Allocate_Threads
+ * @purpose Helper function to allocate an array of pthread_t objects
+ * Handles all the pointless, er, "important" error checking that should be done
+ * @param n - Number of threads in the array
+ *
+ * WARNING: Remember to free() the array!!!
+ */
+pthread_t * Allocate_Threads(unsigned n)
+{
+ pthread_t * result = (pthread_t*)(calloc(n, sizeof(pthread_t)));
+ if (result == NULL)
+ {
+ perror("Unable to allocate memory for threads");
+ QuitProgram(true);
+ pthread_exit(NULL);
+ }
+ return result;
+}