* drv/dgram_pipe.c
* - Connection+Datagram based local IPC
*/
+#define DEBUG 1
#include <vfs.h>
#include <fs_devfs.h>
#include <modules.h>
tMutex lList;
tIPCPipe_Packet *OutHead;
tIPCPipe_Packet *OutTail;
- tSemaphore InCount;
tVFS_Node Node;
};
struct sIPCPipe_Channel
size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
size_t CurrentByteCount;
tVFS_Node ServerNode;
- tSemaphore ConnectionSemaphore;
tRWLock lChannelList;
tIPCPipe_Channel *FirstClient;
tIPCPipe_Channel *LastClient;
};
tDevFS_Driver gIPCPipe_DevFSInfo = {
.Name = "ipcpipe",
- .RootNode = {.Type=&gIPCPipe_RootNodeType}
+ .RootNode = {.Type=&gIPCPipe_RootNodeType,.Flags=VFS_FFLAG_DIRECTORY,.Size=-1}
};
// - Global list of servers
tRWLock glIPCPipe_ServerList;
{
tIPCPipe_Server *srv;
+ ENTER("pNode sName xFlags", Node, Name, Flags);
+
// Write Lock
RWLock_AcquireWrite(&glIPCPipe_ServerList);
for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
if( srv )
{
RWLock_Release(&glIPCPipe_ServerList);
+ LEAVE('n');
return NULL;
}
srv->QueueByteLimit = DEF_MAX_BYTE_LIMIT;
srv->ServerNode.Type = &gIPCPipe_ServerNodeType;
srv->ServerNode.ImplPtr = srv;
+ srv->ServerNode.Flags = VFS_FFLAG_DIRECTORY;
// Lock already held
if( gpIPCPipe_LastServer )
gpIPCPipe_LastServer = srv;
RWLock_Release(&glIPCPipe_ServerList);
+ LEAVE('p', &srv->ServerNode);
return &srv->ServerNode;
}
tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
{
tIPCPipe_Server *srv;
+ ENTER("pNode sName", Node, Name);
// Find server
RWLock_AcquireRead(&glIPCPipe_ServerList);
for( srv = gpIPCPipe_FirstServer; srv; srv = srv->Next )
{
+ LOG("== %s", srv->Name);
if( strcmp(srv->Name, Name) == 0 )
break;
}
RWLock_Release(&glIPCPipe_ServerList);
- if( !srv )
+ if( !srv ) {
+ LEAVE('n');
return NULL;
+ }
// Create new client
tIPCPipe_Channel *new_client;
srv->LastClient = new_client;
if(!srv->FirstUnseenClient)
srv->FirstUnseenClient = new_client;
+ VFS_MarkAvaliable(&srv->ServerNode, !!srv->FirstUnseenClient);
RWLock_Release(&srv->lChannelList);
+ LEAVE('p', &new_client->ClientEP.Node);
return &new_client->ClientEP.Node;
}
tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
{
tIPCPipe_Server *srv = Node->ImplPtr;
-
- if( strcmp(Name, "next") != 0 )
- return NULL;
- if( Semaphore_Wait( &srv->ConnectionSemaphore, 1 ) != 1 )
+
+ ENTER("pNode sName", Node, Name);
+
+ if( strcmp(Name, "newclient") != 0 ) {
+ LEAVE('n');
return NULL;
+ }
+
+ VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
tIPCPipe_Channel *conn;
RWLock_AcquireRead(&srv->lChannelList);
{
srv->FirstUnseenClient = conn->Next;
}
+ VFS_MarkAvaliable(Node, !!srv->FirstUnseenClient);
RWLock_Release(&srv->lChannelList);
- if( !conn )
+ if( !conn ) {
+ LEAVE('n');
return NULL;
+ }
// Success
+ LEAVE('p', &conn->ServerEP.Node);
return &conn->ServerEP.Node;
}
void IPCPipe_Server_Close(tVFS_Node *Node)
{
tIPCPipe_Server *srv = Node->ImplPtr;
-// Semaphore_Cancel(&srv->ConnectionSemaphore);
// Flag server as being destroyed
tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep)
{
tIPCPipe_Channel *ch = Node->ImplPtr;
- if( !ch ) return NULL;
- *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
- *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+ if( ch )
+ {
+ *lep = (Node == &ch->ServerEP.Node ? &ch->ServerEP : &ch->ClientEP);
+ *rep = (Node == &ch->ServerEP.Node ? &ch->ClientEP : &ch->ServerEP);
+ }
return ch;
}
size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
-
- if( channel->Server == NULL )
+
+ ENTER("pNode xOffset xLength pDest", Node, Offset, Length, Dest);
+
+ // Closed endpoint / channel
+ if( !channel || channel->Server == NULL ) {
+ LEAVE('i', -1);
return -1;
+ }
- if( Semaphore_Wait(&lep->InCount, 1) != 1 )
- {
- if( channel->Server == NULL )
- return -1;
- return 0;
+ // Wait for a packet to be ready
+ VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint");
+ if( channel->Server == NULL ) {
+ LEAVE('i', -1);
+ return -1;
}
+ // Pop a packet from the list
Mutex_Acquire(&rep->lList);
+ if( !rep->Node.ImplPtr )
+ {
+ Mutex_Release(&rep->lList);
+ LEAVE('i', -1);
+ return -1;
+ }
tIPCPipe_Packet *pkt = rep->OutHead;
if(pkt)
rep->OutHead = pkt->Next;
if(!rep->OutHead)
rep->OutTail = NULL;
+ VFS_MarkAvaliable(Node, !!rep->OutHead);
Mutex_Release(&rep->lList);
- if(!pkt)
- return 0;
-
- size_t ret = MIN(pkt->Length, Length);
- memcpy(Dest, pkt->Data, ret);
- free(pkt);
+ // Return
+ size_t ret = 0;
+ if(pkt)
+ {
+ ret = MIN(pkt->Length, Length);
+ memcpy(Dest, pkt->Data, ret);
+ free(pkt);
+ }
+ else
+ {
+ Log_Warning("IPCPipe", "No packet ready but semaphore returned");
+ }
+ LEAVE('i', ret);
return ret;
}
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+ ENTER("pNode xOffset xLength pSrc", Node, Offset, Length, Src);
// Ensure the server hasn't been closed
- if( !channel || channel->Server == NULL )
+ if( !channel || channel->Server == NULL ) {
+ LEAVE('i', -1);
return -1;
+ }
- if( Length > channel->Server->MaxBlockSize )
+ if( Length > channel->Server->MaxBlockSize ) {
+ LEAVE('i', 0);
return 0;
+ }
// Create packet structure
tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
// Client was closed by another thread
free(pkt);
Mutex_Release(&lep->lList);
+ LEAVE('i', -1);
return -1;
}
if(lep->OutTail)
Mutex_Release(&lep->lList);
// Signal other end
- Semaphore_Signal(&rep->InCount, 1);
+ VFS_MarkAvaliable(&rep->Node, 1);
+ LEAVE('i', Length);
return Length;
}
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *ch = IPCPipe_int_GetEPs(Node, &lep, &rep);
+
+ ENTER("pNode", Node);
+
+ if( !ch ) {
+ Log_Warning("IPCPipe", "Endpoint %p double-closed", ch);
+ return ;
+ }
// Mark client as closed
Node->ImplPtr = NULL;
+ // Clear packets
Mutex_Acquire(&lep->lList);
while( lep->OutHead ) {
tIPCPipe_Packet *next = lep->OutHead->Next;
lep->OutHead = next;
}
Mutex_Release(&lep->lList);
+ LOG("Packets cleared");
+ // Tell remote that local has closed
+ VFS_MarkError(&rep->Node, 1);
+ // TODO: Deliver SIGPIPE or similar to all owners of remote
// Clean up if both sides are closed
if( rep->Node.ImplPtr == NULL )
{
+ LOG("Remote closed, cleaning up");
tIPCPipe_Server *srv = ch->Server;
if( srv )
{
}
free(ch);
}
+ LEAVE('-');
}