Kernel - Start on SHM 'driver' (and common in-memory finddir/readdir)
[tpg/acess2.git] / KernelLand / Kernel / drv / dgram_pipe.c
index f47450d..9e0c52d 100644 (file)
@@ -5,6 +5,7 @@
  * drv/dgram_pipe.c
  * - Connection+Datagram based local IPC
  */
+#define DEBUG  0
 #include <vfs.h>
 #include <fs_devfs.h>
 #include <modules.h>
@@ -33,7 +34,7 @@ struct sIPCPipe_Endpoint
        tMutex  lList;
        tIPCPipe_Packet *OutHead;
        tIPCPipe_Packet *OutTail;
-       tSemaphore      InCount;
+       size_t  ByteCount;
        tVFS_Node       Node;
 };
 struct sIPCPipe_Channel
@@ -50,10 +51,9 @@ struct sIPCPipe_Server
        tIPCPipe_Server *Prev;
        char    *Name;
        size_t  MaxBlockSize;   // Max size of a 'packet'
-       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for this server
-       size_t  CurrentByteCount;
+       // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
+       size_t  QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
        tVFS_Node       ServerNode;
-       tSemaphore      ConnectionSemaphore;
        tRWLock lChannelList;
        tIPCPipe_Channel        *FirstClient;
        tIPCPipe_Channel        *LastClient;
@@ -65,15 +65,15 @@ struct sIPCPipe_Server
 // - Root
 tVFS_Node      *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
  int   IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node      *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node      *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
 // - Server
  int   IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node      *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node      *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
 void   IPCPipe_Server_Close(tVFS_Node *Node);
 // - Socket
 tIPCPipe_Channel       *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
 void   IPCPipe_Client_Close(tVFS_Node *Node);
 
 // === GLOBALS ===
@@ -98,7 +98,7 @@ tVFS_NodeType gIPCPipe_RootNodeType = {
 };
 tDevFS_Driver  gIPCPipe_DevFSInfo = {
        .Name = "ipcpipe",
-       .RootNode = {.Type=&gIPCPipe_RootNodeType}
+       .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
 };
 // - Global list of servers
 tRWLock        glIPCPipe_ServerList;
@@ -123,6 +123,8 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
 {
        tIPCPipe_Server *srv;
        
+       ENTER("pNode sName xFlags", Node, Name, Flags);
+       
        // Write Lock
        RWLock_AcquireWrite(&glIPCPipe_ServerList);
        for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
@@ -133,6 +135,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
        if( srv )
        {
                RWLock_Release(&glIPCPipe_ServerList);
+               LEAVE('n');
                return NULL;
        }
        
@@ -144,6 +147,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
        srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
        srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
        srv->ServerNode.ImplPtr = srv;
+       srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
 
        // Lock already held
        if( gpIPCPipe_LastServer )
@@ -154,6 +158,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags)
        gpIPCPipe_LastServer = srv;
        RWLock_Release(&glIPCPipe_ServerList);
 
+       LEAVE('p', &srv->ServerNode);
        return &srv->ServerNode;
 }
 
@@ -175,24 +180,33 @@ int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
 /**
  * \return New client pointer
  */
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
 {
        tIPCPipe_Server *srv;
+       ENTER("pNode sName", Node, Name);
        
        // Find server
        RWLock_AcquireRead(&glIPCPipe_ServerList);
        for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
        {
+               LOG("== %s", srv->Name);
                if( strcmp(srv->Name, Name) == 0 )
                        break;
        }
        RWLock_Release(&glIPCPipe_ServerList);
-       if( !srv )
+       if( !srv ) {
+               LEAVE('n');
                return NULL;
+       }
+
+       if( Flags & VFS_FDIRFLAG_STAT ) {
+               // LEAVE('p', srv->TplClientNode);
+               // return &srv->TplClientNode;
+       }
 
        // Create new client
-       tIPCPipe_Channel *new_client;
-       
+       tIPCPipe_Channel *new_client;   
+
        new_client = calloc(1, sizeof(tIPCPipe_Channel));
        new_client->Server = srv;
        new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
@@ -209,8 +223,10 @@ tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
        srv->LastClient = new_client;
        if(!srv->FirstUnseenClient)
                srv->FirstUnseenClient = new_client;
+       VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
        RWLock_Release(&srv->lChannelList);
        
+       LEAVE('p', &new_client->ClientEP.Node);
        return &new_client->ClientEP.Node;
 }
 
