From 4638b23e49638c3435a576f332b9f0fd383676e0 Mon Sep 17 00:00:00 2001 From: John Hodge Date: Mon, 25 Feb 2013 11:42:18 +0800 Subject: [PATCH] Kernel - Add untested Datagram IPC Pipe --- KernelLand/Kernel/drv/dgram_pipe.c | 398 +++++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 KernelLand/Kernel/drv/dgram_pipe.c diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c new file mode 100644 index 00000000..f47450de --- /dev/null +++ b/KernelLand/Kernel/drv/dgram_pipe.c @@ -0,0 +1,398 @@ +/* + * Acess2 Kernel + * - By John Hodge (thePowersGang); + * + * drv/dgram_pipe.c + * - Connection+Datagram based local IPC + */ +#include +#include +#include +#include +#include + +#define DEF_MAX_BLOCK_SIZE 0x1000 +#define DEF_MAX_BYTE_LIMIT 0x8000 + +// === TYPES === +typedef struct sIPCPipe_Packet tIPCPipe_Packet; +typedef struct sIPCPipe_Endpoint tIPCPipe_Endpoint; +typedef struct sIPCPipe_Channel tIPCPipe_Channel; +typedef struct sIPCPipe_Server tIPCPipe_Server; + +// === STRUCTURES === +struct sIPCPipe_Packet +{ + tIPCPipe_Packet *Next; + size_t Length; + size_t Offset; //!< Offset to first unread byte (for streams) + void *Data; +}; +struct sIPCPipe_Endpoint +{ + tMutex lList; + tIPCPipe_Packet *OutHead; + tIPCPipe_Packet *OutTail; + tSemaphore InCount; + tVFS_Node Node; +}; +struct sIPCPipe_Channel +{ + tIPCPipe_Channel *Next; + tIPCPipe_Channel *Prev; + tIPCPipe_Server *Server; + tIPCPipe_Endpoint ServerEP; + tIPCPipe_Endpoint ClientEP; +}; +struct sIPCPipe_Server +{ + tIPCPipe_Server *Next; + tIPCPipe_Server *Prev; + char *Name; + size_t MaxBlockSize; // Max size of a 'packet' + size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server + size_t CurrentByteCount; + tVFS_Node ServerNode; + tSemaphore ConnectionSemaphore; + tRWLock lChannelList; + tIPCPipe_Channel *FirstClient; + tIPCPipe_Channel *LastClient; + tIPCPipe_Channel *FirstUnseenClient; // !< First client server hasn't opened +}; + +// === PROTOTYPES === + int IPCPipe_Install(char **Arguments); +// - Root +tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags); + int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name); +// - Server + int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name); +void IPCPipe_Server_Close(tVFS_Node *Node); +// - Socket +tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep); +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest); +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src); +void IPCPipe_Client_Close(tVFS_Node *Node); + +// === GLOBALS === +// - Server node: Directory +tVFS_NodeType gIPCPipe_ServerNodeType = { + .TypeName = "IPC Pipe - Server", + .ReadDir = IPCPipe_Server_ReadDir, + .FindDir = IPCPipe_Server_FindDir, + .Close = IPCPipe_Server_Close +}; +tVFS_NodeType gIPCPipe_ChannelNodeType = { + .TypeName = "IPC Pipe - Channel", + .Read = IPCPipe_Client_Read, + .Write = IPCPipe_Client_Write, + .Close = IPCPipe_Client_Close +}; +tVFS_NodeType gIPCPipe_RootNodeType = { + .TypeName = "IPC Pipe - Root", + .MkNod = IPCPipe_Root_MkNod, + .ReadDir = IPCPipe_Root_ReadDir, + .FindDir = IPCPipe_Root_FindDir +}; +tDevFS_Driver gIPCPipe_DevFSInfo = { + .Name = "ipcpipe", + .RootNode = {.Type=&gIPCPipe_RootNodeType} +}; +// - Global list of servers +tRWLock glIPCPipe_ServerList; +tIPCPipe_Server *gpIPCPipe_FirstServer; +tIPCPipe_Server *gpIPCPipe_LastServer; +// - Module definition +MODULE_DEFINE(0, 0x0100, IPCPipe, IPCPipe_Install, NULL, NULL); + +// === CODE === +int IPCPipe_Install(char **Arguments) +{ + DevFS_AddDevice(&gIPCPipe_DevFSInfo); + return MODULE_ERR_OK; +} + +// - Root - +/* + * \brief Create a new named pipe + * \note Expected to be called from VFS_Open with OPENFLAG_CREATE + */ +tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags) +{ + tIPCPipe_Server *srv; + + // Write Lock + RWLock_AcquireWrite(&glIPCPipe_ServerList); + for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next ) + { + if( strcmp(srv->Name, Name) == 0 ) + break; + } + if( srv ) + { + RWLock_Release(&glIPCPipe_ServerList); + return NULL; + } + + + srv = calloc( 1, sizeof(tIPCPipe_Server) + strlen(Name) + 1 ); + srv->Name = (void*)(srv + 1); + strcpy(srv->Name, Name); + srv->MaxBlockSize = DEF_MAX_BLOCK_SIZE; + srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT; + srv->ServerNode.Type = &gIPCPipe_ServerNodeType; + srv->ServerNode.ImplPtr = srv; + + // Lock already held + if( gpIPCPipe_LastServer ) + gpIPCPipe_LastServer->Next = srv; + else + gpIPCPipe_FirstServer = srv; + srv->Prev = gpIPCPipe_LastServer; + gpIPCPipe_LastServer = srv; + RWLock_Release(&glIPCPipe_ServerList); + + return &srv->ServerNode; +} + +int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) +{ + tIPCPipe_Server *srv; + RWLock_AcquireRead(&glIPCPipe_ServerList); + for( srv = gpIPCPipe_FirstServer; srv && ID--; srv = srv->Next ) + ; + RWLock_Release(&glIPCPipe_ServerList); + + if( srv ) { + strncpy(Name, srv->Name, sizeof(Name)); + return 0; + } + + return -1; +} +/** + * \return New client pointer + */ +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name) +{ + tIPCPipe_Server *srv; + + // Find server + RWLock_AcquireRead(&glIPCPipe_ServerList); + for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next ) + { + if( strcmp(srv->Name, Name) == 0 ) + break; + } + RWLock_Release(&glIPCPipe_ServerList); + if( !srv ) + return NULL; + + // Create new client + tIPCPipe_Channel *new_client; + + new_client = calloc(1, sizeof(tIPCPipe_Channel)); + new_client->Server = srv; + new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType; + new_client->ClientEP.Node.ImplPtr = new_client; + new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType; + new_client->ServerEP.Node.ImplPtr = new_client; + + // Append to server list + RWLock_AcquireWrite(&srv->lChannelList); + if(srv->LastClient) + srv->LastClient->Next = new_client; + else + srv->FirstClient = new_client; + srv->LastClient = new_client; + if(!srv->FirstUnseenClient) + srv->FirstUnseenClient = new_client; + RWLock_Release(&srv->lChannelList); + + return &new_client->ClientEP.Node; +} + +// --- Server --- +int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) +{ + // 'next' is a valid entry, but readdir should never be called on this node + return -1; +} +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name) +{ + tIPCPipe_Server *srv = Node->ImplPtr; + + if( strcmp(Name, "next") != 0 ) + return NULL; + if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 ) + return NULL; + + tIPCPipe_Channel *conn; + RWLock_AcquireRead(&srv->lChannelList); + conn = srv->FirstUnseenClient; + if( conn ) + { + srv->FirstUnseenClient = conn->Next; + } + RWLock_Release(&srv->lChannelList); + + if( !conn ) + return NULL; + + // Success + return &conn->ServerEP.Node; +} + +void IPCPipe_Server_Close(tVFS_Node *Node) +{ + tIPCPipe_Server *srv = Node->ImplPtr; +// Semaphore_Cancel(&srv->ConnectionSemaphore); + + // Flag server as being destroyed + + // Force-close all children + RWLock_AcquireWrite(&srv->lChannelList); + for(tIPCPipe_Channel *client = srv->FirstClient; client; client = client->Next) + { + client->Server = NULL; + } + RWLock_Release(&srv->lChannelList); + + // Remove from global list + RWLock_AcquireWrite(&glIPCPipe_ServerList); + // - Forward link + if(srv->Prev) + srv->Prev->Next = srv->Next; + else + gpIPCPipe_FirstServer = srv->Next; + // - Reverse link + if(srv->Next) + srv->Next->Prev = srv->Prev; + else + gpIPCPipe_LastServer = srv->Prev; + RWLock_Release(&glIPCPipe_ServerList); +} + +// --- Channel --- +tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep) +{ + tIPCPipe_Channel *ch = Node->ImplPtr; + if( !ch ) return NULL; + *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP); + *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP); + return ch; +} +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest) +{ + tIPCPipe_Endpoint *lep, *rep; + tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); + + if( channel->Server == NULL ) + return -1; + + if( Semaphore_Wait(&lep->InCount, 1) != 1 ) + { + if( channel->Server == NULL ) + return -1; + return 0; + } + + Mutex_Acquire(&rep->lList); + tIPCPipe_Packet *pkt = rep->OutHead; + if(pkt) + rep->OutHead = pkt->Next; + if(!rep->OutHead) + rep->OutTail = NULL; + Mutex_Release(&rep->lList); + + if(!pkt) + return 0; + + size_t ret = MIN(pkt->Length, Length); + memcpy(Dest, pkt->Data, ret); + free(pkt); + + return ret; +} + +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src) +{ + tIPCPipe_Endpoint *lep, *rep; + tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); + + // Ensure the server hasn't been closed + if( !channel || channel->Server == NULL ) + return -1; + + if( Length > channel->Server->MaxBlockSize ) + return 0; + + // Create packet structure + tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length); + pkt->Next = NULL; + pkt->Offset = 0; + pkt->Length = Length; + pkt->Data = pkt + 1; + memcpy(pkt->Data, Src, Length); + + // Append to list + Mutex_Acquire(&lep->lList); + if( !Node->ImplPtr ) + { + // Client was closed by another thread + free(pkt); + Mutex_Release(&lep->lList); + return -1; + } + if(lep->OutTail) + lep->OutTail->Next = pkt; + else + lep->OutHead = pkt; + lep->OutTail = pkt; + Mutex_Release(&lep->lList); + + // Signal other end + Semaphore_Signal(&rep->InCount, 1); + + return Length; +} + +void IPCPipe_Client_Close(tVFS_Node *Node) +{ + tIPCPipe_Endpoint *lep, *rep; + tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep); + + // Mark client as closed + Node->ImplPtr = NULL; + Mutex_Acquire(&lep->lList); + while( lep->OutHead ) { + tIPCPipe_Packet *next = lep->OutHead->Next; + free(lep->OutHead); + lep->OutHead = next; + } + Mutex_Release(&lep->lList); + + // Clean up if both sides are closed + if( rep->Node.ImplPtr == NULL ) + { + tIPCPipe_Server *srv = ch->Server; + if( srv ) + { + RWLock_AcquireWrite(&srv->lChannelList); + if(ch->Prev) + ch->Prev->Next = ch->Next; + else + srv->FirstClient = ch->Next; + if(ch->Next) + ch->Next->Prev = ch->Prev; + else + srv->LastClient = ch->Prev; + if(srv->FirstUnseenClient == ch) + srv->FirstUnseenClient = ch->Next; + RWLock_Release(&srv->lChannelList); + } + free(ch); + } +} -- 2.20.1