Kernel/VTerm - Minor cleanup to VT input (remove logging)
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
1 /*
2  * Acess2 Kernel
3  * - By John Hodge (thePowersGang);
4  * 
5  * drv/dgram_pipe.c
6  * - Connection+Datagram based local IPC
7  */
8 #define DEBUG   0
9 #include <vfs.h>
10 #include <fs_devfs.h>
11 #include <modules.h>
12 #include <rwlock.h>
13 #include <semaphore.h>
14
15 #define DEF_MAX_BLOCK_SIZE      0x1000
16 #define DEF_MAX_BYTE_LIMIT      0x8000
17
18 // === TYPES ===
19 typedef struct sIPCPipe_Packet  tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint        tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server  tIPCPipe_Server;
23
24 // === STRUCTURES ===
25 struct sIPCPipe_Packet
26 {
27         tIPCPipe_Packet *Next;
28         size_t  Length;
29         size_t  Offset; //!< Offset to first unread byte (for streams)
30         void    *Data;
31 };
32 struct sIPCPipe_Endpoint
33 {
34         tMutex  lList;
35         tIPCPipe_Packet *OutHead;
36         tIPCPipe_Packet *OutTail;
37         size_t  ByteCount;
38         tVFS_Node       Node;
39 };
40 struct sIPCPipe_Channel
41 {
42         tIPCPipe_Channel        *Next;
43         tIPCPipe_Channel        *Prev;
44         tIPCPipe_Server *Server;
45         tIPCPipe_Endpoint       ServerEP;
46         tIPCPipe_Endpoint       ClientEP;
47 };
48 struct sIPCPipe_Server
49 {
50         tIPCPipe_Server *Next;
51         tIPCPipe_Server *Prev;
52         char    *Name;
53         size_t  MaxBlockSize;   // Max size of a 'packet'
54         // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
55         size_t  QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
56         tVFS_Node       ServerNode;
57         tRWLock lChannelList;
58         tIPCPipe_Channel        *FirstClient;
59         tIPCPipe_Channel        *LastClient;
60         tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
61 };
62
63 // === PROTOTYPES ===
64  int    IPCPipe_Install(char **Arguments);
65 // - Root
66 tVFS_Node       *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67  int    IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node       *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
69 // - Server
70  int    IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node       *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
72 void    IPCPipe_Server_Close(tVFS_Node *Node);
73 // - Socket
74 tIPCPipe_Channel        *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t  IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
76 size_t  IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
77 void    IPCPipe_Client_Close(tVFS_Node *Node);
78
79 // === GLOBALS ===
80 // - Server node: Directory
81 tVFS_NodeType   gIPCPipe_ServerNodeType = {
82         .TypeName = "IPC Pipe - Server",
83         .ReadDir = IPCPipe_Server_ReadDir,
84         .FindDir = IPCPipe_Server_FindDir,
85         .Close = IPCPipe_Server_Close
86 };
87 tVFS_NodeType   gIPCPipe_ChannelNodeType = {
88         .TypeName = "IPC Pipe - Channel",
89         .Flags = VFS_NODETYPEFLAG_STREAM,
90         .Read = IPCPipe_Client_Read,
91         .Write = IPCPipe_Client_Write,
92         .Close = IPCPipe_Client_Close
93 };
94 tVFS_NodeType   gIPCPipe_RootNodeType = {
95         .TypeName = "IPC Pipe - Root",
96         .MkNod = IPCPipe_Root_MkNod,
97         .ReadDir = IPCPipe_Root_ReadDir,
98         .FindDir = IPCPipe_Root_FindDir
99 };
100 tDevFS_Driver   gIPCPipe_DevFSInfo = {
101         .Name = "ipcpipe",
102         .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
103 };
104 // - Global list of servers
105 tRWLock glIPCPipe_ServerList;
106 tIPCPipe_Server *gpIPCPipe_FirstServer;
107 tIPCPipe_Server *gpIPCPipe_LastServer;
108 // - Module definition
109 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
110
111 // === CODE ===
112 int IPCPipe_Install(char **Arguments)
113 {
114         DevFS_AddDevice(&gIPCPipe_DevFSInfo);
115         return MODULE_ERR_OK;
116 }
117
118 // - Root -
119 /*
120  * \brief Create a new named pipe
121  * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
122  */
123 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
124 {
125         tIPCPipe_Server *srv;
126         
127         ENTER("pNode sName xFlags", Node, Name, Flags);
128         
129         // Write Lock
130         RWLock_AcquireWrite(&glIPCPipe_ServerList);
131         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
132         {
133                 if( strcmp(srv->Name, Name) == 0 )
134                         break;
135         }
136         if( srv )
137         {
138                 RWLock_Release(&glIPCPipe_ServerList);
139                 LEAVE('n');
140                 return NULL;
141         }
142         
143
144         srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
145         srv->Name = (void*)(srv + 1);
146         strcpy(srv->Name, Name);
147         srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
148         srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
149         srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
150         srv->ServerNode.ImplPtr = srv;
151         srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
152
153         // Lock already held
154         if( gpIPCPipe_LastServer )
155                 gpIPCPipe_LastServer->Next = srv;
156         else
157                 gpIPCPipe_FirstServer = srv;
158         srv->Prev = gpIPCPipe_LastServer;
159         gpIPCPipe_LastServer = srv;
160         RWLock_Release(&glIPCPipe_ServerList);
161
162         LEAVE('p', &srv->ServerNode);
163         return &srv->ServerNode;
164 }
165
166 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
167 {
168         tIPCPipe_Server *srv;
169         RWLock_AcquireRead(&glIPCPipe_ServerList);
170         for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
171                 ;
172         RWLock_Release(&glIPCPipe_ServerList);
173         
174         if( srv ) {
175                 strncpy(Name, srv->Name, sizeof(Name));
176                 return 0;
177         }
178         
179         return -1;
180 }
181 /**
182  * \return New client pointer
183  */
184 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
185 {
186         tIPCPipe_Server *srv;
187         ENTER("pNode sName", Node, Name);
188         
189         // Find server
190         RWLock_AcquireRead(&glIPCPipe_ServerList);
191         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
192         {
193                 LOG("== %s", srv->Name);
194                 if( strcmp(srv->Name, Name) == 0 )
195                         break;
196         }
197         RWLock_Release(&glIPCPipe_ServerList);
198         if( !srv ) {
199                 LEAVE('n');
200                 return NULL;
201         }
202
203         if( Flags & VFS_FDIRFLAG_STAT ) {
204                 // LEAVE('p', srv->TplClientNode);
205                 // return &srv->TplClientNode;
206         }
207
208         // Create new client
209         tIPCPipe_Channel *new_client;   
210
211         new_client = calloc(1, sizeof(tIPCPipe_Channel));
212         new_client->Server = srv;
213         new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
214         new_client->ClientEP.Node.ImplPtr = new_client;
215         new_client->ClientEP.Node.Size = -1;
216         new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
217         new_client->ServerEP.Node.ImplPtr = new_client;
218         new_client->ServerEP.Node.Size = -1;
219
220         // Append to server list
221         RWLock_AcquireWrite(&srv->lChannelList);
222         if(srv->LastClient)
223                 srv->LastClient->Next = new_client;
224         else
225                 srv->FirstClient = new_client;
226         srv->LastClient = new_client;
227         if(!srv->FirstUnseenClient)
228                 srv->FirstUnseenClient = new_client;
229         VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
230         RWLock_Release(&srv->lChannelList);
231         
232         LEAVE('p', &new_client->ClientEP.Node);
233         return &new_client->ClientEP.Node;
234 }
235
236 // --- Server ---
237 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
238 {
239         // 'next' is a valid entry, but readdir should never be called on this node
240         return -1;
241 }
242 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
243 {
244         tIPCPipe_Server *srv = Node->ImplPtr;
245
246         ENTER("pNode sName", Node, Name);       
247
248         if( strcmp(Name, "newclient") != 0 ) {
249                 LEAVE('n');
250                 return NULL;
251         }
252         
253         // TODO: Need VFS_FDIRFLAG_NOBLOCK?
254         VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
255
256         tIPCPipe_Channel *conn;
257         RWLock_AcquireRead(&srv->lChannelList);
258         conn = srv->FirstUnseenClient;
259         if( conn )
260         {
261                 srv->FirstUnseenClient = conn->Next;
262         }
263         VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
264         RWLock_Release(&srv->lChannelList);
265
266         if( !conn ) {
267                 LEAVE('n');
268                 return NULL;
269         }
270         
271         // Success
272         LEAVE('p', &conn->ServerEP.Node);
273         return &conn->ServerEP.Node;
274 }
275
276 void IPCPipe_Server_Close(tVFS_Node *Node)
277 {
278         tIPCPipe_Server *srv = Node->ImplPtr;
279
280         // Flag server as being destroyed
281
282         // Force-close all children
283         RWLock_AcquireWrite(&srv->lChannelList);
284         for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
285         {
286                 client->Server = NULL;
287         }
288         RWLock_Release(&srv->lChannelList);
289
290         // Remove from global list
291         RWLock_AcquireWrite(&glIPCPipe_ServerList);
292         // - Forward link
293         if(srv->Prev)
294                 srv->Prev->Next = srv->Next;
295         else
296                 gpIPCPipe_FirstServer = srv->Next;
297         // - Reverse link
298         if(srv->Next)
299                 srv->Next->Prev = srv->Prev;
300         else
301                 gpIPCPipe_LastServer = srv->Prev;
302         RWLock_Release(&glIPCPipe_ServerList);
303 }
304
305 // --- Channel ---
306 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
307 {
308         tIPCPipe_Channel        *ch = Node->ImplPtr;
309         if( ch )
310         {
311                 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
312                 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
313         }
314         return ch;
315 }
316 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
317 {
318         tIPCPipe_Endpoint       *lep, *rep;
319         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
320
321         ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);       
322
323         // Closed endpoint / channel
324         if( !channel || channel->Server == NULL ) {
325                 LEAVE('i', -1);
326                 return -1;
327         }
328         
329         // Wait for a packet to be ready
330         tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
331         int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
332         if( !rv ) {
333                 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
334                 LEAVE('i', -1);
335                 return -1;
336         }
337         if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
338                 //errno = EIO;
339                 LEAVE('i', -1);
340                 return -1;
341         }
342
343         // Pop a packet from the list
344         Mutex_Acquire(&rep->lList);
345         if( !rep->Node.ImplPtr )
346         {
347                 Mutex_Release(&rep->lList);
348                 //errno = EIO;
349                 LEAVE('i', -1);
350                 return -1;
351         }
352         tIPCPipe_Packet *pkt = rep->OutHead;
353         if(pkt)
354                 rep->OutHead = pkt->Next;
355         if(!rep->OutHead)
356                 rep->OutTail = NULL;
357         VFS_MarkAvaliable(Node, !!rep->OutHead);
358         VFS_MarkFull(&rep->Node, 0);    //      Just read a packet, remote shouldn't be full
359         Mutex_Release(&rep->lList);
360
361         // Return
362         size_t ret = 0;
363         if(pkt)
364         {
365                 ret = MIN(pkt->Length, Length);
366                 memcpy(Dest, pkt->Data, ret);
367                 free(pkt);
368         }
369         else
370         {
371                 Log_Warning("IPCPipe", "No packet ready but select returned");
372         }
373         
374         LEAVE('i', ret);
375         return ret;
376 }
377
378 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
379 {
380         tIPCPipe_Endpoint       *lep, *rep;
381         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
382
383         ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); 
384         
385         // Ensure the server hasn't been closed
386         if( !channel || channel->Server == NULL ) {
387                 LEAVE('i', -1);
388                 return -1;
389         }
390
391         if( Length > channel->Server->MaxBlockSize ) {
392                 LEAVE('i', 0);
393                 return 0;
394         }
395         
396         // Wait for a packet to be ready
397         tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
398         int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
399         ASSERTC(rv, >=, 0);
400         if( !rv ) {
401                 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
402                 LEAVE('i', -1);
403                 return -1;
404         }
405         if( (rv & VFS_SELECT_ERROR) ||  channel->Server == NULL ) {
406                 //errno = EIO;
407                 LEAVE('i', -1);
408                 return -1;
409         }
410
411         // Create packet structure
412         tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
413         pkt->Next = NULL;
414         pkt->Offset = 0;
415         pkt->Length = Length;
416         pkt->Data = pkt + 1;
417         memcpy(pkt->Data, Src, Length);
418
419         // Append to list
420         Mutex_Acquire(&lep->lList);
421         if( !Node->ImplPtr )
422         {
423                 // Client was closed by another thread
424                 free(pkt);
425                 Mutex_Release(&lep->lList);
426                 LEAVE('i', -1);
427                 return -1;
428         }
429         if(lep->OutTail)
430                 lep->OutTail->Next = pkt;
431         else
432                 lep->OutHead = pkt;
433         lep->OutTail = pkt;
434         
435         lep->ByteCount += Length;
436         if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
437                 VFS_MarkFull(Node, 1);
438         }
439         
440         Mutex_Release(&lep->lList);
441
442         // Signal other end
443         VFS_MarkAvaliable(&rep->Node, 1);
444
445         LEAVE('i', Length);
446         return Length;
447 }
448
449 void IPCPipe_Client_Close(tVFS_Node *Node)
450 {
451         tIPCPipe_Endpoint       *lep, *rep;
452         tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
453
454         ENTER("pNode", Node);
455
456         if( !ch ) {
457                 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
458                 return ;
459         }
460         
461         // Mark client as closed
462         Node->ImplPtr = NULL;
463         // Clear packets
464         Mutex_Acquire(&lep->lList);
465         while( lep->OutHead ) {
466                 tIPCPipe_Packet *next = lep->OutHead->Next;
467                 free(lep->OutHead);
468                 lep->OutHead = next;
469         }
470         Mutex_Release(&lep->lList);
471         LOG("Packets cleared");
472         
473         // Tell remote that local has closed
474         VFS_MarkError(&rep->Node, 1);
475         // TODO: Deliver SIGPIPE or similar to all owners of remote
476         // Clean up if both sides are closed
477         if( rep->Node.ImplPtr == NULL )
478         {
479                 LOG("Remote closed, cleaning up");
480                 tIPCPipe_Server *srv = ch->Server;
481                 if( srv )
482                 {
483                         RWLock_AcquireWrite(&srv->lChannelList);
484                         if(ch->Prev)
485                                 ch->Prev->Next = ch->Next;
486                         else
487                                 srv->FirstClient = ch->Next;
488                         if(ch->Next)
489                                 ch->Next->Prev = ch->Prev;
490                         else
491                                 srv->LastClient = ch->Prev;
492                         if(srv->FirstUnseenClient == ch)
493                                 srv->FirstUnseenClient = ch->Next;
494                         RWLock_Release(&srv->lChannelList);
495                 }
496                 free(ch);
497         }
498         LEAVE('-');
499 }

UCC git Repository :: git.ucc.asn.au