3 * - By John Hodge (thePowersGang);
6 * - Connection+Datagram based local IPC
13 #include <semaphore.h>
15 #define DEF_MAX_BLOCK_SIZE 0x1000
16 #define DEF_MAX_BYTE_LIMIT 0x8000
19 typedef struct sIPCPipe_Packet tIPCPipe_Packet;
20 typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint;
21 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
22 typedef struct sIPCPipe_Server tIPCPipe_Server;
25 struct sIPCPipe_Packet
27 tIPCPipe_Packet *Next;
29 size_t Offset; //!< Offset to first unread byte (for streams)
32 struct sIPCPipe_Endpoint
35 tIPCPipe_Packet *OutHead;
36 tIPCPipe_Packet *OutTail;
39 struct sIPCPipe_Channel
41 tIPCPipe_Channel *Next;
42 tIPCPipe_Channel *Prev;
43 tIPCPipe_Server *Server;
44 tIPCPipe_Endpoint ServerEP;
45 tIPCPipe_Endpoint ClientEP;
47 struct sIPCPipe_Server
49 tIPCPipe_Server *Next;
50 tIPCPipe_Server *Prev;
52 size_t MaxBlockSize; // Max size of a 'packet'
53 size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
54 size_t CurrentByteCount;
57 tIPCPipe_Channel *FirstClient;
58 tIPCPipe_Channel *LastClient;
59 tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened
63 int IPCPipe_Install(char **Arguments);
65 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
66 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
67 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
69 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
70 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
71 void IPCPipe_Server_Close(tVFS_Node *Node);
73 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
74 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
75 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
76 void IPCPipe_Client_Close(tVFS_Node *Node);
79 // - Server node: Directory
80 tVFS_NodeType gIPCPipe_ServerNodeType = {
81 .TypeName = "IPC Pipe - Server",
82 .ReadDir = IPCPipe_Server_ReadDir,
83 .FindDir = IPCPipe_Server_FindDir,
84 .Close = IPCPipe_Server_Close
86 tVFS_NodeType gIPCPipe_ChannelNodeType = {
87 .TypeName = "IPC Pipe - Channel",
88 .Read = IPCPipe_Client_Read,
89 .Write = IPCPipe_Client_Write,
90 .Close = IPCPipe_Client_Close
92 tVFS_NodeType gIPCPipe_RootNodeType = {
93 .TypeName = "IPC Pipe - Root",
94 .MkNod = IPCPipe_Root_MkNod,
95 .ReadDir = IPCPipe_Root_ReadDir,
96 .FindDir = IPCPipe_Root_FindDir
98 tDevFS_Driver gIPCPipe_DevFSInfo = {
100 .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
102 // - Global list of servers
103 tRWLock glIPCPipe_ServerList;
104 tIPCPipe_Server *gpIPCPipe_FirstServer;
105 tIPCPipe_Server *gpIPCPipe_LastServer;
106 // - Module definition
107 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
110 int IPCPipe_Install(char **Arguments)
112 DevFS_AddDevice(&gIPCPipe_DevFSInfo);
113 return MODULE_ERR_OK;
118 * \brief Create a new named pipe
119 * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
121 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
123 tIPCPipe_Server *srv;
125 ENTER("pNode sName xFlags", Node, Name, Flags);
128 RWLock_AcquireWrite(&glIPCPipe_ServerList);
129 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
131 if( strcmp(srv->Name, Name) == 0 )
136 RWLock_Release(&glIPCPipe_ServerList);
142 srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
143 srv->Name = (void*)(srv + 1);
144 strcpy(srv->Name, Name);
145 srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
146 srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
147 srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
148 srv->ServerNode.ImplPtr = srv;
149 srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
152 if( gpIPCPipe_LastServer )
153 gpIPCPipe_LastServer->Next = srv;
155 gpIPCPipe_FirstServer = srv;
156 srv->Prev = gpIPCPipe_LastServer;
157 gpIPCPipe_LastServer = srv;
158 RWLock_Release(&glIPCPipe_ServerList);
160 LEAVE('p', &srv->ServerNode);
161 return &srv->ServerNode;
164 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
166 tIPCPipe_Server *srv;
167 RWLock_AcquireRead(&glIPCPipe_ServerList);
168 for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
170 RWLock_Release(&glIPCPipe_ServerList);
173 strncpy(Name, srv->Name, sizeof(Name));
180 * \return New client pointer
182 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
184 tIPCPipe_Server *srv;
185 ENTER("pNode sName", Node, Name);
188 RWLock_AcquireRead(&glIPCPipe_ServerList);
189 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
191 LOG("== %s", srv->Name);
192 if( strcmp(srv->Name, Name) == 0 )
195 RWLock_Release(&glIPCPipe_ServerList);
201 if( Flags & VFS_FDIRFLAG_STAT ) {
202 // LEAVE('p', srv->TplClientNode);
203 // return &srv->TplClientNode;
207 tIPCPipe_Channel *new_client;
209 new_client = calloc(1, sizeof(tIPCPipe_Channel));
210 new_client->Server = srv;
211 new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
212 new_client->ClientEP.Node.ImplPtr = new_client;
213 new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
214 new_client->ServerEP.Node.ImplPtr = new_client;
216 // Append to server list
217 RWLock_AcquireWrite(&srv->lChannelList);
219 srv->LastClient->Next = new_client;
221 srv->FirstClient = new_client;
222 srv->LastClient = new_client;
223 if(!srv->FirstUnseenClient)
224 srv->FirstUnseenClient = new_client;
225 VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
226 RWLock_Release(&srv->lChannelList);
228 LEAVE('p', &new_client->ClientEP.Node);
229 return &new_client->ClientEP.Node;
233 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
235 // 'next' is a valid entry, but readdir should never be called on this node
238 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
240 tIPCPipe_Server *srv = Node->ImplPtr;
242 ENTER("pNode sName", Node, Name);
244 if( strcmp(Name, "newclient") != 0 ) {
249 // TODO: Need VFS_FDIRFLAG_NOBLOCK?
250 VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
252 tIPCPipe_Channel *conn;
253 RWLock_AcquireRead(&srv->lChannelList);
254 conn = srv->FirstUnseenClient;
257 srv->FirstUnseenClient = conn->Next;
259 VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
260 RWLock_Release(&srv->lChannelList);
268 LEAVE('p', &conn->ServerEP.Node);
269 return &conn->ServerEP.Node;
272 void IPCPipe_Server_Close(tVFS_Node *Node)
274 tIPCPipe_Server *srv = Node->ImplPtr;
276 // Flag server as being destroyed
278 // Force-close all children
279 RWLock_AcquireWrite(&srv->lChannelList);
280 for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
282 client->Server = NULL;
284 RWLock_Release(&srv->lChannelList);
286 // Remove from global list
287 RWLock_AcquireWrite(&glIPCPipe_ServerList);
290 srv->Prev->Next = srv->Next;
292 gpIPCPipe_FirstServer = srv->Next;
295 srv->Next->Prev = srv->Prev;
297 gpIPCPipe_LastServer = srv->Prev;
298 RWLock_Release(&glIPCPipe_ServerList);
302 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
304 tIPCPipe_Channel *ch = Node->ImplPtr;
307 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
308 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
312 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
314 tIPCPipe_Endpoint *lep, *rep;
315 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
317 ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
319 // Closed endpoint / channel
320 if( !channel || channel->Server == NULL ) {
325 // Wait for a packet to be ready
326 tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
327 int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint");
329 errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
333 if( channel->Server == NULL ) {
339 // Pop a packet from the list
340 Mutex_Acquire(&rep->lList);
341 if( !rep->Node.ImplPtr )
343 Mutex_Release(&rep->lList);
348 tIPCPipe_Packet *pkt = rep->OutHead;
350 rep->OutHead = pkt->Next;
353 VFS_MarkAvaliable(Node, !!rep->OutHead);
354 Mutex_Release(&rep->lList);
360 ret = MIN(pkt->Length, Length);
361 memcpy(Dest, pkt->Data, ret);
366 Log_Warning("IPCPipe", "No packet ready but semaphore returned");
373 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
375 tIPCPipe_Endpoint *lep, *rep;
376 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
378 ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
380 // Ensure the server hasn't been closed
381 if( !channel || channel->Server == NULL ) {
386 if( Length > channel->Server->MaxBlockSize ) {
391 // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time
393 // Create packet structure
394 tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
397 pkt->Length = Length;
399 memcpy(pkt->Data, Src, Length);
402 Mutex_Acquire(&lep->lList);
405 // Client was closed by another thread
407 Mutex_Release(&lep->lList);
412 lep->OutTail->Next = pkt;
416 Mutex_Release(&lep->lList);
419 VFS_MarkAvaliable(&rep->Node, 1);
425 void IPCPipe_Client_Close(tVFS_Node *Node)
427 tIPCPipe_Endpoint *lep, *rep;
428 tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
430 ENTER("pNode", Node);
433 Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
437 // Mark client as closed
438 Node->ImplPtr = NULL;
440 Mutex_Acquire(&lep->lList);
441 while( lep->OutHead ) {
442 tIPCPipe_Packet *next = lep->OutHead->Next;
446 Mutex_Release(&lep->lList);
447 LOG("Packets cleared");
449 // Tell remote that local has closed
450 VFS_MarkError(&rep->Node, 1);
451 // TODO: Deliver SIGPIPE or similar to all owners of remote
452 // Clean up if both sides are closed
453 if( rep->Node.ImplPtr == NULL )
455 LOG("Remote closed, cleaning up");
456 tIPCPipe_Server *srv = ch->Server;
459 RWLock_AcquireWrite(&srv->lChannelList);
461 ch->Prev->Next = ch->Next;
463 srv->FirstClient = ch->Next;
465 ch->Next->Prev = ch->Prev;
467 srv->LastClient = ch->Prev;
468 if(srv->FirstUnseenClient == ch)
469 srv->FirstUnseenClient = ch->Next;
470 RWLock_Release(&srv->lChannelList);