From: Sam Moore Date: Mon, 26 Aug 2013 05:43:21 +0000 (+0800) Subject: Get threads to deal with exit conditions, create timestamps X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=2e3ab88d1282fc893c9bd615911aaedc9e552178;p=matches%2FMCTX3420.git Get threads to deal with exit conditions, create timestamps --- diff --git a/server/Makefile b/server/Makefile index f1084af..e85a72e 100644 --- a/server/Makefile +++ b/server/Makefile @@ -2,11 +2,12 @@ CXX = gcc FLAGS = -std=c99 -Wall -Werror -pedantic -g LIB = -lpthread -lfcgi -lssl -OBJ = log.o sensor.o fastcgi.o main.o +OBJ = log.o sensor.o fastcgi.o thread.o main.o RM = rm -f BIN = server + $(BIN) : $(OBJ) $(CXX) $(FLAGS) -o $(BIN) $(OBJ) $(LIB) diff --git a/server/common.h b/server/common.h index 5f71f9f..9b73de9 100644 --- a/server/common.h +++ b/server/common.h @@ -17,8 +17,10 @@ #include #include #include +#include #include "log.h" #include "fastcgi.h" +#include "thread.h" #endif //_COMMON_H diff --git a/server/fastcgi.c b/server/fastcgi.c index 4fc3742..00d8264 100644 --- a/server/fastcgi.c +++ b/server/fastcgi.c @@ -11,9 +11,9 @@ #include #include "common.h" -#include "fastcgi.h" #include "sensor.h" #include "log.h" +#include "options.h" #define LOGIN_TIMEOUT 180 @@ -170,6 +170,16 @@ void FCGI_BeginJSON(FCGIContext *context, StatusCodes status_code) printf("{\r\n"); printf("\t\"module\" : \"%s\"", context->current_module); FCGI_JSONLong("status", status_code); + + // Jeremy: Should we include a timestamp in the JSON; something like this? + double start_time = g_options.start_time.tv_sec + 1e-6*(g_options.start_time.tv_usec); + struct timeval now; + gettimeofday(&now, NULL); + double current_time = now.tv_sec + 1e-6*(now.tv_usec); + FCGI_JSONDouble("start_time", start_time); + FCGI_JSONDouble("current_time", current_time); + FCGI_JSONDouble("running_time", current_time - start_time); + } /** @@ -258,12 +268,28 @@ void FCGI_RejectJSON(FCGIContext *context) /** * Main FCGI request loop that receives/responds to client requests. * @param data Reserved. + * @returns NULL (void* required for consistency with pthreads, although at the moment this runs in the main thread anyway) + * TODO: Get this to exit with the rest of the program! */ -void FCGI_RequestLoop (void *data) +void * FCGI_RequestLoop (void *data) { FCGIContext context = {0}; + Log(LOGDEBUG, "First request..."); + //TODO: The FCGI_Accept here is blocking. + // That means that if another thread terminates the program, this thread + // will not terminate until the next request is made. while (FCGI_Accept() >= 0) { + + if (Thread_Runstate() != RUNNING) + { + //TODO: Yeah... deal with this better :P + Log(LOGERR, "FIXME; FCGI gets request after other threads have finished."); + printf("Content-type: text/plain\r\n\r\n+++OUT OF CHEESE ERROR+++\n"); + break; + } + + Log(LOGDEBUG, "Got request #%d", context.response_number); ModuleHandler module_handler = NULL; char module[BUFSIZ], params[BUFSIZ]; @@ -292,7 +318,15 @@ void FCGI_RequestLoop (void *data) strncat(module, " [unknown]", BUFSIZ); FCGI_RejectJSON(&context); } - context.response_number++; + + Log(LOGDEBUG, "Waiting for request #%d", context.response_number); } + + Log(LOGDEBUG, "Thread exiting."); + Thread_QuitProgram(false); + // NOTE: Don't call pthread_exit, because this runs in the main thread. Just return. + return NULL; + + } diff --git a/server/fastcgi.h b/server/fastcgi.h index e9db8ba..3340280 100644 --- a/server/fastcgi.h +++ b/server/fastcgi.h @@ -26,7 +26,7 @@ extern void FCGI_JSONKey(const char *key); extern void FCGI_JSONValue(const char *format, ...); extern void FCGI_EndJSON(); extern void FCGI_RejectJSON(FCGIContext *context); -extern void FCGI_RequestLoop (void *data); +extern void * FCGI_RequestLoop (void *data); #endif diff --git a/server/log.c b/server/log.c index f7e39bc..7b67043 100644 --- a/server/log.c +++ b/server/log.c @@ -17,6 +17,9 @@ static const char * unspecified_funct = "???"; // --- Function implementations --- // +//TODO: Migrate to syslog (shouldn't be too hard; these functions basically do what syslog does) +// Note that we will want to have a seperate log as well as syslog; give the user the option to view the log using the GUI + /** * Print a message to stderr * @param level - Specify how severe the message is. diff --git a/server/main.c b/server/main.c index bfd5300..ec21a25 100644 --- a/server/main.c +++ b/server/main.c @@ -13,7 +13,6 @@ // --- Variable definitions --- // Options g_options; // options passed to program through command line arguments -Sensor g_sensors[NUMSENSORS]; // sensors array // --- Function definitions --- // @@ -26,6 +25,7 @@ void ParseArguments(int argc, char ** argv) { g_options.program = argv[0]; // program name g_options.verbosity = LOGDEBUG; // default log level + gettimeofday(&(g_options.start_time), NULL); // Start time Log(LOGDEBUG, "Called as %s with %d arguments.", g_options.program, argc); } @@ -35,12 +35,14 @@ void ParseArguments(int argc, char ** argv) */ //TODO: Something that gets massively annoying with threads is that you can't predict which one gets the signal // There are ways to deal with this, but I can't remember them +// Probably sufficient to just call Thread_QuitProgram here void SignalHandler(int signal) { // At the moment just always exit. // Call `exit` so that Cleanup will be called to... clean up. Log(LOGWARN, "Got signal %d (%s). Exiting.", signal, strsignal(signal)); - exit(signal); + Thread_QuitProgram(false); + //exit(signal); } /** @@ -58,20 +60,31 @@ void Cleanup() * @param argc - Num args * @param argv - Args * @returns 0 on success, error code on failure + * NOTE: NEVER USE exit(3)! Instead call Thread_QuitProgram */ int main(int argc, char ** argv) { ParseArguments(argc, argv); - // start sensor threads - for (int i = 0; i < NUMSENSORS; ++i) + // signal handler + //TODO: Make this work + /* + int signals[] = {SIGINT, SIGSEGV, SIGTERM}; + for (int i = 0; i < sizeof(signals)/sizeof(int); ++i) { - Sensor_Init(g_sensors+i, i); - pthread_create(&(g_sensors[i].thread), NULL, Sensor_Main, (void*)(g_sensors+i)); + signal(signals[i], SignalHandler); } + */ + Sensor_Spawn(); // run request thread in the main thread FCGI_RequestLoop(NULL); + + // Join the dark side, Luke + // *cough* + // Join the sensor threads + Sensor_Join(); + Cleanup(); return 0; } diff --git a/server/options.h b/server/options.h index 297acf4..a15e14a 100644 --- a/server/options.h +++ b/server/options.h @@ -14,6 +14,10 @@ typedef struct const char * program; /** Determines at what level log messages are shown **/ int verbosity; + /** Time at which program begins to run **/ + struct timeval start_time; + /** Time at which program exits **/ + struct timeval end_time; } Options; diff --git a/server/run.sh b/server/run.sh index 756f9cf..11f2e34 100755 --- a/server/run.sh +++ b/server/run.sh @@ -1,3 +1,5 @@ #!/bin/bash # Use this to quickly test run the server in valgrind -spawn-fcgi -p9005 -n /usr/bin/valgrind ./server +#spawn-fcgi -p9005 -n ./valgrind.sh +# Use this to run the server normally +spawn-fcgi -p9005 -n ./server diff --git a/server/sensor.c b/server/sensor.c index 6222510..34c9310 100644 --- a/server/sensor.c +++ b/server/sensor.c @@ -9,34 +9,44 @@ #include "sensor.h" #include +/** Array of sensors, initialised by Sensor_Init **/ +static Sensor g_sensors[NUMSENSORS]; //global to this file + /** * Read a data value from a sensor; block until value is read * @param sensor_id - The ID of the sensor - * @returns The current value of the sensor + * @param d - DataPoint to set + * @returns NULL on error, otherwise d */ -DataPoint GetData(int sensor_id) +DataPoint * GetData(int sensor_id, DataPoint * d) { // switch based on the sensor_id at the moment for testing; // might be able to just directly access ADC from sensor_id? //TODO: Implement for real sensors - DataPoint d; - //TODO: Deal with time stamps properly - static int count = 0; - d.time = count++; + + //TODO: We should ensure the time is *never* allowed to change on the server if we use gettimeofday + // Another way people might think of getting the time is to count CPU cycles with clock() + // But this will not work because a) CPU clock speed may change on some devices (RPi?) and b) It counts cycles used by all threads + gettimeofday(&(d->time_stamp), NULL); + switch (sensor_id) { case SENSOR_TEST0: - d.value = count; + { + static int count = 0; + d->value = count++; break; + } case SENSOR_TEST1: - d.value = (float)(rand() % 100) / 100; + d->value = (float)(rand() % 100) / 100; break; default: Fatal("Unknown sensor id: %d", sensor_id); break; } usleep(100000); // simulate delay in sensor polling + return d; } @@ -57,16 +67,15 @@ void Destroy(Sensor * s) * Initialise a sensor * @param s - Sensor to initialise */ -void Sensor_Init(Sensor * s, int id) +void Init(Sensor * s, int id) { s->write_index = 0; s->read_offset = 0; s->id = id; - #define FILENAMESIZE BUFSIZ + #define FILENAMESIZE 3 char filename[FILENAMESIZE]; - //if (s->id >= pow(10, FILENAMESIZE)) - if (false) + if (s->id >= pow(10, FILENAMESIZE)) { Fatal("Too many sensors! FILENAMESIZE is %d; increase it and recompile.", FILENAMESIZE); } @@ -81,6 +90,9 @@ void Sensor_Init(Sensor * s, int id) } + + + /** * Run the main sensor polling loop * @param arg - Cast to Sensor* - Sensor that the thread will handle @@ -90,7 +102,7 @@ void * Sensor_Main(void * arg) { Sensor * s = (Sensor*)(arg); - while (true) //TODO: Exit condition + while (Thread_Runstate() == RUNNING) //TODO: Exit condition { // The sensor will write data to a buffer until it is full // Then it will open a file and dump the buffer to the end of it. @@ -102,7 +114,11 @@ void * Sensor_Main(void * arg) while (s->write_index < SENSOR_DATABUFSIZ) { - s->buffer[s->write_index] = GetData(s->id); + DataPoint * d = &(s->buffer[s->write_index]); + if (GetData(s->id, d) == NULL) + { + Fatal("Error collecting data"); + } s->write_index += 1; } @@ -110,6 +126,8 @@ void * Sensor_Main(void * arg) // CRITICAL SECTION (no threads should be able to read/write the file at the same time) pthread_mutex_lock(&(s->mutex)); + //TODO: Valgrind complains about this fseek: "Syscall param write(buf) points to uninitialised byte(s)" + // Not sure why, but we should find out and fix it. fseek(s->file, 0, SEEK_END); int amount_written = fwrite(s->buffer, sizeof(DataPoint), SENSOR_DATABUFSIZ, s->file); if (amount_written != SENSOR_DATABUFSIZ) @@ -123,7 +141,8 @@ void * Sensor_Main(void * arg) s->write_index = 0; // reset position in buffer } - return NULL; + Log(LOGDEBUG, "Thread for sensor %d exits", s->id); + return NULL; } /** @@ -146,6 +165,27 @@ int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz) return amount_read; } +/** + * Get a Sensor given an ID string + * @param id_str ID string + * @returns Sensor* identified by the string; NULL on error + */ +Sensor * Sensor_Identify(const char * id_str) +{ + char * end; + // Parse string as integer + int id = strtol(id_str, &end, 10); + if (*end != '\0') + { + return NULL; + } + // Bounds check + if (id < 0 || id > NUMSENSORS) + return NULL; + + return g_sensors+id; +} + /** * Handle a request to the sensor module * @param context - The context to work in @@ -157,15 +197,14 @@ void Sensor_Handler(FCGIContext *context, char * params) StatusCodes status = STATUS_OK; const char * key; const char * value; - int sensor_id = SENSOR_NONE; + Sensor * sensor = NULL; while ((params = FCGI_KeyPair(params, &key, &value)) != NULL) { Log(LOGDEBUG, "Got key=%s and value=%s", key, value); if (strcmp(key, "id") == 0) { - char *end; - if (sensor_id != SENSOR_NONE) + if (sensor != NULL) { Log(LOGERR, "Only one sensor id should be specified"); status = STATUS_ERROR; @@ -177,11 +216,11 @@ void Sensor_Handler(FCGIContext *context, char * params) status = STATUS_ERROR; break; } - //TODO: Use human readable sensor identifier string for API? - sensor_id = strtol(value, &end, 10); - if (*end != '\0') + + sensor = Sensor_Identify(value); + if (sensor == NULL) { - Log(LOGERR, "Sensor id not an integer; %s", value); + Log(LOGERR, "Invalid sensor id: %s", value); status = STATUS_ERROR; break; } @@ -194,14 +233,9 @@ void Sensor_Handler(FCGIContext *context, char * params) } } - if (sensor_id == SENSOR_NONE) - { - Log(LOGERR, "No sensor id specified"); - status = STATUS_ERROR; - } - else if (sensor_id >= NUMSENSORS || sensor_id < 0) + if (status != STATUS_ERROR && sensor == NULL) { - Log(LOGERR, "Invalid sensor id %d", sensor_id); + Log(LOGERR, "No valid sensor id given"); status = STATUS_ERROR; } @@ -211,17 +245,21 @@ void Sensor_Handler(FCGIContext *context, char * params) } else { + FCGI_BeginJSON(context, status); FCGI_JSONPair(key, value); // should spit back sensor ID //Log(LOGDEBUG, "Call Sensor_Query..."); - int amount_read = Sensor_Query(&(g_sensors[sensor_id]), buffer, SENSOR_QUERYBUFSIZ); + int amount_read = Sensor_Query(sensor, buffer, SENSOR_QUERYBUFSIZ); //Log(LOGDEBUG, "Read %d DataPoints", amount_read); //Log(LOGDEBUG, "Produce JSON response"); FCGI_JSONKey("data"); FCGI_JSONValue("["); for (int i = 0; i < amount_read; ++i) { - FCGI_JSONValue("[%f, %f]", buffer[i].time, buffer[i].value); + //TODO: Consider; is it better to give both tv_sec and tv_usec to the client seperately, instead of combining here? + //NOTE: Must always use doubles; floats get rounded! + double time = buffer[i].time_stamp.tv_sec + 1e-6*(buffer[i].time_stamp.tv_usec); + FCGI_JSONValue("[%f, %f]", time, buffer[i].value); if (i+1 < amount_read) FCGI_JSONValue(","); } @@ -230,3 +268,32 @@ void Sensor_Handler(FCGIContext *context, char * params) FCGI_EndJSON(); } } + +/** + * Setup Sensors, start Sensor polling thread(s) + */ +void Sensor_Spawn() +{ + // start sensor threads + for (int i = 0; i < NUMSENSORS; ++i) + { + Init(g_sensors+i, i); + pthread_create(&(g_sensors[i].thread), NULL, Sensor_Main, (void*)(g_sensors+i)); + } +} + +/** + * Quit Sensor loops + */ +void Sensor_Join() +{ + if (!Thread_Runstate()) + { + Fatal("This function should not be called before Thread_QuitProgram"); + } + for (int i = 0; i < NUMSENSORS; ++i) + { + pthread_join(g_sensors[i].thread, NULL); + Destroy(g_sensors+i); + } +} diff --git a/server/sensor.h b/server/sensor.h index 4009e5a..a259036 100644 --- a/server/sensor.h +++ b/server/sensor.h @@ -20,7 +20,7 @@ typedef struct { /** Time at which data was taken **/ - float time; + struct timeval time_stamp; //TODO: Consider; use float instead? /** Value of data **/ float value; } DataPoint; @@ -29,7 +29,7 @@ typedef struct typedef struct { /** ID number of the sensor **/ - enum {SENSOR_TEST0=0, SENSOR_TEST1=1, SENSOR_NONE} id; + enum {SENSOR_TEST0=0, SENSOR_TEST1=1} id; /** Buffer to store data from the sensor **/ DataPoint buffer[SENSOR_DATABUFSIZ]; /** Index of last point written in the data buffer **/ @@ -46,10 +46,10 @@ typedef struct } Sensor; -/** Array of Sensors **/ -extern Sensor g_sensors[]; -extern void Sensor_Init(Sensor * s, int id); // Initialise sensor + +extern void Sensor_Spawn(); // Initialise sensor +extern void Sensor_Join(); //Join sensor threads extern void * Sensor_Main(void * args); // main loop for sensor thread; pass a Sensor* cast to void* extern int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz); // fill buffer with sensor data diff --git a/server/thread.c b/server/thread.c new file mode 100644 index 0000000..57473ea --- /dev/null +++ b/server/thread.c @@ -0,0 +1,62 @@ +/** + * @file thread.c + * @purpose Implementation of thread control + */ + +#include "thread.h" +#include "options.h" + +pthread_mutex_t mutex_runstate = PTHREAD_MUTEX_INITIALIZER; +Runstate runstate = RUNNING; + +/** + * Set the runstate, causing all threads to exit when they next check Thread_Runstate + * Repeated calls to this function have no effect on the runstate. + * @param error - Set to true to indicate an error occured + */ +void Thread_QuitProgram(bool error) +{ + if (runstate != RUNNING) + { + Log(LOGNOTE, "Called when program is not running; runstate = %d", runstate); + return; + } + + + Log(LOGNOTE, "Program will quit; error = %d", (int)error); + + //CRITICAL SECTION - We do NOT want multiple threads editing the runstate at the same time! + pthread_mutex_lock(&mutex_runstate); + if (error) + runstate = QUIT_ERROR; + else + runstate = QUIT; + + gettimeofday(&g_options.end_time, NULL); + pthread_mutex_unlock(&mutex_runstate); + // End critical section +} + +/** + * Check the runstate; to be called periodically by all threads. + * This function will call Thread_QuitProgram and change the Runstate there is an exit condition detected. + */ +Runstate Thread_Runstate() +{ + //TODO: Add real exit conditions; for testing purposes, set a timeout + /* + struct timeval time; + gettimeofday(&time, NULL); + Log(LOGDEBUG, "It has been %d seconds since program started.", time.tv_sec - g_options.start_time.tv_sec); + if (time.tv_sec - g_options.start_time.tv_sec > 3) + { + Thread_QuitProgram(false); + } + */ + + // Just reading the runstate doesn't really require a mutex + // The worst case: Another thread alters the runstate before this thread gets the result; this thread thinks the program is still running + // In that case, the thread will run an extra cycle of its loop and *then* exit. Since the runstate can only be changed once. + // We could put a mutex here anyway, but it will have an impact on how fast the loops can run. + return runstate; +} diff --git a/server/thread.h b/server/thread.h new file mode 100644 index 0000000..14ae785 --- /dev/null +++ b/server/thread.h @@ -0,0 +1,21 @@ +/** + * @file thread.h + * @purpose Declarations for thread control related functions and variables + */ + +#ifndef _THREAD_H +#define _THREAD_H + +#include "common.h" +#include + +typedef enum {QUIT, QUIT_ERROR, RUNNING} Runstate; + +/** Determine if the thread should exit; to be called periodically **/ +extern Runstate Thread_Runstate(); +/** Tell all other threads (when they call Thread_ExitCondition) to exit. Repeated calls have no effect. **/ +extern void Thread_QuitProgram(bool error); + +#endif //_THREAD_H + +//EOF diff --git a/server/valgrind.sh b/server/valgrind.sh new file mode 100755 index 0000000..2a50865 --- /dev/null +++ b/server/valgrind.sh @@ -0,0 +1,2 @@ +#!/bin/bash +valgrind --leak-check=full ./server