From: John Hodge Date: Mon, 28 Feb 2011 06:30:17 +0000 (+0800) Subject: Kernel - Implemented select(), partially debugged X-Git-Tag: rel0.10~185 X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=e42035c38b65d428672b128f9ae253f81b2ced96;p=tpg%2Facess2.git Kernel - Implemented select(), partially debugged - There is quite likely a couple of bugs in there --- diff --git a/Kernel/drv/fifo.c b/Kernel/drv/fifo.c index 53e6bab3..2af45103 100644 --- a/Kernel/drv/fifo.c +++ b/Kernel/drv/fifo.c @@ -20,7 +20,6 @@ typedef struct sPipe { int WritePos; int BufSize; char *Buffer; - tSemaphore Semaphore; } tPipe; // === PROTOTYPES === @@ -222,7 +221,17 @@ Uint64 FIFO_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) // Wait for buffer to fill if(pipe->Flags & PF_BLOCKING) { + #if 0 len = Semaphore_Wait( &pipe->Semaphore, remaining ); + #else + VFS_SelectNode(Node, VFS_SELECT_READ, NULL); + // Read buffer + // TODO: Rethink this, it might not work on buffer overflow + if(pipe->WritePos - pipe->ReadPos < remaining) + len = pipe->WritePos - pipe->ReadPos; + else + len = remaining; + #endif } else { @@ -251,6 +260,12 @@ Uint64 FIFO_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) pipe->ReadPos += len; pipe->ReadPos %= pipe->BufSize; + // Mark some flags + if( pipe->ReadPos == pipe->WritePos ) { + VFS_MarkAvaliable(Node, 0); + } + VFS_MarkFull(Node, 0); // Buffer can't still be full + // Decrement Remaining Bytes remaining -= len; // Increment Buffer address @@ -277,7 +292,15 @@ Uint64 FIFO_Write(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) { // Wait for buffer to empty if(pipe->Flags & PF_BLOCKING) { + #if 0 len = Semaphore_Signal( &pipe->Semaphore, remaining ); + #else + VFS_SelectNode(Node, VFS_SELECT_WRITE, NULL); + if(pipe->ReadPos - pipe->WritePos < remaining) + len = pipe->ReadPos - pipe->WritePos; + else + len = remaining; + #endif } else { @@ -306,6 +329,12 @@ Uint64 FIFO_Write(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) pipe->WritePos += len; pipe->WritePos %= pipe->BufSize; + // Mark some flags + if( pipe->ReadPos == pipe->WritePos ) { + VFS_MarkFull(Node, 1); // Buffer full + } + VFS_MarkAvaliable(Node, 1); + // Decrement Remaining Bytes remaining -= len; // Increment Buffer address @@ -341,7 +370,7 @@ tPipe *FIFO_Int_NewPipe(int Size, const char *Name) ret->Name = ret->Buffer + Size; strcpy(ret->Name, Name); // - Start empty, max of `Size` - Semaphore_Init( &ret->Semaphore, 0, Size, "FIFO", ret->Name ); + //Semaphore_Init( &ret->Semaphore, 0, Size, "FIFO", ret->Name ); // Set Node ret->Node.Size = 0; diff --git a/Kernel/drv/vterm.c b/Kernel/drv/vterm.c index 5facebca..b2611308 100644 --- a/Kernel/drv/vterm.c +++ b/Kernel/drv/vterm.c @@ -431,10 +431,13 @@ Uint64 VT_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) while(pos < Length) { int avail_bytes; - avail_bytes = Semaphore_Wait( &term->InputSemaphore, Length ); - if( avail_bytes == -1 ) { - break ; - } + VFS_SelectNode(Node, VFS_SELECT_READ, NULL); + avail_bytes = term->InputRead - term->InputWrite; + + if(avail_bytes < 0) + avail_bytes += MAX_INPUT_CHARS8; + if(avail_bytes > Length - pos) + avail_bytes = Length - pos; while( avail_bytes -- ) { @@ -451,26 +454,34 @@ Uint64 VT_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) default: while(pos < Length) { - int avail; - avail = Semaphore_Wait( &term->InputSemaphore, Length/4 * 4 ); - if( avail == -1 ) { - break ; - } + int avail; + VFS_SelectNode(Node, VFS_SELECT_READ, NULL); + + avail = term->InputRead - term->InputWrite; + if(avail < 0) + avail += MAX_INPUT_CHARS32; + if(avail > Length - pos) + avail = Length/4 - pos; + - while( avail > 3 ) + while( avail -- ) { ((Uint32*)Buffer)[pos] = ((Uint32*)term->InputBuffer)[term->InputRead]; pos ++; term->InputRead ++; term->InputRead %= MAX_INPUT_CHARS32; - avail -= 4; } } pos *= 4; break; } + // Mark none avaliable if buffer empty + if( term->InputRead == term->InputWrite ) + VFS_MarkAvaliable(&term->Node, 0); + term->ReadingThread = -1; + Mutex_Release( &term->ReadingLock ); return pos; @@ -848,8 +859,6 @@ void VT_KBCallBack(Uint32 Codepoint) term->InputRead = term->InputWrite + 1; term->InputRead %= MAX_INPUT_CHARS8; } - - Semaphore_Signal(&term->InputSemaphore, 1); } else { @@ -861,9 +870,10 @@ void VT_KBCallBack(Uint32 Codepoint) term->InputRead ++; term->InputRead %= MAX_INPUT_CHARS32; } - Semaphore_Signal(&term->InputSemaphore, 4); } + VFS_MarkAvaliable(&term->Node, 1); + // Wake up the thread waiting on us //if( term->ReadingThread >= 0 ) { // Threads_WakeTID(term->ReadingThread); diff --git a/Kernel/threads.c b/Kernel/threads.c index cafb8068..b5ce1917 100644 --- a/Kernel/threads.c +++ b/Kernel/threads.c @@ -1360,9 +1360,11 @@ int Mutex_IsLocked(tMutex *Mutex) // void Semaphore_Init(tSemaphore *Sem, int Value, int MaxValue, const char *Module, const char *Name) { + memset(Sem, 0, sizeof(tSemaphore)); Sem->Value = Value; Sem->ModName = Module; Sem->Name = Name; + Sem->MaxValue = MaxValue; } // // Wait for items to be avaliable diff --git a/Kernel/vfs/select.c b/Kernel/vfs/select.c index 3f8193b8..c695bb6c 100644 --- a/Kernel/vfs/select.c +++ b/Kernel/vfs/select.c @@ -5,7 +5,7 @@ * select.c * - Implements the select() system call (and supporting code) */ -#define DEBUG 1 +#define DEBUG 0 #include #include "vfs.h" #include "vfs_int.h" @@ -56,34 +56,49 @@ void VFS_int_Select_SignalAll(tVFS_SelectList *List); // === FUNCTIONS === int VFS_SelectNode(tVFS_Node *Node, enum eVFS_SelectTypes Type, tTime *Timeout) { - tVFS_SelectThread thread_info; + tVFS_SelectThread *thread_info; tVFS_SelectList **list; int *flag, wanted, maxAllowed; ENTER("pNode iType pTimeout", Node, Type, Timeout); - Semaphore_Init(&thread_info.SleepHandle, 0, 0, "VFS_SelectNode()", ""); - if( VFS_int_Select_GetType(Type, Node, &list, &flag, &wanted, &maxAllowed) ) { LEAVE('i', -1); return -1; } - VFS_int_Select_AddThread(*list, &thread_info, maxAllowed); + thread_info = malloc(sizeof(tVFS_SelectThread)); + if(!thread_info) return -1; + + Semaphore_Init(&thread_info->SleepHandle, 0, 0, "VFS_SelectNode()", ""); + + LOG("list=%p, flag=%p, wanted=%i, maxAllowed=%i", list, flag, wanted, maxAllowed); + + // Alloc if needed + if( !*list ) { + *list = calloc(1, sizeof(tVFS_SelectList)); + } + + VFS_int_Select_AddThread(*list, thread_info, maxAllowed); if( *flag == wanted ) { - VFS_int_Select_RemThread(*list, &thread_info); + VFS_int_Select_RemThread(*list, thread_info); + free(thread_info); LEAVE('i', 1); return 1; } if( !Timeout || *Timeout > 0 ) { + LOG("Semaphore_Wait()"); // TODO: Actual timeout - Semaphore_Wait(&thread_info.SleepHandle, 0); + Semaphore_Wait(&thread_info->SleepHandle, 1); } - VFS_int_Select_RemThread(*list, &thread_info); + LOG("VFS_int_Select_RemThread()"); + VFS_int_Select_RemThread(*list, thread_info); + + free(thread_info); LEAVE('i', *flag == wanted); return *flag == wanted; @@ -91,10 +106,13 @@ int VFS_SelectNode(tVFS_Node *Node, enum eVFS_SelectTypes Type, tTime *Timeout) int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set *ErrHandles, tTime *Timeout, BOOL IsKernel) { - tVFS_SelectThread thread_info; + tVFS_SelectThread *thread_info; int ret; - Semaphore_Init(&thread_info.SleepHandle, 0, 0, "VFS_Select()", ""); + thread_info = malloc(sizeof(tVFS_SelectThread)); + if(!thread_info) return -1; + + Semaphore_Init(&thread_info->SleepHandle, 0, -1, "VFS_Select()", ""); // Notes: The idea is to make sure we onlt enter wait (on the semaphore) // if we are going to be woken up (either by an event at a later time, @@ -104,16 +122,17 @@ int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set // or the semaphore is incremeneted (or both, but never none) // Register with nodes - ret = VFS_int_Select_Register(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); - ret += VFS_int_Select_Register(&thread_info, MaxHandle, WriteHandles, 1, IsKernel); - ret += VFS_int_Select_Register(&thread_info, MaxHandle, ErrHandles, 2, IsKernel); + ret = VFS_int_Select_Register(thread_info, MaxHandle, ReadHandles, 0, IsKernel); + ret += VFS_int_Select_Register(thread_info, MaxHandle, WriteHandles, 1, IsKernel); + ret += VFS_int_Select_Register(thread_info, MaxHandle, ErrHandles, 2, IsKernel); // If there were events waiting, de-register and return if( ret ) { - ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, WriteHandles, 1, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ErrHandles, 2, IsKernel); + ret = VFS_int_Select_Deregister(thread_info, MaxHandle, ReadHandles, 0, IsKernel); + ret += VFS_int_Select_Deregister(thread_info, MaxHandle, WriteHandles, 1, IsKernel); + ret += VFS_int_Select_Deregister(thread_info, MaxHandle, ErrHandles, 2, IsKernel); + free(thread_info); return ret; } @@ -122,41 +141,48 @@ int VFS_Select(int MaxHandle, fd_set *ReadHandles, fd_set *WriteHandles, fd_set // Wait (only if there is no timeout, or it is greater than zero if( !Timeout || *Timeout > 0 ) { - ret = Semaphore_Wait(&thread_info.SleepHandle, 0); + ret = Semaphore_Wait(&thread_info->SleepHandle, 1); } // Fill output (modify *Handles) // - Also, de-register - ret = VFS_int_Select_Deregister(&thread_info, MaxHandle, ReadHandles, 0, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, WriteHandles, 1, IsKernel); - ret += VFS_int_Select_Deregister(&thread_info, MaxHandle, ErrHandles, 2, IsKernel); + ret = VFS_int_Select_Deregister(thread_info, MaxHandle, ReadHandles, 0, IsKernel); + ret += VFS_int_Select_Deregister(thread_info, MaxHandle, WriteHandles, 1, IsKernel); + ret += VFS_int_Select_Deregister(thread_info, MaxHandle, ErrHandles, 2, IsKernel); + free(thread_info); return ret; } // Mark a node as having data ready for reading int VFS_MarkAvaliable(tVFS_Node *Node, BOOL IsDataAvaliable) { + ENTER("pNode bIsDataAvaliable", Node, IsDataAvaliable); Node->DataAvaliable = !!IsDataAvaliable; if( Node->DataAvaliable ) VFS_int_Select_SignalAll(Node->ReadThreads); + LEAVE('i', 0); return 0; } // Mark a node as having a full buffer int VFS_MarkFull(tVFS_Node *Node, BOOL IsBufferFull) { + ENTER("pNode bIsDataAvaliable", Node, IsBufferFull); Node->BufferFull = !!IsBufferFull; if( !Node->BufferFull ) VFS_int_Select_SignalAll(Node->WriteThreads); + LEAVE('i', 0); return 0; } // Mark a node as errored int VFS_MarkError(tVFS_Node *Node, BOOL IsErrorState) { + ENTER("pNode bIsDataAvaliable", Node, IsErrorState); Node->ErrorOccurred = !!IsErrorState; if( Node->ErrorOccurred ) VFS_int_Select_SignalAll(Node->ErrorThreads); + LEAVE('i', 0); return 0; } @@ -203,6 +229,8 @@ int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Ha if( !Handles ) return 0; + ENTER("pThread iMaxHandle pHandles iType BIsKernel", Thread, MaxHandle, Handles, Type, IsKernel); + for( i = 0; i < MaxHandle; i ++ ) { tVFS_Handle *handle; @@ -224,8 +252,10 @@ int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Ha } // Get the type of the listen - if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, &maxAllowed) ) + if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, &maxAllowed) ) { + LEAVE('i', 0); return 0; + } // Alloc if needed if( !*list ) { @@ -244,6 +274,8 @@ int VFS_int_Select_Register(tVFS_SelectThread *Thread, int MaxHandle, fd_set *Ha numFlagged ++; } + LEAVE('i', numFlagged); + return numFlagged; } /** @@ -257,6 +289,8 @@ int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set * if( !Handles ) return 0; + ENTER("pThread iMaxHandle pHandles iType BIsKernel", Thread, MaxHandle, Handles, Type, IsKernel); + for( i = 0; i < MaxHandle; i ++ ) { tVFS_Handle *handle; @@ -280,8 +314,10 @@ int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set * // Get the type of the listen // Get the type of the listen - if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, NULL) ) + if( VFS_int_Select_GetType(Type, handle->Node, &list, &flag, &wantedFlagValue, NULL) ) { + LEAVE('i', 0); return 0; + } // Remove VFS_int_Select_RemThread(*list, Thread ); @@ -295,6 +331,8 @@ int VFS_int_Select_Deregister(tVFS_SelectThread *Thread, int MaxHandle, fd_set * } } + LEAVE('i', numFlagged); + return numFlagged; } @@ -306,6 +344,8 @@ int VFS_int_Select_AddThread(tVFS_SelectList *List, tVFS_SelectThread *Thread, i int i, count = 0; tVFS_SelectListEnt *block, *prev; + ENTER("pList pThread iMaxAllowed", List, Thread, MaxAllowed); + // Lock to avoid concurrency issues Mutex_Acquire(&List->Lock); @@ -320,10 +360,12 @@ int VFS_int_Select_AddThread(tVFS_SelectList *List, tVFS_SelectThread *Thread, i { block->Threads[i] = Thread; Mutex_Release(&List->Lock); + LEAVE('i', 0); return 0; } count ++; if( MaxAllowed && count >= MaxAllowed ) { + LEAVE('i', 1); return 1; } } @@ -332,6 +374,8 @@ int VFS_int_Select_AddThread(tVFS_SelectList *List, tVFS_SelectThread *Thread, i block = block->Next; } while(block); + LOG("New block"); + // Create new block block = malloc( sizeof(tVFS_SelectListEnt) ); if( !block ) { @@ -352,6 +396,7 @@ int VFS_int_Select_AddThread(tVFS_SelectList *List, tVFS_SelectThread *Thread, i // Release Mutex_Release(&List->Lock); + LEAVE('i', 0); return 0; } @@ -360,6 +405,8 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) int i; tVFS_SelectListEnt *block, *prev; + ENTER("pList pThread", List, Thread); + // Lock to avoid concurrency issues Mutex_Acquire(&List->Lock); @@ -382,6 +429,7 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) break; // If empty, free it if( i == NUM_THREADS_PER_ALLOC ) { + LOG("Deleting block"); prev->Next = block->Next; free(block); } @@ -389,6 +437,7 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) } Mutex_Release(&List->Lock); + LEAVE('-'); return ; } } @@ -400,6 +449,9 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) // Not on list, is this an error? Mutex_Release(&List->Lock); + + LOG("Not on list"); + LEAVE('-'); } /** @@ -408,7 +460,9 @@ void VFS_int_Select_RemThread(tVFS_SelectList *List, tVFS_SelectThread *Thread) void VFS_int_Select_SignalAll(tVFS_SelectList *List) { int i; - tVFS_SelectListEnt *block, *prev; + tVFS_SelectListEnt *block; + + ENTER("pList", List); // Lock to avoid concurrency issues Mutex_Acquire(&List->Lock); @@ -420,15 +474,17 @@ void VFS_int_Select_SignalAll(tVFS_SelectList *List) { for( i = 0; i < NUM_THREADS_PER_ALLOC; i ++ ) { + LOG("block->Threads[i] = %p", block->Threads[i]); if( block->Threads[i] ) { Semaphore_Signal( &block->Threads[i]->SleepHandle, 1 ); } } - prev = block; block = block->Next; } while(block); Mutex_Release(&List->Lock); + + LEAVE('-'); } diff --git a/Modules/IPStack/udp.c b/Modules/IPStack/udp.c index a721a7bb..44a68514 100644 --- a/Modules/IPStack/udp.c +++ b/Modules/IPStack/udp.c @@ -400,6 +400,7 @@ Uint64 UDP_Channel_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buf for(;;) { + VFS_SelectNode(Node, VFS_SELECT_READ, NULL); SHORTLOCK(&chan->lQueue); if(chan->Queue == NULL) { SHORTREL(&chan->lQueue); @@ -407,7 +408,10 @@ Uint64 UDP_Channel_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buf } pack = chan->Queue; chan->Queue = pack->Next; - if(!chan->Queue) chan->QueueEnd = NULL; + if(!chan->Queue) { + chan->QueueEnd = NULL; + VFS_MarkAvaliable(Node, 0); // Nothing left + } SHORTREL(&chan->lQueue); break; }