From: John Hodge Date: Mon, 28 Feb 2011 03:17:35 +0000 (+0800) Subject: Implementing (and using) select() at kernel level X-Git-Tag: rel0.10~187 X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=11e9a26f9aa039eb2b4edcf55c1295e640b5999a;p=tpg%2Facess2.git Implementing (and using) select() at kernel level --- diff --git a/Kernel/include/vfs.h b/Kernel/include/vfs.h index e37ebfec..988a6112 100644 --- a/Kernel/include/vfs.h +++ b/Kernel/include/vfs.h @@ -341,6 +341,28 @@ extern tVFS_Driver *VFS_GetFSByName(const char *Name); */ extern tVFS_ACL *VFS_UnixToAcessACL(Uint Mode, Uint Owner, Uint Group); +/** + */ +enum eVFS_SelectTypes +{ + VFS_SELECT_READ, + VFS_SELECT_WRITE, + VFS_SELECT_ERROR +}; + +/** + * \brief Wait for an event on a node + * \param Node Node to wait on + * \param Type Type of wait + * \param Timeout Time to wait (NULL for infinite wait) + * \return Number of nodes that actioned (0 or 1) + */ +extern int VFS_SelectNode(tVFS_Node *Node, enum eVFS_SelectTypes Type, tTime *Timeout); + +extern int VFS_MarkFull(tVFS_Node *Node, BOOL IsBufferFull); +extern int VFS_MarkAvaliable(tVFS_Node *Node, BOOL IsDataAvaliable); +extern int VFS_MarkError(tVFS_Node *Node, BOOL IsErrorState); + // --- Node Cache -- /** * \name Node Cache diff --git a/Kernel/include/vfs_ext.h b/Kernel/include/vfs_ext.h index acee1645..5e6b77dd 100644 --- a/Kernel/include/vfs_ext.h +++ b/Kernel/include/vfs_ext.h @@ -292,4 +292,16 @@ extern int VFS_ReadDir(int FD, char *Dest); */ extern int VFS_OpenChild(Uint *Errno, int FD, const char *Name, Uint Mode); +/** + * \brief Wait for an aciton on a file descriptor + * \param MaxHandle Maximum set handle in \a *Handles + * \param ReadHandles Handles to wait for data on + * \param WriteHandles Handles to wait to write to + * \param ErrHandles Handle to wait for errors on + * \param Timeout Timeout for select() (if null, there is no timeout), if zero select() is non blocking + * \param IsKernel Use kernel handles as opposed to user handles + * \return Number of handles that actioned + */ +extern int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, BOOL IsKernel); + #endif diff --git a/Kernel/vfs/select.c b/Kernel/vfs/select.c index 3cb0af2f..3f8193b8 100644 --- a/Kernel/vfs/select.c +++ b/Kernel/vfs/select.c @@ -5,7 +5,7 @@ * select.c * - Implements the select() system call (and supporting code) */ -#define DEBUG 0 +#define DEBUG 1 #include #include "vfs.h" #include "vfs_int.h" @@ -34,26 +34,62 @@ struct sVFS_SelectList struct sVFS_SelectThread { - //! \brief Marks the thread as actively using this select - int IsActive; //! \brief Semaphore to atomically put the listener to sleep tSemaphore SleepHandle; // TODO: Allow timeouts (by setting an alarm?) }; // === PROTOTYPES === - int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, int IsKernel); - int VFS_MarkFull(tVFS_Node *Node, BOOL IsBufferFull); - int VFS_MarkAvaliable(tVFS_Node *Node, BOOL IsDataAvaliable); - int VFS_MarkError(tVFS_Node *Node, BOOL IsErrorState); - int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, int Type, BOOL IsKernel); - int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, int Type, BOOL IsKernel); +// int VFS_SelectNode(tVFS_Node *Node, enum eVFS_SelectTypes Type, tTime *Timeout); +// int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, BOOL IsKernel); +// int VFS_MarkFull(tVFS_Node *Node, BOOL IsBufferFull); +// int VFS_MarkAvaliable(tVFS_Node *Node, BOOL IsDataAvaliable); +// int VFS_MarkError(tVFS_Node *Node, BOOL IsErrorState); + int VFS_int_Select_GetType(enum eVFS_SelectTypes Type, tVFS_Node *Node, tVFS_SelectList ***List, int **Flag, int *WantedFlag, int *MaxAllowed); + int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, enum eVFS_SelectTypes Type, BOOL IsKernel); + int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, enum eVFS_SelectTypes Type, BOOL IsKernel); int VFS_int_Select_AddThread(tVFS_SelectList *List, tVFS_SelectThread *Thread, int MaxAllowed); void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread); +void VFS_int_Select_SignalAll(tVFS_SelectList *List); // === GLOBALS === // === FUNCTIONS === -int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, int IsKernel) +int VFS_SelectNode(tVFS_Node *Node, enum eVFS_SelectTypes Type, tTime *Timeout) +{ + tVFS_SelectThread thread_info; + tVFS_SelectList **list; + int *flag, wanted, maxAllowed; + + ENTER("pNode iType pTimeout", Node, Type, Timeout); + + Semaphore_Init(&thread_info.SleepHandle, 0, 0, "VFS_SelectNode()", ""); + + if( VFS_int_Select_GetType(Type, Node, &list, &flag, &wanted, &maxAllowed) ) { + LEAVE('i', -1); + return -1; + } + + VFS_int_Select_AddThread(*list, &thread_info, maxAllowed); + if( *flag == wanted ) + { + VFS_int_Select_RemThread(*list, &thread_info); + LEAVE('i', 1); + return 1; + } + + if( !Timeout || *Timeout > 0 ) + { + // TODO: Actual timeout + Semaphore_Wait(&thread_info.SleepHandle, 0); + } + + VFS_int_Select_RemThread(*list, &thread_info); + + LEAVE('i', *flag == wanted); + return *flag == wanted; +} + +int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, BOOL IsKernel) { tVFS_SelectThread thread_info; int ret; @@ -75,9 +111,9 @@ int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set // If there were events waiting, de-register and return if( ret ) { - ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 1, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 2, IsKernel); + ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); + ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, WriteHandles, 1, IsKernel); + ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ErrHandles, 2, IsKernel); return ret; } @@ -91,17 +127,74 @@ int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set // Fill output (modify *Handles) // - Also, de-register - ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 1, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 2, IsKernel); + ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); + ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, WriteHandles, 1, IsKernel); + ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ErrHandles, 2, IsKernel); return ret; } +// Mark a node as having data ready for reading +int VFS_MarkAvaliable(tVFS_Node *Node, BOOL IsDataAvaliable) +{ + Node->DataAvaliable = !!IsDataAvaliable; + if( Node->DataAvaliable ) + VFS_int_Select_SignalAll(Node->ReadThreads); + return 0; +} + +// Mark a node as having a full buffer +int VFS_MarkFull(tVFS_Node *Node, BOOL IsBufferFull) +{ + Node->BufferFull = !!IsBufferFull; + if( !Node->BufferFull ) + VFS_int_Select_SignalAll(Node->WriteThreads); + return 0; +} + +// Mark a node as errored +int VFS_MarkError(tVFS_Node *Node, BOOL IsErrorState) +{ + Node->ErrorOccurred = !!IsErrorState; + if( Node->ErrorOccurred ) + VFS_int_Select_SignalAll(Node->ErrorThreads); + return 0; +} + // --- Internal --- +int VFS_int_Select_GetType(enum eVFS_SelectTypes Type, tVFS_Node *Node, tVFS_SelectList ***List, int **Flag, int *WantedFlag, int *MaxAllowed) +{ + // Get the type of the listen + switch(Type) + { + case 0: // Read + if(List) *List = &Node->ReadThreads; + if(Flag) *Flag = &Node->DataAvaliable; + if(WantedFlag) *WantedFlag = 1; + if(MaxAllowed) *MaxAllowed = 1; // Max of 1 for read + break; + case 1: // Write + if(List) *List = &Node->WriteThreads; + if(Flag) *Flag = &Node->BufferFull; + if(WantedFlag) *WantedFlag = 0; + if(MaxAllowed) *MaxAllowed = 1; // Max of 1 for write + break; + case 2: // Error + if(List) *List = &Node->ErrorThreads; + if(Flag) *Flag = &Node->ErrorOccurred; + if(WantedFlag) *WantedFlag = 1; + if(MaxAllowed) *MaxAllowed = -1; // No max for error listeners + break; + default: + Log_Error("VFS", "VFS_int_Select_GetType: BUG CHECK, Unknown Type %i", Type); + return 1; + } + return 0; +} + /** * \return Number of files with an action */ -int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, int Type, BOOL IsKernel) +int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, enum eVFS_SelectTypes Type, BOOL IsKernel) { int i, numFlagged = 0; tVFS_SelectList **list; @@ -131,30 +224,8 @@ int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Ha } // Get the type of the listen - switch(Type) - { - case 0: // Read - list = &handle->Node->ReadThreads; - flag = &handle->Node->DataAvaliable; - wantedFlagValue = 1; - maxAllowed = 1; // Max of 1 for read - break; - case 1: // Write - list = &handle->Node->WriteThreads; - flag = &handle->Node->BufferFull; - wantedFlagValue = 0; - maxAllowed = 1; // Max of 1 for write - break; - case 2: // Error - list = &handle->Node->ErrorThreads; - flag = &handle->Node->ErrorOccurred; - wantedFlagValue = 1; - maxAllowed = -1; // No max for error listeners - break; - default: - Log_Error("VFS", "VFS_int_Select_Deregister: BUG CHECK, Unknown Type %i", Type); + if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, &maxAllowed) ) return 0; - } // Alloc if needed if( !*list ) { @@ -178,7 +249,7 @@ int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Ha /** * \return Number of files with an action */ -int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, int Type, BOOL IsKernel) +int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Handles, enum eVFS_SelectTypes Type, BOOL IsKernel) { int i, numFlagged = 0; tVFS_SelectList **list; @@ -207,27 +278,10 @@ int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set * } // Get the type of the listen - switch(Type) - { - case 0: // Read - list = &handle->Node->ReadThreads; - flag = &handle->Node->DataAvaliable; - wantedFlagValue = 1; - break; - case 1: // Write - list = &handle->Node->WriteThreads; - flag = &handle->Node->BufferFull; - wantedFlagValue = 0; - break; - case 2: // Error - list = &handle->Node->ErrorThreads; - flag = &handle->Node->ErrorOccurred; - wantedFlagValue = 1; - break; - default: - Log_Error("VFS", "VFS_int_Select_Deregister: BUG CHECK, Unknown Type %i", Type); + + // Get the type of the listen + if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, NULL) ) return 0; - } // Remove VFS_int_Select_RemThread(*list, Thread ); @@ -344,5 +398,37 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) } while(block); // Not on list, is this an error? + + Mutex_Release(&List->Lock); } +/** + * \brief Signal all threads on a list + */ +void VFS_int_Select_SignalAll(tVFS_SelectList *List) +{ + int i; + tVFS_SelectListEnt *block, *prev; + + // Lock to avoid concurrency issues + Mutex_Acquire(&List->Lock); + + block = &List->FirstEnt; + + // Look for the thread + do + { + for( i = 0; i < NUM_THREADS_PER_ALLOC; i ++ ) + { + if( block->Threads[i] ) + { + Semaphore_Signal( &block->Threads[i]->SleepHandle, 1 ); + } + } + + prev = block; + block = block->Next; + } while(block); + + Mutex_Release(&List->Lock); +} diff --git a/Modules/IPStack/ipv6.c b/Modules/IPStack/ipv6.c index 561297db..9368c3e2 100644 --- a/Modules/IPStack/ipv6.c +++ b/Modules/IPStack/ipv6.c @@ -45,9 +45,6 @@ void IPv6_int_GetPacket(tAdapter *Interface, tMacAddr From, int Length, void *Bu return; Log_Debug("IPv6", "hdr = {"); - //Log_Debug("IPv6", " .Version = %i", (hdr->Head >> (20+8)) & 0xF ); - //Log_Debug("IPv6", " .TrafficClass = %i", (hdr->Head >> (20)) & 0xFF ); - //Log_Debug("IPv6", " .FlowLabel = %i", hdr->Head & 0xFFFFF ); Log_Debug("IPv6", " .Version = %i", hdr->Version ); Log_Debug("IPv6", " .TrafficClass = %i", hdr->TrafficClass ); Log_Debug("IPv6", " .FlowLabel = %i", hdr->FlowLabel ); diff --git a/Modules/IPStack/tcp.c b/Modules/IPStack/tcp.c index f292ea86..23d028c9 100644 --- a/Modules/IPStack/tcp.c +++ b/Modules/IPStack/tcp.c @@ -426,6 +426,10 @@ void TCP_INT_AppendRecieved(tTCPConnection *Connection, tTCPStoredPacket *Pkt) } RingBuffer_Write( Connection->RecievedBuffer, Pkt->Data, Pkt->Length ); + + #if USE_SELECT + VFS_MarkAvaliable(&Connection->Node, 1); + #endif Mutex_Release( &Connection->lRecievedPackets ); } @@ -763,6 +767,12 @@ Uint64 TCP_Client_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buff // Poll packets for(;;) { + #if USE_SELECT + // Wait + VFS_SelectNode(Node, VFS_SELECT_READ, NULL); + // Lock list and read + Mutex_Acquire( &conn->lRecievedPackets ); + #else // Lock list and check if there is a packet Mutex_Acquire( &conn->lRecievedPackets ); if( conn->RecievedBuffer->Length == 0 ) { @@ -771,10 +781,16 @@ Uint64 TCP_Client_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buff Threads_Yield(); // TODO: Less expensive wait continue; } + #endif // Attempt to read all `Length` bytes len = RingBuffer_Read( destbuf, conn->RecievedBuffer, Length ); + #if USE_SELECT + if( conn->RecievedBuffer->Length == 0 ) + VFS_MarkAvaliable(Node, 0); + #endif + // Release the lock (we don't need it any more) Mutex_Release( &conn->lRecievedPackets );