3 * - By John Hodge (thePowersGang);
6 * - Connection+Datagram based local IPC
12 #include <semaphore.h>
14 #define DEF_MAX_BLOCK_SIZE 0x1000
15 #define DEF_MAX_BYTE_LIMIT 0x8000
18 typedef struct sIPCPipe_Packet tIPCPipe_Packet;
19 typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint;
20 typedef struct sIPCPipe_Channel tIPCPipe_Channel;
21 typedef struct sIPCPipe_Server tIPCPipe_Server;
24 struct sIPCPipe_Packet
26 tIPCPipe_Packet *Next;
28 size_t Offset; //!< Offset to first unread byte (for streams)
31 struct sIPCPipe_Endpoint
34 tIPCPipe_Packet *OutHead;
35 tIPCPipe_Packet *OutTail;
39 struct sIPCPipe_Channel
41 tIPCPipe_Channel *Next;
42 tIPCPipe_Channel *Prev;
43 tIPCPipe_Server *Server;
44 tIPCPipe_Endpoint ServerEP;
45 tIPCPipe_Endpoint ClientEP;
47 struct sIPCPipe_Server
49 tIPCPipe_Server *Next;
50 tIPCPipe_Server *Prev;
52 size_t MaxBlockSize; // Max size of a 'packet'
53 size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
54 size_t CurrentByteCount;
56 tSemaphore ConnectionSemaphore;
58 tIPCPipe_Channel *FirstClient;
59 tIPCPipe_Channel *LastClient;
60 tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened
64 int IPCPipe_Install(char **Arguments);
66 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
67 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
68 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
70 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
71 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
72 void IPCPipe_Server_Close(tVFS_Node *Node);
74 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
75 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
76 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
77 void IPCPipe_Client_Close(tVFS_Node *Node);
80 // - Server node: Directory
81 tVFS_NodeType gIPCPipe_ServerNodeType = {
82 .TypeName = "IPC Pipe - Server",
83 .ReadDir = IPCPipe_Server_ReadDir,
84 .FindDir = IPCPipe_Server_FindDir,
85 .Close = IPCPipe_Server_Close
87 tVFS_NodeType gIPCPipe_ChannelNodeType = {
88 .TypeName = "IPC Pipe - Channel",
89 .Read = IPCPipe_Client_Read,
90 .Write = IPCPipe_Client_Write,
91 .Close = IPCPipe_Client_Close
93 tVFS_NodeType gIPCPipe_RootNodeType = {
94 .TypeName = "IPC Pipe - Root",
95 .MkNod = IPCPipe_Root_MkNod,
96 .ReadDir = IPCPipe_Root_ReadDir,
97 .FindDir = IPCPipe_Root_FindDir
99 tDevFS_Driver gIPCPipe_DevFSInfo = {
101 .RootNode = {.Type=&gIPCPipe_RootNodeType}
103 // - Global list of servers
104 tRWLock glIPCPipe_ServerList;
105 tIPCPipe_Server *gpIPCPipe_FirstServer;
106 tIPCPipe_Server *gpIPCPipe_LastServer;
107 // - Module definition
108 MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
111 int IPCPipe_Install(char **Arguments)
113 DevFS_AddDevice(&gIPCPipe_DevFSInfo);
114 return MODULE_ERR_OK;
119 * \brief Create a new named pipe
120 * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
122 tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
124 tIPCPipe_Server *srv;
127 RWLock_AcquireWrite(&glIPCPipe_ServerList);
128 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
130 if( strcmp(srv->Name, Name) == 0 )
135 RWLock_Release(&glIPCPipe_ServerList);
140 srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
141 srv->Name = (void*)(srv + 1);
142 strcpy(srv->Name, Name);
143 srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
144 srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
145 srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
146 srv->ServerNode.ImplPtr = srv;
149 if( gpIPCPipe_LastServer )
150 gpIPCPipe_LastServer->Next = srv;
152 gpIPCPipe_FirstServer = srv;
153 srv->Prev = gpIPCPipe_LastServer;
154 gpIPCPipe_LastServer = srv;
155 RWLock_Release(&glIPCPipe_ServerList);
157 return &srv->ServerNode;
160 int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
162 tIPCPipe_Server *srv;
163 RWLock_AcquireRead(&glIPCPipe_ServerList);
164 for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
166 RWLock_Release(&glIPCPipe_ServerList);
169 strncpy(Name, srv->Name, sizeof(Name));
176 * \return New client pointer
178 tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
180 tIPCPipe_Server *srv;
183 RWLock_AcquireRead(&glIPCPipe_ServerList);
184 for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
186 if( strcmp(srv->Name, Name) == 0 )
189 RWLock_Release(&glIPCPipe_ServerList);
194 tIPCPipe_Channel *new_client;
196 new_client = calloc(1, sizeof(tIPCPipe_Channel));
197 new_client->Server = srv;
198 new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
199 new_client->ClientEP.Node.ImplPtr = new_client;
200 new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
201 new_client->ServerEP.Node.ImplPtr = new_client;
203 // Append to server list
204 RWLock_AcquireWrite(&srv->lChannelList);
206 srv->LastClient->Next = new_client;
208 srv->FirstClient = new_client;
209 srv->LastClient = new_client;
210 if(!srv->FirstUnseenClient)
211 srv->FirstUnseenClient = new_client;
212 RWLock_Release(&srv->lChannelList);
214 return &new_client->ClientEP.Node;
218 int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
220 // 'next' is a valid entry, but readdir should never be called on this node
223 tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
225 tIPCPipe_Server *srv = Node->ImplPtr;
227 if( strcmp(Name, "next") != 0 )
229 if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
232 tIPCPipe_Channel *conn;
233 RWLock_AcquireRead(&srv->lChannelList);
234 conn = srv->FirstUnseenClient;
237 srv->FirstUnseenClient = conn->Next;
239 RWLock_Release(&srv->lChannelList);
245 return &conn->ServerEP.Node;
248 void IPCPipe_Server_Close(tVFS_Node *Node)
250 tIPCPipe_Server *srv = Node->ImplPtr;
251 // Semaphore_Cancel(&srv->ConnectionSemaphore);
253 // Flag server as being destroyed
255 // Force-close all children
256 RWLock_AcquireWrite(&srv->lChannelList);
257 for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
259 client->Server = NULL;
261 RWLock_Release(&srv->lChannelList);
263 // Remove from global list
264 RWLock_AcquireWrite(&glIPCPipe_ServerList);
267 srv->Prev->Next = srv->Next;
269 gpIPCPipe_FirstServer = srv->Next;
272 srv->Next->Prev = srv->Prev;
274 gpIPCPipe_LastServer = srv->Prev;
275 RWLock_Release(&glIPCPipe_ServerList);
279 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
281 tIPCPipe_Channel *ch = Node->ImplPtr;
282 if( !ch ) return NULL;
283 *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
284 *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
287 size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
289 tIPCPipe_Endpoint *lep, *rep;
290 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
292 if( channel->Server == NULL )
295 if( Semaphore_Wait(&lep->InCount, 1) != 1 )
297 if( channel->Server == NULL )
302 Mutex_Acquire(&rep->lList);
303 tIPCPipe_Packet *pkt = rep->OutHead;
305 rep->OutHead = pkt->Next;
308 Mutex_Release(&rep->lList);
313 size_t ret = MIN(pkt->Length, Length);
314 memcpy(Dest, pkt->Data, ret);
320 size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
322 tIPCPipe_Endpoint *lep, *rep;
323 tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
325 // Ensure the server hasn't been closed
326 if( !channel || channel->Server == NULL )
329 if( Length > channel->Server->MaxBlockSize )
332 // Create packet structure
333 tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
336 pkt->Length = Length;
338 memcpy(pkt->Data, Src, Length);
341 Mutex_Acquire(&lep->lList);
344 // Client was closed by another thread
346 Mutex_Release(&lep->lList);
350 lep->OutTail->Next = pkt;
354 Mutex_Release(&lep->lList);
357 Semaphore_Signal(&rep->InCount, 1);
362 void IPCPipe_Client_Close(tVFS_Node *Node)
364 tIPCPipe_Endpoint *lep, *rep;
365 tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
367 // Mark client as closed
368 Node->ImplPtr = NULL;
369 Mutex_Acquire(&lep->lList);
370 while( lep->OutHead ) {
371 tIPCPipe_Packet *next = lep->OutHead->Next;
375 Mutex_Release(&lep->lList);
377 // Clean up if both sides are closed
378 if( rep->Node.ImplPtr == NULL )
380 tIPCPipe_Server *srv = ch->Server;
383 RWLock_AcquireWrite(&srv->lChannelList);
385 ch->Prev->Next = ch->Next;
387 srv->FirstClient = ch->Next;
389 ch->Next->Prev = ch->Prev;
391 srv->LastClient = ch->Prev;
392 if(srv->FirstUnseenClient == ch)
393 srv->FirstUnseenClient = ch->Next;
394 RWLock_Release(&srv->lChannelList);