Kernel/IPCPipe - Add queue length limit, watch for _ERROR in select
authorJohn Hodge <[email protected]>
Sun, 8 Jun 2014 05:54:47 +0000 (13:54 +0800)
committerJohn Hodge <[email protected]>
Sun, 8 Jun 2014 05:54:47 +0000 (13:54 +0800)
KernelLand/Kernel/drv/dgram_pipe.c

index c13ca2a..9e0c52d 100644 (file)
@@ -34,6 +34,7 @@ struct sIPCPipe_Endpoint
        tMutex  lList;
        tIPCPipe_Packet *OutHead;
        tIPCPipe_Packet *OutTail;
+       size_t  ByteCount;
        tVFS_Node       Node;
 };
 struct sIPCPipe_Channel
@@ -50,8 +51,8 @@ struct sIPCPipe_Server
        tIPCPipe_Server *Prev;
        char    *Name;
        size_t  MaxBlockSize;   // Max size of a 'packet'
-       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
-       size_t  CurrentByteCount;
+       // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
+       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
        tVFS_Node       ServerNode;
        tRWLock lChannelList;
        tIPCPipe_Channel        *FirstClient;
@@ -324,13 +325,13 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        
        // Wait for a packet to be ready
        tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
-       int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint");
+       int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
        if( !rv ) {
                errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
                LEAVE('i', -1);
                return -1;
        }
-       if( channel->Server == NULL ) {
+       if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
                //errno = EIO;
                LEAVE('i', -1);
                return -1;
@@ -351,6 +352,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        if(!rep->OutHead)
                rep->OutTail = NULL;
        VFS_MarkAvaliable(Node, !!rep->OutHead);
+       VFS_MarkFull(&rep->Node, 0);    //      Just read a packet, remote shouldn't be full
        Mutex_Release(&rep->lList);
 
        // Return
@@ -363,7 +365,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        }
        else
        {
-               Log_Warning("IPCPipe", "No packet ready but semaphore returned");
+               Log_Warning("IPCPipe", "No packet ready but select returned");
        }
        
        LEAVE('i', ret);
@@ -387,8 +389,21 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
                LEAVE('i', 0);
                return 0;
        }
-
-       // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time
+       
+       // Wait for a packet to be ready
+       tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+       int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+       ASSERTC(rv, >=, 0);
+       if( !rv ) {
+               errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+               LEAVE('i', -1);
+               return -1;
+       }
+       if( (rv & VFS_SELECT_ERROR) ||  channel->Server == NULL ) {
+               //errno = EIO;
+               LEAVE('i', -1);
+               return -1;
+       }
 
        // Create packet structure
        tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
@@ -413,6 +428,12 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
        else
                lep->OutHead = pkt;
        lep->OutTail = pkt;
+       
+       lep->ByteCount += Length;
+       if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
+               VFS_MarkFull(Node, 1);
+       }
+       
        Mutex_Release(&lep->lList);
 
        // Signal other end

UCC git Repository :: git.ucc.asn.au