3 * - By John Hodge (thePowersGang);
6 * - Connection+Datagram based local IPC
13 #include <semaphore.h>
15 #define DEF_MAX_BLOCK_SIZE 0x1000
16 #define DEF_MAX_BYTE_LIMIT 0x8000
19 typedef struct sIPCPipe_Packet tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server tIPCPipe_Server;
25 struct sIPCPipe_Packet
27 tIPCPipe_Packet *Next;
29 size_t Offset; //!< Offset to first unread byte (for streams)
32 struct sIPCPipe_Endpoint
35 tIPCPipe_Packet *OutHead;
36 tIPCPipe_Packet *OutTail;
40 struct sIPCPipe_Channel
42 tIPCPipe_Channel *Next;
43 tIPCPipe_Channel *Prev;
44 tIPCPipe_Server *Server;
45 tIPCPipe_Endpoint ServerEP;
46 tIPCPipe_Endpoint ClientEP;
48 struct sIPCPipe_Server
50 tIPCPipe_Server *Next;
51 tIPCPipe_Server *Prev;
53 size_t MaxBlockSize; // Max size of a 'packet'
54 // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
55 size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
58 tIPCPipe_Channel *FirstClient;
59 tIPCPipe_Channel *LastClient;
60 tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened
64 int IPCPipe_Install(char **Arguments);
66 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
70 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
72 void IPCPipe_Server_Close(tVFS_Node *Node);
74 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
76 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
77 void IPCPipe_Client_Close(tVFS_Node *Node);
80 // - Server node: Directory
81 tVFS_NodeType gIPCPipe_ServerNodeType = {
82 .TypeName = "IPC Pipe - Server",
83 .ReadDir = IPCPipe_Server_ReadDir,
84 .FindDir = IPCPipe_Server_FindDir,
85 .Close = IPCPipe_Server_Close
87 tVFS_NodeType gIPCPipe_ChannelNodeType = {
88 .TypeName = "IPC Pipe - Channel",
89 .Flags = VFS_NODETYPEFLAG_STREAM,
90 .Read = IPCPipe_Client_Read,
91 .Write = IPCPipe_Client_Write,
92 .Close = IPCPipe_Client_Close
94 tVFS_NodeType gIPCPipe_RootNodeType = {
95 .TypeName = "IPC Pipe - Root",
96 .MkNod = IPCPipe_Root_MkNod,
97 .ReadDir = IPCPipe_Root_ReadDir,
98 .FindDir = IPCPipe_Root_FindDir
100 tDevFS_Driver gIPCPipe_DevFSInfo = {
102 .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
104 // - Global list of servers
105 tRWLock glIPCPipe_ServerList;
106 tIPCPipe_Server *gpIPCPipe_FirstServer;
107 tIPCPipe_Server *gpIPCPipe_LastServer;
108 // - Module definition
109 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
112 int IPCPipe_Install(char **Arguments)
114 DevFS_AddDevice(&gIPCPipe_DevFSInfo);
115 return MODULE_ERR_OK;
120 * \brief Create a new named pipe
121 * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
123 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
125 tIPCPipe_Server *srv;
127 ENTER("pNode sName xFlags", Node, Name, Flags);
130 RWLock_AcquireWrite(&glIPCPipe_ServerList);
131 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
133 if( strcmp(srv->Name, Name) == 0 )
138 RWLock_Release(&glIPCPipe_ServerList);
144 srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
145 srv->Name = (void*)(srv + 1);
146 strcpy(srv->Name, Name);
147 srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
148 srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
149 srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
150 srv->ServerNode.ImplPtr = srv;
151 srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
154 if( gpIPCPipe_LastServer )
155 gpIPCPipe_LastServer->Next = srv;
157 gpIPCPipe_FirstServer = srv;
158 srv->Prev = gpIPCPipe_LastServer;
159 gpIPCPipe_LastServer = srv;
160 RWLock_Release(&glIPCPipe_ServerList);
162 LEAVE('p', &srv->ServerNode);
163 return &srv->ServerNode;
166 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
168 tIPCPipe_Server *srv;
169 RWLock_AcquireRead(&glIPCPipe_ServerList);
170 for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
172 RWLock_Release(&glIPCPipe_ServerList);
175 strncpy(Name, srv->Name, sizeof(Name));
182 * \return New client pointer
184 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
186 tIPCPipe_Server *srv;
187 ENTER("pNode sName", Node, Name);
190 RWLock_AcquireRead(&glIPCPipe_ServerList);
191 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
193 LOG("== %s", srv->Name);
194 if( strcmp(srv->Name, Name) == 0 )
197 RWLock_Release(&glIPCPipe_ServerList);
203 if( Flags & VFS_FDIRFLAG_STAT ) {
204 // LEAVE('p', srv->TplClientNode);
205 // return &srv->TplClientNode;
209 tIPCPipe_Channel *new_client;
211 new_client = calloc(1, sizeof(tIPCPipe_Channel));
212 new_client->Server = srv;
213 new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
214 new_client->ClientEP.Node.ImplPtr = new_client;
215 new_client->ClientEP.Node.Size = -1;
216 new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
217 new_client->ServerEP.Node.ImplPtr = new_client;
218 new_client->ServerEP.Node.Size = -1;
220 // Append to server list
221 RWLock_AcquireWrite(&srv->lChannelList);
223 srv->LastClient->Next = new_client;
225 srv->FirstClient = new_client;
226 srv->LastClient = new_client;
227 if(!srv->FirstUnseenClient)
228 srv->FirstUnseenClient = new_client;
229 VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
230 RWLock_Release(&srv->lChannelList);
232 LEAVE('p', &new_client->ClientEP.Node);
233 return &new_client->ClientEP.Node;
237 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
239 // 'next' is a valid entry, but readdir should never be called on this node
242 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
244 tIPCPipe_Server *srv = Node->ImplPtr;
246 ENTER("pNode sName", Node, Name);
248 if( strcmp(Name, "newclient") != 0 ) {
253 // TODO: Need VFS_FDIRFLAG_NOBLOCK?
254 VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
256 tIPCPipe_Channel *conn;
257 RWLock_AcquireRead(&srv->lChannelList);
258 conn = srv->FirstUnseenClient;
261 srv->FirstUnseenClient = conn->Next;
263 VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
264 RWLock_Release(&srv->lChannelList);
272 LEAVE('p', &conn->ServerEP.Node);
273 return &conn->ServerEP.Node;
276 void IPCPipe_Server_Close(tVFS_Node *Node)
278 tIPCPipe_Server *srv = Node->ImplPtr;
280 // Flag server as being destroyed
282 // Force-close all children
283 RWLock_AcquireWrite(&srv->lChannelList);
284 for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
286 client->Server = NULL;
288 RWLock_Release(&srv->lChannelList);
290 // Remove from global list
291 RWLock_AcquireWrite(&glIPCPipe_ServerList);
294 srv->Prev->Next = srv->Next;
296 gpIPCPipe_FirstServer = srv->Next;
299 srv->Next->Prev = srv->Prev;
301 gpIPCPipe_LastServer = srv->Prev;
302 RWLock_Release(&glIPCPipe_ServerList);
306 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
308 tIPCPipe_Channel *ch = Node->ImplPtr;
311 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
312 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
316 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
318 tIPCPipe_Endpoint *lep, *rep;
319 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
321 ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
323 // Closed endpoint / channel
324 if( !channel || channel->Server == NULL ) {
329 // Wait for a packet to be ready
330 tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
331 int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
333 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
337 if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
343 // Pop a packet from the list
344 Mutex_Acquire(&rep->lList);
345 if( !rep->Node.ImplPtr )
347 Mutex_Release(&rep->lList);
352 tIPCPipe_Packet *pkt = rep->OutHead;
354 rep->OutHead = pkt->Next;
357 VFS_MarkAvaliable(Node, !!rep->OutHead);
358 VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full
359 Mutex_Release(&rep->lList);
365 ret = MIN(pkt->Length, Length);
366 memcpy(Dest, pkt->Data, ret);
371 Log_Warning("IPCPipe", "No packet ready but select returned");
378 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
380 tIPCPipe_Endpoint *lep, *rep;
381 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
383 ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
385 // Ensure the server hasn't been closed
386 if( !channel || channel->Server == NULL ) {
391 if( Length > channel->Server->MaxBlockSize ) {
396 // Wait for a packet to be ready
397 tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
398 int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
401 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
405 if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
411 // Create packet structure
412 tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
415 pkt->Length = Length;
417 memcpy(pkt->Data, Src, Length);
420 Mutex_Acquire(&lep->lList);
423 // Client was closed by another thread
425 Mutex_Release(&lep->lList);
430 lep->OutTail->Next = pkt;
435 lep->ByteCount += Length;
436 if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
437 VFS_MarkFull(Node, 1);
440 Mutex_Release(&lep->lList);
443 VFS_MarkAvaliable(&rep->Node, 1);
449 void IPCPipe_Client_Close(tVFS_Node *Node)
451 tIPCPipe_Endpoint *lep, *rep;
452 tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
454 ENTER("pNode", Node);
457 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
461 // Mark client as closed
462 Node->ImplPtr = NULL;
464 Mutex_Acquire(&lep->lList);
465 while( lep->OutHead ) {
466 tIPCPipe_Packet *next = lep->OutHead->Next;
470 Mutex_Release(&lep->lList);
471 LOG("Packets cleared");
473 // Tell remote that local has closed
474 VFS_MarkError(&rep->Node, 1);
475 // TODO: Deliver SIGPIPE or similar to all owners of remote
476 // Clean up if both sides are closed
477 if( rep->Node.ImplPtr == NULL )
479 LOG("Remote closed, cleaning up");
480 tIPCPipe_Server *srv = ch->Server;
483 RWLock_AcquireWrite(&srv->lChannelList);
485 ch->Prev->Next = ch->Next;
487 srv->FirstClient = ch->Next;
489 ch->Next->Prev = ch->Prev;
491 srv->LastClient = ch->Prev;
492 if(srv->FirstUnseenClient == ch)
493 srv->FirstUnseenClient = ch->Next;
494 RWLock_Release(&srv->lChannelList);