Kernel/IPCPipe - Add queue length limit, watch for _ERROR in select
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
1 /*
2  * Acess2 Kernel
3  * - By John Hodge (thePowersGang);
4  * 
5  * drv/dgram_pipe.c
6  * - Connection+Datagram based local IPC
7  */
8 #define DEBUG   0
9 #include <vfs.h>
10 #include <fs_devfs.h>
11 #include <modules.h>
12 #include <rwlock.h>
13 #include <semaphore.h>
14
15 #define DEF_MAX_BLOCK_SIZE      0x1000
16 #define DEF_MAX_BYTE_LIMIT      0x8000
17
18 // === TYPES ===
19 typedef struct sIPCPipe_Packet  tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint        tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server  tIPCPipe_Server;
23
24 // === STRUCTURES ===
25 struct sIPCPipe_Packet
26 {
27         tIPCPipe_Packet *Next;
28         size_t  Length;
29         size_t  Offset; //!< Offset to first unread byte (for streams)
30         void    *Data;
31 };
32 struct sIPCPipe_Endpoint
33 {
34         tMutex  lList;
35         tIPCPipe_Packet *OutHead;
36         tIPCPipe_Packet *OutTail;
37         size_t  ByteCount;
38         tVFS_Node       Node;
39 };
40 struct sIPCPipe_Channel
41 {
42         tIPCPipe_Channel        *Next;
43         tIPCPipe_Channel        *Prev;
44         tIPCPipe_Server *Server;
45         tIPCPipe_Endpoint       ServerEP;
46         tIPCPipe_Endpoint       ClientEP;
47 };
48 struct sIPCPipe_Server
49 {
50         tIPCPipe_Server *Next;
51         tIPCPipe_Server *Prev;
52         char    *Name;
53         size_t  MaxBlockSize;   // Max size of a 'packet'
54         // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
55         size_t  QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
56         tVFS_Node       ServerNode;
57         tRWLock lChannelList;
58         tIPCPipe_Channel        *FirstClient;
59         tIPCPipe_Channel        *LastClient;
60         tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
61 };
62
63 // === PROTOTYPES ===
64  int    IPCPipe_Install(char **Arguments);
65 // - Root
66 tVFS_Node       *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67  int    IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node       *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
69 // - Server
70  int    IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node       *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
72 void    IPCPipe_Server_Close(tVFS_Node *Node);
73 // - Socket
74 tIPCPipe_Channel        *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t  IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
76 size_t  IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
77 void    IPCPipe_Client_Close(tVFS_Node *Node);
78
79 // === GLOBALS ===
80 // - Server node: Directory
81 tVFS_NodeType   gIPCPipe_ServerNodeType = {
82         .TypeName = "IPC Pipe - Server",
83         .ReadDir = IPCPipe_Server_ReadDir,
84         .FindDir = IPCPipe_Server_FindDir,
85         .Close = IPCPipe_Server_Close
86 };
87 tVFS_NodeType   gIPCPipe_ChannelNodeType = {
88         .TypeName = "IPC Pipe - Channel",
89         .Read = IPCPipe_Client_Read,
90         .Write = IPCPipe_Client_Write,
91         .Close = IPCPipe_Client_Close
92 };
93 tVFS_NodeType   gIPCPipe_RootNodeType = {
94         .TypeName = "IPC Pipe - Root",
95         .MkNod = IPCPipe_Root_MkNod,
96         .ReadDir = IPCPipe_Root_ReadDir,
97         .FindDir = IPCPipe_Root_FindDir
98 };
99 tDevFS_Driver   gIPCPipe_DevFSInfo = {
100         .Name = "ipcpipe",
101         .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
102 };
103 // - Global list of servers
104 tRWLock glIPCPipe_ServerList;
105 tIPCPipe_Server *gpIPCPipe_FirstServer;
106 tIPCPipe_Server *gpIPCPipe_LastServer;
107 // - Module definition
108 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
109
110 // === CODE ===
111 int IPCPipe_Install(char **Arguments)
112 {
113         DevFS_AddDevice(&gIPCPipe_DevFSInfo);
114         return MODULE_ERR_OK;
115 }
116
117 // - Root -
118 /*
119  * \brief Create a new named pipe
120  * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
121  */
122 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
123 {
124         tIPCPipe_Server *srv;
125         
126         ENTER("pNode sName xFlags", Node, Name, Flags);
127         
128         // Write Lock
129         RWLock_AcquireWrite(&glIPCPipe_ServerList);
130         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
131         {
132                 if( strcmp(srv->Name, Name) == 0 )
133                         break;
134         }
135         if( srv )
136         {
137                 RWLock_Release(&glIPCPipe_ServerList);
138                 LEAVE('n');
139                 return NULL;
140         }
141         
142
143         srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
144         srv->Name = (void*)(srv + 1);
145         strcpy(srv->Name, Name);
146         srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
147         srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
148         srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
149         srv->ServerNode.ImplPtr = srv;
150         srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
151
152         // Lock already held
153         if( gpIPCPipe_LastServer )
154                 gpIPCPipe_LastServer->Next = srv;
155         else
156                 gpIPCPipe_FirstServer = srv;
157         srv->Prev = gpIPCPipe_LastServer;
158         gpIPCPipe_LastServer = srv;
159         RWLock_Release(&glIPCPipe_ServerList);
160
161         LEAVE('p', &srv->ServerNode);
162         return &srv->ServerNode;
163 }
164
165 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
166 {
167         tIPCPipe_Server *srv;
168         RWLock_AcquireRead(&glIPCPipe_ServerList);
169         for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
170                 ;
171         RWLock_Release(&glIPCPipe_ServerList);
172         
173         if( srv ) {
174                 strncpy(Name, srv->Name, sizeof(Name));
175                 return 0;
176         }
177         
178         return -1;
179 }
180 /**
181  * \return New client pointer
182  */
183 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
184 {
185         tIPCPipe_Server *srv;
186         ENTER("pNode sName", Node, Name);
187         
188         // Find server
189         RWLock_AcquireRead(&glIPCPipe_ServerList);
190         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
191         {
192                 LOG("== %s", srv->Name);
193                 if( strcmp(srv->Name, Name) == 0 )
194                         break;
195         }
196         RWLock_Release(&glIPCPipe_ServerList);
197         if( !srv ) {
198                 LEAVE('n');
199                 return NULL;
200         }
201
202         if( Flags & VFS_FDIRFLAG_STAT ) {
203                 // LEAVE('p', srv->TplClientNode);
204                 // return &srv->TplClientNode;
205         }
206
207         // Create new client
208         tIPCPipe_Channel *new_client;   
209
210         new_client = calloc(1, sizeof(tIPCPipe_Channel));
211         new_client->Server = srv;
212         new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
213         new_client->ClientEP.Node.ImplPtr = new_client;
214         new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
215         new_client->ServerEP.Node.ImplPtr = new_client;
216
217         // Append to server list
218         RWLock_AcquireWrite(&srv->lChannelList);
219         if(srv->LastClient)
220                 srv->LastClient->Next = new_client;
221         else
222                 srv->FirstClient = new_client;
223         srv->LastClient = new_client;
224         if(!srv->FirstUnseenClient)
225                 srv->FirstUnseenClient = new_client;
226         VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
227         RWLock_Release(&srv->lChannelList);
228         
229         LEAVE('p', &new_client->ClientEP.Node);
230         return &new_client->ClientEP.Node;
231 }
232
233 // --- Server ---
234 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
235 {
236         // 'next' is a valid entry, but readdir should never be called on this node
237         return -1;
238 }
239 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
240 {
241         tIPCPipe_Server *srv = Node->ImplPtr;
242
243         ENTER("pNode sName", Node, Name);       
244
245         if( strcmp(Name, "newclient") != 0 ) {
246                 LEAVE('n');
247                 return NULL;
248         }
249         
250         // TODO: Need VFS_FDIRFLAG_NOBLOCK?
251         VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
252
253         tIPCPipe_Channel *conn;
254         RWLock_AcquireRead(&srv->lChannelList);
255         conn = srv->FirstUnseenClient;
256         if( conn )
257         {
258                 srv->FirstUnseenClient = conn->Next;
259         }
260         VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
261         RWLock_Release(&srv->lChannelList);
262
263         if( !conn ) {
264                 LEAVE('n');
265                 return NULL;
266         }
267         
268         // Success
269         LEAVE('p', &conn->ServerEP.Node);
270         return &conn->ServerEP.Node;
271 }
272
273 void IPCPipe_Server_Close(tVFS_Node *Node)
274 {
275         tIPCPipe_Server *srv = Node->ImplPtr;
276
277         // Flag server as being destroyed
278
279         // Force-close all children
280         RWLock_AcquireWrite(&srv->lChannelList);
281         for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
282         {
283                 client->Server = NULL;
284         }
285         RWLock_Release(&srv->lChannelList);
286
287         // Remove from global list
288         RWLock_AcquireWrite(&glIPCPipe_ServerList);
289         // - Forward link
290         if(srv->Prev)
291                 srv->Prev->Next = srv->Next;
292         else
293                 gpIPCPipe_FirstServer = srv->Next;
294         // - Reverse link
295         if(srv->Next)
296                 srv->Next->Prev = srv->Prev;
297         else
298                 gpIPCPipe_LastServer = srv->Prev;
299         RWLock_Release(&glIPCPipe_ServerList);
300 }
301
302 // --- Channel ---
303 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
304 {
305         tIPCPipe_Channel        *ch = Node->ImplPtr;
306         if( ch )
307         {
308                 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
309                 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
310         }
311         return ch;
312 }
313 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
314 {
315         tIPCPipe_Endpoint       *lep, *rep;
316         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
317
318         ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);       
319
320         // Closed endpoint / channel
321         if( !channel || channel->Server == NULL ) {
322                 LEAVE('i', -1);
323                 return -1;
324         }
325         
326         // Wait for a packet to be ready
327         tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
328         int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
329         if( !rv ) {
330                 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
331                 LEAVE('i', -1);
332                 return -1;
333         }
334         if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
335                 //errno = EIO;
336                 LEAVE('i', -1);
337                 return -1;
338         }
339
340         // Pop a packet from the list
341         Mutex_Acquire(&rep->lList);
342         if( !rep->Node.ImplPtr )
343         {
344                 Mutex_Release(&rep->lList);
345                 //errno = EIO;
346                 LEAVE('i', -1);
347                 return -1;
348         }
349         tIPCPipe_Packet *pkt = rep->OutHead;
350         if(pkt)
351                 rep->OutHead = pkt->Next;
352         if(!rep->OutHead)
353                 rep->OutTail = NULL;
354         VFS_MarkAvaliable(Node, !!rep->OutHead);
355         VFS_MarkFull(&rep->Node, 0);    //      Just read a packet, remote shouldn't be full
356         Mutex_Release(&rep->lList);
357
358         // Return
359         size_t ret = 0;
360         if(pkt)
361         {
362                 ret = MIN(pkt->Length, Length);
363                 memcpy(Dest, pkt->Data, ret);
364                 free(pkt);
365         }
366         else
367         {
368                 Log_Warning("IPCPipe", "No packet ready but select returned");
369         }
370         
371         LEAVE('i', ret);
372         return ret;
373 }
374
375 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
376 {
377         tIPCPipe_Endpoint       *lep, *rep;
378         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
379
380         ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); 
381         
382         // Ensure the server hasn't been closed
383         if( !channel || channel->Server == NULL ) {
384                 LEAVE('i', -1);
385                 return -1;
386         }
387
388         if( Length > channel->Server->MaxBlockSize ) {
389                 LEAVE('i', 0);
390                 return 0;
391         }
392         
393         // Wait for a packet to be ready
394         tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
395         int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
396         ASSERTC(rv, >=, 0);
397         if( !rv ) {
398                 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
399                 LEAVE('i', -1);
400                 return -1;
401         }
402         if( (rv & VFS_SELECT_ERROR) ||  channel->Server == NULL ) {
403                 //errno = EIO;
404                 LEAVE('i', -1);
405                 return -1;
406         }
407
408         // Create packet structure
409         tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
410         pkt->Next = NULL;
411         pkt->Offset = 0;
412         pkt->Length = Length;
413         pkt->Data = pkt + 1;
414         memcpy(pkt->Data, Src, Length);
415
416         // Append to list
417         Mutex_Acquire(&lep->lList);
418         if( !Node->ImplPtr )
419         {
420                 // Client was closed by another thread
421                 free(pkt);
422                 Mutex_Release(&lep->lList);
423                 LEAVE('i', -1);
424                 return -1;
425         }
426         if(lep->OutTail)
427                 lep->OutTail->Next = pkt;
428         else
429                 lep->OutHead = pkt;
430         lep->OutTail = pkt;
431         
432         lep->ByteCount += Length;
433         if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
434                 VFS_MarkFull(Node, 1);
435         }
436         
437         Mutex_Release(&lep->lList);
438
439         // Signal other end
440         VFS_MarkAvaliable(&rep->Node, 1);
441
442         LEAVE('i', Length);
443         return Length;
444 }
445
446 void IPCPipe_Client_Close(tVFS_Node *Node)
447 {
448         tIPCPipe_Endpoint       *lep, *rep;
449         tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
450
451         ENTER("pNode", Node);
452
453         if( !ch ) {
454                 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
455                 return ;
456         }
457         
458         // Mark client as closed
459         Node->ImplPtr = NULL;
460         // Clear packets
461         Mutex_Acquire(&lep->lList);
462         while( lep->OutHead ) {
463                 tIPCPipe_Packet *next = lep->OutHead->Next;
464                 free(lep->OutHead);
465                 lep->OutHead = next;
466         }
467         Mutex_Release(&lep->lList);
468         LOG("Packets cleared");
469         
470         // Tell remote that local has closed
471         VFS_MarkError(&rep->Node, 1);
472         // TODO: Deliver SIGPIPE or similar to all owners of remote
473         // Clean up if both sides are closed
474         if( rep->Node.ImplPtr == NULL )
475         {
476                 LOG("Remote closed, cleaning up");
477                 tIPCPipe_Server *srv = ch->Server;
478                 if( srv )
479                 {
480                         RWLock_AcquireWrite(&srv->lChannelList);
481                         if(ch->Prev)
482                                 ch->Prev->Next = ch->Next;
483                         else
484                                 srv->FirstClient = ch->Next;
485                         if(ch->Next)
486                                 ch->Next->Prev = ch->Prev;
487                         else
488                                 srv->LastClient = ch->Prev;
489                         if(srv->FirstUnseenClient == ch)
490                                 srv->FirstUnseenClient = ch->Next;
491                         RWLock_Release(&srv->lChannelList);
492                 }
493                 free(ch);
494         }
495         LEAVE('-');
496 }

UCC git Repository :: git.ucc.asn.au