@@ -220,14 +236,19 @@ int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX])
        // 'next' is a valid entry, but readdir should never be called on this node
        return -1;
 }
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
 {
        tIPCPipe_Server *srv = Node->ImplPtr;
-       
-       if( strcmp(Name, "next") != 0 )
-               return NULL;
-       if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
+
+       ENTER("pNode sName", Node, Name);       
+
+       if( strcmp(Name, "newclient") != 0 ) {
+               LEAVE('n');
                return NULL;
+       }
+       
+       // TODO: Need VFS_FDIRFLAG_NOBLOCK?
+       VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
 
        tIPCPipe_Channel *conn;
        RWLock_AcquireRead(&srv->lChannelList);
@@ -236,19 +257,22 @@ tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
        {
                srv->FirstUnseenClient = conn->Next;
        }
+       VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
        RWLock_Release(&srv->lChannelList);
 
-       if( !conn )
+       if( !conn ) {
+               LEAVE('n');
                return NULL;
+       }
        
        // Success
+       LEAVE('p', &conn->ServerEP.Node);
        return &conn->ServerEP.Node;
 }
 
 void IPCPipe_Server_Close(tVFS_Node *Node)
 {
        tIPCPipe_Server *srv = Node->ImplPtr;
-//     Semaphore_Cancel(&srv->ConnectionSemaphore);
 
        // Flag server as being destroyed
 
@@ -279,57 +303,109 @@ void IPCPipe_Server_Close(tVFS_Node *Node)
 tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
 {
        tIPCPipe_Channel        *ch = Node->ImplPtr;
-       if( !ch )       return NULL;
-       *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
-       *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+       if( ch )
+       {
+               *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
+               *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+       }
        return ch;
 }
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
 {
        tIPCPipe_Endpoint       *lep, *rep;
        tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
-       
-       if( channel->Server == NULL )
+
+       ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);       
+
+       // Closed endpoint / channel
+       if( !channel || channel->Server == NULL ) {
+               LEAVE('i', -1);
                return -1;
+       }
        
-       if( Semaphore_Wait(&lep->InCount, 1) != 1 )
-       {
-               if( channel->Server == NULL )
-                       return -1;
-               return 0;
+       // Wait for a packet to be ready
+       tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+       int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+       if( !rv ) {
+               errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+               LEAVE('i', -1);
+               return -1;
+       }
+       if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+               //errno = EIO;
+               LEAVE('i', -1);
+               return -1;
        }
 
+       // Pop a packet from the list
        Mutex_Acquire(&rep->lList);
+       if( !rep->Node.ImplPtr )
+       {
+               Mutex_Release(&rep->lList);
+               //errno = EIO;
+               LEAVE('i', -1);
+               return -1;
+       }
        tIPCPipe_Packet *pkt = rep->OutHead;
        if(pkt)
                rep->OutHead = pkt->Next;
        if(!rep->OutHead)
                rep->OutTail = NULL;
+       VFS_MarkAvaliable(Node, !!rep->OutHead);
+       VFS_MarkFull(&rep->Node, 0);    //      Just read a packet, remote shouldn't be full
        Mutex_Release(&rep->lList);
 
-       if(!pkt)
-               return 0;
-
-       size_t ret = MIN(pkt->Length, Length);
-       memcpy(Dest, pkt->Data, ret);
-       free(pkt);
+       // Return
+       size_t ret = 0;
+       if(pkt)
+       {
+               ret = MIN(pkt->Length, Length);
+               memcpy(Dest, pkt->Data, ret);
+               free(pkt);
+       }
+       else
+       {
+               Log_Warning("IPCPipe", "No packet ready but select returned");
+       }
        
+       LEAVE('i', ret);
        return ret;
 }
 
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
 {
        tIPCPipe_Endpoint       *lep, *rep;
        tIPCPipe_Channel        *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+       ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); 
        
        // Ensure the server hasn't been closed
-       if( !channel || channel->Server == NULL )
+       if( !channel || channel->Server == NULL ) {
+               LEAVE('i', -1);
                return -1;
+       }
 
-       if( Length > channel->Server->MaxBlockSize )
+       if( Length > channel->Server->MaxBlockSize ) {
+               LEAVE('i', 0);
                return 0;
+       }
+       
+       // Wait for a packet to be ready
+       tTime   timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+       int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+       ASSERTC(rv, >=, 0);
+       if( !rv ) {
+               errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+               LEAVE('i', -1);
+               return -1;
+       }
+       if( (rv & VFS_SELECT_ERROR) ||  channel->Server == NULL ) {
+               //errno = EIO;
+               LEAVE('i', -1);
+               return -1;
+       }
 
-       // Create packet structure      
+       // Create packet structure
        tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
        pkt->Next = NULL;
        pkt->Offset = 0;
@@ -344,6 +420,7 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
                // Client was closed by another thread
                free(pkt);
                Mutex_Release(&lep->lList);
+               LEAVE('i', -1);
                return -1;
        }
        if(lep->OutTail)
@@ -351,11 +428,18 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const
        else
                lep->OutHead = pkt;
        lep->OutTail = pkt;
+       
+       lep->ByteCount += Length;
+       if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
+               VFS_MarkFull(Node, 1);
+       }
+       
        Mutex_Release(&lep->lList);
 
        // Signal other end
-       Semaphore_Signal(&rep->InCount, 1);
+       VFS_MarkAvaliable(&rep->Node, 1);
 
+       LEAVE('i', Length);
        return Length;
 }
 
@@ -363,9 +447,17 @@ void IPCPipe_Client_Close(tVFS_Node *Node)
 {
        tIPCPipe_Endpoint       *lep, *rep;
        tIPCPipe_Channel        *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+       ENTER("pNode", Node);
+
+       if( !ch ) {
+               Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
+               return ;
+       }
        
        // Mark client as closed
        Node->ImplPtr = NULL;
+       // Clear packets
        Mutex_Acquire(&lep->lList);
        while( lep->OutHead ) {
                tIPCPipe_Packet *next = lep->OutHead->Next;
@@ -373,10 +465,15 @@ void IPCPipe_Client_Close(tVFS_Node *Node)
                lep->OutHead = next;
        }
        Mutex_Release(&lep->lList);
+       LOG("Packets cleared");
        
+       // Tell remote that local has closed
+       VFS_MarkError(&rep->Node, 1);
+       // TODO: Deliver SIGPIPE or similar to all owners of remote
        // Clean up if both sides are closed
        if( rep->Node.ImplPtr == NULL )
        {
+               LOG("Remote closed, cleaning up");
                tIPCPipe_Server *srv = ch->Server;
                if( srv )
                {
@@ -395,4 +492,5 @@ void IPCPipe_Client_Close(tVFS_Node *Node)
                }
                free(ch);
        }
+       LEAVE('-');
 }

UCC git Repository :: git.ucc.asn.au