X-Git-Url: https://git.ucc.asn.au/?a=blobdiff_plain;f=KernelLand%2FKernel%2Fdrv%2Fdgram_pipe.c;h=667190eb0ad8b3cdc3f88e7f18866d46883f2f10;hb=845b6f9d90bb87b5e760e4d49aa93b0e003ab750;hp=c13ca2a71cb41cf1a472cdfcdd0bbeb7bf4ac0c8;hpb=015f48988e0ff398409d71dfc692005ab439490a;p=tpg%2Facess2.git diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c index c13ca2a7..667190eb 100644 --- a/KernelLand/Kernel/drv/dgram_pipe.c +++ b/KernelLand/Kernel/drv/dgram_pipe.c @@ -34,6 +34,7 @@ struct sIPCPipe_Endpoint tMutex lList; tIPCPipe_Packet *OutHead; tIPCPipe_Packet *OutTail; + size_t ByteCount; tVFS_Node Node; }; struct sIPCPipe_Channel @@ -50,8 +51,8 @@ struct sIPCPipe_Server tIPCPipe_Server *Prev; char *Name; size_t MaxBlockSize; // Max size of a 'packet' - size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server - size_t CurrentByteCount; + // NOTE: Not strictly enforced, can go MaxBlockSize-1 over + size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint tVFS_Node ServerNode; tRWLock lChannelList; tIPCPipe_Channel *FirstClient; @@ -85,6 +86,7 @@ tVFS_NodeType gIPCPipe_ServerNodeType = { }; tVFS_NodeType gIPCPipe_ChannelNodeType = { .TypeName = "IPC Pipe - Channel", + .Flags = VFS_NODETYPEFLAG_STREAM, .Read = IPCPipe_Client_Read, .Write = IPCPipe_Client_Write, .Close = IPCPipe_Client_Close @@ -210,8 +212,10 @@ tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags) new_client->Server = srv; new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType; new_client->ClientEP.Node.ImplPtr = new_client; + new_client->ClientEP.Node.Size = -1; new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType; new_client->ServerEP.Node.ImplPtr = new_client; + new_client->ServerEP.Node.Size = -1; // Append to server list RWLock_AcquireWrite(&srv->lChannelList); @@ -324,13 +328,13 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D // Wait for a packet to be ready tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); - int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint"); + int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); if( !rv ) { errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; LEAVE('i', -1); return -1; } - if( channel->Server == NULL ) { + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { //errno = EIO; LEAVE('i', -1); return -1; @@ -351,6 +355,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D if(!rep->OutHead) rep->OutTail = NULL; VFS_MarkAvaliable(Node, !!rep->OutHead); + VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full Mutex_Release(&rep->lList); // Return @@ -363,7 +368,7 @@ size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *D } else { - Log_Warning("IPCPipe", "No packet ready but semaphore returned"); + Log_Warning("IPCPipe", "No packet ready but select returned"); } LEAVE('i', ret); @@ -387,8 +392,21 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const LEAVE('i', 0); return 0; } - - // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time + + // Wait for a packet to be ready + tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); + int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint"); + ASSERTC(rv, >=, 0); + if( !rv ) { + errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; + LEAVE('i', -1); + return -1; + } + if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) { + //errno = EIO; + LEAVE('i', -1); + return -1; + } // Create packet structure tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length); @@ -413,6 +431,12 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const else lep->OutHead = pkt; lep->OutTail = pkt; + + lep->ByteCount += Length; + if( lep->ByteCount >= channel->Server->QueueByteLimit ) { + VFS_MarkFull(Node, 1); + } + Mutex_Release(&lep->lList); // Signal other end