From: Sam Moore Date: Thu, 12 Sep 2013 15:39:23 +0000 (+0800) Subject: Refactor Sensor related code; introduce seperate functions for dealing with DataPoints X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=8f94dbb551783aad414b57ba6da4596f19dc80a6;p=matches%2FMCTX3420.git Refactor Sensor related code; introduce seperate functions for dealing with DataPoints DataFile acts as wrapper around whatever stores DataPoints. Currently wraps a binary file. Rewrote Sensor_Handler in anticipation of using new FCGI_ParseRequest function. Hopefully I can merge this easily :S --- diff --git a/server/Makefile b/server/Makefile index b674ab0..74ba473 100644 --- a/server/Makefile +++ b/server/Makefile @@ -2,7 +2,7 @@ CXX = gcc FLAGS = -std=c99 -Wall -Werror -pedantic -g LIB = -lfcgi -lssl -lcrypto -lpthread -lm -OBJ = log.o control.o sensor.o fastcgi.o thread.o main.o +OBJ = log.o control.o data.o fastcgi.o main.o sensor.o RM = rm -f BIN = server diff --git a/server/common.h b/server/common.h index 1275032..5b6be66 100644 --- a/server/common.h +++ b/server/common.h @@ -26,6 +26,10 @@ #include "log.h" #include "fastcgi.h" -#include "thread.h" + +/**Converts a timeval to a double**/ +#define TIMEVAL_TO_DOUBLE(tv) ((tv).tv_sec + 1e-6 * ((tv).tv_usec)) +/**Takes the tv1-tv2 between two timevals and returns the result as a double*/ +#define TIMEVAL_DIFF(tv1, tv2) ((tv1).tv_sec - (tv2).tv_sec + 1e-6 * ((tv1).tv_usec - (tv2).tv_usec)) #endif //_COMMON_H diff --git a/server/data.c b/server/data.c new file mode 100644 index 0000000..254b7bc --- /dev/null +++ b/server/data.c @@ -0,0 +1,297 @@ +/** + * @file data.c + * @purpose Implementation of data handling functions; saving, loading, displaying, selecting. + */ + +#include "data.h" +#include //TODO: Remove asserts + +/** + * One off initialisation of DataFile + * @param df - The DataFile + */ +void Data_Init(DataFile * df) +{ + // Everything is NULL + df->filename = NULL; + df->read_file = NULL; + df->write_file = NULL; +} + +/** + * Initialise a DataFile from a filename; opens read/write FILE* + * @param df - DataFile to initialise + * @param filename - Name of file; overwritten if it exists + */ +void Data_Open(DataFile * df, const char * filename) +{ + assert(filename != NULL); + assert(df != NULL); + + // Set the filename + df->filename = filename; + + // Set number of DataPoints + df->num_points = 0; + + // Set read FILE* + df->read_file = fopen(filename, "r"); + if (df->read_file == NULL) + { + Fatal("Error opening DataFile %s - %s", filename, strerror(errno)); + } + + // Set write FILE* + df->write_file = fopen(filename, "w"); + if (df->write_file == NULL) + { + Fatal("Error opening DataFile %s - %s", filename, strerror(errno)); + } +} + +/** + * Close a DataFile + * @param df - The DataFile to close + */ +void Data_Close(DataFile * df) +{ + assert(df != NULL); + + //TODO: Write data to TSV? + + // Clear the FILE*s + df->read_file = NULL; + df->write_file = NULL; + + // Clear the filename + df->filename = NULL; +} + +/** + * Save DataPoints to a DataFile + * @param df - The DataFile to save to + * @param buffer - Array of DataPoint(s) to save + * @param amount - Number of DataPoints in the buffer + */ +void Data_Save(DataFile * df, DataPoint * buffer, int amount) +{ + assert(df != NULL); + assert(buffer != NULL); + assert(amount >= 0); + + // Go to the end of the file + if (fseek(df->write_file, 0, SEEK_END) < 0) + { + Fatal("Error seeking to end of DataFile %s - %s", df->filename, strerror(errno)); + } + + // Attempt to write the DataPoints + int amount_written = fwrite(buffer, sizeof(DataPoint), amount, df->write_file); + + // Check if the correct number of points were written + if (amount_written != amount) + { + Fatal("Wrote %d points instead of %d to DataFile %s - %s", amount_written, amount, df->filename, strerror(errno)); + } + + // Update number of DataPoints + pthread_mutex_lock(&(df->mutex)); + df->num_points += amount_written; + pthread_mutex_unlock(&(df->mutex)); + +} + +/** + * Read DataPoints from a DataFile + * @param df - The DataFile to read from + * @param buffer - Array to fill with DataPoints + * @param index - Index to start reading at (inclusive) + * @param amount - Maximum number of DataPoints to read + * @returns - Actual number of points read (If smaller than amount, the end of the file was reached) + */ +int Data_Read(DataFile * df, DataPoint * buffer, int index, int amount) +{ + assert(df != NULL); + assert(buffer != NULL); + assert(index >= 0); + assert(amount > 0); + + // If we would read past the end of the file, reduce the amount of points to read + pthread_mutex_lock(&(df->mutex)); + if (index + amount >= df->num_points) + { + Log(LOGDEBUG, "Requested %d points but will only read %d to get to EOF", amount, df->num_points - index); + amount = df->num_points - index; + } + pthread_mutex_unlock(&(df->mutex)); + + // Go to position in file + if (fseek(df->read_file, index*sizeof(DataPoint), SEEK_SET)) + { + Fatal("Error seeking to position %d in DataFile %s - %s", index, df->filename, strerror(errno)); + } + + // Attempt to read the DataPoints + int amount_read = fread(buffer, sizeof(DataPoint), amount, df->read_file); + + // Check if correct number of points were read + if (amount_read != amount) + { + Fatal("Read %d points instead of %d from DataFile %s - %s", amount_read, amount, df->filename, strerror(errno)); + } + + return amount; +} + +/** + * Print data points between two indexes using a given format + * @param df - DataFile to print + * @param start_index - Index to start at (inclusive) + * @param end_index - Index to end at (inclusive) + * @param format - The format to use + */ +void Data_Print(DataFile * df, int start_index, int end_index, DataFormat format) +{ + assert(df != NULL); + assert(start_index >= 0); + assert(end_index <= df->num_points-1); + + const char * fmt_string; // Format for each data point + char seperator; // Character used to seperate successive data points + + // Determine what format string and seperator character to use + switch (format) + { + case JSON: + fmt_string = "[%f,%f]"; + seperator = ','; + // For JSON we need an opening bracket + FCGI_PrintRaw("["); + break; + case TSV: + fmt_string = "%f\t%f"; + seperator = '\n'; + break; + } + + DataPoint buffer[DATA_BUFSIZ]; // Buffer + + int index = start_index; + + // Repeat until all DataPoints are printed + while (index <= end_index) + { + // Fill the buffer from the DataFile + int amount_read = Data_Read(df, buffer, index, DATA_BUFSIZ); + + // Print all points in the buffer + for (int i = 0; i < amount_read; ++i) + { + // Print individual DataPoint + FCGI_PrintRaw(fmt_string, buffer[i].time_stamp, buffer[i].value); + + // Last seperator is not required + if (index+1 < end_index) + FCGI_PrintRaw("%c", seperator); + + // Advance the position in the DataFile + ++index; + } + } + + switch (format) + { + case JSON: + // For JSON we need a closing bracket + FCGI_PrintRaw("]"); + break; + default: + break; + } +} + +/** + * Print data points between two time stamps using a given format. + * Prints nothing if the time stamp + * @param df - DataFile to print + * @param start_time - Time to start from (inclusive) + * @param end_time - Time to end at (inclusive) + * @param format - The format to use + */ +void Data_PrintTimes(DataFile * df, double start_time, double end_time, DataFormat format) +{ + assert(df != NULL); + assert(start_time > 0); + assert(end_time > 0); + assert(end_time > start_time); + + DataPoint closest; + + // Get starting index + int start_index = Data_FindByTime(df, start_time, &closest); + + // Start time is greater than most recent time stamp + if (start_index >= df->num_points-1 && closest.time_stamp < start_time) + { + Data_Print(df, 0, 0, format); // Will print "empty" dataset + return; + } + + // Get finishing index + int end_index = Data_FindByTime(df, end_time, &closest); + + // Print data between the indexes + Data_Print(df, start_index, end_index, format); +} + +/** + * Get the index of the DataPoint closest to a given time stamp + * @param df - DataFile to search + * @param time_stamp - The time stamp to search for + * @param closest - If not NULL, will be filled with the DataPoint chosen + * @returns index of DataPoint with the *closest* time stamp to that given + */ +int Data_FindByTime(DataFile * df, double time_stamp, DataPoint * closest) +{ + assert(df != NULL); + assert(time_stamp >= 0); + assert(closest != NULL); + + DataPoint tmp; // Current DataPoint in binary search + + int lower = 0; // lower index in binary search + pthread_mutex_lock(&(df->mutex)); + int upper = df->num_points - 1; // upper index in binary search + pthread_mutex_unlock(&(df->mutex)); + int index = 0; // current index in binary search + + // Commence binary search: + while (upper - lower > 1) + { + // Pick midpoint + index = lower + ((upper - lower)/2); + + // Look at DataPoint + if (Data_Read(df, &tmp, index, 1) != 1) + { + Fatal("Couldn't read DataFile %s at index %d", df->filename, index); + } + + // Change search interval to either half appropriately + if (tmp.time_stamp > time_stamp) + { + upper = index; + } + else if (tmp.time_stamp < time_stamp) + { + lower = index; + } + } + + // Store closest DataPoint + if (closest != NULL) + *closest = tmp; + + return index; + +} diff --git a/server/data.h b/server/data.h new file mode 100644 index 0000000..726af5c --- /dev/null +++ b/server/data.h @@ -0,0 +1,54 @@ +/** + * @file datapoint.h + * @purpose Declaration of data handling functions; saving, loading, displaying, selecting. + */ + +#ifndef _DATAPOINT_H +#define _DATAPOINT_H + +#define DATA_BUFSIZ 10 // Size to use for DataPoint buffers (TODO: Optimise) + + +#include "common.h" + +/** Structure to represent a time, value DataPoint **/ +typedef struct +{ + /** Time at which data was taken **/ + double time_stamp; + /** Value of data **/ + double value; +} DataPoint; + +/** Enum of output format types for DataPoints **/ +typedef enum +{ + JSON, // JSON data + TSV // Tab seperated vector +} DataFormat; + +/** + * Structure to represent a collection of data. + * All operations involving this structure are thread safe. + * NOTE: It is essentially a wrapper around a binary file. + */ +typedef struct +{ + FILE * read_file; // used for reading + FILE * write_file; // used for writing + int num_points; // Number of DataPoints in the file + const char * filename; // Name of the file + pthread_mutex_t mutex; // Mutex around num_points +} DataFile; + + +extern void Data_Init(DataFile * df); // One off initialisation of DataFile +extern void Data_Open(DataFile * df, const char * filename); // Open data file +extern void Data_Close(DataFile * df); +extern void Data_Save(DataFile * df, DataPoint * buffer, int amount); // Save data to file +extern int Data_Read(DataFile * df, DataPoint * buffer, int index, int amount); // Retrieve data from file +extern void Data_PrintByIndexes(DataFile * df, int start_index, int end_index, DataFormat format); // Print data buffer +extern void Data_PrintByTimes(DataFile * df, double start_time, double end_time, DataFormat format); // Print data between time values +extern int Data_FindByTime(DataFile * df, double time_stamp, DataPoint * closest); // Find index of DataPoint with the closest timestamp to that given + +#endif //_DATAPOINT_H diff --git a/server/fastcgi.c b/server/fastcgi.c index 1bd520b..4f47584 100644 --- a/server/fastcgi.c +++ b/server/fastcgi.c @@ -320,18 +320,8 @@ void * FCGI_RequestLoop (void *data) FCGIContext context = {0}; Log(LOGDEBUG, "First request..."); - //TODO: The FCGI_Accept here is blocking. - // That means that if another thread terminates the program, this thread - // will not terminate until the next request is made. while (FCGI_Accept() >= 0) { - if (Thread_Runstate() != RUNNING) - { - //TODO: Yeah... deal with this better :P - Log(LOGERR, "FIXME; FCGI gets request after other threads have finished."); - printf("Content-type: text/plain\r\n\r\n+++OUT OF CHEESE ERROR+++\n"); - break; - } Log(LOGDEBUG, "Got request #%d", context.response_number); ModuleHandler module_handler = NULL; @@ -370,7 +360,6 @@ void * FCGI_RequestLoop (void *data) } Log(LOGDEBUG, "Thread exiting."); - Thread_QuitProgram(false); // NOTE: Don't call pthread_exit, because this runs in the main thread. Just return. return NULL; } diff --git a/server/main.c b/server/main.c index b202418..d1aaa4c 100644 --- a/server/main.c +++ b/server/main.c @@ -41,7 +41,7 @@ void SignalHandler(int signal) // At the moment just always exit. // Call `exit` so that Cleanup will be called to... clean up. Log(LOGWARN, "Got signal %d (%s). Exiting.", signal, strsignal(signal)); - Thread_QuitProgram(false); + //exit(signal); } @@ -75,15 +75,14 @@ int main(int argc, char ** argv) signal(signals[i], SignalHandler); } */ - Sensor_Spawn(); + Sensor_Init(); + Sensor_StartAll("test"); // run request thread in the main thread FCGI_RequestLoop(NULL); - // Join the dark side, Luke - // *cough* - // Join the sensor threads - Sensor_Join(); + Sensor_StopAll(); + Cleanup(); return 0; } diff --git a/server/sensor.c b/server/sensor.c index 9055409..2e08a30 100644 --- a/server/sensor.c +++ b/server/sensor.c @@ -11,51 +11,102 @@ /** Array of sensors, initialised by Sensor_Init **/ static Sensor g_sensors[NUMSENSORS]; //global to this file + +/** Human readable names for the sensors **/ const char * g_sensor_names[NUMSENSORS] = { "analog_test0", "analog_test1", "digital_test0", "digital_test1" }; /** - * Read a data value from a sensor; block until value is read - * @param sensor_id - The ID of the sensor - * @param d - DataPoint to set - * @returns NULL for digital sensors when data is unchanged, otherwise d + * One off initialisation of *all* sensors */ -DataPoint * GetData(SensorId sensor_id, DataPoint * d) +void Sensor_Init() { - // switch based on the sensor_id at the moment for testing; - // might be able to just directly access ADC from sensor_id? - //TODO: Implement for real sensors + for (int i = 0; i < NUMSENSORS; ++i) + { + g_sensors[i].id = i; + Data_Init(&(g_sensors[i].data_file)); + g_sensors[i].record_data = false; + } +} +/** + * Start a Sensor recording DataPoints + * @param s - The Sensor to start + * @param experiment_name - Prepended to DataFile filename + */ +void Sensor_Start(Sensor * s, const char * experiment_name) +{ + char filename[BUFSIZ]; + if (sprintf(filename, "%s_%d", experiment_name, s->id) >= BUFSIZ) + { + Fatal("Experiment name \"%s\" too long (>%d)", experiment_name, BUFSIZ); + } + Data_Open(&(s->data_file), filename); + + pthread_create(&(s->thread), NULL, Sensor_Loop, (void*)(s)); +} + +/** + * Stop a Sensor from recording DataPoints. Blocks until it has stopped. + * @param s - The Sensor to stop + */ +void Sensor_Stop(Sensor * s) +{ + if (s->record_data) + { + s->record_data = false; + pthread_join(s->thread, NULL); + Data_Close(&(s->data_file)); + } +} + +/** + * Stop all Sensors + */ +void Sensor_StopAll() +{ + for (int i = 0; i < NUMSENSORS; ++i) + Sensor_Stop(g_sensors+i); +} + +/** + * Start all Sensors + */ +void Sensor_StartAll(const char * experiment_name) +{ + for (int i = 0; i < NUMSENSORS; ++i) + Sensor_Start(g_sensors+i, experiment_name); +} + +/** + * Read a DataPoint from a Sensor; block until value is read + * @param id - The ID of the sensor + * @param d - DataPoint to set + * @returns True if the DataPoint was different from the most recently recorded. + */ +bool Sensor_Read(Sensor * s, DataPoint * d) +{ - //TODO: We should ensure the time is *never* allowed to change on the server if we use gettimeofday - // Another way people might think of getting the time is to count CPU cycles with clock() - // But this will not work because a) CPU clock speed may change on some devices (RPi?) and b) It counts cycles used by all threads - + // Set time stamp struct timeval t; gettimeofday(&t, NULL); - d->time_stamp = (t.tv_sec - g_options.start_time.tv_sec) + 1e-6*(t.tv_usec - g_options.start_time.tv_usec); + d->time_stamp = TIMEVAL_DIFF(t, g_options.start_time); - // Make time relative - //d->time_stamp.tv_sec -= g_options.start_time.tv_sec; - //d->time_stamp.tv_usec -= g_options.start_time.tv_usec; - - switch (sensor_id) + // Read value based on Sensor Id + switch (s->id) { case ANALOG_TEST0: + d->value = (double)(rand() % 100) / 100; + break; + + case ANALOG_TEST1: { - //CheckSensor( sensor_id, *sensor value*); - static int count = 0; d->value = count++; break; } - case ANALOG_TEST1: - d->value = (double)(rand() % 100) / 100; - break; - - //TODO: For digital sensors, consider only updating when sensor is actually changed case DIGITAL_TEST0: d->value = t.tv_sec % 2; break; @@ -63,21 +114,32 @@ DataPoint * GetData(SensorId sensor_id, DataPoint * d) d->value = (t.tv_sec+1)%2; break; default: - Fatal("Unknown sensor id: %d", sensor_id); + Fatal("Unknown sensor id: %d", s->id); break; } usleep(100000); // simulate delay in sensor polling - return d; + // Perform sanity check based on Sensor's ID and the DataPoint + Sensor_CheckData(s->id, d); + + // Update latest DataPoint if necessary + bool result = (d->value != s->newest_data.value); + if (result) + { + s->newest_data.time_stamp = d->time_stamp; + } + return result; } /** * Checks the sensor data for unsafe or unexpected results * @param sensor_id - The ID of the sensor - * -* -void CheckSensor( SensorId sensor_id) + * @param d - DataPoint to check + */ +void Sensor_CheckData(SensorId id, DataPoint * d) { + //TODO: Implement + /* switch (sensor_id) { case ANALOG_TEST0: @@ -94,249 +156,31 @@ void CheckSensor( SensorId sensor_id) } } } - - -*/ - - -/** - * Destroy a sensor - * @param s - Sensor to destroy - */ -void Destroy(Sensor * s) -{ - // Maybe move the binary file into long term file storage? - fclose(s->file); + */ } - - - -/** - * Initialise a sensor - * @param s - Sensor to initialise - */ -void Init(Sensor * s, int id) -{ - s->write_index = 0; - s->id = id; - s->points_written = 0; - s->points_read = 0; - - #define FILENAMESIZE 3 - char filename[FILENAMESIZE]; - if (s->id >= pow(10, FILENAMESIZE)) - { - Fatal("Too many sensors! FILENAMESIZE is %d; increase it and recompile.", FILENAMESIZE); - } - - pthread_mutex_init(&(s->mutex), NULL); - sprintf(filename, "%d", s->id); - unlink(filename); //TODO: Move old files somewhere - - s->file = fopen(filename, "a+b"); // open binary file - Log(LOGDEBUG, "Initialised sensor %d; binary file is \"%s\"", id, filename); -} - /** - * Run the main sensor polling loop + * Record data from a single Sensor; to be run in a seperate thread * @param arg - Cast to Sensor* - Sensor that the thread will handle * @returns NULL (void* required to use the function with pthreads) */ -void * Sensor_Main(void * arg) +void * Sensor_Loop(void * arg) { Sensor * s = (Sensor*)(arg); - while (Thread_Runstate() == RUNNING) //TODO: Exit condition - { - // The sensor will write data to a buffer until it is full - // Then it will open a file and dump the buffer to the end of it. - // Rinse and repeat - - // The reason I've added the buffer is because locks are expensive - // But maybe it's better to just write data straight to the file - // I'd like to do some tests by changing SENSOR_DATABUFSIZ - - while (s->write_index < SENSOR_DATABUFSIZ) - { - DataPoint * d = &(s->buffer[s->write_index]); - if (GetData(s->id, d) == NULL) - { - Fatal("Error collecting data"); - } - s->write_index += 1; - } - - //Log(LOGDEBUG, "Filled buffer"); - - // CRITICAL SECTION (no threads should be able to read/write the file at the same time) - pthread_mutex_lock(&(s->mutex)); - //TODO: Valgrind complains about this fseek: "Syscall param write(buf) points to uninitialised byte(s)" - // Not sure why, but we should find out and fix it. - fseek(s->file, 0, SEEK_END); - int amount_written = fwrite(s->buffer, sizeof(DataPoint), SENSOR_DATABUFSIZ, s->file); - if (amount_written != SENSOR_DATABUFSIZ) - { - Fatal("Wrote %d data points and expected to write %d to \"%s\" - %s", amount_written, SENSOR_DATABUFSIZ, strerror(errno)); - } - s->points_written += amount_written; - //Log(LOGDEBUG, "Wrote %d data points for sensor %d", amount_written, s->id); - pthread_mutex_unlock(&(s->mutex)); - // End of critical section - - s->write_index = 0; // reset position in buffer - - } - Log(LOGDEBUG, "Thread for sensor %d exits", s->id); - return NULL; -} - -/** - * Get position in a binary sensor file with a timestamp using a binary search - * @param s - Sensor to use - * @param time_stamp - Timestamp - * @param count - If not NULL, used to provide number of searches required - * @param found - If not NULL, set to the closest DataPoint - * @returns Integer giving the *closest* index in the file - * TODO: Refactor or replace? - */ -int FindTime(Sensor * s, double time_stamp, int * count, DataPoint * found) -{ - DataPoint d; - - int lower = 0; - int upper = s->points_written - 1; - int index = 0; - if (count != NULL) - *count = 0; - - while (upper - lower > 1) + // Until the sensor is stopped, record data points + while (s->record_data) { - index = lower + ((upper - lower)/2); - - // Basically anything with fseek is critical; if we don't make it critical the sensor thread may alter data at a random point in the file! - // CRITICAL SECTION (May need to rethink how this is done, but I can't see how to do it without fseek :S) - // Regarding the suggestion that we have 2 file pointers; one for reading and one for writing: - // That seems like it will work... but we will have to be very careful and test it first - pthread_mutex_lock(&s->mutex); - fseek(s->file, index*sizeof(DataPoint), SEEK_SET); - int amount_read = fread(&d, sizeof(DataPoint), 1, s->file); - pthread_mutex_unlock(&s->mutex); - - if (amount_read != 1) - { - Fatal("Couldn't read single data point from sensor %d", s->id); - } - - if (d.time_stamp > time_stamp) + DataPoint d; + if (Sensor_Read(s, &d)) // If new DataPoint is read: { - upper = index; + Data_Save(&(s->data_file), &d, 1); // Record it } - else if (d.time_stamp < time_stamp) - { - lower = index; - } - if (count != NULL) - *count += 1; } - - if (found != NULL) - *found = d; - - return index; -} - -/** - * Print sensor data between two indexes in the file, using a given format - * @param s - Sensor to use - * @param start - Start index - * @param end - End index - * @param output_type - JSON, CSV or TSV output format - */ -void PrintData(Sensor * s, int start, int end, OutputType output_type) -{ - DataPoint buffer[SENSOR_QUERYBUFSIZ]; - int index = start; - - if (output_type == JSON) - { - FCGI_JSONValue("["); - } - - - while (index < end) - { - int to_read = end - index; - if (to_read > SENSOR_QUERYBUFSIZ) - { - to_read = SENSOR_QUERYBUFSIZ; - } - - int amount_read = 0; - // CRITICAL SECTION - pthread_mutex_lock(&(s->mutex)); - - fseek(s->file, index*sizeof(DataPoint), SEEK_SET); - amount_read = fread(buffer, sizeof(DataPoint), to_read, s->file); - - pthread_mutex_unlock(&(s->mutex)); - // End critical section - - if (amount_read != to_read) - { - Fatal("Failed to read %d DataPoints from sensor %d; read %d instead", to_read, s->id, amount_read); - } - - // Print the data - for (int i = 0; i < amount_read; ++i) - { - //TODO: Reformat? - switch (output_type) - { - case JSON: - FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value); - if (i+1 < amount_read) - FCGI_JSONValue(","); - break; - case CSV: - FCGI_PrintRaw("%f,%f\n", buffer[i].time_stamp, buffer[i].value); - break; - case TSV: - default: - FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value); - break; - } - } - index += amount_read; - } - - if (output_type == JSON) - { - FCGI_JSONValue("]"); - } -} - -/** - * Fill buffer with most recent sensor data - * TODO: This may be obselete; remove? - * @param s - Sensor to use - * @param buffer - Buffer to fill - * @param bufsiz - Size of buffer to fill - * @returns The number of DataPoints actually read - */ -int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz) -{ - int amount_read = 0; - //CRITICAL SECTION (Don't access file while sensor thread is writing to it!) - pthread_mutex_lock(&(s->mutex)); - - fseek(s->file, -bufsiz*sizeof(DataPoint), SEEK_END); - amount_read = fread(buffer, sizeof(DataPoint), bufsiz, s->file); - //Log(LOGDEBUG, "Read %d data points", amount_read); - pthread_mutex_unlock(&(s->mutex)); - return amount_read; + // Needed to keep pthreads happy + return NULL; } /** @@ -362,243 +206,136 @@ Sensor * Sensor_Identify(const char * id_str) return g_sensors+id; } +/** + * Helper: Begin sensor response in a given format + * @param context - the FCGIContext + * @param format - Format + * @param id - ID of sensor + */ +void Sensor_BeginResponse(FCGIContext * context, SensorId id, DataFormat format) +{ + // Begin response + switch (format) + { + case JSON: + FCGI_BeginJSON(context, STATUS_OK); + FCGI_JSONLong("id", id); + FCGI_JSONKey("data"); + break; + default: + FCGI_PrintRaw("Content-type: text/plain\r\n\r\n"); + break; + } +} + +/** + * Helper: End sensor response in a given format + * @param context - the FCGIContext + * @param id - ID of the sensor + * @param format - Format + */ +void Sensor_EndResponse(FCGIContext * context, SensorId id, DataFormat format) +{ + // End response + switch (format) + { + case JSON: + FCGI_EndJSON(); + break; + default: + break; + } +} + /** * Handle a request to the sensor module * @param context - The context to work in * @param params - Parameters passed - * TODO: Seriously need to write more helper functions and decrease the size of this function! */ void Sensor_Handler(FCGIContext *context, char * params) { - StatusCodes status = STATUS_OK; - - OutputType output_type = JSON; - - - - const char * key; const char * value; - - Sensor * sensor = NULL; - struct timeval now; gettimeofday(&now, NULL); - - double start_time = -1; - double end_time = -1; - double current_time = (now.tv_sec - g_options.start_time.tv_sec) + 1e-6*(now.tv_usec - g_options.start_time.tv_usec); - bool seek_time = false; - bool points_specified = false; - int query_size = SENSOR_QUERYBUFSIZ; - int start_index = -1; - int end_index = -1; - - - while ((params = FCGI_KeyPair(params, &key, &value)) != NULL) + double current_time = TIMEVAL_DIFF(now, g_options.start_time); + + double start_time = 0; + double end_time = current_time; + char * fmt_str; + + // key/value pairs + FCGIValue values[] = { + {"id", &id, FCGI_REQUIRED(FCGI_INT_T)}, + {"format", &fmt_str, FCGI_STRING_T}, + {"start_time", &start_time, FCGI_DOUBLE_T}, + {"end_time", &end_time, FCGI_DOUBLE_T}, + }; + + // enum to avoid the use of magic numbers + typedef enum { + ID, + FORMAT, + START_TIME, + END_TIME, + } SensorParams; + + // Fill values appropriately + if (!FCGI_ParseRequest(context, params, values, sizeof(values))) { - Log(LOGDEBUG, "Got key=%s and value=%s", key, value); - if (strcmp(key, "id") == 0) - { - if (sensor != NULL) - { - Log(LOGERR, "Only one sensor id should be specified"); - status = STATUS_ERROR; - break; - } - if (*value == '\0') - { - Log(LOGERR, "No id specified."); - status = STATUS_ERROR; - break; - } - - sensor = Sensor_Identify(value); - if (sensor == NULL) - { - Log(LOGERR, "Invalid sensor id: %s", value); - status = STATUS_ERROR; - break; - } - } - else if (strcmp(key, "format") == 0) - { - if (strcmp(value, "json") == 0) - output_type = JSON; - else if (strcmp(value, "csv") == 0) - output_type = CSV; - else if (strcmp(value, "tsv") == 0) - output_type = TSV; - } - else if (strcmp(key, "points") == 0) - { - points_specified = true; - if (strcmp(value, "all") == 0) - { - query_size = sensor->points_written; - } - else - { - char * end; - query_size = strtol(value, &end, 10); - if (*end != '\0') - { - Log(LOGERR, "Require \"all\" or an integer value: %s = %s", key, value); - status = STATUS_ERROR; - break; - } - } - - } - else if (strcmp(key, "start_time") == 0) - { - seek_time = true; - char * end; - start_time = strtod(value, &end); - if (*end != '\0') - { - Log(LOGERR, "Require a double: %s = %s", key, value); - status = STATUS_ERROR; - break; - } - - // Treat negative values as being relative to the current time - if (start_time < 0) - { - start_time = current_time + start_time; - } - start_time = floor(start_time); - } - else if (strcmp(key, "end_time") == 0) - { - seek_time = true; - char * end; - end_time = strtod(value, &end); - if (*end != '\0') - { - Log(LOGERR, "Require a double: %s = %s", key, value); - status = STATUS_ERROR; - break; - } - - // Treat negative values as being relative to the current time - if (end_time < 0) - { - end_time = current_time + end_time; - } - end_time = ceil(end_time); - } - // For backward compatability: - else if (strcmp(key, "dump") == 0) - { - output_type = TSV; - query_size = sensor->points_written+1; - } - else - { - Log(LOGERR, "Unknown key \"%s\" (value = %s)", key, value); - status = STATUS_ERROR; - break; - } + // Error occured; FCGI_RejectJSON already called + return; } - if (status != STATUS_ERROR && sensor == NULL) + // Error checking on sensor id + if (id < 0 || id >= NUMSENSORS) { - Log(LOGERR, "No valid sensor id given"); - status = STATUS_ERROR; + Log(LOGERR, "Invalid id %d", id); + FCGI_RejectJSON(); // Whoops, I do still need this! } - if (status == STATUS_ERROR) + // Check if format type was specified + if (FCGI_RECEIVED(values[FORMAT].flags)) { - FCGI_RejectJSON(context, "Invalid input parameters"); - return; + if (strcmp(fmt_str, "json") == 0) + format = JSON; + else if (strcmp(fmt_str, "tsv") == 0) + format = TSV; + else + Log(LOGERR, "Unknown format type \"%s\"", fmt_str); } - - if (seek_time) + // Get Sensor + Sensor * s = g_sensors[id]; + + // Begin response + Sensor_BeginResponse(context, id, format); + + // If a time was specified + if (FCGI_RECEIVED(values[START_TIME].flags) || FCGI_RECEIVED(values[END_TIME].flags)) { - if (end_time < 0 && !points_specified) - end_index = sensor->points_written; - else - { - int count = 0; DataPoint d; - end_index = FindTime(sensor, end_time, &count, &d); - Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", end_time, d.time_stamp, d.value, count, sensor->id, end_index); - } + // Wrap times relative to the current time if (start_time < 0) - start_time = 0; - else - { - int count = 0; DataPoint d; - start_index = FindTime(sensor, start_time, &count, &d); - Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", start_time, d.time_stamp, d.value, count, sensor->id, start_index); - } + start_time += current_time; + if (end_time < 0) + end_time += current_time; + + // Print points by time range + Data_PrintByTimes(&(s->data_file), start_time, end_time, format); - if (points_specified) - end_index = start_index + query_size; - } - else - { - start_index = sensor->points_written - query_size; - - end_index = sensor->points_written; - } - - if (start_index < 0) - { - Log(LOGNOTE, "start_index = %d => Clamped to 0", start_index); - start_index = 0; } - if (end_index > sensor->points_written) + else // No time was specified; just return a recent set of points { - Log(LOGNOTE, "end_index = %d => Clamped to %d", end_index, sensor->points_written); - end_index = sensor->points_written; + pthread_mutex_lock(&(s->data_file.mutex)); + int start_index = s->data_file.num_points-DATA_BUFSIZ; + int end_index = s->data_file.num_points-1; + pthread_mutex_unlock(&(s->data_file.mutex)); + + // Print points by indexes + Data_PrintByIndexes(&(s->data_file), start_index, end_index, format); } - switch (output_type) - { - case JSON: - FCGI_BeginJSON(context, status); - FCGI_JSONLong("id", sensor->id); - FCGI_JSONKey("data"); - PrintData(sensor, start_index, end_index, output_type); - FCGI_EndJSON(); - break; - default: - FCGI_PrintRaw("Content-type: text/plain\r\n\r\n"); - PrintData(sensor, start_index, end_index, output_type); - //Force download with content-disposition - // Sam: This is cool, but I don't think we should do it - // - letting the user view it in the browser and then save with their own filename is more flexible - //"Content-disposition: attachment;filename=%d.csv\r\n\r\n", sensor->id); - break; - } + // Finish response + Sensor_EndResponse(context, id, format); } -/** - * Setup Sensors, start Sensor polling thread(s) - */ -void Sensor_Spawn() -{ - // start sensor threads - for (int i = 0; i < NUMSENSORS; ++i) - { - Init(g_sensors+i, i); - pthread_create(&(g_sensors[i].thread), NULL, Sensor_Main, (void*)(g_sensors+i)); - } -} -/** - * Quit Sensor loops - */ -void Sensor_Join() -{ - if (!Thread_Runstate()) - { - Fatal("This function should not be called before Thread_QuitProgram"); - } - for (int i = 0; i < NUMSENSORS; ++i) - { - pthread_join(g_sensors[i].thread, NULL); - Destroy(g_sensors+i); - } -} diff --git a/server/sensor.h b/server/sensor.h index 36fdb27..a499b77 100644 --- a/server/sensor.h +++ b/server/sensor.h @@ -6,80 +6,67 @@ #ifndef _SENSOR_H #define _SENSOR_H -/** Number of data points to keep in sensor buffers **/ -#define SENSOR_DATABUFSIZ 10 -/** Size of the query buffer. @see Sensor_Handler **/ -#define SENSOR_QUERYBUFSIZ 10 +#include "data.h" /** Number of sensors **/ #define NUMSENSORS 4 /** Safety Values for sensors **/ +//TODO: Probably better to use an array instead #define ANALOG_TEST0_SAFETY 1000 #define ANALOG_TEST1_SAFETY 1000 #define DIGITAL_TEST0_SAFETY 1 #define DIGITAL_TEST1_SAFETY 1 -typedef enum SensorId { +typedef enum SensorId +{ ANALOG_TEST0, ANALOG_TEST1, DIGITAL_TEST0, DIGITAL_TEST1 } SensorId; -typedef enum -{ - JSON, // JSON data - CSV, // Comma seperated vector - TSV // Tab seperated vector -} OutputType; + /** Human readable names for the sensors **/ extern const char * g_sensor_names[NUMSENSORS]; -/** Structure to represent data recorded by a sensor at an instant in time **/ -typedef struct -{ - /** Time at which data was taken **/ - double time_stamp; - /** Value of data **/ - double value; -} DataPoint; /** Structure to represent a sensor **/ typedef struct { /** ID number of the sensor **/ SensorId id; - /** Buffer to store data from the sensor **/ - DataPoint buffer[SENSOR_DATABUFSIZ]; - /** Index of last point written in the data buffer **/ - int write_index; - /** Number of points read **/ - long points_read; - /** Number of points written to file **/ - long points_written; - /** Binary file to write data into when buffer is full **/ - FILE * file; - /** Thread running the sensor **/ + /** DataFile to store sensor values in **/ + DataFile data_file; + /** Indicates whether the Sensor should record data **/ + bool record_data; + /** Thread the Sensor is running in **/ pthread_t thread; - /** Mutex to protect access to stuff **/ - pthread_mutex_t mutex; + /** Most recently recorded data **/ + DataPoint newest_data; } Sensor; +extern void Sensor_Init(); // One off initialisation of *all* sensors + +extern void Sensor_StartAll(const char * experiment_name); // Start all Sensors recording data +extern void Sensor_StopAll(); // Stop all Sensors recording data +extern void Sensor_Start(Sensor * s, const char * experiment_name); // Start a sensor recording datas +extern void Sensor_Stop(Sensor * s); // Stop a Sensor from recording data + +extern void * Sensor_Loop(void * args); // Main loop for a thread that handles a Sensor +extern bool Sensor_Read(Sensor * s, DataPoint * d); // Read a single DataPoint, indicating if it has changed since the last one +extern void Sensor_CheckData(SensorId id, DataPoint * d); // Check a DataPoint +extern Sensor * Sensor_Identify(const char * str); // Identify a Sensor from a string Id -extern void Sensor_Spawn(); // Initialise sensor -extern void Sensor_Join(); //Join sensor threads -extern void * Sensor_Main(void * args); // main loop for sensor thread; pass a Sensor* cast to void* +extern void Sensor_Handler(FCGIContext *context, char * params); // Handle a FCGI request for Sensor data -extern int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz); // fill buffer with sensor data -extern void Sensor_Handler(FCGIContext *context, char * params); #endif //_SENSOR_H diff --git a/server/thread.c b/server/thread.c deleted file mode 100644 index f1d0bb3..0000000 --- a/server/thread.c +++ /dev/null @@ -1,62 +0,0 @@ -/** - * @file thread.c - * @brief Implementation of thread control - */ - -#include "thread.h" -#include "options.h" - -pthread_mutex_t mutex_runstate = PTHREAD_MUTEX_INITIALIZER; -Runstate runstate = RUNNING; - -/** - * Set the runstate, causing all threads to exit when they next check Thread_Runstate - * Repeated calls to this function have no effect on the runstate. - * @param error - Set to true to indicate an error occured - */ -void Thread_QuitProgram(bool error) -{ - if (runstate != RUNNING) - { - Log(LOGNOTE, "Called when program is not running; runstate = %d", runstate); - return; - } - - - Log(LOGNOTE, "Program will quit; error = %d", (int)error); - - //CRITICAL SECTION - We do NOT want multiple threads editing the runstate at the same time! - pthread_mutex_lock(&mutex_runstate); - if (error) - runstate = QUIT_ERROR; - else - runstate = QUIT; - - gettimeofday(&g_options.end_time, NULL); - pthread_mutex_unlock(&mutex_runstate); - // End critical section -} - -/** - * Check the runstate; to be called periodically by all threads. - * This function will call Thread_QuitProgram and change the Runstate there is an exit condition detected. - */ -Runstate Thread_Runstate() -{ - //TODO: Add real exit conditions; for testing purposes, set a timeout - /* - struct timeval time; - gettimeofday(&time, NULL); - Log(LOGDEBUG, "It has been %d seconds since program started.", time.tv_sec - g_options.start_time.tv_sec); - if (time.tv_sec - g_options.start_time.tv_sec > 3) - { - Thread_QuitProgram(false); - } - */ - - // Just reading the runstate doesn't really require a mutex - // The worst case: Another thread alters the runstate before this thread gets the result; this thread thinks the program is still running - // In that case, the thread will run an extra cycle of its loop and *then* exit. Since the runstate can only be changed once. - // We could put a mutex here anyway, but it will have an impact on how fast the loops can run. - return runstate; -} diff --git a/server/thread.h b/server/thread.h deleted file mode 100644 index 1c14c9a..0000000 --- a/server/thread.h +++ /dev/null @@ -1,21 +0,0 @@ -/** - * @file thread.h - * @brief Declarations for thread control related functions and variables - */ - -#ifndef _THREAD_H -#define _THREAD_H - -#include "common.h" -#include - -typedef enum {QUIT, QUIT_ERROR, RUNNING} Runstate; - -/** Determine if the thread should exit; to be called periodically **/ -extern Runstate Thread_Runstate(); -/** Tell all other threads (when they call Thread_ExitCondition) to exit. Repeated calls have no effect. **/ -extern void Thread_QuitProgram(bool error); - -#endif //_THREAD_H - -//EOF