* drv/dgram_pipe.c
* - Connection+Datagram based local IPC
*/
+#define DEBUG 0
#include <vfs.h>
#include <fs_devfs.h>
#include <modules.h>
tMutex lList;
tIPCPipe_Packet *OutHead;
tIPCPipe_Packet *OutTail;
- tSemaphore InCount;
+ size_t ByteCount;
tVFS_Node Node;
};
struct sIPCPipe_Channel
tIPCPipe_Server *Prev;
char *Name;
size_t MaxBlockSize; // Max size of a 'packet'
- size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
- size_t CurrentByteCount;
+ // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
+ size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
tVFS_Node ServerNode;
- tSemaphore ConnectionSemaphore;
tRWLock lChannelList;
tIPCPipe_Channel *FirstClient;
tIPCPipe_Channel *LastClient;
// - Root
tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
// - Server
int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
void IPCPipe_Server_Close(tVFS_Node *Node);
// - Socket
tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
void IPCPipe_Client_Close(tVFS_Node *Node);
// === GLOBALS ===
};
tVFS_NodeType gIPCPipe_ChannelNodeType = {
.TypeName = "IPC Pipe - Channel",
+ .Flags = VFS_NODETYPEFLAG_STREAM,
.Read = IPCPipe_Client_Read,
.Write = IPCPipe_Client_Write,
.Close = IPCPipe_Client_Close
};
tDevFS_Driver gIPCPipe_DevFSInfo = {
.Name = "ipcpipe",
- .RootNode = {.Type=&gIPCPipe_RootNodeType}
+ .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
};
// - Global list of servers
tRWLock glIPCPipe_ServerList;
{
tIPCPipe_Server *srv;
+ ENTER("pNode sName xFlags", Node, Name, Flags);
+
// Write Lock
RWLock_AcquireWrite(&glIPCPipe_ServerList);
for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
if( srv )
{
RWLock_Release(&glIPCPipe_ServerList);
+ LEAVE('n');
return NULL;
}
srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
srv->ServerNode.ImplPtr = srv;
+ srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
// Lock already held
if( gpIPCPipe_LastServer )
gpIPCPipe_LastServer = srv;
RWLock_Release(&glIPCPipe_ServerList);
+ LEAVE('p', &srv->ServerNode);
return &srv->ServerNode;
}
/**
* \return New client pointer
*/
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
{
tIPCPipe_Server *srv;
+ ENTER("pNode sName", Node, Name);
// Find server
RWLock_AcquireRead(&glIPCPipe_ServerList);
for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
{
+ LOG("== %s", srv->Name);
if( strcmp(srv->Name, Name) == 0 )
break;
}
RWLock_Release(&glIPCPipe_ServerList);
- if( !srv )
+ if( !srv ) {
+ LEAVE('n');
return NULL;
+ }
+
+ if( Flags & VFS_FDIRFLAG_STAT ) {
+ // LEAVE('p', srv->TplClientNode);
+ // return &srv->TplClientNode;
+ }
// Create new client
- tIPCPipe_Channel *new_client;
-
+ tIPCPipe_Channel *new_client;
+
new_client = calloc(1, sizeof(tIPCPipe_Channel));
new_client->Server = srv;
new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
new_client->ClientEP.Node.ImplPtr = new_client;
+ new_client->ClientEP.Node.Size = -1;
new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
new_client->ServerEP.Node.ImplPtr = new_client;
+ new_client->ServerEP.Node.Size = -1;
// Append to server list
RWLock_AcquireWrite(&srv->lChannelList);
srv->LastClient = new_client;
if(!srv->FirstUnseenClient)
srv->FirstUnseenClient = new_client;
+ VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
RWLock_Release(&srv->lChannelList);
+ LEAVE('p', &new_client->ClientEP.Node);
return &new_client->ClientEP.Node;
}
// 'next' is a valid entry, but readdir should never be called on this node
return -1;
}
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
{
tIPCPipe_Server *srv = Node->ImplPtr;
-
- if( strcmp(Name, "next") != 0 )
- return NULL;
- if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
+
+ ENTER("pNode sName", Node, Name);
+
+ if( strcmp(Name, "newclient") != 0 ) {
+ LEAVE('n');
return NULL;
+ }
+
+ // TODO: Need VFS_FDIRFLAG_NOBLOCK?
+ VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
tIPCPipe_Channel *conn;
RWLock_AcquireRead(&srv->lChannelList);
{
srv->FirstUnseenClient = conn->Next;
}
+ VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
RWLock_Release(&srv->lChannelList);
- if( !conn )
+ if( !conn ) {
+ LEAVE('n');
return NULL;
+ }
// Success
+ LEAVE('p', &conn->ServerEP.Node);
return &conn->ServerEP.Node;
}
void IPCPipe_Server_Close(tVFS_Node *Node)
{
tIPCPipe_Server *srv = Node->ImplPtr;
-// Semaphore_Cancel(&srv->ConnectionSemaphore);
// Flag server as being destroyed
tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
{
tIPCPipe_Channel *ch = Node->ImplPtr;
- if( !ch ) return NULL;
- *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
- *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+ if( ch )
+ {
+ *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
+ *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+ }
return ch;
}
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
-
- if( channel->Server == NULL )
+
+ ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
+
+ // Closed endpoint / channel
+ if( !channel || channel->Server == NULL ) {
+ LEAVE('i', -1);
return -1;
+ }
- if( Semaphore_Wait(&lep->InCount, 1) != 1 )
- {
- if( channel->Server == NULL )
- return -1;
- return 0;
+ // Wait for a packet to be ready
+ tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+ int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+ if( !rv ) {
+ errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+ LEAVE('i', -1);
+ return -1;
+ }
+ if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+ //errno = EIO;
+ LEAVE('i', -1);
+ return -1;
}
+ // Pop a packet from the list
Mutex_Acquire(&rep->lList);
+ if( !rep->Node.ImplPtr )
+ {
+ Mutex_Release(&rep->lList);
+ //errno = EIO;
+ LEAVE('i', -1);
+ return -1;
+ }
tIPCPipe_Packet *pkt = rep->OutHead;
if(pkt)
rep->OutHead = pkt->Next;
if(!rep->OutHead)
rep->OutTail = NULL;
+ VFS_MarkAvaliable(Node, !!rep->OutHead);
+ VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full
Mutex_Release(&rep->lList);
- if(!pkt)
- return 0;
-
- size_t ret = MIN(pkt->Length, Length);
- memcpy(Dest, pkt->Data, ret);
- free(pkt);
+ // Return
+ size_t ret = 0;
+ if(pkt)
+ {
+ ret = MIN(pkt->Length, Length);
+ memcpy(Dest, pkt->Data, ret);
+ free(pkt);
+ }
+ else
+ {
+ Log_Warning("IPCPipe", "No packet ready but select returned");
+ }
+ LEAVE('i', ret);
return ret;
}
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+ ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
// Ensure the server hasn't been closed
- if( !channel || channel->Server == NULL )
+ if( !channel || channel->Server == NULL ) {
+ LEAVE('i', -1);
return -1;
+ }
- if( Length > channel->Server->MaxBlockSize )
+ if( Length > channel->Server->MaxBlockSize ) {
+ LEAVE('i', 0);
return 0;
+ }
+
+ // Wait for a packet to be ready
+ tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+ int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+ ASSERTC(rv, >=, 0);
+ if( !rv ) {
+ errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+ LEAVE('i', -1);
+ return -1;
+ }
+ if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+ //errno = EIO;
+ LEAVE('i', -1);
+ return -1;
+ }
- // Create packet structure
+ // Create packet structure
tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
pkt->Next = NULL;
pkt->Offset = 0;
// Client was closed by another thread
free(pkt);
Mutex_Release(&lep->lList);
+ LEAVE('i', -1);
return -1;
}
if(lep->OutTail)
else
lep->OutHead = pkt;
lep->OutTail = pkt;
+
+ lep->ByteCount += Length;
+ if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
+ VFS_MarkFull(Node, 1);
+ }
+
Mutex_Release(&lep->lList);
// Signal other end
- Semaphore_Signal(&rep->InCount, 1);
+ VFS_MarkAvaliable(&rep->Node, 1);
+ LEAVE('i', Length);
return Length;
}
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+ ENTER("pNode", Node);
+
+ if( !ch ) {
+ Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
+ return ;
+ }
// Mark client as closed
Node->ImplPtr = NULL;
+ // Clear packets
Mutex_Acquire(&lep->lList);
while( lep->OutHead ) {
tIPCPipe_Packet *next = lep->OutHead->Next;
lep->OutHead = next;
}
Mutex_Release(&lep->lList);
+ LOG("Packets cleared");
+ // Tell remote that local has closed
+ VFS_MarkError(&rep->Node, 1);
+ // TODO: Deliver SIGPIPE or similar to all owners of remote
// Clean up if both sides are closed
if( rep->Node.ImplPtr == NULL )
{
+ LOG("Remote closed, cleaning up");
tIPCPipe_Server *srv = ch->Server;
if( srv )
{
}
free(ch);
}
+ LEAVE('-');
}