Add experimental support for querying a sensor an arbitrary range of points
[matches/MCTX3420.git] / server / sensor.c
1 /**
2  * @file sensor.c
3  * @brief Implementation of sensor thread
4  * TODO: Finalise implementation
5  */
6
7 #include "common.h"
8 #include "sensor.h"
9 #include "options.h"
10 #include <math.h>
11
12 /** Array of sensors, initialised by Sensor_Init **/
13 static Sensor g_sensors[NUMSENSORS]; //global to this file
14 const char * g_sensor_names[NUMSENSORS] = {     
15         "analog_test0", "analog_test1", 
16         "digital_test0", "digital_test1"
17 };
18
19 /**
20  * Read a data value from a sensor; block until value is read
21  * @param sensor_id - The ID of the sensor
22  * @param d - DataPoint to set
23  * @returns NULL for digital sensors when data is unchanged, otherwise d
24  */
25 DataPoint * GetData(SensorId sensor_id, DataPoint * d)
26 {
27         // switch based on the sensor_id at the moment for testing;
28         // might be able to just directly access ADC from sensor_id?
29         //TODO: Implement for real sensors
30
31         
32         //TODO: We should ensure the time is *never* allowed to change on the server if we use gettimeofday
33         //              Another way people might think of getting the time is to count CPU cycles with clock()
34         //              But this will not work because a) CPU clock speed may change on some devices (RPi?) and b) It counts cycles used by all threads
35         
36         struct timeval t;
37         gettimeofday(&t, NULL);
38         d->time_stamp = (t.tv_sec - g_options.start_time.tv_sec) + 1e-6*(t.tv_usec - g_options.start_time.tv_usec);
39
40         // Make time relative
41         //d->time_stamp.tv_sec -= g_options.start_time.tv_sec;
42         //d->time_stamp.tv_usec -= g_options.start_time.tv_usec;
43         
44         switch (sensor_id)
45         {
46                 case ANALOG_TEST0:
47                 {
48                         static int count = 0;
49                         d->value = count++;
50                         break;
51                 }
52                 case ANALOG_TEST1:
53                         d->value = (double)(rand() % 100) / 100;
54                         break;
55         
56                 //TODO: For digital sensors, consider only updating when sensor is actually changed
57                 case DIGITAL_TEST0:
58                         d->value = t.tv_sec % 2;
59                         break;
60                 case DIGITAL_TEST1:
61                         d->value = (t.tv_sec+1)%2;
62                         break;
63                 default:
64                         Fatal("Unknown sensor id: %d", sensor_id);
65                         break;
66         }       
67         usleep(100000); // simulate delay in sensor polling
68
69         return d;
70 }
71
72
73 /**
74  * Destroy a sensor
75  * @param s - Sensor to destroy
76  */
77 void Destroy(Sensor * s)
78 {
79         // Maybe move the binary file into long term file storage?
80         fclose(s->file);
81 }
82
83
84
85 /**
86  * Initialise a sensor
87  * @param s - Sensor to initialise
88  */
89 void Init(Sensor * s, int id)
90 {
91         s->write_index = 0;
92         s->id = id;
93
94         #define FILENAMESIZE 3
95         char filename[FILENAMESIZE];
96         if (s->id >= pow(10, FILENAMESIZE))
97         {
98                 Fatal("Too many sensors! FILENAMESIZE is %d; increase it and recompile.", FILENAMESIZE);
99         }
100
101         pthread_mutex_init(&(s->mutex), NULL);
102                 
103         sprintf(filename, "%d", s->id);
104         unlink(filename); //TODO: Move old files somewhere
105
106         s->file = fopen(filename, "a+b"); // open binary file
107         Log(LOGDEBUG, "Initialised sensor %d; binary file is \"%s\"", id, filename);
108 }
109
110
111 /**
112  * Run the main sensor polling loop
113  * @param arg - Cast to Sensor* - Sensor that the thread will handle
114  * @returns NULL (void* required to use the function with pthreads)
115  */
116 void * Sensor_Main(void * arg)
117 {
118         Sensor * s = (Sensor*)(arg);
119
120         while (Thread_Runstate() == RUNNING) //TODO: Exit condition
121         {
122                 // The sensor will write data to a buffer until it is full
123                 // Then it will open a file and dump the buffer to the end of it.
124                 // Rinse and repeat
125
126                 // The reason I've added the buffer is because locks are expensive
127                 // But maybe it's better to just write data straight to the file
128                 // I'd like to do some tests by changing SENSOR_DATABUFSIZ
129
130                 while (s->write_index < SENSOR_DATABUFSIZ)
131                 {
132                         DataPoint * d = &(s->buffer[s->write_index]);
133                         if (GetData(s->id, d) == NULL)
134                         {
135                                 Fatal("Error collecting data");
136                         }
137                         s->write_index += 1;
138                         //TODO: s->points_read not used?
139                 }
140
141                 //Log(LOGDEBUG, "Filled buffer");
142
143                 // CRITICAL SECTION (no threads should be able to read/write the file at the same time)
144                 pthread_mutex_lock(&(s->mutex));
145                         //TODO: Valgrind complains about this fseek: "Syscall param write(buf) points to uninitialised byte(s)"
146                         //              Not sure why, but we should find out and fix it.
147                         fseek(s->file, 0, SEEK_END);
148                         int amount_written = fwrite(s->buffer, sizeof(DataPoint), SENSOR_DATABUFSIZ, s->file);
149                         if (amount_written != SENSOR_DATABUFSIZ)
150                         {
151                                 Fatal("Wrote %d data points and expected to write %d to \"%s\" - %s", amount_written, SENSOR_DATABUFSIZ, strerror(errno));
152                         }
153                         s->points_stored += amount_written;
154                         //Log(LOGDEBUG, "Wrote %d data points for sensor %d", amount_written, s->id);
155                 pthread_mutex_unlock(&(s->mutex));
156                 // End of critical section
157
158                 s->write_index = 0; // reset position in buffer
159                 
160         }
161         Log(LOGDEBUG, "Thread for sensor %d exits", s->id);
162         return NULL; 
163 }
164
165 /**
166  * Fill buffer with most recent sensor data
167  * @param s - Sensor to use
168  * @param buffer - Buffer to fill
169  * @param bufsiz - Size of buffer to fill
170  * @returns The number of DataPoints actually read
171  */
172 int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz)
173 {
174         int amount_read = 0;
175         //CRITICAL SECTION (Don't access file while sensor thread is writing to it!)
176         pthread_mutex_lock(&(s->mutex));
177                 
178                 fseek(s->file, -bufsiz*sizeof(DataPoint), SEEK_END);
179                 amount_read = fread(buffer, sizeof(DataPoint), bufsiz, s->file);
180                 //Log(LOGDEBUG, "Read %d data points", amount_read);            
181         pthread_mutex_unlock(&(s->mutex));
182         return amount_read;
183 }
184
185 /**
186  * Get a Sensor given an ID string
187  * @param id_str ID string
188  * @returns Sensor* identified by the string; NULL on error
189  */
190 Sensor * Sensor_Identify(const char * id_str)
191 {
192         char * end;
193         // Parse string as integer
194         int id = strtol(id_str, &end, 10);
195         if (*end != '\0')
196         {
197                 return NULL;
198         }
199         // Bounds check
200         if (id < 0 || id >= NUMSENSORS)
201                 return NULL;
202
203
204         Log(LOGDEBUG, "Sensor \"%s\" identified", g_sensor_names[id]);
205         return g_sensors+id;
206 }
207
208 /*
209  * Behaviour: 
210  * Dump true:
211  * - from < 0: From beginning of file, else from that point onwards (0-indexed)
212  * - count < 0: All points available, else *at most* that many points
213  * Dump false:
214  * - from < 0: From the end of file (last available points), else from that point onwards (0-indexed)
215  * - count < 0: Default buffer size (SENSOR_QUERYBUFSIZ), else *at most* that many points
216  */
217 void Sensor_Handler2(FCGIContext *context, char *params)
218 {
219         const char *key, *value;
220         int id = -1, from = -1, count = -1;
221         bool dump = false;
222
223         //Lazy checking
224         while ((params = FCGI_KeyPair(params, &key, &value))) {
225                 if (!strcmp(key, "id") && *value) {
226                         char *end;
227                         id = strtol(value, &end, 10);
228                         if (*end != '\0')
229                                 id = -1;
230                 } else if (!strcmp(key, "dump")) {
231                         dump = !dump;
232                 } else if (!strcmp(key, "from") && *value) {
233                         from = strtol(value, NULL, 10);
234                 } else if (!strcmp(key, "count") && *value) {
235                         count = strtol(value, NULL, 10);
236                 }
237         }
238
239         if (id < 0 || id >= NUMSENSORS) {
240                 FCGI_RejectJSON(context, "Invalid sensor id specified.");
241                 return;
242         }
243         
244         Sensor *sensor = &g_sensors[id];
245         DataPoint buffer[SENSOR_QUERYBUFSIZ];
246         int amount_read = 0, total = 0;
247         
248         //Critical section
249         pthread_mutex_lock(&(sensor->mutex));
250         if (from >= sensor->points_stored) {
251                 FCGI_RejectJSONEx(context, STATUS_OUTOFRANGE, "Invalid range specified.");
252         } else if (dump) {
253                 from =  (from < 0) ? 0 : from;
254                 count = (count < 0) ? sensor->points_stored : count;
255
256                 FCGI_PrintRaw("Content-type: text/plain\r\n"
257                         "Content-disposition: attachment;filename=%d.csv\r\n\r\n", id);
258
259                 fseek(sensor->file, sizeof(DataPoint) * from, SEEK_SET);
260                 //Force download with content-disposition
261                 do {
262                         amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file);
263                         for (int i = 0; i < amount_read && total < count; i++, total++) {
264                                 FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value);
265                         }
266                 } while (amount_read > 0 && total < count);
267         } else {
268                 count = (count < 0) ? SENSOR_QUERYBUFSIZ : count;
269                 if (from < 0) {
270                         from = sensor->points_stored - count;
271                         if (from < 0)
272                                 from = 0;
273                 }
274                 fseek(sensor->file, sizeof(DataPoint) * from, SEEK_SET);
275
276                 FCGI_BeginJSON(context, STATUS_OK);     
277                 FCGI_JSONLong("id", id); 
278                 FCGI_JSONKey("data");
279                 FCGI_JSONValue("[");
280                 while (total < count) {
281                         amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file);
282                         if (amount_read > 0) {
283                                 FCGI_JSONValue("[%f, %f]", buffer[0].time_stamp, buffer[0].value);
284                                 total++;
285                                 for (int i = 1; i < amount_read && total < count; i++, total++)
286                                         FCGI_JSONValue(", [%f, %f]", buffer[i].time_stamp, buffer[i].value);
287                         } else {
288                                 break;
289                         }
290                 }
291                 FCGI_JSONValue("]");
292
293                 FCGI_JSONLong("total_points", sensor->points_stored);
294                 FCGI_JSONLong("next_point", from + total);
295                 FCGI_EndJSON(); 
296         }
297         pthread_mutex_unlock(&(sensor->mutex));
298         //End critical section
299 }
300
301 /**
302  * Handle a request to the sensor module
303  * @param context - The context to work in
304  * @param params - Parameters passed
305  */
306 void Sensor_Handler(FCGIContext *context, char * params)
307 {
308         DataPoint buffer[SENSOR_QUERYBUFSIZ];
309         StatusCodes status = STATUS_OK;
310
311         enum {DEFAULT, DUMP} operation = DEFAULT;
312
313         const char * key; const char * value;
314
315         Sensor * sensor = NULL;
316
317         while ((params = FCGI_KeyPair(params, &key, &value)) != NULL)
318         {
319                 Log(LOGDEBUG, "Got key=%s and value=%s", key, value);
320                 if (strcmp(key, "id") == 0)
321                 {
322                         if (sensor != NULL)
323                         {
324                                 Log(LOGERR, "Only one sensor id should be specified");
325                                 status = STATUS_ERROR;
326                                 break;
327                         }
328                         if (*value == '\0')
329                         {
330                                 Log(LOGERR, "No id specified.");
331                                 status = STATUS_ERROR;
332                                 break;
333                         }
334
335                         sensor = Sensor_Identify(value);
336                         if (sensor == NULL)
337                         {
338                                 Log(LOGERR, "Invalid sensor id: %s", value);
339                                 status = STATUS_ERROR;
340                                 break;
341                         }
342                 }
343                 else if (strcmp(key, "dump") == 0)
344                 {
345                         if (operation != DEFAULT)
346                         {
347                                 Log(LOGERR, "Operation already specified!");
348                                 status = STATUS_ERROR;
349                                 break;
350                         }
351                         operation = DUMP;
352                 }
353                 else
354                 {
355                         Log(LOGERR, "Unknown key \"%s\" (value = %s)", key, value);
356                         status = STATUS_ERROR;
357                         break;
358                 }               
359         }
360
361         if (status != STATUS_ERROR && sensor == NULL)
362         {
363                 Log(LOGERR, "No valid sensor id given");
364                 status = STATUS_ERROR;
365         }
366
367         if (status == STATUS_ERROR)
368         {
369                 FCGI_RejectJSON(context, "Invalid input parameters");
370                 return;
371         }
372         
373         switch (operation)
374         {
375                 case DUMP:
376                 {
377                         //Force download with content-disposition
378                         FCGI_PrintRaw("Content-type: text/plain\r\n"
379                                 "Content-disposition: attachment;filename=%d.csv\r\n\r\n",
380                                 sensor->id);
381                         //CRITICAL SECTION
382                         pthread_mutex_lock(&(sensor->mutex));
383                                 fseek(sensor->file, 0, SEEK_SET);
384                                 int amount_read = 0;
385                                 do
386                                 {
387                                         amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file);
388                                         for (int i = 0; i < amount_read; ++i)
389                                         {
390                                                 FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value);
391                                         }
392
393                                 }
394                                 while (amount_read == SENSOR_QUERYBUFSIZ);
395                         pthread_mutex_unlock(&(sensor->mutex));
396                         // end critical section
397                         break;
398                 }
399                 default:
400                 {
401                         FCGI_BeginJSON(context, status);        
402                         FCGI_JSONPair(key, value); // should spit back sensor ID
403                         //Log(LOGDEBUG, "Call Sensor_Query...");
404                         int amount_read = Sensor_Query(sensor, buffer, SENSOR_QUERYBUFSIZ);
405                         //Log(LOGDEBUG, "Read %d DataPoints", amount_read);
406                         //Log(LOGDEBUG, "Produce JSON response");
407                         FCGI_JSONKey("data");
408                         FCGI_JSONValue("[");
409                         for (int i = 0; i < amount_read; ++i)
410                         {
411                                 //TODO: Consider; is it better to give both tv_sec and tv_usec to the client seperately, instead of combining here?
412                                 
413                                 FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value);
414                                 if (i+1 < amount_read)
415                                         FCGI_JSONValue(",");
416                         }
417                         FCGI_JSONValue("]");
418                         //Log(LOGDEBUG, "Done producing JSON response");
419                         FCGI_EndJSON(); 
420                         break;
421                 }
422         }
423 }
424
425 /**
426  * Setup Sensors, start Sensor polling thread(s)
427  */
428 void Sensor_Spawn()
429 {
430         // start sensor threads
431         for (int i = 0; i < NUMSENSORS; ++i)
432         {
433                 Init(g_sensors+i, i);
434                 pthread_create(&(g_sensors[i].thread), NULL, Sensor_Main, (void*)(g_sensors+i));
435         }
436 }
437
438 /**
439  * Quit Sensor loops
440  */
441 void Sensor_Join()
442 {
443         if (!Thread_Runstate())
444         {
445                 Fatal("This function should not be called before Thread_QuitProgram");
446         }
447         for (int i = 0; i < NUMSENSORS; ++i)
448         {
449                 pthread_join(g_sensors[i].thread, NULL);
450                 Destroy(g_sensors+i);
451         }
452 }

UCC git Repository :: git.ucc.asn.au