3 * - By John Hodge (thePowersGang);
6 * - Connection+Datagram based local IPC
13 #include <semaphore.h>
15 #define DEF_MAX_BLOCK_SIZE 0x1000
16 #define DEF_MAX_BYTE_LIMIT 0x8000
19 typedef struct sIPCPipe_Packet tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server tIPCPipe_Server;
25 struct sIPCPipe_Packet
27 tIPCPipe_Packet *Next;
29 size_t Offset; //!< Offset to first unread byte (for streams)
32 struct sIPCPipe_Endpoint
35 tIPCPipe_Packet *OutHead;
36 tIPCPipe_Packet *OutTail;
39 struct sIPCPipe_Channel
41 tIPCPipe_Channel *Next;
42 tIPCPipe_Channel *Prev;
43 tIPCPipe_Server *Server;
44 tIPCPipe_Endpoint ServerEP;
45 tIPCPipe_Endpoint ClientEP;
47 struct sIPCPipe_Server
49 tIPCPipe_Server *Next;
50 tIPCPipe_Server *Prev;
52 size_t MaxBlockSize; // Max size of a 'packet'
53 size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
54 size_t CurrentByteCount;
57 tIPCPipe_Channel *FirstClient;
58 tIPCPipe_Channel *LastClient;
59 tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened
63 int IPCPipe_Install(char **Arguments);
65 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
66 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
67 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
69 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
70 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
71 void IPCPipe_Server_Close(tVFS_Node *Node);
73 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
74 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
75 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
76 void IPCPipe_Client_Close(tVFS_Node *Node);
79 // - Server node: Directory
80 tVFS_NodeType gIPCPipe_ServerNodeType = {
81 .TypeName = "IPC Pipe - Server",
82 .ReadDir = IPCPipe_Server_ReadDir,
83 .FindDir = IPCPipe_Server_FindDir,
84 .Close = IPCPipe_Server_Close
86 tVFS_NodeType gIPCPipe_ChannelNodeType = {
87 .TypeName = "IPC Pipe - Channel",
88 .Read = IPCPipe_Client_Read,
89 .Write = IPCPipe_Client_Write,
90 .Close = IPCPipe_Client_Close
92 tVFS_NodeType gIPCPipe_RootNodeType = {
93 .TypeName = "IPC Pipe - Root",
94 .MkNod = IPCPipe_Root_MkNod,
95 .ReadDir = IPCPipe_Root_ReadDir,
96 .FindDir = IPCPipe_Root_FindDir
98 tDevFS_Driver gIPCPipe_DevFSInfo = {
100 .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
102 // - Global list of servers
103 tRWLock glIPCPipe_ServerList;
104 tIPCPipe_Server *gpIPCPipe_FirstServer;
105 tIPCPipe_Server *gpIPCPipe_LastServer;
106 // - Module definition
107 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
110 int IPCPipe_Install(char **Arguments)
112 DevFS_AddDevice(&gIPCPipe_DevFSInfo);
113 return MODULE_ERR_OK;
118 * \brief Create a new named pipe
119 * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
121 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
123 tIPCPipe_Server *srv;
125 ENTER("pNode sName xFlags", Node, Name, Flags);
128 RWLock_AcquireWrite(&glIPCPipe_ServerList);
129 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
131 if( strcmp(srv->Name, Name) == 0 )
136 RWLock_Release(&glIPCPipe_ServerList);
142 srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
143 srv->Name = (void*)(srv + 1);
144 strcpy(srv->Name, Name);
145 srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
146 srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
147 srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
148 srv->ServerNode.ImplPtr = srv;
149 srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
152 if( gpIPCPipe_LastServer )
153 gpIPCPipe_LastServer->Next = srv;
155 gpIPCPipe_FirstServer = srv;
156 srv->Prev = gpIPCPipe_LastServer;
157 gpIPCPipe_LastServer = srv;
158 RWLock_Release(&glIPCPipe_ServerList);
160 LEAVE('p', &srv->ServerNode);
161 return &srv->ServerNode;
164 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
166 tIPCPipe_Server *srv;
167 RWLock_AcquireRead(&glIPCPipe_ServerList);
168 for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
170 RWLock_Release(&glIPCPipe_ServerList);
173 strncpy(Name, srv->Name, sizeof(Name));
180 * \return New client pointer
182 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
184 tIPCPipe_Server *srv;
185 ENTER("pNode sName", Node, Name);
188 RWLock_AcquireRead(&glIPCPipe_ServerList);
189 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
191 LOG("== %s", srv->Name);
192 if( strcmp(srv->Name, Name) == 0 )
195 RWLock_Release(&glIPCPipe_ServerList);
202 tIPCPipe_Channel *new_client;
204 new_client = calloc(1, sizeof(tIPCPipe_Channel));
205 new_client->Server = srv;
206 new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
207 new_client->ClientEP.Node.ImplPtr = new_client;
208 new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
209 new_client->ServerEP.Node.ImplPtr = new_client;
211 // Append to server list
212 RWLock_AcquireWrite(&srv->lChannelList);
214 srv->LastClient->Next = new_client;
216 srv->FirstClient = new_client;
217 srv->LastClient = new_client;
218 if(!srv->FirstUnseenClient)
219 srv->FirstUnseenClient = new_client;
220 VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
221 RWLock_Release(&srv->lChannelList);
223 LEAVE('p', &new_client->ClientEP.Node);
224 return &new_client->ClientEP.Node;
228 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
230 // 'next' is a valid entry, but readdir should never be called on this node
233 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
235 tIPCPipe_Server *srv = Node->ImplPtr;
237 ENTER("pNode sName", Node, Name);
239 if( strcmp(Name, "newclient") != 0 ) {
244 VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
246 tIPCPipe_Channel *conn;
247 RWLock_AcquireRead(&srv->lChannelList);
248 conn = srv->FirstUnseenClient;
251 srv->FirstUnseenClient = conn->Next;
253 VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
254 RWLock_Release(&srv->lChannelList);
262 LEAVE('p', &conn->ServerEP.Node);
263 return &conn->ServerEP.Node;
266 void IPCPipe_Server_Close(tVFS_Node *Node)
268 tIPCPipe_Server *srv = Node->ImplPtr;
270 // Flag server as being destroyed
272 // Force-close all children
273 RWLock_AcquireWrite(&srv->lChannelList);
274 for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
276 client->Server = NULL;
278 RWLock_Release(&srv->lChannelList);
280 // Remove from global list
281 RWLock_AcquireWrite(&glIPCPipe_ServerList);
284 srv->Prev->Next = srv->Next;
286 gpIPCPipe_FirstServer = srv->Next;
289 srv->Next->Prev = srv->Prev;
291 gpIPCPipe_LastServer = srv->Prev;
292 RWLock_Release(&glIPCPipe_ServerList);
296 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
298 tIPCPipe_Channel *ch = Node->ImplPtr;
301 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
302 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
306 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
308 tIPCPipe_Endpoint *lep, *rep;
309 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
311 ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
313 // Closed endpoint / channel
314 if( !channel || channel->Server == NULL ) {
319 // Wait for a packet to be ready
320 VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint");
321 if( channel->Server == NULL ) {
326 // Pop a packet from the list
327 Mutex_Acquire(&rep->lList);
328 if( !rep->Node.ImplPtr )
330 Mutex_Release(&rep->lList);
334 tIPCPipe_Packet *pkt = rep->OutHead;
336 rep->OutHead = pkt->Next;
339 VFS_MarkAvaliable(Node, !!rep->OutHead);
340 Mutex_Release(&rep->lList);
346 ret = MIN(pkt->Length, Length);
347 memcpy(Dest, pkt->Data, ret);
352 Log_Warning("IPCPipe", "No packet ready but semaphore returned");
359 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
361 tIPCPipe_Endpoint *lep, *rep;
362 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
364 ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
366 // Ensure the server hasn't been closed
367 if( !channel || channel->Server == NULL ) {
372 if( Length > channel->Server->MaxBlockSize ) {
377 // Create packet structure
378 tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
381 pkt->Length = Length;
383 memcpy(pkt->Data, Src, Length);
386 Mutex_Acquire(&lep->lList);
389 // Client was closed by another thread
391 Mutex_Release(&lep->lList);
396 lep->OutTail->Next = pkt;
400 Mutex_Release(&lep->lList);
403 VFS_MarkAvaliable(&rep->Node, 1);
409 void IPCPipe_Client_Close(tVFS_Node *Node)
411 tIPCPipe_Endpoint *lep, *rep;
412 tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
414 ENTER("pNode", Node);
417 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
421 // Mark client as closed
422 Node->ImplPtr = NULL;
424 Mutex_Acquire(&lep->lList);
425 while( lep->OutHead ) {
426 tIPCPipe_Packet *next = lep->OutHead->Next;
430 Mutex_Release(&lep->lList);
431 LOG("Packets cleared");
433 // Tell remote that local has closed
434 VFS_MarkError(&rep->Node, 1);
435 // TODO: Deliver SIGPIPE or similar to all owners of remote
436 // Clean up if both sides are closed
437 if( rep->Node.ImplPtr == NULL )
439 LOG("Remote closed, cleaning up");
440 tIPCPipe_Server *srv = ch->Server;
443 RWLock_AcquireWrite(&srv->lChannelList);
445 ch->Prev->Next = ch->Next;
447 srv->FirstClient = ch->Next;
449 ch->Next->Prev = ch->Prev;
451 srv->LastClient = ch->Prev;
452 if(srv->FirstUnseenClient == ch)
453 srv->FirstUnseenClient = ch->Next;
454 RWLock_Release(&srv->lChannelList);