Kernel - Added 'Flags' param to VFS Read/Write/FindDir
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
1 /*
2  * Acess2 Kernel
3  * - By John Hodge (thePowersGang);
4  * 
5  * drv/dgram_pipe.c
6  * - Connection+Datagram based local IPC
7  */
8 #define DEBUG   0
9 #include <vfs.h>
10 #include <fs_devfs.h>
11 #include <modules.h>
12 #include <rwlock.h>
13 #include <semaphore.h>
14
15 #define DEF_MAX_BLOCK_SIZE      0x1000
16 #define DEF_MAX_BYTE_LIMIT      0x8000
17
18 // === TYPES ===
19 typedef struct sIPCPipe_Packet  tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint        tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server  tIPCPipe_Server;
23
24 // === STRUCTURES ===
25 struct sIPCPipe_Packet
26 {
27         tIPCPipe_Packet *Next;
28         size_t  Length;
29         size_t  Offset; //!< Offset to first unread byte (for streams)
30         void    *Data;
31 };
32 struct sIPCPipe_Endpoint
33 {
34         tMutex  lList;
35         tIPCPipe_Packet *OutHead;
36         tIPCPipe_Packet *OutTail;
37         tVFS_Node       Node;
38 };
39 struct sIPCPipe_Channel
40 {
41         tIPCPipe_Channel        *Next;
42         tIPCPipe_Channel        *Prev;
43         tIPCPipe_Server *Server;
44         tIPCPipe_Endpoint       ServerEP;
45         tIPCPipe_Endpoint       ClientEP;
46 };
47 struct sIPCPipe_Server
48 {
49         tIPCPipe_Server *Next;
50         tIPCPipe_Server *Prev;
51         char    *Name;
52         size_t  MaxBlockSize;   // Max size of a 'packet'
53         size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
54         size_t  CurrentByteCount;
55         tVFS_Node       ServerNode;
56         tRWLock lChannelList;
57         tIPCPipe_Channel        *FirstClient;
58         tIPCPipe_Channel        *LastClient;
59         tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
60 };
61
62 // === PROTOTYPES ===
63  int    IPCPipe_Install(char **Arguments);
64 // - Root
65 tVFS_Node       *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
66  int    IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
67 tVFS_Node       *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
68 // - Server
69  int    IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
70 tVFS_Node       *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
71 void    IPCPipe_Server_Close(tVFS_Node *Node);
72 // - Socket
73 tIPCPipe_Channel        *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
74 size_t  IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
75 size_t  IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
76 void    IPCPipe_Client_Close(tVFS_Node *Node);
77
78 // === GLOBALS ===
79 // - Server node: Directory
80 tVFS_NodeType   gIPCPipe_ServerNodeType = {
81         .TypeName = "IPC Pipe - Server",
82         .ReadDir = IPCPipe_Server_ReadDir,
83         .FindDir = IPCPipe_Server_FindDir,
84         .Close = IPCPipe_Server_Close
85 };
86 tVFS_NodeType   gIPCPipe_ChannelNodeType = {
87         .TypeName = "IPC Pipe - Channel",
88         .Read = IPCPipe_Client_Read,
89         .Write = IPCPipe_Client_Write,
90         .Close = IPCPipe_Client_Close
91 };
92 tVFS_NodeType   gIPCPipe_RootNodeType = {
93         .TypeName = "IPC Pipe - Root",
94         .MkNod = IPCPipe_Root_MkNod,
95         .ReadDir = IPCPipe_Root_ReadDir,
96         .FindDir = IPCPipe_Root_FindDir
97 };
98 tDevFS_Driver   gIPCPipe_DevFSInfo = {
99         .Name = "ipcpipe",
100         .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
101 };
102 // - Global list of servers
103 tRWLock glIPCPipe_ServerList;
104 tIPCPipe_Server *gpIPCPipe_FirstServer;
105 tIPCPipe_Server *gpIPCPipe_LastServer;
106 // - Module definition
107 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
108
109 // === CODE ===
110 int IPCPipe_Install(char **Arguments)
111 {
112         DevFS_AddDevice(&gIPCPipe_DevFSInfo);
113         return MODULE_ERR_OK;
114 }
115
116 // - Root -
117 /*
118  * \brief Create a new named pipe
119  * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
120  */
121 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
122 {
123         tIPCPipe_Server *srv;
124         
125         ENTER("pNode sName xFlags", Node, Name, Flags);
126         
127         // Write Lock
128         RWLock_AcquireWrite(&glIPCPipe_ServerList);
129         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
130         {
131                 if( strcmp(srv->Name, Name) == 0 )
132                         break;
133         }
134         if( srv )
135         {
136                 RWLock_Release(&glIPCPipe_ServerList);
137                 LEAVE('n');
138                 return NULL;
139         }
140         
141
142         srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
143         srv->Name = (void*)(srv + 1);
144         strcpy(srv->Name, Name);
145         srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
146         srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
147         srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
148         srv->ServerNode.ImplPtr = srv;
149         srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
150
151         // Lock already held
152         if( gpIPCPipe_LastServer )
153                 gpIPCPipe_LastServer->Next = srv;
154         else
155                 gpIPCPipe_FirstServer = srv;
156         srv->Prev = gpIPCPipe_LastServer;
157         gpIPCPipe_LastServer = srv;
158         RWLock_Release(&glIPCPipe_ServerList);
159
160         LEAVE('p', &srv->ServerNode);
161         return &srv->ServerNode;
162 }
163
164 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
165 {
166         tIPCPipe_Server *srv;
167         RWLock_AcquireRead(&glIPCPipe_ServerList);
168         for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
169                 ;
170         RWLock_Release(&glIPCPipe_ServerList);
171         
172         if( srv ) {
173                 strncpy(Name, srv->Name, sizeof(Name));
174                 return 0;
175         }
176         
177         return -1;
178 }
179 /**
180  * \return New client pointer
181  */
182 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
183 {
184         tIPCPipe_Server *srv;
185         ENTER("pNode sName", Node, Name);
186         
187         // Find server
188         RWLock_AcquireRead(&glIPCPipe_ServerList);
189         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
190         {
191                 LOG("== %s", srv->Name);
192                 if( strcmp(srv->Name, Name) == 0 )
193                         break;
194         }
195         RWLock_Release(&glIPCPipe_ServerList);
196         if( !srv ) {
197                 LEAVE('n');
198                 return NULL;
199         }
200
201         if( Flags & VFS_FDIRFLAG_STAT ) {
202                 // LEAVE('p', srv->TplClientNode);
203                 // return &srv->TplClientNode;
204         }
205
206         // Create new client
207         tIPCPipe_Channel *new_client;   
208
209         new_client = calloc(1, sizeof(tIPCPipe_Channel));
210         new_client->Server = srv;
211         new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
212         new_client->ClientEP.Node.ImplPtr = new_client;
213         new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
214         new_client->ServerEP.Node.ImplPtr = new_client;
215
216         // Append to server list
217         RWLock_AcquireWrite(&srv->lChannelList);
218         if(srv->LastClient)
219                 srv->LastClient->Next = new_client;
220         else
221                 srv->FirstClient = new_client;
222         srv->LastClient = new_client;
223         if(!srv->FirstUnseenClient)
224                 srv->FirstUnseenClient = new_client;
225         VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
226         RWLock_Release(&srv->lChannelList);
227         
228         LEAVE('p', &new_client->ClientEP.Node);
229         return &new_client->ClientEP.Node;
230 }
231
232 // --- Server ---
233 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
234 {
235         // 'next' is a valid entry, but readdir should never be called on this node
236         return -1;
237 }
238 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
239 {
240         tIPCPipe_Server *srv = Node->ImplPtr;
241
242         ENTER("pNode sName", Node, Name);       
243
244         if( strcmp(Name, "newclient") != 0 ) {
245                 LEAVE('n');
246                 return NULL;
247         }
248         
249         // TODO: Need VFS_FDIRFLAG_NOBLOCK?
250         VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
251
252         tIPCPipe_Channel *conn;
253         RWLock_AcquireRead(&srv->lChannelList);
254         conn = srv->FirstUnseenClient;
255         if( conn )
256         {
257                 srv->FirstUnseenClient = conn->Next;
258         }
259         VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
260         RWLock_Release(&srv->lChannelList);
261
262         if( !conn ) {
263                 LEAVE('n');
264                 return NULL;
265         }
266         
267         // Success
268         LEAVE('p', &conn->ServerEP.Node);
269         return &conn->ServerEP.Node;
270 }
271
272 void IPCPipe_Server_Close(tVFS_Node *Node)
273 {
274         tIPCPipe_Server *srv = Node->ImplPtr;
275
276         // Flag server as being destroyed
277
278         // Force-close all children
279         RWLock_AcquireWrite(&srv->lChannelList);
280         for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
281         {
282                 client->Server = NULL;
283         }
284         RWLock_Release(&srv->lChannelList);
285
286         // Remove from global list
287         RWLock_AcquireWrite(&glIPCPipe_ServerList);
288         // - Forward link
289         if(srv->Prev)
290                 srv->Prev->Next = srv->Next;
291         else
292                 gpIPCPipe_FirstServer = srv->Next;
293         // - Reverse link
294         if(srv->Next)
295                 srv->Next->Prev = srv->Prev;
296         else
297                 gpIPCPipe_LastServer = srv->Prev;
298         RWLock_Release(&glIPCPipe_ServerList);
299 }
300
301 // --- Channel ---
302 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
303 {
304         tIPCPipe_Channel        *ch = Node->ImplPtr;
305         if( ch )
306         {
307                 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
308                 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
309         }
310         return ch;
311 }
312 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
313 {
314         tIPCPipe_Endpoint       *lep, *rep;
315         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
316
317         ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);       
318
319         // Closed endpoint / channel
320         if( !channel || channel->Server == NULL ) {
321                 LEAVE('i', -1);
322                 return -1;
323         }
324         
325         // Wait for a packet to be ready
326         tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
327         int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint");
328         if( !rv ) {
329                 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
330                 LEAVE('i', -1);
331                 return -1;
332         }
333         if( channel->Server == NULL ) {
334                 //errno = EIO;
335                 LEAVE('i', -1);
336                 return -1;
337         }
338
339         // Pop a packet from the list
340         Mutex_Acquire(&rep->lList);
341         if( !rep->Node.ImplPtr )
342         {
343                 Mutex_Release(&rep->lList);
344                 //errno = EIO;
345                 LEAVE('i', -1);
346                 return -1;
347         }
348         tIPCPipe_Packet *pkt = rep->OutHead;
349         if(pkt)
350                 rep->OutHead = pkt->Next;
351         if(!rep->OutHead)
352                 rep->OutTail = NULL;
353         VFS_MarkAvaliable(Node, !!rep->OutHead);
354         Mutex_Release(&rep->lList);
355
356         // Return
357         size_t ret = 0;
358         if(pkt)
359         {
360                 ret = MIN(pkt->Length, Length);
361                 memcpy(Dest, pkt->Data, ret);
362                 free(pkt);
363         }
364         else
365         {
366                 Log_Warning("IPCPipe", "No packet ready but semaphore returned");
367         }
368         
369         LEAVE('i', ret);
370         return ret;
371 }
372
373 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
374 {
375         tIPCPipe_Endpoint       *lep, *rep;
376         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
377
378         ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); 
379         
380         // Ensure the server hasn't been closed
381         if( !channel || channel->Server == NULL ) {
382                 LEAVE('i', -1);
383                 return -1;
384         }
385
386         if( Length > channel->Server->MaxBlockSize ) {
387                 LEAVE('i', 0);
388                 return 0;
389         }
390
391         // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time
392
393         // Create packet structure
394         tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
395         pkt->Next = NULL;
396         pkt->Offset = 0;
397         pkt->Length = Length;
398         pkt->Data = pkt + 1;
399         memcpy(pkt->Data, Src, Length);
400
401         // Append to list
402         Mutex_Acquire(&lep->lList);
403         if( !Node->ImplPtr )
404         {
405                 // Client was closed by another thread
406                 free(pkt);
407                 Mutex_Release(&lep->lList);
408                 LEAVE('i', -1);
409                 return -1;
410         }
411         if(lep->OutTail)
412                 lep->OutTail->Next = pkt;
413         else
414                 lep->OutHead = pkt;
415         lep->OutTail = pkt;
416         Mutex_Release(&lep->lList);
417
418         // Signal other end
419         VFS_MarkAvaliable(&rep->Node, 1);
420
421         LEAVE('i', Length);
422         return Length;
423 }
424
425 void IPCPipe_Client_Close(tVFS_Node *Node)
426 {
427         tIPCPipe_Endpoint       *lep, *rep;
428         tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
429
430         ENTER("pNode", Node);
431
432         if( !ch ) {
433                 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
434                 return ;
435         }
436         
437         // Mark client as closed
438         Node->ImplPtr = NULL;
439         // Clear packets
440         Mutex_Acquire(&lep->lList);
441         while( lep->OutHead ) {
442                 tIPCPipe_Packet *next = lep->OutHead->Next;
443                 free(lep->OutHead);
444                 lep->OutHead = next;
445         }
446         Mutex_Release(&lep->lList);
447         LOG("Packets cleared");
448         
449         // Tell remote that local has closed
450         VFS_MarkError(&rep->Node, 1);
451         // TODO: Deliver SIGPIPE or similar to all owners of remote
452         // Clean up if both sides are closed
453         if( rep->Node.ImplPtr == NULL )
454         {
455                 LOG("Remote closed, cleaning up");
456                 tIPCPipe_Server *srv = ch->Server;
457                 if( srv )
458                 {
459                         RWLock_AcquireWrite(&srv->lChannelList);
460                         if(ch->Prev)
461                                 ch->Prev->Next = ch->Next;
462                         else
463                                 srv->FirstClient = ch->Next;
464                         if(ch->Next)
465                                 ch->Next->Prev = ch->Prev;
466                         else
467                                 srv->LastClient = ch->Prev;
468                         if(srv->FirstUnseenClient == ch)
469                                 srv->FirstUnseenClient = ch->Next;
470                         RWLock_Release(&srv->lChannelList);
471                 }
472                 free(ch);
473         }
474         LEAVE('-');
475 }

UCC git Repository :: git.ucc.asn.au