From e27bcae7779827a3858cf82d18ede9c66ee6d950 Mon Sep 17 00:00:00 2001 From: Sam Moore Date: Mon, 9 Sep 2013 11:22:55 +0800 Subject: [PATCH] Improve Server API for transferring sensor data - Specify format as JSON, TSV or CSV: http://localhost/api/sensors?id=0&format=tsv - Default is JSON - Currently lower case; make case insensitive? - Specify number of points (most recent) to transfer: http://localhost/api/sensors?id=0&points=all - Default is SENSOR_QUERYBUFSIZ (10) - "all" - Will transfer *all* points - Currently incompatable with the time seeking options (TODO: Make compatable?) - Specify time range to return data points: http://localhost/api/sensors?id=0&start_time=5&end_time=6 - Default behaviour is to just return most recent X points; see above - Currently no options for relative (eg: last 5s) time arguments (TODO: Include relative times?)\ Consider: - Remove critical sections by using one FILE* for writing and one for reading - A critical section is needed around every call to fseek, because the writing thread may write suddenly to the wrong position - Will having 2 FILE* mean file can be written and read independently? - Need to be sure that this is actually thread safe, otherwise it is more work for no reward - Need to simplify Sensor_Handler - Added some helper functions, but probably need more - Need to do some testing of this API in a test GUI --- server/sensor.c | 300 ++++++++++++++++++++++++++++++++++++++++-------- server/sensor.h | 9 ++ 2 files changed, 258 insertions(+), 51 deletions(-) diff --git a/server/sensor.c b/server/sensor.c index 1e27b4f..bcf1fcf 100644 --- a/server/sensor.c +++ b/server/sensor.c @@ -119,6 +119,8 @@ void Init(Sensor * s, int id) { s->write_index = 0; s->id = id; + s->points_written = 0; + s->points_read = 0; #define FILENAMESIZE 3 char filename[FILENAMESIZE]; @@ -178,6 +180,7 @@ void * Sensor_Main(void * arg) { Fatal("Wrote %d data points and expected to write %d to \"%s\" - %s", amount_written, SENSOR_DATABUFSIZ, strerror(errno)); } + s->points_written += amount_written; //Log(LOGDEBUG, "Wrote %d data points for sensor %d", amount_written, s->id); pthread_mutex_unlock(&(s->mutex)); // End of critical section @@ -189,8 +192,135 @@ void * Sensor_Main(void * arg) return NULL; } +/** + * Get position in a binary sensor file with a timestamp using a binary search + * @param s - Sensor to use + * @param time_stamp - Timestamp + * @param count - If not NULL, used to provide number of searches required + * @param found - If not NULL, set to the closest DataPoint + * @returns Integer giving the *closest* index in the file + * TODO: Refactor or replace? + */ +int FindTime(Sensor * s, double time_stamp, int * count, DataPoint * found) +{ + DataPoint d; + + int lower = 0; + int upper = s->points_written - 1; + int index = 0; + if (count != NULL) + *count = 0; + + while (upper - lower > 1) + { + index = lower + ((upper - lower)/2); + + // Basically anything with fseek is critical; if we don't make it critical the sensor thread may alter data at a random point in the file! + // CRITICAL SECTION (May need to rethink how this is done, but I can't see how to do it without fseek :S) + // Regarding the suggestion that we have 2 file pointers; one for reading and one for writing: + // That seems like it will work... but we will have to be very careful and test it first + pthread_mutex_lock(&s->mutex); + fseek(s->file, index*sizeof(DataPoint), SEEK_SET); + int amount_read = fread(&d, sizeof(DataPoint), 1, s->file); + pthread_mutex_unlock(&s->mutex); + + if (amount_read != 1) + { + Fatal("Couldn't read single data point from sensor %d", s->id); + } + + if (d.time_stamp > time_stamp) + { + upper = index; + } + else if (d.time_stamp < time_stamp) + { + lower = index; + } + if (count != NULL) + *count += 1; + } + + if (found != NULL) + *found = d; + + return index; + +} + +/** + * Print sensor data between two indexes in the file, using a given format + * @param s - Sensor to use + * @param start - Start index + * @param end - End index + * @param output_type - JSON, CSV or TSV output format + */ +void PrintData(Sensor * s, int start, int end, OutputType output_type) +{ + DataPoint buffer[SENSOR_QUERYBUFSIZ]; + int index = start; + + if (output_type == JSON) + { + FCGI_JSONValue("["); + } + + + while (index < end) + { + int to_read = end - index; + if (to_read > SENSOR_QUERYBUFSIZ) + { + to_read = SENSOR_QUERYBUFSIZ; + } + + int amount_read = 0; + // CRITICAL SECTION + pthread_mutex_lock(&(s->mutex)); + + fseek(s->file, index*sizeof(DataPoint), SEEK_SET); + amount_read = fread(buffer, sizeof(DataPoint), to_read, s->file); + + pthread_mutex_unlock(&(s->mutex)); + // End critical section + + if (amount_read != to_read) + { + Fatal("Failed to read %d DataPoints from sensor %d; read %d instead", to_read, s->id, amount_read); + } + + // Print the data + for (int i = 0; i < amount_read; ++i) + { + //TODO: Reformat? + switch (output_type) + { + case JSON: + FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value); + if (i+1 < amount_read) + FCGI_JSONValue(","); + break; + case CSV: + FCGI_PrintRaw("%f,%f\n", buffer[i].time_stamp, buffer[i].value); + break; + case TSV: + default: + FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value); + break; + } + } + index += amount_read; + } + + if (output_type == JSON) + { + FCGI_JSONValue("]"); + } +} + /** * Fill buffer with most recent sensor data + * TODO: This may be obselete; remove? * @param s - Sensor to use * @param buffer - Buffer to fill * @param bufsiz - Size of buffer to fill @@ -236,18 +366,31 @@ Sensor * Sensor_Identify(const char * id_str) * Handle a request to the sensor module * @param context - The context to work in * @param params - Parameters passed + * TODO: Seriously need to write more helper functions and decrease the size of this function! */ void Sensor_Handler(FCGIContext *context, char * params) { - DataPoint buffer[SENSOR_QUERYBUFSIZ]; StatusCodes status = STATUS_OK; - enum {DEFAULT, DUMP} operation = DEFAULT; + OutputType output_type = JSON; + + const char * key; const char * value; Sensor * sensor = NULL; + struct timeval now; + gettimeofday(&now, NULL); + + double start_time = -1; + double end_time = -1; + bool seek_time = false; + int query_size = SENSOR_QUERYBUFSIZ; + int start_index = -1; + int end_index = -1; + + while ((params = FCGI_KeyPair(params, &key, &value)) != NULL) { Log(LOGDEBUG, "Got key=%s and value=%s", key, value); @@ -274,15 +417,63 @@ void Sensor_Handler(FCGIContext *context, char * params) break; } } - else if (strcmp(key, "dump") == 0) + else if (strcmp(key, "format") == 0) + { + if (strcmp(value, "json") == 0) + output_type = JSON; + else if (strcmp(value, "csv") == 0) + output_type = CSV; + else if (strcmp(value, "tsv") == 0) + output_type = TSV; + } + else if (strcmp(key, "points") == 0) { - if (operation != DEFAULT) + if (strcmp(value, "all") == 0) { - Log(LOGERR, "Operation already specified!"); + query_size = sensor->points_written; + } + else + { + char * end; + query_size = strtol(value, &end, 10); + if (*end != '\0') + { + Log(LOGERR, "Require \"all\" or an integer value: %s = %s", key, value); + status = STATUS_ERROR; + break; + } + } + + } + else if (strcmp(key, "start_time") == 0) + { + seek_time = true; + char * end; + start_time = strtod(value, &end); + if (*end != '\0') + { + Log(LOGERR, "Require a double: %s = %s", key, value); status = STATUS_ERROR; break; - } - operation = DUMP; + } + } + else if (strcmp(key, "end_time") == 0) + { + seek_time = true; + char * end; + end_time = strtod(value, &end); + if (*end != '\0') + { + Log(LOGERR, "Require a double: %s = %s", key, value); + status = STATUS_ERROR; + break; + } + } + // For backward compatability: + else if (strcmp(key, "dump") == 0) + { + output_type = TSV; + query_size = sensor->points_written+1; } else { @@ -303,57 +494,64 @@ void Sensor_Handler(FCGIContext *context, char * params) FCGI_RejectJSON(context, "Invalid input parameters"); return; } - - switch (operation) + + + if (seek_time) { - case DUMP: + if (end_time < 0) + end_index = sensor->points_written; + else { - //Force download with content-disposition - FCGI_PrintRaw("Content-type: text/plain\r\n" - "Content-disposition: attachment;filename=%d.csv\r\n\r\n", - sensor->id); - //CRITICAL SECTION - pthread_mutex_lock(&(sensor->mutex)); - fseek(sensor->file, 0, SEEK_SET); - int amount_read = 0; - do - { - amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file); - for (int i = 0; i < amount_read; ++i) - { - FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value); - } - - } - while (amount_read == SENSOR_QUERYBUFSIZ); - pthread_mutex_unlock(&(sensor->mutex)); - // end critical section - break; + int count = 0; DataPoint d; + end_index = FindTime(sensor, end_time, &count, &d); + //Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", end_time, d.time_stamp, d.value, count, sensor->id, end_index); } - default: + if (start_time < 0) + start_time = 0; + else { - FCGI_BeginJSON(context, status); - FCGI_JSONPair(key, value); // should spit back sensor ID - //Log(LOGDEBUG, "Call Sensor_Query..."); - int amount_read = Sensor_Query(sensor, buffer, SENSOR_QUERYBUFSIZ); - //Log(LOGDEBUG, "Read %d DataPoints", amount_read); - //Log(LOGDEBUG, "Produce JSON response"); + int count = 0; DataPoint d; + start_index = FindTime(sensor, start_time, &count, &d); + //Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", start_time, d.time_stamp, d.value, count, sensor->id, start_index); + } + } + else + { + start_index = sensor->points_written - query_size; + + end_index = sensor->points_written; + } + + if (start_index < 0) + { + Log(LOGNOTE, "start_index = %d => Clamped to 0", start_index); + start_index = 0; + } + if (end_index > sensor->points_written) + { + Log(LOGNOTE, "end_index = %d => Clamped to %d", end_index, sensor->points_written); + end_index = sensor->points_written; + } + + switch (output_type) + { + case JSON: + FCGI_BeginJSON(context, status); + FCGI_JSONLong("id", sensor->id); FCGI_JSONKey("data"); - FCGI_JSONValue("["); - for (int i = 0; i < amount_read; ++i) - { - //TODO: Consider; is it better to give both tv_sec and tv_usec to the client seperately, instead of combining here? - - FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value); - if (i+1 < amount_read) - FCGI_JSONValue(","); - } - FCGI_JSONValue("]"); - //Log(LOGDEBUG, "Done producing JSON response"); - FCGI_EndJSON(); + PrintData(sensor, start_index, end_index, output_type); + FCGI_EndJSON(); + break; + default: + FCGI_PrintRaw("Content-type: text/plain\r\n\r\n"); + PrintData(sensor, start_index, end_index, output_type); + //Force download with content-disposition + // Sam: This is cool, but I don't think we should do it + // - letting the user view it in the browser and then save with their own filename is more flexible + //"Content-disposition: attachment;filename=%d.csv\r\n\r\n", sensor->id); break; - } } + } /** diff --git a/server/sensor.h b/server/sensor.h index 4c9de1b..36fdb27 100644 --- a/server/sensor.h +++ b/server/sensor.h @@ -28,6 +28,13 @@ typedef enum SensorId { DIGITAL_TEST1 } SensorId; +typedef enum +{ + JSON, // JSON data + CSV, // Comma seperated vector + TSV // Tab seperated vector +} OutputType; + /** Human readable names for the sensors **/ extern const char * g_sensor_names[NUMSENSORS]; @@ -51,6 +58,8 @@ typedef struct int write_index; /** Number of points read **/ long points_read; + /** Number of points written to file **/ + long points_written; /** Binary file to write data into when buffer is full **/ FILE * file; /** Thread running the sensor **/ -- 2.20.1