{
s->write_index = 0;
s->id = id;
+ s->points_written = 0;
+ s->points_read = 0;
#define FILENAMESIZE 3
char filename[FILENAMESIZE];
{
Fatal("Wrote %d data points and expected to write %d to \"%s\" - %s", amount_written, SENSOR_DATABUFSIZ, strerror(errno));
}
+ s->points_written += amount_written;
//Log(LOGDEBUG, "Wrote %d data points for sensor %d", amount_written, s->id);
pthread_mutex_unlock(&(s->mutex));
// End of critical section
return NULL;
}
+/**
+ * Get position in a binary sensor file with a timestamp using a binary search
+ * @param s - Sensor to use
+ * @param time_stamp - Timestamp
+ * @param count - If not NULL, used to provide number of searches required
+ * @param found - If not NULL, set to the closest DataPoint
+ * @returns Integer giving the *closest* index in the file
+ * TODO: Refactor or replace?
+ */
+int FindTime(Sensor * s, double time_stamp, int * count, DataPoint * found)
+{
+ DataPoint d;
+
+ int lower = 0;
+ int upper = s->points_written - 1;
+ int index = 0;
+ if (count != NULL)
+ *count = 0;
+
+ while (upper - lower > 1)
+ {
+ index = lower + ((upper - lower)/2);
+
+ // Basically anything with fseek is critical; if we don't make it critical the sensor thread may alter data at a random point in the file!
+ // CRITICAL SECTION (May need to rethink how this is done, but I can't see how to do it without fseek :S)
+ // Regarding the suggestion that we have 2 file pointers; one for reading and one for writing:
+ // That seems like it will work... but we will have to be very careful and test it first
+ pthread_mutex_lock(&s->mutex);
+ fseek(s->file, index*sizeof(DataPoint), SEEK_SET);
+ int amount_read = fread(&d, sizeof(DataPoint), 1, s->file);
+ pthread_mutex_unlock(&s->mutex);
+
+ if (amount_read != 1)
+ {
+ Fatal("Couldn't read single data point from sensor %d", s->id);
+ }
+
+ if (d.time_stamp > time_stamp)
+ {
+ upper = index;
+ }
+ else if (d.time_stamp < time_stamp)
+ {
+ lower = index;
+ }
+ if (count != NULL)
+ *count += 1;
+ }
+
+ if (found != NULL)
+ *found = d;
+
+ return index;
+
+}
+
+/**
+ * Print sensor data between two indexes in the file, using a given format
+ * @param s - Sensor to use
+ * @param start - Start index
+ * @param end - End index
+ * @param output_type - JSON, CSV or TSV output format
+ */
+void PrintData(Sensor * s, int start, int end, OutputType output_type)
+{
+ DataPoint buffer[SENSOR_QUERYBUFSIZ];
+ int index = start;
+
+ if (output_type == JSON)
+ {
+ FCGI_JSONValue("[");
+ }
+
+
+ while (index < end)
+ {
+ int to_read = end - index;
+ if (to_read > SENSOR_QUERYBUFSIZ)
+ {
+ to_read = SENSOR_QUERYBUFSIZ;
+ }
+
+ int amount_read = 0;
+ // CRITICAL SECTION
+ pthread_mutex_lock(&(s->mutex));
+
+ fseek(s->file, index*sizeof(DataPoint), SEEK_SET);
+ amount_read = fread(buffer, sizeof(DataPoint), to_read, s->file);
+
+ pthread_mutex_unlock(&(s->mutex));
+ // End critical section
+
+ if (amount_read != to_read)
+ {
+ Fatal("Failed to read %d DataPoints from sensor %d; read %d instead", to_read, s->id, amount_read);
+ }
+
+ // Print the data
+ for (int i = 0; i < amount_read; ++i)
+ {
+ //TODO: Reformat?
+ switch (output_type)
+ {
+ case JSON:
+ FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value);
+ if (i+1 < amount_read)
+ FCGI_JSONValue(",");
+ break;
+ case CSV:
+ FCGI_PrintRaw("%f,%f\n", buffer[i].time_stamp, buffer[i].value);
+ break;
+ case TSV:
+ default:
+ FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value);
+ break;
+ }
+ }
+ index += amount_read;
+ }
+
+ if (output_type == JSON)
+ {
+ FCGI_JSONValue("]");
+ }
+}
+
/**
* Fill buffer with most recent sensor data
+ * TODO: This may be obselete; remove?
* @param s - Sensor to use
* @param buffer - Buffer to fill
* @param bufsiz - Size of buffer to fill
* Handle a request to the sensor module
* @param context - The context to work in
* @param params - Parameters passed
+ * TODO: Seriously need to write more helper functions and decrease the size of this function!
*/
void Sensor_Handler(FCGIContext *context, char * params)
{
- DataPoint buffer[SENSOR_QUERYBUFSIZ];
StatusCodes status = STATUS_OK;
- enum {DEFAULT, DUMP} operation = DEFAULT;
+ OutputType output_type = JSON;
+
+
const char * key; const char * value;
Sensor * sensor = NULL;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ double start_time = -1;
+ double end_time = -1;
+ bool seek_time = false;
+ int query_size = SENSOR_QUERYBUFSIZ;
+ int start_index = -1;
+ int end_index = -1;
+
+
while ((params = FCGI_KeyPair(params, &key, &value)) != NULL)
{
Log(LOGDEBUG, "Got key=%s and value=%s", key, value);
break;
}
}
- else if (strcmp(key, "dump") == 0)
+ else if (strcmp(key, "format") == 0)
+ {
+ if (strcmp(value, "json") == 0)
+ output_type = JSON;
+ else if (strcmp(value, "csv") == 0)
+ output_type = CSV;
+ else if (strcmp(value, "tsv") == 0)
+ output_type = TSV;
+ }
+ else if (strcmp(key, "points") == 0)
{
- if (operation != DEFAULT)
+ if (strcmp(value, "all") == 0)
{
- Log(LOGERR, "Operation already specified!");
+ query_size = sensor->points_written;
+ }
+ else
+ {
+ char * end;
+ query_size = strtol(value, &end, 10);
+ if (*end != '\0')
+ {
+ Log(LOGERR, "Require \"all\" or an integer value: %s = %s", key, value);
+ status = STATUS_ERROR;
+ break;
+ }
+ }
+
+ }
+ else if (strcmp(key, "start_time") == 0)
+ {
+ seek_time = true;
+ char * end;
+ start_time = strtod(value, &end);
+ if (*end != '\0')
+ {
+ Log(LOGERR, "Require a double: %s = %s", key, value);
status = STATUS_ERROR;
break;
- }
- operation = DUMP;
+ }
+ }
+ else if (strcmp(key, "end_time") == 0)
+ {
+ seek_time = true;
+ char * end;
+ end_time = strtod(value, &end);
+ if (*end != '\0')
+ {
+ Log(LOGERR, "Require a double: %s = %s", key, value);
+ status = STATUS_ERROR;
+ break;
+ }
+ }
+ // For backward compatability:
+ else if (strcmp(key, "dump") == 0)
+ {
+ output_type = TSV;
+ query_size = sensor->points_written+1;
}
else
{
FCGI_RejectJSON(context, "Invalid input parameters");
return;
}
-
- switch (operation)
+
+
+ if (seek_time)
{
- case DUMP:
+ if (end_time < 0)
+ end_index = sensor->points_written;
+ else
{
- //Force download with content-disposition
- FCGI_PrintRaw("Content-type: text/plain\r\n"
- "Content-disposition: attachment;filename=%d.csv\r\n\r\n",
- sensor->id);
- //CRITICAL SECTION
- pthread_mutex_lock(&(sensor->mutex));
- fseek(sensor->file, 0, SEEK_SET);
- int amount_read = 0;
- do
- {
- amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file);
- for (int i = 0; i < amount_read; ++i)
- {
- FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value);
- }
-
- }
- while (amount_read == SENSOR_QUERYBUFSIZ);
- pthread_mutex_unlock(&(sensor->mutex));
- // end critical section
- break;
+ int count = 0; DataPoint d;
+ end_index = FindTime(sensor, end_time, &count, &d);
+ //Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", end_time, d.time_stamp, d.value, count, sensor->id, end_index);
}
- default:
+ if (start_time < 0)
+ start_time = 0;
+ else
{
- FCGI_BeginJSON(context, status);
- FCGI_JSONPair(key, value); // should spit back sensor ID
- //Log(LOGDEBUG, "Call Sensor_Query...");
- int amount_read = Sensor_Query(sensor, buffer, SENSOR_QUERYBUFSIZ);
- //Log(LOGDEBUG, "Read %d DataPoints", amount_read);
- //Log(LOGDEBUG, "Produce JSON response");
+ int count = 0; DataPoint d;
+ start_index = FindTime(sensor, start_time, &count, &d);
+ //Log(LOGDEBUG, "FindTime - Looked for %f; found [%f,%f] after %d iterations; sensor %d, position %d", start_time, d.time_stamp, d.value, count, sensor->id, start_index);
+ }
+ }
+ else
+ {
+ start_index = sensor->points_written - query_size;
+
+ end_index = sensor->points_written;
+ }
+
+ if (start_index < 0)
+ {
+ Log(LOGNOTE, "start_index = %d => Clamped to 0", start_index);
+ start_index = 0;
+ }
+ if (end_index > sensor->points_written)
+ {
+ Log(LOGNOTE, "end_index = %d => Clamped to %d", end_index, sensor->points_written);
+ end_index = sensor->points_written;
+ }
+
+ switch (output_type)
+ {
+ case JSON:
+ FCGI_BeginJSON(context, status);
+ FCGI_JSONLong("id", sensor->id);
FCGI_JSONKey("data");
- FCGI_JSONValue("[");
- for (int i = 0; i < amount_read; ++i)
- {
- //TODO: Consider; is it better to give both tv_sec and tv_usec to the client seperately, instead of combining here?
-
- FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value);
- if (i+1 < amount_read)
- FCGI_JSONValue(",");
- }
- FCGI_JSONValue("]");
- //Log(LOGDEBUG, "Done producing JSON response");
- FCGI_EndJSON();
+ PrintData(sensor, start_index, end_index, output_type);
+ FCGI_EndJSON();
+ break;
+ default:
+ FCGI_PrintRaw("Content-type: text/plain\r\n\r\n");
+ PrintData(sensor, start_index, end_index, output_type);
+ //Force download with content-disposition
+ // Sam: This is cool, but I don't think we should do it
+ // - letting the user view it in the browser and then save with their own filename is more flexible
+ //"Content-disposition: attachment;filename=%d.csv\r\n\r\n", sensor->id);
break;
- }
}
+
}
/**