Kernel - Start on SHM 'driver' (and common in-memory finddir/readdir)
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
index 0acae3a..9e0c52d 100644 (file)
@@ -34,6 +34,7 @@ struct sIPCPipe_Endpoint
        tMutex  lList;
        tIPCPipe_Packet *OutHead;
        tIPCPipe_Packet *OutTail;
+       size_t  ByteCount;
        tVFS_Node       Node;
 };
 struct sIPCPipe_Channel
@@ -50,8 +51,8 @@ struct sIPCPipe_Server
        tIPCPipe_Server *Prev;
        char    *Name;
        size_t  MaxBlockSize;   // Max size of a 'packet'
-       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
-       size_t  CurrentByteCount;
+       // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
+       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
        tVFS_Node       ServerNode;
        tRWLock lChannelList;
        tIPCPipe_Channel        *FirstClient;
@@ -64,15 +65,15 @@ struct sIPCPipe_Server
 // - Root
 tVFS_Node      *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
  int   IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node      *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node      *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
 // - Server
  int   IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node      *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node      *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
 void   IPCPipe_Server_Close(tVFS_Node *Node);
 // - Socket
 tIPCPipe_Channel       *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
 void   IPCPipe_Client_Close(tVFS_Node *Node);
 
 // === GLOBALS ===
@@ -179,7 +180,7 @@ int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
 /**
  * \return New client pointer
  */
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
 {
        tIPCPipe_Server *srv;
        ENTER("pNode sName", Node, Name);
@@ -198,9 +199,14 @@ tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
                return NULL;
        }
 
+       if( Flags & VFS_FDIRFLAG_STAT ) {
+               // LEAVE('p', srv->TplClientNode);
+               // return &srv->TplClientNode;
+       }
+
        // Create new client
-       tIPCPipe_Channel *new_client;
-       
+       tIPCPipe_Channel *new_client;   
+
        new_client = calloc(1, sizeof(tIPCPipe_Channel));
        new_client->Server = srv;
        new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
@@ -230,7 +236,7 @@ int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
        // 'next' is a valid entry, but readdir should never be called on this node
        return -1;
 }
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
 {
        tIPCPipe_Server *srv = Node->ImplPtr;
 
@@ -241,6 +247,7 @@ tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
                return NULL;
        }
        
+       // TODO: Need VFS_FDIRFLAG_NOBLOCK?
        VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
 
        tIPCPipe_Channel *conn;
@@ -303,7 +310,7 @@ tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, t
        }
        return ch;
 }
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
 {
        tIPCPipe_Endpoint       *lep, *rep;
        tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
@@ -317,8 +324,15 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        }
        
        // Wait for a packet to be ready
-       VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint");
-       if( channel->Server == NULL ) {
+       tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+       int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+       if( !rv ) {
+               errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+               LEAVE('i', -1);
+               return -1;
+       }
+       if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+               //errno = EIO;
                LEAVE('i', -1);
                return -1;
        }
@@ -328,6 +342,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        if( !rep->Node.ImplPtr )
        {
                Mutex_Release(&rep->lList);
+               //errno = EIO;
                LEAVE('i', -1);
                return -1;
        }
@@ -337,6 +352,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        if(!rep->OutHead)
                rep->OutTail = NULL;
        VFS_MarkAvaliable(Node, !!rep->OutHead);
+       VFS_MarkFull(&rep->Node, 0);    //      Just read a packet, remote shouldn't be full
        Mutex_Release(&rep->lList);
 
        // Return
@@ -349,14 +365,14 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D
        }
        else
        {
-               Log_Warning("IPCPipe", "No packet ready but semaphore returned");
+               Log_Warning("IPCPipe", "No packet ready but select returned");
        }
        
        LEAVE('i', ret);
        return ret;
 }
 
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
 {
        tIPCPipe_Endpoint       *lep, *rep;
        tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
@@ -373,8 +389,23 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
                LEAVE('i', 0);
                return 0;
        }
+       
+       // Wait for a packet to be ready
+       tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+       int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+       ASSERTC(rv, >=, 0);
+       if( !rv ) {
+               errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+               LEAVE('i', -1);
+               return -1;
+       }
+       if( (rv & VFS_SELECT_ERROR) ||  channel->Server == NULL ) {
+               //errno = EIO;
+               LEAVE('i', -1);
+               return -1;
+       }
 
-       // Create packet structure      
+       // Create packet structure
        tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
        pkt->Next = NULL;
        pkt->Offset = 0;
@@ -397,6 +428,12 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
        else
                lep->OutHead = pkt;
        lep->OutTail = pkt;
+       
+       lep->ByteCount += Length;
+       if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
+               VFS_MarkFull(Node, 1);
+       }
+       
        Mutex_Release(&lep->lList);
 
        // Signal other end

UCC git Repository :: git.ucc.asn.au