Kernel - Add untested Datagram IPC Pipe
authorJohn Hodge <[email protected]>
Mon, 25 Feb 2013 03:42:18 +0000 (11:42 +0800)
committerJohn Hodge <[email protected]>
Mon, 25 Feb 2013 03:42:18 +0000 (11:42 +0800)
KernelLand/Kernel/drv/dgram_pipe.c [new file with mode: 0644]

diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c
new file mode 100644 (file)
index 0000000..f47450d
--- /dev/null
@@ -0,0 +1,398 @@
+/*
+ * Acess2 Kernel
+ * - By John Hodge (thePowersGang);
+ * 
+ * drv/dgram_pipe.c
+ * - Connection+Datagram based local IPC
+ */
+#include <vfs.h>
+#include <fs_devfs.h>
+#include <modules.h>
+#include <rwlock.h>
+#include <semaphore.h>
+
+#define DEF_MAX_BLOCK_SIZE     0x1000
+#define DEF_MAX_BYTE_LIMIT     0x8000
+
+// === TYPES ===
+typedef struct sIPCPipe_Packet tIPCPipe_Packet;
+typedef struct sIPCPipe_Endpoint       tIPCPipe_Endpoint;
+typedef struct sIPCPipe_Channel        tIPCPipe_Channel;
+typedef struct sIPCPipe_Server tIPCPipe_Server;
+
+// === STRUCTURES ===
+struct sIPCPipe_Packet
+{
+       tIPCPipe_Packet *Next;
+       size_t  Length;
+       size_t  Offset; //!< Offset to first unread byte (for streams)
+       void    *Data;
+};
+struct sIPCPipe_Endpoint
+{
+       tMutex  lList;
+       tIPCPipe_Packet *OutHead;
+       tIPCPipe_Packet *OutTail;
+       tSemaphore      InCount;
+       tVFS_Node       Node;
+};
+struct sIPCPipe_Channel
+{
+       tIPCPipe_Channel        *Next;
+       tIPCPipe_Channel        *Prev;
+       tIPCPipe_Server *Server;
+       tIPCPipe_Endpoint       ServerEP;
+       tIPCPipe_Endpoint       ClientEP;
+};
+struct sIPCPipe_Server
+{
+       tIPCPipe_Server *Next;
+       tIPCPipe_Server *Prev;
+       char    *Name;
+       size_t  MaxBlockSize;   // Max size of a 'packet'
+       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
+       size_t  CurrentByteCount;
+       tVFS_Node       ServerNode;
+       tSemaphore      ConnectionSemaphore;
+       tRWLock lChannelList;
+       tIPCPipe_Channel        *FirstClient;
+       tIPCPipe_Channel        *LastClient;
+       tIPCPipe_Channel        *FirstUnseenClient;     // !< First client server hasn't opened
+};
+
+// === PROTOTYPES ===
+ int   IPCPipe_Install(char **Arguments);
+// - Root
+tVFS_Node      *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
+ int   IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
+tVFS_Node      *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
+// - Server
+ int   IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
+tVFS_Node      *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
+void   IPCPipe_Server_Close(tVFS_Node *Node);
+// - Socket
+tIPCPipe_Channel       *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
+void   IPCPipe_Client_Close(tVFS_Node *Node);
+
+// === GLOBALS ===
+// - Server node: Directory
+tVFS_NodeType  gIPCPipe_ServerNodeType = {
+       .TypeName = "IPC Pipe - Server",
+       .ReadDir = IPCPipe_Server_ReadDir,
+       .FindDir = IPCPipe_Server_FindDir,
+       .Close = IPCPipe_Server_Close
+};
+tVFS_NodeType  gIPCPipe_ChannelNodeType = {
+       .TypeName = "IPC Pipe - Channel",
+       .Read = IPCPipe_Client_Read,
+       .Write = IPCPipe_Client_Write,
+       .Close = IPCPipe_Client_Close
+};
+tVFS_NodeType  gIPCPipe_RootNodeType = {
+       .TypeName = "IPC Pipe - Root",
+       .MkNod = IPCPipe_Root_MkNod,
+       .ReadDir = IPCPipe_Root_ReadDir,
+       .FindDir = IPCPipe_Root_FindDir
+};
+tDevFS_Driver  gIPCPipe_DevFSInfo = {
+       .Name = "ipcpipe",
+       .RootNode = {.Type=&gIPCPipe_RootNodeType}
+};
+// - Global list of servers
+tRWLock        glIPCPipe_ServerList;
+tIPCPipe_Server        *gpIPCPipe_FirstServer;
+tIPCPipe_Server        *gpIPCPipe_LastServer;
+// - Module definition
+MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL);
+
+// === CODE ===
+int IPCPipe_Install(char **Arguments)
+{
+       DevFS_AddDevice(&gIPCPipe_DevFSInfo);
+       return MODULE_ERR_OK;
+}
+
+// - Root -
+/*
+ * \brief Create a new named pipe
+ * \note Expected to be called from VFS_Open with OPENFLAG_CREATE
+ */
+tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
+{
+       tIPCPipe_Server *srv;
+       
+       // Write Lock
+       RWLock_AcquireWrite(&glIPCPipe_ServerList);
+       for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
+       {
+               if( strcmp(srv->Name, Name) == 0 )
+                       break;
+       }
+       if( srv )
+       {
+               RWLock_Release(&glIPCPipe_ServerList);
+               return NULL;
+       }
+       
+
+       srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 );
+       srv->Name = (void*)(srv + 1);
+       strcpy(srv->Name, Name);
+       srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE;
+       srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
+       srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
+       srv->ServerNode.ImplPtr = srv;
+
+       // Lock already held
+       if( gpIPCPipe_LastServer )
+               gpIPCPipe_LastServer->Next = srv;
+       else
+               gpIPCPipe_FirstServer = srv;
+       srv->Prev = gpIPCPipe_LastServer;
+       gpIPCPipe_LastServer = srv;
+       RWLock_Release(&glIPCPipe_ServerList);
+
+       return &srv->ServerNode;
+}
+
+int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
+{
+       tIPCPipe_Server *srv;
+       RWLock_AcquireRead(&glIPCPipe_ServerList);
+       for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next )
+               ;
+       RWLock_Release(&glIPCPipe_ServerList);
+       
+       if( srv ) {
+               strncpy(Name, srv->Name, sizeof(Name));
+               return 0;
+       }
+       
+       return -1;
+}
+/**
+ * \return New client pointer
+ */
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
+{
+       tIPCPipe_Server *srv;
+       
+       // Find server
+       RWLock_AcquireRead(&glIPCPipe_ServerList);
+       for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
+       {
+               if( strcmp(srv->Name, Name) == 0 )
+                       break;
+       }
+       RWLock_Release(&glIPCPipe_ServerList);
+       if( !srv )
+               return NULL;
+
+       // Create new client
+       tIPCPipe_Channel *new_client;
+       
+       new_client = calloc(1, sizeof(tIPCPipe_Channel));
+       new_client->Server = srv;
+       new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
+       new_client->ClientEP.Node.ImplPtr = new_client;
+       new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
+       new_client->ServerEP.Node.ImplPtr = new_client;
+
+       // Append to server list
+       RWLock_AcquireWrite(&srv->lChannelList);
+       if(srv->LastClient)
+               srv->LastClient->Next = new_client;
+       else
+               srv->FirstClient = new_client;
+       srv->LastClient = new_client;
+       if(!srv->FirstUnseenClient)
+               srv->FirstUnseenClient = new_client;
+       RWLock_Release(&srv->lChannelList);
+       
+       return &new_client->ClientEP.Node;
+}
+
+// --- Server ---
+int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
+{
+       // 'next' is a valid entry, but readdir should never be called on this node
+       return -1;
+}
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
+{
+       tIPCPipe_Server *srv = Node->ImplPtr;
+       
+       if( strcmp(Name, "next") != 0 )
+               return NULL;
+       if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
+               return NULL;
+
+       tIPCPipe_Channel *conn;
+       RWLock_AcquireRead(&srv->lChannelList);
+       conn = srv->FirstUnseenClient;
+       if( conn )
+       {
+               srv->FirstUnseenClient = conn->Next;
+       }
+       RWLock_Release(&srv->lChannelList);
+
+       if( !conn )
+               return NULL;
+       
+       // Success
+       return &conn->ServerEP.Node;
+}
+
+void IPCPipe_Server_Close(tVFS_Node *Node)
+{
+       tIPCPipe_Server *srv = Node->ImplPtr;
+//     Semaphore_Cancel(&srv->ConnectionSemaphore);
+
+       // Flag server as being destroyed
+
+       // Force-close all children
+       RWLock_AcquireWrite(&srv->lChannelList);
+       for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next)
+       {
+               client->Server = NULL;
+       }
+       RWLock_Release(&srv->lChannelList);
+
+       // Remove from global list
+       RWLock_AcquireWrite(&glIPCPipe_ServerList);
+       // - Forward link
+       if(srv->Prev)
+               srv->Prev->Next = srv->Next;
+       else
+               gpIPCPipe_FirstServer = srv->Next;
+       // - Reverse link
+       if(srv->Next)
+               srv->Next->Prev = srv->Prev;
+       else
+               gpIPCPipe_LastServer = srv->Prev;
+       RWLock_Release(&glIPCPipe_ServerList);
+}
+
+// --- Channel ---
+tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
+{
+       tIPCPipe_Channel        *ch = Node->ImplPtr;
+       if( !ch )       return NULL;
+       *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
+       *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+       return ch;
+}
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
+{
+       tIPCPipe_Endpoint       *lep, *rep;
+       tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
+       
+       if( channel->Server == NULL )
+               return -1;
+       
+       if( Semaphore_Wait(&lep->InCount, 1) != 1 )
+       {
+               if( channel->Server == NULL )
+                       return -1;
+               return 0;
+       }
+
+       Mutex_Acquire(&rep->lList);
+       tIPCPipe_Packet *pkt = rep->OutHead;
+       if(pkt)
+               rep->OutHead = pkt->Next;
+       if(!rep->OutHead)
+               rep->OutTail = NULL;
+       Mutex_Release(&rep->lList);
+
+       if(!pkt)
+               return 0;
+
+       size_t ret = MIN(pkt->Length, Length);
+       memcpy(Dest, pkt->Data, ret);
+       free(pkt);
+       
+       return ret;
+}
+
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
+{
+       tIPCPipe_Endpoint       *lep, *rep;
+       tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
+       
+       // Ensure the server hasn't been closed
+       if( !channel || channel->Server == NULL )
+               return -1;
+
+       if( Length > channel->Server->MaxBlockSize )
+               return 0;
+
+       // Create packet structure      
+       tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
+       pkt->Next = NULL;
+       pkt->Offset = 0;
+       pkt->Length = Length;
+       pkt->Data = pkt + 1;
+       memcpy(pkt->Data, Src, Length);
+
+       // Append to list
+       Mutex_Acquire(&lep->lList);
+       if( !Node->ImplPtr )
+       {
+               // Client was closed by another thread
+               free(pkt);
+               Mutex_Release(&lep->lList);
+               return -1;
+       }
+       if(lep->OutTail)
+               lep->OutTail->Next = pkt;
+       else
+               lep->OutHead = pkt;
+       lep->OutTail = pkt;
+       Mutex_Release(&lep->lList);
+
+       // Signal other end
+       Semaphore_Signal(&rep->InCount, 1);
+
+       return Length;
+}
+
+void IPCPipe_Client_Close(tVFS_Node *Node)
+{
+       tIPCPipe_Endpoint       *lep, *rep;
+       tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
+       
+       // Mark client as closed
+       Node->ImplPtr = NULL;
+       Mutex_Acquire(&lep->lList);
+       while( lep->OutHead ) {
+               tIPCPipe_Packet *next = lep->OutHead->Next;
+               free(lep->OutHead);
+               lep->OutHead = next;
+       }
+       Mutex_Release(&lep->lList);
+       
+       // Clean up if both sides are closed
+       if( rep->Node.ImplPtr == NULL )
+       {
+               tIPCPipe_Server *srv = ch->Server;
+               if( srv )
+               {
+                       RWLock_AcquireWrite(&srv->lChannelList);
+                       if(ch->Prev)
+                               ch->Prev->Next = ch->Next;
+                       else
+                               srv->FirstClient = ch->Next;
+                       if(ch->Next)
+                               ch->Next->Prev = ch->Prev;
+                       else
+                               srv->LastClient = ch->Prev;
+                       if(srv->FirstUnseenClient == ch)
+                               srv->FirstUnseenClient = ch->Next;
+                       RWLock_Release(&srv->lChannelList);
+               }
+               free(ch);
+       }
+}

UCC git Repository :: git.ucc.asn.au