Merge branch 'master' of https://github.com/szmoore/MCTX3420.git
[matches/MCTX3420.git] / server / sensor.c
1 /**
2  * @file sensor.c
3  * @brief Implementation of sensor thread
4  * TODO: Finalise implementation
5  */
6
7 #include "common.h"
8 #include "sensor.h"
9 #include "options.h"
10 #include <math.h>
11
12 /** Array of sensors, initialised by Sensor_Init **/
13 static Sensor g_sensors[NUMSENSORS]; //global to this file
14 static const char * g_sensor_names[] = {"analog_test0", "analog_test1", "digital_test0", "digital_test1"};
15 /**
16  * Read a data value from a sensor; block until value is read
17  * @param sensor_id - The ID of the sensor
18  * @param d - DataPoint to set
19  * @returns NULL for digital sensors when data is unchanged, otherwise d
20  */
21 DataPoint * GetData(int sensor_id, DataPoint * d)
22 {
23         // switch based on the sensor_id at the moment for testing;
24         // might be able to just directly access ADC from sensor_id?
25         //TODO: Implement for real sensors
26
27         
28         //TODO: We should ensure the time is *never* allowed to change on the server if we use gettimeofday
29         //              Another way people might think of getting the time is to count CPU cycles with clock()
30         //              But this will not work because a) CPU clock speed may change on some devices (RPi?) and b) It counts cycles used by all threads
31         
32         struct timeval t;
33         gettimeofday(&t, NULL);
34         d->time_stamp = (t.tv_sec - g_options.start_time.tv_sec) + 1e-6*(t.tv_usec - g_options.start_time.tv_usec);
35
36         // Make time relative
37         //d->time_stamp.tv_sec -= g_options.start_time.tv_sec;
38         //d->time_stamp.tv_usec -= g_options.start_time.tv_usec;
39         
40         switch (sensor_id)
41         {
42                 case ANALOG_TEST0:
43                 {
44                         static int count = 0;
45                         d->value = count++;
46                         break;
47                 }
48                 case ANALOG_TEST1:
49                         d->value = (double)(rand() % 100) / 100;
50                         break;
51         
52                 //TODO: For digital sensors, consider only updating when sensor is actually changed
53                 case DIGITAL_TEST0:
54                         d->value = t.tv_sec % 2;
55                         break;
56                 case DIGITAL_TEST1:
57                         d->value = (t.tv_sec+1)%2;
58                         break;
59                 default:
60                         Fatal("Unknown sensor id: %d", sensor_id);
61                         break;
62         }       
63         usleep(100000); // simulate delay in sensor polling
64
65         return d;
66 }
67
68
69 /**
70  * Destroy a sensor
71  * @param s - Sensor to destroy
72  */
73 void Destroy(Sensor * s)
74 {
75         // Maybe move the binary file into long term file storage?
76         fclose(s->file);
77 }
78
79
80
81 /**
82  * Initialise a sensor
83  * @param s - Sensor to initialise
84  */
85 void Init(Sensor * s, int id)
86 {
87         s->write_index = 0;
88         s->id = id;
89
90         #define FILENAMESIZE 3
91         char filename[FILENAMESIZE];
92         if (s->id >= pow(10, FILENAMESIZE))
93         {
94                 Fatal("Too many sensors! FILENAMESIZE is %d; increase it and recompile.", FILENAMESIZE);
95         }
96
97         pthread_mutex_init(&(s->mutex), NULL);
98                 
99         sprintf(filename, "%d", s->id);
100         unlink(filename); //TODO: Move old files somewhere
101
102         s->file = fopen(filename, "a+b"); // open binary file
103         Log(LOGDEBUG, "Initialised sensor %d; binary file is \"%s\"", id, filename);
104 }
105
106
107
108
109
110 /**
111  * Run the main sensor polling loop
112  * @param arg - Cast to Sensor* - Sensor that the thread will handle
113  * @returns NULL (void* required to use the function with pthreads)
114  */
115 void * Sensor_Main(void * arg)
116 {
117         Sensor * s = (Sensor*)(arg);
118
119         while (Thread_Runstate() == RUNNING) //TODO: Exit condition
120         {
121                 // The sensor will write data to a buffer until it is full
122                 // Then it will open a file and dump the buffer to the end of it.
123                 // Rinse and repeat
124
125                 // The reason I've added the buffer is because locks are expensive
126                 // But maybe it's better to just write data straight to the file
127                 // I'd like to do some tests by changing SENSOR_DATABUFSIZ
128
129                 while (s->write_index < SENSOR_DATABUFSIZ)
130                 {
131                         DataPoint * d = &(s->buffer[s->write_index]);
132                         if (GetData(s->id, d) == NULL)
133                         {
134                                 Fatal("Error collecting data");
135                         }
136                         s->write_index += 1;
137                 }
138
139                 //Log(LOGDEBUG, "Filled buffer");
140
141                 // CRITICAL SECTION (no threads should be able to read/write the file at the same time)
142                 pthread_mutex_lock(&(s->mutex));
143                         //TODO: Valgrind complains about this fseek: "Syscall param write(buf) points to uninitialised byte(s)"
144                         //              Not sure why, but we should find out and fix it.
145                         fseek(s->file, 0, SEEK_END);
146                         int amount_written = fwrite(s->buffer, sizeof(DataPoint), SENSOR_DATABUFSIZ, s->file);
147                         if (amount_written != SENSOR_DATABUFSIZ)
148                         {
149                                 Fatal("Wrote %d data points and expected to write %d to \"%s\" - %s", amount_written, SENSOR_DATABUFSIZ, strerror(errno));
150                         }
151                         //Log(LOGDEBUG, "Wrote %d data points for sensor %d", amount_written, s->id);
152                 pthread_mutex_unlock(&(s->mutex));
153                 // End of critical section
154
155                 s->write_index = 0; // reset position in buffer
156                 
157         }
158         Log(LOGDEBUG, "Thread for sensor %d exits", s->id);
159         return NULL; 
160 }
161
162 /**
163  * Fill buffer with most recent sensor data
164  * @param s - Sensor to use
165  * @param buffer - Buffer to fill
166  * @param bufsiz - Size of buffer to fill
167  * @returns The number of DataPoints actually read
168  */
169 int Sensor_Query(Sensor * s, DataPoint * buffer, int bufsiz)
170 {
171         int amount_read = 0;
172         //CRITICAL SECTION (Don't access file while sensor thread is writing to it!)
173         pthread_mutex_lock(&(s->mutex));
174                 
175                 fseek(s->file, -bufsiz*sizeof(DataPoint), SEEK_END);
176                 amount_read = fread(buffer, sizeof(DataPoint), bufsiz, s->file);
177                 //Log(LOGDEBUG, "Read %d data points", amount_read);            
178         pthread_mutex_unlock(&(s->mutex));
179         return amount_read;
180 }
181
182 /**
183  * Get a Sensor given an ID string
184  * @param id_str ID string
185  * @returns Sensor* identified by the string; NULL on error
186  */
187 Sensor * Sensor_Identify(const char * id_str)
188 {
189         char * end;
190         // Parse string as integer
191         int id = strtol(id_str, &end, 10);
192         if (*end != '\0')
193         {
194                 return NULL;
195         }
196         // Bounds check
197         if (id < 0 || id >= NUMSENSORS)
198                 return NULL;
199
200
201         Log(LOGDEBUG, "Sensor \"%s\" identified", g_sensor_names[id]);
202         return g_sensors+id;
203 }
204
205 /**
206  * Handle a request to the sensor module
207  * @param context - The context to work in
208  * @param params - Parameters passed
209  */
210 void Sensor_Handler(FCGIContext *context, char * params)
211 {
212         DataPoint buffer[SENSOR_QUERYBUFSIZ];
213         StatusCodes status = STATUS_OK;
214
215         enum {DEFAULT, DUMP} operation = DEFAULT;
216
217         const char * key; const char * value;
218
219         Sensor * sensor = NULL;
220
221         while ((params = FCGI_KeyPair(params, &key, &value)) != NULL)
222         {
223                 Log(LOGDEBUG, "Got key=%s and value=%s", key, value);
224                 if (strcmp(key, "id") == 0)
225                 {
226                         if (sensor != NULL)
227                         {
228                                 Log(LOGERR, "Only one sensor id should be specified");
229                                 status = STATUS_ERROR;
230                                 break;
231                         }
232                         if (*value == '\0')
233                         {
234                                 Log(LOGERR, "No id specified.");
235                                 status = STATUS_ERROR;
236                                 break;
237                         }
238
239                         sensor = Sensor_Identify(value);
240                         if (sensor == NULL)
241                         {
242                                 Log(LOGERR, "Invalid sensor id: %s", value);
243                                 status = STATUS_ERROR;
244                                 break;
245                         }
246                 }
247                 else if (strcmp(key, "dump") == 0)
248                 {
249                         if (operation != DEFAULT)
250                         {
251                                 Log(LOGERR, "Operation already specified!");
252                                 status = STATUS_ERROR;
253                                 break;
254                         }
255                         operation = DUMP;
256                 }
257                 else
258                 {
259                         Log(LOGERR, "Unknown key \"%s\" (value = %s)", key, value);
260                         status = STATUS_ERROR;
261                         break;
262                 }               
263         }
264
265         if (status != STATUS_ERROR && sensor == NULL)
266         {
267                 Log(LOGERR, "No valid sensor id given");
268                 status = STATUS_ERROR;
269         }
270
271         if (status == STATUS_ERROR)
272         {
273                 FCGI_RejectJSON(context);
274                 return;
275         }
276         
277         switch (operation)
278         {
279                 case DUMP:
280                 {
281                         FCGI_PrintRaw("Content-type: text/plain\r\n\r\n");
282                         //CRITICAL SECTION
283                         pthread_mutex_lock(&(sensor->mutex));
284                                 fseek(sensor->file, 0, SEEK_SET);
285                                 int amount_read = 0;
286                                 do
287                                 {
288                                         amount_read = fread(buffer, sizeof(DataPoint), SENSOR_QUERYBUFSIZ, sensor->file);
289                                         for (int i = 0; i < amount_read; ++i)
290                                         {
291                                                 FCGI_PrintRaw("%f\t%f\n", buffer[i].time_stamp, buffer[i].value);
292                                         }
293         
294                                 }
295                                 while (amount_read == SENSOR_QUERYBUFSIZ);
296                         pthread_mutex_unlock(&(sensor->mutex));
297                         // end critical section
298                         break;
299                 }
300                 default:
301                 {
302                         FCGI_BeginJSON(context, status);        
303                         FCGI_JSONPair(key, value); // should spit back sensor ID
304                         //Log(LOGDEBUG, "Call Sensor_Query...");
305                         int amount_read = Sensor_Query(sensor, buffer, SENSOR_QUERYBUFSIZ);
306                         //Log(LOGDEBUG, "Read %d DataPoints", amount_read);
307                         //Log(LOGDEBUG, "Produce JSON response");
308                         FCGI_JSONKey("data");
309                         FCGI_JSONValue("[");
310                         for (int i = 0; i < amount_read; ++i)
311                         {
312                                 //TODO: Consider; is it better to give both tv_sec and tv_usec to the client seperately, instead of combining here?
313                                 
314                                 FCGI_JSONValue("[%f, %f]", buffer[i].time_stamp, buffer[i].value);
315                                 if (i+1 < amount_read)
316                                         FCGI_JSONValue(",");
317                         }
318                         FCGI_JSONValue("]");
319                         //Log(LOGDEBUG, "Done producing JSON response");
320                         FCGI_EndJSON(); 
321                         break;
322                 }
323         }
324 }
325
326 /**
327  * Setup Sensors, start Sensor polling thread(s)
328  */
329 void Sensor_Spawn()
330 {
331         // start sensor threads
332         for (int i = 0; i < NUMSENSORS; ++i)
333         {
334                 Init(g_sensors+i, i);
335                 pthread_create(&(g_sensors[i].thread), NULL, Sensor_Main, (void*)(g_sensors+i));
336         }
337 }
338
339 /**
340  * Quit Sensor loops
341  */
342 void Sensor_Join()
343 {
344         if (!Thread_Runstate())
345         {
346                 Fatal("This function should not be called before Thread_QuitProgram");
347         }
348         for (int i = 0; i < NUMSENSORS; ++i)
349         {
350                 pthread_join(g_sensors[i].thread, NULL);
351                 Destroy(g_sensors+i);
352         }
353 }

UCC git Repository :: git.ucc.asn.au