X-Git-Url: https://git.ucc.asn.au/?p=tpg%2Facess2.git;a=blobdiff_plain;f=Usermode%2FApplications%2Faxwin4_src%2FServer%2FCIPCChannel_AcessIPCPipe.cpp;h=51d88b177198a5628481f18d3f1ed08b3ee31d12;hp=16863496d0cbc99ff93d570ccd2912189cdd19a4;hb=9a9053e91df6e08761e2ce72be350c5c2233153b;hpb=f0ca31bc3f9c66a7fced9afcab9a6cacc8d1d647 diff --git a/Usermode/Applications/axwin4_src/Server/CIPCChannel_AcessIPCPipe.cpp b/Usermode/Applications/axwin4_src/Server/CIPCChannel_AcessIPCPipe.cpp index 16863496..51d88b17 100644 --- a/Usermode/Applications/axwin4_src/Server/CIPCChannel_AcessIPCPipe.cpp +++ b/Usermode/Applications/axwin4_src/Server/CIPCChannel_AcessIPCPipe.cpp @@ -5,25 +5,115 @@ * CIPCChannel_AcessIPCPipe.cpp * - IPC Channel :: Acess' IPC Pipe /Devices/ipcpipe/ */ +#include #include +#include +#include +#include +#include namespace AxWin { CIPCChannel_AcessIPCPipe::CIPCChannel_AcessIPCPipe(const ::std::string& suffix) { - + ::std::string path = "/Devices/ipcpipe/" + suffix; + m_fd = _SysOpen(path.c_str(), OPENFLAG_CREATE); + if(m_fd == -1) { + _SysDebug("Failed to open %s: %s", path.c_str(), strerror(errno)); + throw ::std::system_error(errno, ::std::system_category()); + } } CIPCChannel_AcessIPCPipe::~CIPCChannel_AcessIPCPipe() { + _SysClose(m_fd); } int CIPCChannel_AcessIPCPipe::FillSelect(fd_set& rfds) { - return 0; + int maxfd = m_fd; + FD_SET(m_fd, &rfds); + + for( auto& clientref : m_clients ) + { + maxfd = ::std::max(maxfd, clientref.m_fd); + FD_SET(clientref.m_fd, &rfds); + } + + return maxfd+1; } void CIPCChannel_AcessIPCPipe::HandleSelect(const fd_set& rfds) { + if( FD_ISSET(m_fd, &rfds) ) + { + int newfd = _SysOpenChild(m_fd, "newclient", OPENFLAG_READ|OPENFLAG_WRITE); + if( newfd == -1 ) { + _SysDebug("ERROR - Failure to open new client on FD%i", m_fd); + } + else { + _SysDebug("CIPCChannel_AcessIPCPipe::HandleSelect - New client on FD %i with FD%i", + m_fd, newfd); + + // emplace creates a new object within the list + m_clients.emplace( m_clients.end(), *this, newfd ); + IPC::RegisterClient( m_clients.back() ); + } + } + + for( auto it = m_clients.begin(); it != m_clients.end(); ) + { + CClient_AcessIPCPipe& clientref = *it; + ++ it; + + if( FD_ISSET(clientref.m_fd, &rfds) ) + { + try { + clientref.HandleReceive(); + } + catch( const ::std::exception& e ) { + _SysDebug("ERROR - Exception processing IPCPipe FD%i: '%s', removing", + clientref.m_fd, e.what() + ); + it = m_clients.erase(--it); + } + } + } +} + + +CClient_AcessIPCPipe::CClient_AcessIPCPipe(::AxWin::IIPCChannel& channel, int fd): + CClient(channel), + m_fd(fd) +{ +} + +CClient_AcessIPCPipe::~CClient_AcessIPCPipe() +{ + _SysClose(m_fd); + _SysDebug("Closed client FD%i", m_fd); +} + +void CClient_AcessIPCPipe::SendMessage(CSerialiser& message) +{ + const ::std::vector& data = message.Compact(); + + _SysDebug("CClient_AcessIPCPipe::SendMessage - %i bytes to %i", data.size(), m_fd); + //_SysDebugHex("CClient_AcessIPCPipe::SendMessage", data.data(), data.size()); + _SysWrite(m_fd, data.data(), data.size()); +} + +void CClient_AcessIPCPipe::HandleReceive() +{ + ::std::vector rxbuf(0x1000); + size_t len = _SysRead(m_fd, rxbuf.data(), rxbuf.capacity()); + if( len == (size_t)-1 ) + throw ::std::system_error(errno, ::std::system_category()); + //_SysDebug("CClient_AcessIPCPipe::HandleReceive - Rx %i/%i bytes", len, rxbuf.capacity()); + _SysDebugHex("CClient_AcessIPCPipe::HandleReceive", rxbuf.data(), len); + rxbuf.resize(len); + + CDeserialiser msg( ::std::move(rxbuf) ); + CClient::HandleMessage( msg ); } };