X-Git-Url: https://git.ucc.asn.au/?a=blobdiff_plain;f=KernelLand%2FKernel%2Fdrv%2Fdgram_pipe.c;h=667190eb0ad8b3cdc3f88e7f18866d46883f2f10;hb=9eadc33399e705035c33e8434a9644d91e44ed44;hp=2ae22974fb09c6589ea58d3d9c9dbd140eb24dec;hpb=eddfdcfdb2f702313954d28e98efcc27bdd145e8;p=tpg%2Facess2.git diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c index 2ae22974..667190eb 100644 --- a/KernelLand/Kernel/drv/dgram_pipe.c +++ b/KernelLand/Kernel/drv/dgram_pipe.c @@ -5,7 +5,7 @@ * drv/dgram_pipe.c * - Connection+Datagram based local IPC */ -#define DEBUG 1 +#define DEBUG 0 #include #include #include @@ -34,6 +34,7 @@ struct sIPCPipe_Endpoint tMutex lList; tIPCPipe_Packet *OutHead; tIPCPipe_Packet *OutTail; + size_t ByteCount; tVFS_Node Node; }; struct sIPCPipe_Channel @@ -50,8 +51,8 @@ struct sIPCPipe_Server tIPCPipe_Server *Prev; char *Name; size_t MaxBlockSize; // Max size of a 'packet' - size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server - size_t CurrentByteCount; + // NOTE: Not strictly enforced, can go MaxBlockSize-1 over + size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint tVFS_Node ServerNode; tRWLock lChannelList; tIPCPipe_Channel *FirstClient; @@ -64,15 +65,15 @@ struct sIPCPipe_Server // - Root tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags); int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); -tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name); +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags); // - Server int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); -tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name); +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags); void IPCPipe_Server_Close(tVFS_Node *Node); // - Socket tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep); -size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest); -size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src); +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags); +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags); void IPCPipe_Client_Close(tVFS_Node *Node); // === GLOBALS === @@ -85,6 +86,7 @@ tVFS_NodeType gIPCPipe_ServerNodeType = { }; tVFS_NodeType gIPCPipe_ChannelNodeType = { .TypeName = "IPC Pipe - Channel", + .Flags = VFS_NODETYPEFLAG_STREAM, .Read = IPCPipe_Client_Read, .Write = IPCPipe_Client_Write, .Close = IPCPipe_Client_Close @@ -179,7 +181,7 @@ int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) /** * \return New client pointer */ -tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name) +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags) { tIPCPipe_Server *srv; ENTER("pNode sName", Node, Name); @@ -198,15 +200,22 @@ tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name) return NULL; } + if( Flags & VFS_FDIRFLAG_STAT ) { + // LEAVE('p', srv->TplClientNode); + // return &srv->TplClientNode; + } + // Create new client - tIPCPipe_Channel *new_client; - + tIPCPipe_Channel *new_client; + new_client = calloc(1, sizeof(tIPCPipe_Channel)); new_client->Server = srv; new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType; new_client->ClientEP.Node.ImplPtr = new_client; + new_client->ClientEP.Node.Size = -1; new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType; new_client->ServerEP.Node.ImplPtr = new_client; + new_client->ServerEP.Node.Size = -1; // Append to server list RWLock_AcquireWrite(&srv->lChannelList); @@ -230,7 +239,7 @@ int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) // 'next' is a valid entry, but readdir should never be called on this node return -1; } -tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name) +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags) { tIPCPipe_Server *srv = Node->ImplPtr; @@ -241,6 +250,7 @@ tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name) return NULL; } + // TODO: Need VFS_FDIRFLAG_NOBLOCK? VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server"); tIPCPipe_Channel *conn; @@ -303,7 +313,7 @@ tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, t } return ch; } -size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest) +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags) { tIPCPipe_Endpoint *lep, *rep; tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); @@ -317,8 +327,15 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D } // Wait for a packet to be ready - VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint"); - if( channel->Server == NULL ) { + tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); + int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); + if( !rv ) { + errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; + LEAVE('i', -1); + return -1; + } + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { + //errno = EIO; LEAVE('i', -1); return -1; } @@ -328,6 +345,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D if( !rep->Node.ImplPtr ) { Mutex_Release(&rep->lList); + //errno = EIO; LEAVE('i', -1); return -1; } @@ -337,6 +355,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D if(!rep->OutHead) rep->OutTail = NULL; VFS_MarkAvaliable(Node, !!rep->OutHead); + VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full Mutex_Release(&rep->lList); // Return @@ -349,14 +368,14 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D } else { - Log_Warning("IPCPipe", "No packet ready but semaphore returned"); + Log_Warning("IPCPipe", "No packet ready but select returned"); } LEAVE('i', ret); return ret; } -size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src) +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags) { tIPCPipe_Endpoint *lep, *rep; tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); @@ -373,8 +392,23 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const LEAVE('i', 0); return 0; } + + // Wait for a packet to be ready + tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); + int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); + ASSERTC(rv, >=, 0); + if( !rv ) { + errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; + LEAVE('i', -1); + return -1; + } + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { + //errno = EIO; + LEAVE('i', -1); + return -1; + } - // Create packet structure + // Create packet structure tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length); pkt->Next = NULL; pkt->Offset = 0; @@ -397,6 +431,12 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const else lep->OutHead = pkt; lep->OutTail = pkt; + + lep->ByteCount += Length; + if( lep->ByteCount >= channel->Server->QueueByteLimit ) { + VFS_MarkFull(Node, 1); + } + Mutex_Release(&lep->lList); // Signal other end