* drv/dgram_pipe.c
* - Connection+Datagram based local IPC
*/
-#define DEBUG 1
+#define DEBUG 0
#include <vfs.h>
#include <fs_devfs.h>
#include <modules.h>
tMutex lList;
tIPCPipe_Packet *OutHead;
tIPCPipe_Packet *OutTail;
+ size_t ByteCount;
tVFS_Node Node;
};
struct sIPCPipe_Channel
tIPCPipe_Server *Prev;
char *Name;
size_t MaxBlockSize; // Max size of a 'packet'
- size_t QueueByteLimit; // Maximum number of bytes held in kernel for this server
- size_t CurrentByteCount;
+ // NOTE: Not strictly enforced, can go MaxBlockSize-1 over
+ size_t QueueByteLimit; // Maximum number of bytes held in kernel for each endpoint
tVFS_Node ServerNode;
tRWLock lChannelList;
tIPCPipe_Channel *FirstClient;
// - Root
tVFS_Node *IPCPipe_Root_MkNod(tVFS_Node *Node, const char *Name, Uint Flags);
int IPCPipe_Root_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
// - Server
int IPCPipe_Server_ReadDir(tVFS_Node *Node, int ID, char Name[FILENAME_MAX]);
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name);
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags);
void IPCPipe_Server_Close(tVFS_Node *Node);
// - Socket
tIPCPipe_Channel *IPCPipe_int_GetEPs(tVFS_Node *Node, tIPCPipe_Endpoint **lep, tIPCPipe_Endpoint **rep);
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest);
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src);
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags);
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags);
void IPCPipe_Client_Close(tVFS_Node *Node);
// === GLOBALS ===
};
tVFS_NodeType gIPCPipe_ChannelNodeType = {
.TypeName = "IPC Pipe - Channel",
+ .Flags = VFS_NODETYPEFLAG_STREAM,
.Read = IPCPipe_Client_Read,
.Write = IPCPipe_Client_Write,
.Close = IPCPipe_Client_Close
/**
* \return New client pointer
*/
-tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Root_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
{
tIPCPipe_Server *srv;
ENTER("pNode sName", Node, Name);
return NULL;
}
+ if( Flags & VFS_FDIRFLAG_STAT ) {
+ // LEAVE('p', srv->TplClientNode);
+ // return &srv->TplClientNode;
+ }
+
// Create new client
- tIPCPipe_Channel *new_client;
-
+ tIPCPipe_Channel *new_client;
+
new_client = calloc(1, sizeof(tIPCPipe_Channel));
new_client->Server = srv;
new_client->ClientEP.Node.Type = &gIPCPipe_ChannelNodeType;
new_client->ClientEP.Node.ImplPtr = new_client;
+ new_client->ClientEP.Node.Size = -1;
new_client->ServerEP.Node.Type = &gIPCPipe_ChannelNodeType;
new_client->ServerEP.Node.ImplPtr = new_client;
+ new_client->ServerEP.Node.Size = -1;
// Append to server list
RWLock_AcquireWrite(&srv->lChannelList);
// 'next' is a valid entry, but readdir should never be called on this node
return -1;
}
-tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name)
+tVFS_Node *IPCPipe_Server_FindDir(tVFS_Node *Node, const char *Name, Uint Flags)
{
tIPCPipe_Server *srv = Node->ImplPtr;
return NULL;
}
+ // TODO: Need VFS_FDIRFLAG_NOBLOCK?
VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Server");
tIPCPipe_Channel *conn;
}
return ch;
}
-size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest)
+size_t IPCPipe_Client_Read(tVFS_Node *Node, off_t Offset, size_t Length, void *Dest, Uint Flags)
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
}
// Wait for a packet to be ready
- VFS_SelectNode(Node, VFS_SELECT_READ, 0, "IPCPipe Endpoint");
- if( channel->Server == NULL ) {
+ tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+ int rv = VFS_SelectNode(Node, VFS_SELECT_READ|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+ if( !rv ) {
+ errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+ LEAVE('i', -1);
+ return -1;
+ }
+ if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+ //errno = EIO;
LEAVE('i', -1);
return -1;
}
if( !rep->Node.ImplPtr )
{
Mutex_Release(&rep->lList);
+ //errno = EIO;
LEAVE('i', -1);
return -1;
}
if(!rep->OutHead)
rep->OutTail = NULL;
VFS_MarkAvaliable(Node, !!rep->OutHead);
+ VFS_MarkFull(&rep->Node, 0); // Just read a packet, remote shouldn't be full
Mutex_Release(&rep->lList);
// Return
}
else
{
- Log_Warning("IPCPipe", "No packet ready but semaphore returned");
+ Log_Warning("IPCPipe", "No packet ready but select returned");
}
LEAVE('i', ret);
return ret;
}
-size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src)
+size_t IPCPipe_Client_Write(tVFS_Node *Node, off_t Offset, size_t Length, const void *Src, Uint Flags)
{
tIPCPipe_Endpoint *lep, *rep;
tIPCPipe_Channel *channel = IPCPipe_int_GetEPs(Node, &lep, &rep);
LEAVE('i', 0);
return 0;
}
+
+ // Wait for a packet to be ready
+ tTime timeout_z = 0, *timeout = ((Flags & VFS_IOFLAG_NOBLOCK) ? &timeout_z : NULL);
+ int rv = VFS_SelectNode(Node, VFS_SELECT_WRITE|VFS_SELECT_ERROR, timeout, "IPCPipe Endpoint");
+ ASSERTC(rv, >=, 0);
+ if( !rv ) {
+ errno = (Flags & VFS_IOFLAG_NOBLOCK) ? EWOULDBLOCK : EINTR;
+ LEAVE('i', -1);
+ return -1;
+ }
+ if( (rv & VFS_SELECT_ERROR) || channel->Server == NULL ) {
+ //errno = EIO;
+ LEAVE('i', -1);
+ return -1;
+ }
- // Create packet structure
+ // Create packet structure
tIPCPipe_Packet *pkt = malloc(sizeof(tIPCPipe_Packet)+Length);
pkt->Next = NULL;
pkt->Offset = 0;
else
lep->OutHead = pkt;
lep->OutTail = pkt;
+
+ lep->ByteCount += Length;
+ if( lep->ByteCount >= channel->Server->QueueByteLimit ) {
+ VFS_MarkFull(Node, 1);
+ }
+
Mutex_Release(&lep->lList);
// Signal other end