AcessNative - Rework of thread handling
[tpg/acess2.git] / AcessNative / acesskernel_src / server.c
1 /*
2  * Acess2 Native Kernel
3  * - Acess kernel emulation on another OS using SDL and UDP
4  *
5  * Syscall Server
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <SDL/SDL.h>
11 #ifdef __WIN32__
12 # define _WIN32_WINNT 0x0501
13 # include <windows.h>
14 # include <winsock2.h>
15 # include <ws2tcpip.h>
16 # define close(fd)      closesocket(fd)
17 typedef int     socklen_t;
18 #else
19 # include <unistd.h>
20 # include <sys/socket.h>
21 # include <netinet/in.h>
22 # include <netdb.h>     // getaddrinfo
23 #endif
24 #define DONT_INCLUDE_SYSCALL_NAMES
25 #include "../syscalls.h"
26 #include <logdebug.h>   // acess but std
27 #include <errno.h>
28
29 #define USE_TCP 1
30 #define MAX_CLIENTS     16
31
32 // === TYPES ===
33 typedef struct {
34          int    ClientID;
35         SDL_Thread      *WorkerThread;
36         tRequestHeader  *CurrentRequest;
37         SDL_cond        *WaitFlag;
38         SDL_mutex       *Mutex;
39         #if USE_TCP
40          int    Socket;
41         #else
42         struct sockaddr_in      ClientAddr;
43         #endif
44 }       tClient;
45
46 // === IMPORTS ===
47 // TODO: Move these to headers
48 extern tRequestHeader *SyscallRecieve(tRequestHeader *Request, size_t *ReturnLength);
49 extern int      Threads_CreateRootProcess(void);
50 extern void     Threads_SetThread(int TID, void *ClientPtr);
51 extern void     *Threads_GetThread(int TID);
52 extern void     Threads_PostEvent(void *Thread, uint32_t Event);
53 extern void     Threads_int_Terminate(void *Thread);
54
55 // === PROTOTYPES ===
56 tClient *Server_GetClient(int ClientID);
57  int    Server_WorkerThread(void *ClientPtr);
58  int    SyscallServer(void);
59  int    Server_ListenThread(void *Unused);
60
61 // === GLOBALS ===
62 #ifdef __WIN32__
63 WSADATA gWinsock;
64 SOCKET  gSocket = INVALID_SOCKET;
65 #else
66 # define INVALID_SOCKET -1
67  int    gSocket = INVALID_SOCKET;
68 #endif
69 tClient gaServer_Clients[MAX_CLIENTS];
70 SDL_Thread      *gpServer_ListenThread;
71
72 // === CODE ===
73 int Server_GetClientID(void)
74 {
75         Uint32  thisId = SDL_ThreadID();
76         
77         for( int i = 0; i < MAX_CLIENTS; i ++ )
78         {
79                 if( SDL_GetThreadID(gaServer_Clients[i].WorkerThread) == thisId )
80                         return gaServer_Clients[i].ClientID;
81         }
82         
83         fprintf(stderr, "ERROR: Server_GetClientID - Thread is not allocated\n");
84         
85         return 0;
86 }
87
88 tClient *Server_GetClient(int ClientID)
89 {
90         tClient *ret = NULL;
91         
92         // Allocate an ID if needed
93         if(ClientID == 0)
94                 ClientID = Threads_CreateRootProcess();
95         
96         for( int i = 0; i < MAX_CLIENTS; i ++ )
97         {
98                 if( gaServer_Clients[i].ClientID == ClientID ) {
99                         return &gaServer_Clients[i];
100                 }
101                 if(!ret && gaServer_Clients[i].ClientID == 0)
102                         ret = &gaServer_Clients[i];
103         }
104         
105         // Uh oh, no free slots
106         // TODO: Dynamic allocation
107         if( !ret ) {
108                 Log_Error("Server", "Ran out of static client slots (%i)", MAX_CLIENTS);
109                 return NULL;
110         }
111         
112         // Allocate a thread for the process
113         ret->ClientID = ClientID;
114         ret->CurrentRequest = NULL;
115         #if USE_TCP
116         ret->Socket = 0;
117         #endif
118                 
119         if( !ret->WorkerThread ) {
120                 Log_Debug("Server", "Creating worker for %p", ret);
121                 ret->WaitFlag = SDL_CreateCond();
122                 ret->Mutex = SDL_CreateMutex();
123                 SDL_mutexP( ret->Mutex );
124                 ret->WorkerThread = SDL_CreateThread( Server_WorkerThread, ret );
125         }
126         
127         return ret;
128 }
129
130 int Server_WorkerThread(void *ClientPtr)
131 {
132         tClient *Client = ClientPtr;
133
134         Log_Debug("Server", "Worker %p active", ClientPtr);     
135
136         tRequestHeader  errorHeader;
137         size_t  retSize = 0;
138          int    cur_client_id = 0;
139         while( Client->ClientID != 0 )
140         {
141                 // Wait for something to do
142                 if( Client->CurrentRequest == NULL )
143                         SDL_CondWait(Client->WaitFlag, Client->Mutex);
144                 if( Client->CurrentRequest == NULL )
145                         continue ;
146                 
147                 if(Client->ClientID != cur_client_id) {
148                         Threads_SetThread( Client->ClientID, Client );
149                         cur_client_id = Client->ClientID;
150                 }
151                 
152                 // Get the response
153                 tRequestHeader  *retHeader = SyscallRecieve(Client->CurrentRequest, &retSize);
154
155                 if( !retHeader ) {
156                         // Return an error to the client
157                         printf("ERROR: SyscallRecieve failed\n");
158                         errorHeader.CallID = Client->CurrentRequest->CallID;
159                         errorHeader.NParams = 0;
160                         retHeader = &errorHeader;
161                         retSize = sizeof(errorHeader);
162                 }
163                 
164                 // Set ID
165                 retHeader->ClientID = Client->ClientID;
166                 
167                 // Mark the thread as ready for another job
168                 free(Client->CurrentRequest);
169                 Client->CurrentRequest = 0;
170
171                 // If the thread is being terminated, don't send reply
172                 if( Client->ClientID > 0 )
173                 {
174                         // Return the data
175                         #if USE_TCP
176                         size_t sentSize = send(Client->Socket, retHeader, retSize, 0); 
177                         #else
178                         size_t sentSize = sendto(gSocket, retHeader, retSize, 0,
179                                 (struct sockaddr*)&Client->ClientAddr, sizeof(Client->ClientAddr)
180                                 );
181                         #endif
182                         if( sentSize != retSize ) {
183                                 perror("Server_WorkerThread - send");
184                         }
185                 }
186                 
187                 // Free allocated header
188                 if( retHeader != &errorHeader )
189                         free( retHeader );
190         }
191         Log_Notice("Server", "Terminated Worker %p", ClientPtr);        
192         return 0;
193 }
194
195 int SyscallServer(void)
196 {
197         struct sockaddr_in      server;
198         
199         #ifdef __WIN32__
200         /* Open windows connection */
201         if (WSAStartup(0x0101, &gWinsock) != 0)
202         {
203                 fprintf(stderr, "Could not open Windows connection.\n");
204                 exit(0);
205         }
206         #endif
207         
208         #if USE_TCP
209         // Open TCP Connection
210         gSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
211         #else
212         // Open UDP Connection
213         gSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
214         #endif
215         if (gSocket == INVALID_SOCKET)
216         {
217                 fprintf(stderr, "Could not create socket.\n");
218                 #if __WIN32__
219                 WSACleanup();
220                 #endif
221                 exit(0);
222         }
223         
224         // Set server address
225         memset(&server, 0, sizeof(struct sockaddr_in));
226         server.sin_family = AF_INET;
227         server.sin_port = htons(SERVER_PORT);
228         server.sin_addr.s_addr = htonl(INADDR_ANY);
229         
230         // Bind
231         if( bind(gSocket, (struct sockaddr *)&server, sizeof(struct sockaddr_in)) == -1 )
232         {
233                 fprintf(stderr, "Cannot bind address to socket.\n");
234                 perror("SyscallServer - bind");
235                 #if __WIN32__
236                 closesocket(gSocket);
237                 WSACleanup();
238                 #else
239                 close(gSocket);
240                 #endif
241                 exit(0);
242         }
243         
244         #if USE_TCP
245         listen(gSocket, 5);
246         #endif
247         
248         Log_Notice("AcessSrv", "Listening on 0.0.0.0:%i", SERVER_PORT);
249         gpServer_ListenThread = SDL_CreateThread( Server_ListenThread, NULL );
250         return 0;
251 }
252
253 int Server_Shutdown(void)
254 {
255         close(gSocket);
256         for( int i = 0; i < MAX_CLIENTS; i ++ )
257         {
258                 if( gaServer_Clients[i].ClientID == 0 )
259                         continue ;
260                 Threads_PostEvent( Threads_GetThread(gaServer_Clients[i].ClientID), 0 );
261                 gaServer_Clients[i].ClientID = -1;
262                 #if USE_TCP
263                 close(gaServer_Clients[i].Socket);
264                 #else
265                 SDL_CondSignal(gaServer_Clients[i].WaitFlag);
266                 #endif
267         }
268         return 0;
269 }
270
271 #if USE_TCP
272 int Server_int_HandleRx(tClient *Client)
273 {
274         const int       ciMaxParamCount = 6;
275         char    lbuf[sizeof(tRequestHeader) + ciMaxParamCount*sizeof(tRequestValue)];
276         tRequestHeader  *hdr = (void*)lbuf;
277         size_t  len = recv(Client->Socket, (void*)hdr, sizeof(*hdr), 0);
278         if( len == 0 ) {
279                 Log_Notice("Server", "Zero RX on %i (worker %p)", Client->Socket, Client);
280                 return 1;
281         }
282         if( len == -1 ) {
283                 perror("recv header");
284                 return 2;
285         }
286         if( len != sizeof(*hdr) ) {
287                 // Oops?
288                 Log_Warning("Server", "FD%i bad sized (%i != exp %i)",
289                         Client->Socket, len, sizeof(*hdr));
290                 return 0;
291         }
292
293         if( hdr->NParams > ciMaxParamCount ) {
294                 // Oops.
295                 Log_Warning("Server", "FD%i too many params (%i > max %i)",
296                         Client->Socket, hdr->NParams, ciMaxParamCount);
297                 return 0;
298         }
299
300         if( hdr->NParams > 0 )
301         {
302                 len = recv(Client->Socket, (void*)hdr->Params, hdr->NParams*sizeof(tRequestValue), 0);
303                 if( len != hdr->NParams*sizeof(tRequestValue) ) {
304                         // Oops.
305                         perror("recv params");
306                         Log_Warning("Sever", "Recieving params failed");
307                         return 0;
308                 }
309         }
310         else
311         {
312                 //Log_Debug("Server", "No params?");
313         }
314
315         // Get buffer size
316         size_t  hdrsize = sizeof(tRequestHeader) + hdr->NParams*sizeof(tRequestValue);
317         size_t  bufsize = hdrsize;
318         for( int i = 0; i < hdr->NParams; i ++ )
319         {
320                 if( hdr->Params[i].Flags & ARG_FLAG_ZEROED )
321                         ;
322                 else {
323                         bufsize += hdr->Params[i].Length;
324                 }
325         }
326
327         // Allocate full buffer
328         hdr = malloc(bufsize);
329         memcpy(hdr, lbuf, hdrsize);
330         if( bufsize > hdrsize )
331         {
332                 size_t  rem = bufsize - hdrsize;
333                 char    *ptr = (void*)( hdr->Params + hdr->NParams );
334                 while( rem )
335                 {
336                         len = recv(Client->Socket, ptr, rem, 0);
337                         if( len == -1 ) {
338                                 // Oops?
339                                 perror("recv data");
340                                 Log_Warning("Sever", "Recieving data failed");
341                                 return 2;
342                         }
343                         rem -= len;
344                         ptr += len;
345                 }
346                 if( rem ) {
347                         // Extra data?
348                         return 0;
349                 }
350         }
351         else {
352                 //Log_Debug("Server", "no data");
353         }
354         
355         // Dispatch to worker
356         if( Client->CurrentRequest ) {
357                 printf("Worker thread for client ID %i is busy\n", Client->ClientID);
358                 return 1;
359         }
360
361         // Give to worker
362         Log_Debug("Server", "Message from Client %i (%p)", Client->ClientID, Client);
363         Client->CurrentRequest = hdr;
364         SDL_CondSignal(Client->WaitFlag);
365
366         return 0;
367 }
368
369 int Server_int_HandshakeClient(int Socket, struct sockaddr_in *addr, socklen_t addr_size)
370 {
371         ENTER("iSocket paddr iaddr_size",
372                 Socket, addr, addr_size);
373         unsigned short  port = ntohs(addr->sin_port);
374         char    addrstr[4*8+8+1];
375         getnameinfo((struct sockaddr*)addr, addr_size, addrstr, sizeof(addrstr), NULL, 0, NI_NUMERICHOST);
376         Log_Debug("Server", "Client connection %s:%i", addrstr, port);
377         
378         // Perform handshake
379         tRequestAuthHdr authhdr;
380         size_t  len  = recv(Socket, &authhdr, sizeof(authhdr), 0);
381         if( len != sizeof(authhdr) ) {
382                 // Some form of error?
383                 Log_Warning("Server", "Client auth block bad size (%i != exp %i)",
384                         len, sizeof(authhdr));
385                 LEAVE('i', 1);
386                 return 1;
387         }
388         
389         LOG("authhdr.pid = %i", authhdr.pid);
390         tClient *client = Server_GetClient(authhdr.pid);
391         if( authhdr.pid == 0 ) {
392                 // Allocate PID and client structure/thread
393                 client->Socket = Socket;
394                 authhdr.pid = client->ClientID;
395         }
396         else {
397                 Log_Debug("Server", "Client assumed PID %i", authhdr.pid);
398                 
399                 // Get client structure and make sure it's unused
400                 // - Auth token / verifcation?
401                 if( !client ) {
402                         Log_Warning("Server", "Can't allocate a client struct for %s:%i",
403                                 addrstr, port);
404                         LEAVE('i', 1);
405                         return 1;
406                 }
407                 if( client->Socket != 0 ) {
408                         Log_Warning("Server", "Client (%i)%p owned by FD%i but %s:%i tried to use it",
409                                 authhdr.pid, client, addrstr, port);
410                         LEAVE('i', 1);
411                         return 1;
412                 }
413                 
414                 client->Socket = Socket;
415         }
416
417         LOG("Sending auth reply");      
418         len = send(Socket, (void*)&authhdr, sizeof(authhdr), 0);
419         if( len != sizeof(authhdr) ) {
420                 // Ok, this is an error
421                 perror("Sending auth reply");
422                 LEAVE('i', 1);
423                 return 1;
424         }
425
426         // All done, client thread should be watching now               
427         
428         LEAVE('i', 0);
429         return 0;
430 }
431
432 void Server_int_RemoveClient(tClient *Client)
433 {
434         // Trigger the thread to kill itself
435         Threads_int_Terminate( Threads_GetThread(Client->ClientID) );
436         Client->ClientID = 0;
437         close(Client->Socket);
438 }
439
440 #endif
441
442 int Server_ListenThread(void *Unused)
443 {       
444         // Wait for something to do :)
445         for( ;; )
446         {
447                 #if USE_TCP
448                 fd_set  fds;
449                  int    maxfd = gSocket;
450                 FD_ZERO(&fds);
451                 FD_SET(gSocket, &fds);
452
453                 for( int i = 0; i < MAX_CLIENTS; i ++ ) {
454                         tClient *client = &gaServer_Clients[i];
455                         if( client->ClientID == 0 )
456                                 continue ;
457                         FD_SET(client->Socket, &fds);
458                         if(client->Socket > maxfd)
459                                 maxfd = client->Socket;
460                 }
461                 
462                 int rv = select(maxfd+1, &fds, NULL, NULL, NULL);
463                 Log_Debug("Server", "Select rv = %i", rv);
464                 if( rv <= 0 ) {
465                         perror("select");
466                         return 1;
467                 }
468                 
469                 // Incoming connection
470                 if( FD_ISSET(gSocket, &fds) )
471                 {
472                         struct sockaddr_in      clientaddr;
473                         socklen_t       clientSize = sizeof(clientaddr);
474                          int    clientSock = accept(gSocket, (struct sockaddr*)&clientaddr, &clientSize);
475                         if( clientSock < 0 ) {
476                                 perror("SyscallServer - accept");
477                                 break ;
478                         }
479                         if( Server_int_HandshakeClient(clientSock, &clientaddr, clientSize) ) {
480                                 Log_Warning("Server", "Client handshake failed :(");
481                                 close(clientSock);
482                         }
483                 }
484                 
485                 for( int i = 0; i < MAX_CLIENTS; i ++ )
486                 {
487                         tClient *client = &gaServer_Clients[i];
488                         if( client->ClientID == 0 )
489                                 continue ;
490                         //Debug("Server_ListenThread: Idx %i ID %i FD %i",
491                         //      i, client->ClientID, client->Socket);
492                         if( !FD_ISSET(client->Socket, &fds) )
493                                 continue ;
494                         
495                         if( Server_int_HandleRx( client ) )
496                         {
497                                 Log_Warning("Server", "Client %p dropped, TODO: clean up", client);
498                                 Server_int_RemoveClient(client);
499                         }
500                 }
501         
502                 #else
503                 char    data[BUFSIZ];
504                 tRequestHeader  *req = (void*)data;
505                 struct sockaddr_in      addr;
506                 uint    clientSize = sizeof(addr);
507                  int    length;
508                 
509                 length = recvfrom(gSocket, data, BUFSIZ, 0, (struct sockaddr*)&addr, &clientSize);
510                 
511                 if( length == -1 ) {
512                         perror("SyscallServer - recv");
513                         break;
514                 }
515                 
516                 // Recive data
517 //              Log_Debug("Server", "%i bytes from %x:%i", length,
518 //                      ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
519                 
520                 tClient *client = Server_GetClient(req->ClientID);
521                 // NOTE: I should really check if the sin_addr is zero, but meh
522                 // Shouldn't matter much
523                 if( req->ClientID == 0 || client->ClientAddr.sin_port == 0 )
524                 {
525                         memcpy(&client->ClientAddr, &addr, sizeof(addr));
526                 }
527                 else if( memcmp(&client->ClientAddr, &addr, sizeof(addr)) != 0 )
528                 {
529                         printf("ClientID %i used by %x:%i\n",
530                                 client->ClientID, ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
531                         printf(" actually owned by %x:%i\n",
532                                 ntohl(client->ClientAddr.sin_addr.s_addr), ntohs(client->ClientAddr.sin_port));
533                         continue;
534                 }
535                 
536 //              Log_Debug("AcessSrv", "Message from Client %i (%p)",
537 //                      client->ClientID, client);
538                 if( client->CurrentRequest ) {
539                         printf("Worker thread for %x:%i is busy\n",
540                                 ntohl(client->ClientAddr.sin_addr.s_addr), ntohs(client->ClientAddr.sin_port));
541                         continue;
542                 }
543                 
544                 // Duplicate the data currently on the stack, and dispatch to worker
545                 req = malloc(length);
546                 memcpy(req, data, length);
547                 client->CurrentRequest = req;
548                 SDL_CondSignal(client->WaitFlag);
549                 #endif
550         }
551         return -1;
552 }

UCC git Repository :: git.ucc.asn.au