f47450de1f591b8b0eda173c2728dc90cdadfe69
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
1 /*
2  * Acess2 Kernel
3  * - By John Hodge (thePowersGang);
4  * 
5  * drv/dgram_pipe.c
6  * - Connection+Datagram based local IPC
7  */
8 #include <vfs.h>
9 #include <fs_devfs.h>
10 #include <modules.h>
11 #include <rwlock.h>
12 #include <semaphore.h>
13
14 #define DEF_MAX_BLOCK_SIZE      0x1000
15 #define DEF_MAX_BYTE_LIMIT      0x8000
16
17 // === TYPES ===
18 typedef struct sIPCPipe_Packet  tIPCPipe_Packet;
19 typedef struct sIPCPipe_Endpoint        tIPCPipe_Endpoint;
20 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
21 typedef struct sIPCPipe_Server  tIPCPipe_Server;
22
23 // === STRUCTURES ===
24 struct sIPCPipe_Packet
25 {
26         tIPCPipe_Packet *Next;
27         size_t  Length;
28         size_t  Offset; //!< Offset to first unread byte (for streams)
29         void    *Data;
30 };
31 struct sIPCPipe_Endpoint
32 {
33         tMutex  lList;
34         tIPCPipe_Packet *OutHead;
35         tIPCPipe_Packet *OutTail;
36         tSemaphore      InCount;
37         tVFS_Node       Node;
38 };
39 struct sIPCPipe_Channel
40 {
41         tIPCPipe_Channel        *Next;
42         tIPCPipe_Channel        *Prev;
43         tIPCPipe_Server *Server;
44         tIPCPipe_Endpoint       ServerEP;
45         tIPCPipe_Endpoint       ClientEP;
46 };
47 struct sIPCPipe_Server
48 {
49         tIPCPipe_Server *Next;
50         tIPCPipe_Server *Prev;
51         char    *Name;
52         size_t  MaxBlockSize;   // Max size of a 'packet'
53         size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
54         size_t  CurrentByteCount;
55         tVFS_Node       ServerNode;
56         tSemaphore      ConnectionSemaphore;
57         tRWLock lChannelList;
58         tIPCPipe_Channel        *FirstClient;
59         tIPCPipe_Channel        *LastClient;
60         tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
61 };
62
63 // === PROTOTYPES ===
64  int    IPCPipe_Install(char **Arguments);
65 // - Root
66 tVFS_Node       *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67  int    IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node       *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
69 // - Server
70  int    IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node       *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
72 void    IPCPipe_Server_Close(tVFS_Node *Node);
73 // - Socket
74 tIPCPipe_Channel        *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t  IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
76 size_t  IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
77 void    IPCPipe_Client_Close(tVFS_Node *Node);
78
79 // === GLOBALS ===
80 // - Server node: Directory
81 tVFS_NodeType   gIPCPipe_ServerNodeType = {
82         .TypeName = "IPC Pipe - Server",
83         .ReadDir = IPCPipe_Server_ReadDir,
84         .FindDir = IPCPipe_Server_FindDir,
85         .Close = IPCPipe_Server_Close
86 };
87 tVFS_NodeType   gIPCPipe_ChannelNodeType = {
88         .TypeName = "IPC Pipe - Channel",
89         .Read = IPCPipe_Client_Read,
90         .Write = IPCPipe_Client_Write,
91         .Close = IPCPipe_Client_Close
92 };
93 tVFS_NodeType   gIPCPipe_RootNodeType = {
94         .TypeName = "IPC Pipe - Root",
95         .MkNod = IPCPipe_Root_MkNod,
96         .ReadDir = IPCPipe_Root_ReadDir,
97         .FindDir = IPCPipe_Root_FindDir
98 };
99 tDevFS_Driver   gIPCPipe_DevFSInfo = {
100         .Name = "ipcpipe",
101         .RootNode = {.Type=&gIPCPipe_RootNodeType}
102 };
103 // - Global list of servers
104 tRWLock glIPCPipe_ServerList;
105 tIPCPipe_Server *gpIPCPipe_FirstServer;
106 tIPCPipe_Server *gpIPCPipe_LastServer;
107 // - Module definition
108 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
109
110 // === CODE ===
111 int IPCPipe_Install(char **Arguments)
112 {
113         DevFS_AddDevice(&gIPCPipe_DevFSInfo);
114         return MODULE_ERR_OK;
115 }
116
117 // - Root -
118 /*
119  * \brief Create a new named pipe
120  * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
121  */
122 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
123 {
124         tIPCPipe_Server *srv;
125         
126         // Write Lock
127         RWLock_AcquireWrite(&glIPCPipe_ServerList);
128         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
129         {
130                 if( strcmp(srv->Name, Name) == 0 )
131                         break;
132         }
133         if( srv )
134         {
135                 RWLock_Release(&glIPCPipe_ServerList);
136                 return NULL;
137         }
138         
139
140         srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
141         srv->Name = (void*)(srv + 1);
142         strcpy(srv->Name, Name);
143         srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
144         srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
145         srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
146         srv->ServerNode.ImplPtr = srv;
147
148         // Lock already held
149         if( gpIPCPipe_LastServer )
150                 gpIPCPipe_LastServer->Next = srv;
151         else
152                 gpIPCPipe_FirstServer = srv;
153         srv->Prev = gpIPCPipe_LastServer;
154         gpIPCPipe_LastServer = srv;
155         RWLock_Release(&glIPCPipe_ServerList);
156
157         return &srv->ServerNode;
158 }
159
160 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
161 {
162         tIPCPipe_Server *srv;
163         RWLock_AcquireRead(&glIPCPipe_ServerList);
164         for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
165                 ;
166         RWLock_Release(&glIPCPipe_ServerList);
167         
168         if( srv ) {
169                 strncpy(Name, srv->Name, sizeof(Name));
170                 return 0;
171         }
172         
173         return -1;
174 }
175 /**
176  * \return New client pointer
177  */
178 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
179 {
180         tIPCPipe_Server *srv;
181         
182         // Find server
183         RWLock_AcquireRead(&glIPCPipe_ServerList);
184         for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
185         {
186                 if( strcmp(srv->Name, Name) == 0 )
187                         break;
188         }
189         RWLock_Release(&glIPCPipe_ServerList);
190         if( !srv )
191                 return NULL;
192
193         // Create new client
194         tIPCPipe_Channel *new_client;
195         
196         new_client = calloc(1, sizeof(tIPCPipe_Channel));
197         new_client->Server = srv;
198         new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
199         new_client->ClientEP.Node.ImplPtr = new_client;
200         new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
201         new_client->ServerEP.Node.ImplPtr = new_client;
202
203         // Append to server list
204         RWLock_AcquireWrite(&srv->lChannelList);
205         if(srv->LastClient)
206                 srv->LastClient->Next = new_client;
207         else
208                 srv->FirstClient = new_client;
209         srv->LastClient = new_client;
210         if(!srv->FirstUnseenClient)
211                 srv->FirstUnseenClient = new_client;
212         RWLock_Release(&srv->lChannelList);
213         
214         return &new_client->ClientEP.Node;
215 }
216
217 // --- Server ---
218 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
219 {
220         // 'next' is a valid entry, but readdir should never be called on this node
221         return -1;
222 }
223 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
224 {
225         tIPCPipe_Server *srv = Node->ImplPtr;
226         
227         if( strcmp(Name, "next") != 0 )
228                 return NULL;
229         if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
230                 return NULL;
231
232         tIPCPipe_Channel *conn;
233         RWLock_AcquireRead(&srv->lChannelList);
234         conn = srv->FirstUnseenClient;
235         if( conn )
236         {
237                 srv->FirstUnseenClient = conn->Next;
238         }
239         RWLock_Release(&srv->lChannelList);
240
241         if( !conn )
242                 return NULL;
243         
244         // Success
245         return &conn->ServerEP.Node;
246 }
247
248 void IPCPipe_Server_Close(tVFS_Node *Node)
249 {
250         tIPCPipe_Server *srv = Node->ImplPtr;
251 //      Semaphore_Cancel(&srv->ConnectionSemaphore);
252
253         // Flag server as being destroyed
254
255         // Force-close all children
256         RWLock_AcquireWrite(&srv->lChannelList);
257         for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
258         {
259                 client->Server = NULL;
260         }
261         RWLock_Release(&srv->lChannelList);
262
263         // Remove from global list
264         RWLock_AcquireWrite(&glIPCPipe_ServerList);
265         // - Forward link
266         if(srv->Prev)
267                 srv->Prev->Next = srv->Next;
268         else
269                 gpIPCPipe_FirstServer = srv->Next;
270         // - Reverse link
271         if(srv->Next)
272                 srv->Next->Prev = srv->Prev;
273         else
274                 gpIPCPipe_LastServer = srv->Prev;
275         RWLock_Release(&glIPCPipe_ServerList);
276 }
277
278 // --- Channel ---
279 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
280 {
281         tIPCPipe_Channel        *ch = Node->ImplPtr;
282         if( !ch )       return NULL;
283         *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
284         *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
285         return ch;
286 }
287 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
288 {
289         tIPCPipe_Endpoint       *lep, *rep;
290         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
291         
292         if( channel->Server == NULL )
293                 return -1;
294         
295         if( Semaphore_Wait(&lep->InCount, 1) != 1 )
296         {
297                 if( channel->Server == NULL )
298                         return -1;
299                 return 0;
300         }
301
302         Mutex_Acquire(&rep->lList);
303         tIPCPipe_Packet *pkt = rep->OutHead;
304         if(pkt)
305                 rep->OutHead = pkt->Next;
306         if(!rep->OutHead)
307                 rep->OutTail = NULL;
308         Mutex_Release(&rep->lList);
309
310         if(!pkt)
311                 return 0;
312
313         size_t ret = MIN(pkt->Length, Length);
314         memcpy(Dest, pkt->Data, ret);
315         free(pkt);
316         
317         return ret;
318 }
319
320 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
321 {
322         tIPCPipe_Endpoint       *lep, *rep;
323         tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
324         
325         // Ensure the server hasn't been closed
326         if( !channel || channel->Server == NULL )
327                 return -1;
328
329         if( Length > channel->Server->MaxBlockSize )
330                 return 0;
331
332         // Create packet structure      
333         tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
334         pkt->Next = NULL;
335         pkt->Offset = 0;
336         pkt->Length = Length;
337         pkt->Data = pkt + 1;
338         memcpy(pkt->Data, Src, Length);
339
340         // Append to list
341         Mutex_Acquire(&lep->lList);
342         if( !Node->ImplPtr )
343         {
344                 // Client was closed by another thread
345                 free(pkt);
346                 Mutex_Release(&lep->lList);
347                 return -1;
348         }
349         if(lep->OutTail)
350                 lep->OutTail->Next = pkt;
351         else
352                 lep->OutHead = pkt;
353         lep->OutTail = pkt;
354         Mutex_Release(&lep->lList);
355
356         // Signal other end
357         Semaphore_Signal(&rep->InCount, 1);
358
359         return Length;
360 }
361
362 void IPCPipe_Client_Close(tVFS_Node *Node)
363 {
364         tIPCPipe_Endpoint       *lep, *rep;
365         tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
366         
367         // Mark client as closed
368         Node->ImplPtr = NULL;
369         Mutex_Acquire(&lep->lList);
370         while( lep->OutHead ) {
371                 tIPCPipe_Packet *next = lep->OutHead->Next;
372                 free(lep->OutHead);
373                 lep->OutHead = next;
374         }
375         Mutex_Release(&lep->lList);
376         
377         // Clean up if both sides are closed
378         if( rep->Node.ImplPtr == NULL )
379         {
380                 tIPCPipe_Server *srv = ch->Server;
381                 if( srv )
382                 {
383                         RWLock_AcquireWrite(&srv->lChannelList);
384                         if(ch->Prev)
385                                 ch->Prev->Next = ch->Next;
386                         else
387                                 srv->FirstClient = ch->Next;
388                         if(ch->Next)
389                                 ch->Next->Prev = ch->Prev;
390                         else
391                                 srv->LastClient = ch->Prev;
392                         if(srv->FirstUnseenClient == ch)
393                                 srv->FirstUnseenClient = ch->Next;
394                         RWLock_Release(&srv->lChannelList);
395                 }
396                 free(ch);
397         }
398 }

UCC git Repository :: git.ucc.asn.au