X-Git-Url: https://git.ucc.asn.au/?a=blobdiff_plain;f=KernelLand%2FKernel%2Fdrv%2Fdgram_pipe.c;h=c13ca2a71cb41cf1a472cdfcdd0bbeb7bf4ac0c8;hb=7479bf405c5173ab0b403ee4e6dd859580ed27d2;hp=f47450de1f591b8b0eda173c2728dc90cdadfe69;hpb=7ce8187996ab2728433ce08140eb99daa5e15b16;p=tpg%2Facess2.git diff --git a/KernelLand/Kernel/drv/dgram_pipe.c b/KernelLand/Kernel/drv/dgram_pipe.c index f47450de..c13ca2a7 100644 --- a/KernelLand/Kernel/drv/dgram_pipe.c +++ b/KernelLand/Kernel/drv/dgram_pipe.c @@ -5,6 +5,7 @@ * drv/dgram_pipe.c * - Connection+Datagram based local IPC */ +#define DEBUG 0 #include #include #include @@ -33,7 +34,6 @@ struct sIPCPipe_Endpoint tMutex lList; tIPCPipe_Packet *OutHead; tIPCPipe_Packet *OutTail; - tSemaphore InCount; tVFS_Node Node; }; struct sIPCPipe_Channel @@ -53,7 +53,6 @@ struct sIPCPipe_Server size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server size_t CurrentByteCount; tVFS_Node ServerNode; - tSemaphore ConnectionSemaphore; tRWLock lChannelList; tIPCPipe_Channel *FirstClient; tIPCPipe_Channel *LastClient; @@ -65,15 +64,15 @@ struct sIPCPipe_Server // - Root tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags); int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); -tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name); +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags); // - Server int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]); -tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name); +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags); void IPCPipe_Server_Close(tVFS_Node *Node); // - Socket tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep); -size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest); -size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src); +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags); +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags); void IPCPipe_Client_Close(tVFS_Node *Node); // === GLOBALS === @@ -98,7 +97,7 @@ tVFS_NodeType gIPCPipe_RootNodeType = { }; tDevFS_Driver gIPCPipe_DevFSInfo = { .Name = "ipcpipe", - .RootNode = {.Type=&gIPCPipe_RootNodeType} + .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1} }; // - Global list of servers tRWLock glIPCPipe_ServerList; @@ -123,6 +122,8 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags) { tIPCPipe_Server *srv; + ENTER("pNode sName xFlags", Node, Name, Flags); + // Write Lock RWLock_AcquireWrite(&glIPCPipe_ServerList); for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next ) @@ -133,6 +134,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags) if( srv ) { RWLock_Release(&glIPCPipe_ServerList); + LEAVE('n'); return NULL; } @@ -144,6 +146,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags) srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT; srv->ServerNode.Type = &gIPCPipe_ServerNodeType; srv->ServerNode.ImplPtr = srv; + srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY; // Lock already held if( gpIPCPipe_LastServer ) @@ -154,6 +157,7 @@ tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags) gpIPCPipe_LastServer = srv; RWLock_Release(&glIPCPipe_ServerList); + LEAVE('p', &srv->ServerNode); return &srv->ServerNode; } @@ -175,24 +179,33 @@ int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) /** * \return New client pointer */ -tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name) +tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags) { tIPCPipe_Server *srv; + ENTER("pNode sName", Node, Name); // Find server RWLock_AcquireRead(&glIPCPipe_ServerList); for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next ) { + LOG("== %s", srv->Name); if( strcmp(srv->Name, Name) == 0 ) break; } RWLock_Release(&glIPCPipe_ServerList); - if( !srv ) + if( !srv ) { + LEAVE('n'); return NULL; + } + + if( Flags & VFS_FDIRFLAG_STAT ) { + // LEAVE('p', srv->TplClientNode); + // return &srv->TplClientNode; + } // Create new client - tIPCPipe_Channel *new_client; - + tIPCPipe_Channel *new_client; + new_client = calloc(1, sizeof(tIPCPipe_Channel)); new_client->Server = srv; new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType; @@ -209,8 +222,10 @@ tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name) srv->LastClient = new_client; if(!srv->FirstUnseenClient) srv->FirstUnseenClient = new_client; + VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient); RWLock_Release(&srv->lChannelList); + LEAVE('p', &new_client->ClientEP.Node); return &new_client->ClientEP.Node; } @@ -220,14 +235,19 @@ int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]) // 'next' is a valid entry, but readdir should never be called on this node return -1; } -tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name) +tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags) { tIPCPipe_Server *srv = Node->ImplPtr; - - if( strcmp(Name, "next") != 0 ) - return NULL; - if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 ) + + ENTER("pNode sName", Node, Name); + + if( strcmp(Name, "newclient") != 0 ) { + LEAVE('n'); return NULL; + } + + // TODO: Need VFS_FDIRFLAG_NOBLOCK? + VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server"); tIPCPipe_Channel *conn; RWLock_AcquireRead(&srv->lChannelList); @@ -236,19 +256,22 @@ tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name) { srv->FirstUnseenClient = conn->Next; } + VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient); RWLock_Release(&srv->lChannelList); - if( !conn ) + if( !conn ) { + LEAVE('n'); return NULL; + } // Success + LEAVE('p', &conn->ServerEP.Node); return &conn->ServerEP.Node; } void IPCPipe_Server_Close(tVFS_Node *Node) { tIPCPipe_Server *srv = Node->ImplPtr; -// Semaphore_Cancel(&srv->ConnectionSemaphore); // Flag server as being destroyed @@ -279,57 +302,95 @@ void IPCPipe_Server_Close(tVFS_Node *Node) tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep) { tIPCPipe_Channel *ch = Node->ImplPtr; - if( !ch ) return NULL; - *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP); - *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP); + if( ch ) + { + *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP); + *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP); + } return ch; } -size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest) +size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags) { tIPCPipe_Endpoint *lep, *rep; tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); - - if( channel->Server == NULL ) + + ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest); + + // Closed endpoint / channel + if( !channel || channel->Server == NULL ) { + LEAVE('i', -1); return -1; + } - if( Semaphore_Wait(&lep->InCount, 1) != 1 ) - { - if( channel->Server == NULL ) - return -1; - return 0; + // Wait for a packet to be ready + tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL); + int rv = VFS_SelectNode(Node, VFS_SELECT_READ, timeout, "IPCPipe Endpoint"); + if( !rv ) { + errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR; + LEAVE('i', -1); + return -1; + } + if( channel->Server == NULL ) { + //errno = EIO; + LEAVE('i', -1); + return -1; } + // Pop a packet from the list Mutex_Acquire(&rep->lList); + if( !rep->Node.ImplPtr ) + { + Mutex_Release(&rep->lList); + //errno = EIO; + LEAVE('i', -1); + return -1; + } tIPCPipe_Packet *pkt = rep->OutHead; if(pkt) rep->OutHead = pkt->Next; if(!rep->OutHead) rep->OutTail = NULL; + VFS_MarkAvaliable(Node, !!rep->OutHead); Mutex_Release(&rep->lList); - if(!pkt) - return 0; - - size_t ret = MIN(pkt->Length, Length); - memcpy(Dest, pkt->Data, ret); - free(pkt); + // Return + size_t ret = 0; + if(pkt) + { + ret = MIN(pkt->Length, Length); + memcpy(Dest, pkt->Data, ret); + free(pkt); + } + else + { + Log_Warning("IPCPipe", "No packet ready but semaphore returned"); + } + LEAVE('i', ret); return ret; } -size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src) +size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags) { tIPCPipe_Endpoint *lep, *rep; tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep); + + ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src); // Ensure the server hasn't been closed - if( !channel || channel->Server == NULL ) + if( !channel || channel->Server == NULL ) { + LEAVE('i', -1); return -1; + } - if( Length > channel->Server->MaxBlockSize ) + if( Length > channel->Server->MaxBlockSize ) { + LEAVE('i', 0); return 0; + } + + // TODO: Ensure that no more than DEF_MAX_BYTE_LIMIT bytes are in flight at one time - // Create packet structure + // Create packet structure tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length); pkt->Next = NULL; pkt->Offset = 0; @@ -344,6 +405,7 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const // Client was closed by another thread free(pkt); Mutex_Release(&lep->lList); + LEAVE('i', -1); return -1; } if(lep->OutTail) @@ -354,8 +416,9 @@ size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const Mutex_Release(&lep->lList); // Signal other end - Semaphore_Signal(&rep->InCount, 1); + VFS_MarkAvaliable(&rep->Node, 1); + LEAVE('i', Length); return Length; } @@ -363,9 +426,17 @@ void IPCPipe_Client_Close(tVFS_Node *Node) { tIPCPipe_Endpoint *lep, *rep; tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep); + + ENTER("pNode", Node); + + if( !ch ) { + Log_Warning("IPCPipe", "Endpoint %p double-closed", ch); + return ; + } // Mark client as closed Node->ImplPtr = NULL; + // Clear packets Mutex_Acquire(&lep->lList); while( lep->OutHead ) { tIPCPipe_Packet *next = lep->OutHead->Next; @@ -373,10 +444,15 @@ void IPCPipe_Client_Close(tVFS_Node *Node) lep->OutHead = next; } Mutex_Release(&lep->lList); + LOG("Packets cleared"); + // Tell remote that local has closed + VFS_MarkError(&rep->Node, 1); + // TODO: Deliver SIGPIPE or similar to all owners of remote // Clean up if both sides are closed if( rep->Node.ImplPtr == NULL ) { + LOG("Remote closed, cleaning up"); tIPCPipe_Server *srv = ch->Server; if( srv ) { @@ -395,4 +471,5 @@ void IPCPipe_Client_Close(tVFS_Node *Node) } free(ch); } + LEAVE('-'); }