AcessNative - Better error reporting in NativeFS
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
1 /*
2  * Acess2 Kernel
3  * - By John Hodge (thePowersGang);
4  * 
5  * drv/dgram_pipe.c
6  * - Connection+Datagram based local IPC
7  */
8 #define DEBUG   1
9 #include <vfs.h>
10 #include <fs_devfs.h>
11 #include <modules.h>
12 #include <rwlock.h>
13 #include <semaphore.h>
14
15 #define DEF_MAX_BLOCK_SIZE      0x1000
16 #define DEF_MAX_BYTE_LIMIT      0x8000
17
18 // === TYPES ===
19 typedef struct sIPCPipe_Packet  tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint        tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server  tIPCPipe_Server;
23
24 // === STRUCTURES ===
25 struct sIPCPipe_Packet
26 {
27         tIPCPipe_Packet *Next;
28         size_t  Length;
29         size_t  Offset; //!< Offset to first unread byte (for streams)
30         void    *Data;
31 };
32 struct sIPCPipe_Endpoint
33 {
34         tMutex  lList;
35         tIPCPipe_Packet *OutHead;
36         tIPCPipe_Packet *OutTail;
37         tVFS_Node       Node;
38 };
39 struct sIPCPipe_Channel
40 {
41         tIPCPipe_Channel        *Next;
42         tIPCPipe_Channel        *Prev;
43         tIPCPipe_Server *Server;
44         tIPCPipe_Endpoint       ServerEP;
45         tIPCPipe_Endpoint       ClientEP;
46 };
47 struct sIPCPipe_Server
48 {
49         tIPCPipe_Server *Next;
50         tIPCPipe_Server *Prev;
51         char    *Name;
52         size_t  MaxBlockSize;   // Max size of a 'packet'
53         size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
54         size_t  CurrentByteCount;
55         tVFS_Node       ServerNode;
56         tRWLock lChannelList;
57         tIPCPipe_Channel        *FirstClient;
58         tIPCPipe_Channel        *LastClient;
59         tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
60 };
61
62 // === PROTOTYPES ===
63  int    IPCPipe_Install(char **Arguments);
64 // - Root
65 tVFS_Node       *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
66  int    IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
67 tVFS_Node       *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
68 // - Server
69  int    IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
70 tVFS_Node       *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
71 void    IPCPipe_Server_Close(tVFS_Node *Node);
72 // - Socket
73 tIPCPipe_Channel        *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
74 size_t  IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
75 size_t  IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
76 void    IPCPipe_Client_Close(tVFS_Node *Node);
77
78 // === GLOBALS ===
79 // - Server node: Directory
80 tVFS_NodeType   gIPCPipe_ServerNodeType = {
81         .TypeName = "IPC Pipe - Server",
82         .ReadDir = IPCPipe_Server_ReadDir,
83         .FindDir = IPCPipe_Server_FindDir,
84         .Close = IPCPipe_Server_Close
85 };
86 tVFS_NodeType   gIPCPipe_ChannelNodeType = {
87         .TypeName = "IPC Pipe - Channel",
88         .Read = IPCPipe_Client_Read,
89         .Write = IPCPipe_Client_Write,
90         .Close = IPCPipe_Client_Close
91 };
92 tVFS_NodeType   gIPCPipe_RootNodeType = {
93         .TypeName = "IPC Pipe - Root",
94         .MkNod = IPCPipe_Root_MkNod,
95         .ReadDir = IPCPipe_Root_ReadDir,
96         .FindDir = IPCPipe_Root_FindDir
97 };
98 tDevFS_Driver   gIPCPipe_DevFSInfo = {
99         .Name = "ipcpipe",
100         .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
101 };
102 // - Global list of servers
103 tRWLock glIPCPipe_ServerList;
104 tIPCPipe_Server *gpIPCPipe_FirstServer;
105 tIPCPipe_Server *gpIPCPipe_LastServer;
106 // - Module definition
107 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
108
109 // === CODE ===
110 int IPCPipe_Install(char **Arguments)
111 {
112         DevFS_AddDevice(&gIPCPipe_DevFSInfo);
113         return MODULE_ERR_OK;
114 }
115
116 // - Root -
117 /*
118  * \brief Create a new named pipe
119  * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
120  */
121 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
122 {
123         tIPCPipe_Server *srv;
124         
125         ENTER("pNode sName xFlags", Node, Name, Flags);
126         
127         // Write Lock
128         RWLock_AcquireWrite(&glIPCPipe_ServerList);
129         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
130         {
131                 if( strcmp(srv->Name, Name) == 0 )
132                         break;
133         }
134         if( srv )
135         {
136                 RWLock_Release(&glIPCPipe_ServerList);
137                 LEAVE('n');
138                 return NULL;
139         }
140         
141
142         srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
143         srv->Name = (void*)(srv + 1);
144         strcpy(srv->Name, Name);
145         srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
146         srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
147         srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
148         srv->ServerNode.ImplPtr = srv;
149         srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
150
151         // Lock already held
152         if( gpIPCPipe_LastServer )
153                 gpIPCPipe_LastServer->Next = srv;
154         else
155                 gpIPCPipe_FirstServer = srv;
156         srv->Prev = gpIPCPipe_LastServer;
157         gpIPCPipe_LastServer = srv;
158         RWLock_Release(&glIPCPipe_ServerList);
159
160         LEAVE('p', &srv->ServerNode);
161         return &srv->ServerNode;
162 }
163
164 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
165 {
166         tIPCPipe_Server *srv;
167         RWLock_AcquireRead(&glIPCPipe_ServerList);
168         for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
169                 ;
170         RWLock_Release(&glIPCPipe_ServerList);
171         
172         if( srv ) {
173                 strncpy(Name, srv->Name, sizeof(Name));
174                 return 0;
175         }
176         
177         return -1;
178 }
179 /**
180  * \return New client pointer
181  */
182 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
183 {
184         tIPCPipe_Server *srv;
185         ENTER("pNode sName", Node, Name);
186         
187         // Find server
188         RWLock_AcquireRead(&glIPCPipe_ServerList);
189         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
190         {
191                 LOG("== %s", srv->Name);
192                 if( strcmp(srv->Name, Name) == 0 )
193                         break;
194         }
195         RWLock_Release(&glIPCPipe_ServerList);
196         if( !srv ) {
197                 LEAVE('n');
198                 return NULL;
199         }
200
201         // Create new client
202         tIPCPipe_Channel *new_client;
203         
204         new_client = calloc(1, sizeof(tIPCPipe_Channel));
205         new_client->Server = srv;
206         new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
207         new_client->ClientEP.Node.ImplPtr = new_client;
208         new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
209         new_client->ServerEP.Node.ImplPtr = new_client;
210
211         // Append to server list
212         RWLock_AcquireWrite(&srv->lChannelList);
213         if(srv->LastClient)
214                 srv->LastClient->Next = new_client;
215         else
216                 srv->FirstClient = new_client;
217         srv->LastClient = new_client;
218         if(!srv->FirstUnseenClient)
219                 srv->FirstUnseenClient = new_client;
220         VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
221         RWLock_Release(&srv->lChannelList);
222         
223         LEAVE('p', &new_client->ClientEP.Node);
224         return &new_client->ClientEP.Node;
225 }
226
227 // --- Server ---
228 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
229 {
230         // 'next' is a valid entry, but readdir should never be called on this node
231         return -1;
232 }
233 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
234 {
235         tIPCPipe_Server *srv = Node->ImplPtr;
236
237         ENTER("pNode sName", Node, Name);       
238
239         if( strcmp(Name, "newclient") != 0 ) {
240                 LEAVE('n');
241                 return NULL;
242         }
243         
244         VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
245
246         tIPCPipe_Channel *conn;
247         RWLock_AcquireRead(&srv->lChannelList);
248         conn = srv->FirstUnseenClient;
249         if( conn )
250         {
251                 srv->FirstUnseenClient = conn->Next;
252         }
253         VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
254         RWLock_Release(&srv->lChannelList);
255
256         if( !conn ) {
257                 LEAVE('n');
258                 return NULL;
259         }
260         
261         // Success
262         LEAVE('p', &conn->ServerEP.Node);
263         return &conn->ServerEP.Node;
264 }
265
266 void IPCPipe_Server_Close(tVFS_Node *Node)
267 {
268         tIPCPipe_Server *srv = Node->ImplPtr;
269
270         // Flag server as being destroyed
271
272         // Force-close all children
273         RWLock_AcquireWrite(&srv->lChannelList);
274         for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
275         {
276                 client->Server = NULL;
277         }
278         RWLock_Release(&srv->lChannelList);
279
280         // Remove from global list
281         RWLock_AcquireWrite(&glIPCPipe_ServerList);
282         // - Forward link
283         if(srv->Prev)
284                 srv->Prev->Next = srv->Next;
285         else
286                 gpIPCPipe_FirstServer = srv->Next;
287         // - Reverse link
288         if(srv->Next)
289                 srv->Next->Prev = srv->Prev;
290         else
291                 gpIPCPipe_LastServer = srv->Prev;
292         RWLock_Release(&glIPCPipe_ServerList);
293 }
294
295 // --- Channel ---
296 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
297 {
298         tIPCPipe_Channel        *ch = Node->ImplPtr;
299         if( ch )
300         {
301                 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
302                 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
303         }
304         return ch;
305 }
306 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
307 {
308         tIPCPipe_Endpoint       *lep, *rep;
309         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
310
311         ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);       
312
313         // Closed endpoint / channel
314         if( !channel || channel->Server == NULL ) {
315                 LEAVE('i', -1);
316                 return -1;
317         }
318         
319         // Wait for a packet to be ready
320         VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint");
321         if( channel->Server == NULL ) {
322                 LEAVE('i', -1);
323                 return -1;
324         }
325
326         // Pop a packet from the list
327         Mutex_Acquire(&rep->lList);
328         if( !rep->Node.ImplPtr )
329         {
330                 Mutex_Release(&rep->lList);
331                 LEAVE('i', -1);
332                 return -1;
333         }
334         tIPCPipe_Packet *pkt = rep->OutHead;
335         if(pkt)
336                 rep->OutHead = pkt->Next;
337         if(!rep->OutHead)
338                 rep->OutTail = NULL;
339         VFS_MarkAvaliable(Node, !!rep->OutHead);
340         Mutex_Release(&rep->lList);
341
342         // Return
343         size_t ret = 0;
344         if(pkt)
345         {
346                 ret = MIN(pkt->Length, Length);
347                 memcpy(Dest, pkt->Data, ret);
348                 free(pkt);
349         }
350         else
351         {
352                 Log_Warning("IPCPipe", "No packet ready but semaphore returned");
353         }
354         
355         LEAVE('i', ret);
356         return ret;
357 }
358
359 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
360 {
361         tIPCPipe_Endpoint       *lep, *rep;
362         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
363
364         ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); 
365         
366         // Ensure the server hasn't been closed
367         if( !channel || channel->Server == NULL ) {
368                 LEAVE('i', -1);
369                 return -1;
370         }
371
372         if( Length > channel->Server->MaxBlockSize ) {
373                 LEAVE('i', 0);
374                 return 0;
375         }
376
377         // Create packet structure      
378         tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
379         pkt->Next = NULL;
380         pkt->Offset = 0;
381         pkt->Length = Length;
382         pkt->Data = pkt + 1;
383         memcpy(pkt->Data, Src, Length);
384
385         // Append to list
386         Mutex_Acquire(&lep->lList);
387         if( !Node->ImplPtr )
388         {
389                 // Client was closed by another thread
390                 free(pkt);
391                 Mutex_Release(&lep->lList);
392                 LEAVE('i', -1);
393                 return -1;
394         }
395         if(lep->OutTail)
396                 lep->OutTail->Next = pkt;
397         else
398                 lep->OutHead = pkt;
399         lep->OutTail = pkt;
400         Mutex_Release(&lep->lList);
401
402         // Signal other end
403         VFS_MarkAvaliable(&rep->Node, 1);
404
405         LEAVE('i', Length);
406         return Length;
407 }
408
409 void IPCPipe_Client_Close(tVFS_Node *Node)
410 {
411         tIPCPipe_Endpoint       *lep, *rep;
412         tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
413
414         ENTER("pNode", Node);
415
416         if( !ch ) {
417                 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
418                 return ;
419         }
420         
421         // Mark client as closed
422         Node->ImplPtr = NULL;
423         // Clear packets
424         Mutex_Acquire(&lep->lList);
425         while( lep->OutHead ) {
426                 tIPCPipe_Packet *next = lep->OutHead->Next;
427                 free(lep->OutHead);
428                 lep->OutHead = next;
429         }
430         Mutex_Release(&lep->lList);
431         LOG("Packets cleared");
432         
433         // Tell remote that local has closed
434         VFS_MarkError(&rep->Node, 1);
435         // TODO: Deliver SIGPIPE or similar to all owners of remote
436         // Clean up if both sides are closed
437         if( rep->Node.ImplPtr == NULL )
438         {
439                 LOG("Remote closed, cleaning up");
440                 tIPCPipe_Server *srv = ch->Server;
441                 if( srv )
442                 {
443                         RWLock_AcquireWrite(&srv->lChannelList);
444                         if(ch->Prev)
445                                 ch->Prev->Next = ch->Next;
446                         else
447                                 srv->FirstClient = ch->Next;
448                         if(ch->Next)
449                                 ch->Next->Prev = ch->Prev;
450                         else
451                                 srv->LastClient = ch->Prev;
452                         if(srv->FirstUnseenClient == ch)
453                                 srv->FirstUnseenClient = ch->Next;
454                         RWLock_Release(&srv->lChannelList);
455                 }
456                 free(ch);
457         }
458         LEAVE('-');
459 }

UCC git Repository :: git.ucc.asn.au