From: John Hodge Date: Sun, 8 Jun 2014 05:54:47 +0000 (+0800) Subject: Kernel/IPCPipe - Add queue length limit, watch for _ERROR in select X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=0c2915f7f306013a29fc79ed69039eae3a26f337;p=tpg%2Facess2.git Kernel/IPCPipe - Add queue length limit, watch for _ERROR in select --- diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c index c13ca2a7..9e0c52d8 100644 --- a/KernelLand/Kernel/drv/dgram_pipe.c +++ b/KernelLand/Kernel/drv/dgram_pipe.c @@ -34,6 +34,7 @@ struct sIPCPipe_Endpoint tMutex lList; tIPCPipe_Packet *OutHead; tIPCPipe_Packet *OutTail; + size_t ByteCount; tVFS_Node Node; }; struct sIPCPipe_Channel @@ -50,8 +51,8 @@ struct sIPCPipe_Server tIPCPipe_Server *Prev; char *Name; size_t MaxBlockSize; // Max size of a 'packet' - size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server - size_t CurrentByteCount; + // NOTE: Not strictly enforced, can go MaxBlockSize-1 over + size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint tVFS_Node ServerNode; tRWLock lChannelList; tIPCPipe_Channel *FirstClient; @@ -324,13 +325,13 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D // Wait for a packet to be ready tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); - int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint"); + int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); if( !rv ) { errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; LEAVE('i', -1); return -1; } - if( channel->Server == NULL ) { + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { //errno = EIO; LEAVE('i', -1); return -1; @@ -351,6 +352,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D if(!rep->OutHead) rep->OutTail = NULL; VFS_MarkAvaliable(Node, !!rep->OutHead); + VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full Mutex_Release(&rep->lList); // Return @@ -363,7 +365,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D } else { - Log_Warning("IPCPipe", "No packet ready but semaphore returned"); + Log_Warning("IPCPipe", "No packet ready but select returned"); } LEAVE('i', ret); @@ -387,8 +389,21 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const LEAVE('i', 0); return 0; } - - // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time + + // Wait for a packet to be ready + tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); + int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); + ASSERTC(rv, >=, 0); + if( !rv ) { + errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; + LEAVE('i', -1); + return -1; + } + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { + //errno = EIO; + LEAVE('i', -1); + return -1; + } // Create packet structure tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length); @@ -413,6 +428,12 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const else lep->OutHead = pkt; lep->OutTail = pkt; + + lep->ByteCount += Length; + if( lep->ByteCount >= channel->Server->QueueByteLimit ) { + VFS_MarkFull(Node, 1); + } + Mutex_Release(&lep->lList); // Signal other end