3 * - By John Hodge (thePowersGang);
6 * - Connection+Datagram based local IPC
13 #include <semaphore.h>
15 #define DEF_MAX_BLOCK_SIZE 0x1000
16 #define DEF_MAX_BYTE_LIMIT 0x8000
19 typedef struct sIPCPipe_Packet tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server tIPCPipe_Server;
25 struct sIPCPipe_Packet
27 tIPCPipe_Packet *Next;
29 size_t Offset; //!< Offset to first unread byte (for streams)
32 struct sIPCPipe_Endpoint
35 tIPCPipe_Packet *OutHead;
36 tIPCPipe_Packet *OutTail;
40 struct sIPCPipe_Channel
42 tIPCPipe_Channel *Next;
43 tIPCPipe_Channel *Prev;
44 tIPCPipe_Server *Server;
45 tIPCPipe_Endpoint ServerEP;
46 tIPCPipe_Endpoint ClientEP;
48 struct sIPCPipe_Server
50 tIPCPipe_Server *Next;
51 tIPCPipe_Server *Prev;
53 size_t MaxBlockSize; // Max size of a 'packet'
54 // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
55 size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
58 tIPCPipe_Channel *FirstClient;
59 tIPCPipe_Channel *LastClient;
60 tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened
64 int IPCPipe_Install(char **Arguments);
66 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
70 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
72 void IPCPipe_Server_Close(tVFS_Node *Node);
74 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
76 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
77 void IPCPipe_Client_Close(tVFS_Node *Node);
80 // - Server node: Directory
81 tVFS_NodeType gIPCPipe_ServerNodeType = {
82 .TypeName = "IPC Pipe - Server",
83 .ReadDir = IPCPipe_Server_ReadDir,
84 .FindDir = IPCPipe_Server_FindDir,
85 .Close = IPCPipe_Server_Close
87 tVFS_NodeType gIPCPipe_ChannelNodeType = {
88 .TypeName = "IPC Pipe - Channel",
89 .Read = IPCPipe_Client_Read,
90 .Write = IPCPipe_Client_Write,
91 .Close = IPCPipe_Client_Close
93 tVFS_NodeType gIPCPipe_RootNodeType = {
94 .TypeName = "IPC Pipe - Root",
95 .MkNod = IPCPipe_Root_MkNod,
96 .ReadDir = IPCPipe_Root_ReadDir,
97 .FindDir = IPCPipe_Root_FindDir
99 tDevFS_Driver gIPCPipe_DevFSInfo = {
101 .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
103 // - Global list of servers
104 tRWLock glIPCPipe_ServerList;
105 tIPCPipe_Server *gpIPCPipe_FirstServer;
106 tIPCPipe_Server *gpIPCPipe_LastServer;
107 // - Module definition
108 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
111 int IPCPipe_Install(char **Arguments)
113 DevFS_AddDevice(&gIPCPipe_DevFSInfo);
114 return MODULE_ERR_OK;
119 * \brief Create a new named pipe
120 * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
122 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
124 tIPCPipe_Server *srv;
126 ENTER("pNode sName xFlags", Node, Name, Flags);
129 RWLock_AcquireWrite(&glIPCPipe_ServerList);
130 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
132 if( strcmp(srv->Name, Name) == 0 )
137 RWLock_Release(&glIPCPipe_ServerList);
143 srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
144 srv->Name = (void*)(srv + 1);
145 strcpy(srv->Name, Name);
146 srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
147 srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
148 srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
149 srv->ServerNode.ImplPtr = srv;
150 srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
153 if( gpIPCPipe_LastServer )
154 gpIPCPipe_LastServer->Next = srv;
156 gpIPCPipe_FirstServer = srv;
157 srv->Prev = gpIPCPipe_LastServer;
158 gpIPCPipe_LastServer = srv;
159 RWLock_Release(&glIPCPipe_ServerList);
161 LEAVE('p', &srv->ServerNode);
162 return &srv->ServerNode;
165 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
167 tIPCPipe_Server *srv;
168 RWLock_AcquireRead(&glIPCPipe_ServerList);
169 for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
171 RWLock_Release(&glIPCPipe_ServerList);
174 strncpy(Name, srv->Name, sizeof(Name));
181 * \return New client pointer
183 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
185 tIPCPipe_Server *srv;
186 ENTER("pNode sName", Node, Name);
189 RWLock_AcquireRead(&glIPCPipe_ServerList);
190 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
192 LOG("== %s", srv->Name);
193 if( strcmp(srv->Name, Name) == 0 )
196 RWLock_Release(&glIPCPipe_ServerList);
202 if( Flags & VFS_FDIRFLAG_STAT ) {
203 // LEAVE('p', srv->TplClientNode);
204 // return &srv->TplClientNode;
208 tIPCPipe_Channel *new_client;
210 new_client = calloc(1, sizeof(tIPCPipe_Channel));
211 new_client->Server = srv;
212 new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
213 new_client->ClientEP.Node.ImplPtr = new_client;
214 new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
215 new_client->ServerEP.Node.ImplPtr = new_client;
217 // Append to server list
218 RWLock_AcquireWrite(&srv->lChannelList);
220 srv->LastClient->Next = new_client;
222 srv->FirstClient = new_client;
223 srv->LastClient = new_client;
224 if(!srv->FirstUnseenClient)
225 srv->FirstUnseenClient = new_client;
226 VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
227 RWLock_Release(&srv->lChannelList);
229 LEAVE('p', &new_client->ClientEP.Node);
230 return &new_client->ClientEP.Node;
234 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
236 // 'next' is a valid entry, but readdir should never be called on this node
239 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
241 tIPCPipe_Server *srv = Node->ImplPtr;
243 ENTER("pNode sName", Node, Name);
245 if( strcmp(Name, "newclient") != 0 ) {
250 // TODO: Need VFS_FDIRFLAG_NOBLOCK?
251 VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
253 tIPCPipe_Channel *conn;
254 RWLock_AcquireRead(&srv->lChannelList);
255 conn = srv->FirstUnseenClient;
258 srv->FirstUnseenClient = conn->Next;
260 VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
261 RWLock_Release(&srv->lChannelList);
269 LEAVE('p', &conn->ServerEP.Node);
270 return &conn->ServerEP.Node;
273 void IPCPipe_Server_Close(tVFS_Node *Node)
275 tIPCPipe_Server *srv = Node->ImplPtr;
277 // Flag server as being destroyed
279 // Force-close all children
280 RWLock_AcquireWrite(&srv->lChannelList);
281 for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
283 client->Server = NULL;
285 RWLock_Release(&srv->lChannelList);
287 // Remove from global list
288 RWLock_AcquireWrite(&glIPCPipe_ServerList);
291 srv->Prev->Next = srv->Next;
293 gpIPCPipe_FirstServer = srv->Next;
296 srv->Next->Prev = srv->Prev;
298 gpIPCPipe_LastServer = srv->Prev;
299 RWLock_Release(&glIPCPipe_ServerList);
303 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
305 tIPCPipe_Channel *ch = Node->ImplPtr;
308 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
309 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
313 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
315 tIPCPipe_Endpoint *lep, *rep;
316 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
318 ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
320 // Closed endpoint / channel
321 if( !channel || channel->Server == NULL ) {
326 // Wait for a packet to be ready
327 tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
328 int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
330 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
334 if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
340 // Pop a packet from the list
341 Mutex_Acquire(&rep->lList);
342 if( !rep->Node.ImplPtr )
344 Mutex_Release(&rep->lList);
349 tIPCPipe_Packet *pkt = rep->OutHead;
351 rep->OutHead = pkt->Next;
354 VFS_MarkAvaliable(Node, !!rep->OutHead);
355 VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full
356 Mutex_Release(&rep->lList);
362 ret = MIN(pkt->Length, Length);
363 memcpy(Dest, pkt->Data, ret);
368 Log_Warning("IPCPipe", "No packet ready but select returned");
375 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
377 tIPCPipe_Endpoint *lep, *rep;
378 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
380 ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
382 // Ensure the server hasn't been closed
383 if( !channel || channel->Server == NULL ) {
388 if( Length > channel->Server->MaxBlockSize ) {
393 // Wait for a packet to be ready
394 tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
395 int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
398 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
402 if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
408 // Create packet structure
409 tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
412 pkt->Length = Length;
414 memcpy(pkt->Data, Src, Length);
417 Mutex_Acquire(&lep->lList);
420 // Client was closed by another thread
422 Mutex_Release(&lep->lList);
427 lep->OutTail->Next = pkt;
432 lep->ByteCount += Length;
433 if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
434 VFS_MarkFull(Node, 1);
437 Mutex_Release(&lep->lList);
440 VFS_MarkAvaliable(&rep->Node, 1);
446 void IPCPipe_Client_Close(tVFS_Node *Node)
448 tIPCPipe_Endpoint *lep, *rep;
449 tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
451 ENTER("pNode", Node);
454 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
458 // Mark client as closed
459 Node->ImplPtr = NULL;
461 Mutex_Acquire(&lep->lList);
462 while( lep->OutHead ) {
463 tIPCPipe_Packet *next = lep->OutHead->Next;
467 Mutex_Release(&lep->lList);
468 LOG("Packets cleared");
470 // Tell remote that local has closed
471 VFS_MarkError(&rep->Node, 1);
472 // TODO: Deliver SIGPIPE or similar to all owners of remote
473 // Clean up if both sides are closed
474 if( rep->Node.ImplPtr == NULL )
476 LOG("Remote closed, cleaning up");
477 tIPCPipe_Server *srv = ch->Server;
480 RWLock_AcquireWrite(&srv->lChannelList);
482 ch->Prev->Next = ch->Next;
484 srv->FirstClient = ch->Next;
486 ch->Next->Prev = ch->Prev;
488 srv->LastClient = ch->Prev;
489 if(srv->FirstUnseenClient == ch)
490 srv->FirstUnseenClient = ch->Next;
491 RWLock_Release(&srv->lChannelList);