From: John Hodge Date: Sun, 13 Feb 2011 07:10:02 +0000 (+0800) Subject: Reworked semaphores to be symetric X-Git-Tag: rel0.07~2 X-Git-Url: https://git.ucc.asn.au/?a=commitdiff_plain;h=635bc78017d8a4a16314a973e39c849b2afac795;p=tpg%2Facess2.git Reworked semaphores to be symetric - Can block on Signal now - Removing busy waits --- diff --git a/Kernel/arch/x86/proc.c b/Kernel/arch/x86/proc.c index 7366520f..bdb5c5d3 100644 --- a/Kernel/arch/x86/proc.c +++ b/Kernel/arch/x86/proc.c @@ -47,7 +47,6 @@ extern tShortSpinlock glThreadListLock; extern int giNumCPUs; extern int giNextTID; extern tThread gThreadZero; -extern tThread *Threads_CloneTCB(Uint *Err, Uint Flags); extern void Isr8(void); // Double Fault extern void Proc_ReturnToUser(tVAddr Handler, Uint Argument, tVAddr KernelStack); @@ -649,14 +648,11 @@ int Proc_SpawnWorker(void) cur = Proc_GetCurThread(); // Create new thread - new = malloc( sizeof(tThread) ); + new = Threads_CloneThreadZero(); if(!new) { Warning("Proc_SpawnWorker - Out of heap space!\n"); return -1; } - memcpy(new, &gThreadZero, sizeof(tThread)); - // Set Thread ID - new->TID = giNextTID++; // Create a new worker stack (in PID0's address space) // - The stack is relocated by this function new->KernelStack = MM_NewWorkerStack(); diff --git a/Kernel/drv/fifo.c b/Kernel/drv/fifo.c index cbbd1074..00570531 100644 --- a/Kernel/drv/fifo.c +++ b/Kernel/drv/fifo.c @@ -4,6 +4,7 @@ #include #include #include +#include // === CONSTANTS === #define DEFAULT_RING_SIZE 2048 @@ -19,6 +20,7 @@ typedef struct sPipe { int WritePos; int BufSize; char *Buffer; + tSemaphore Semaphore; } tPipe; // === PROTOTYPES === @@ -219,20 +221,18 @@ Uint64 FIFO_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) { // Wait for buffer to fill if(pipe->Flags & PF_BLOCKING) { - while(pipe->ReadPos == pipe->WritePos) { - Threads_Yield(); - //MAGIC_BREAK(); - } + len = Semaphore_Wait( &pipe->Semaphore, remaining ); } else + { if(pipe->ReadPos == pipe->WritePos) return 0; - - // Read buffer - if(pipe->WritePos - pipe->ReadPos < remaining) - len = pipe->WritePos - pipe->ReadPos; - else - len = remaining; + // Read buffer + if(pipe->WritePos - pipe->ReadPos < remaining) + len = pipe->WritePos - pipe->ReadPos; + else + len = remaining; + } // Check if read overflows buffer if(len > pipe->BufSize - pipe->ReadPos) @@ -275,18 +275,19 @@ Uint64 FIFO_Write(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) while(remaining) { // Wait for buffer to empty - if(pipe->Flags & PF_BLOCKING) - while(pipe->ReadPos == (pipe->WritePos+1)%pipe->BufSize) - Threads_Yield(); + if(pipe->Flags & PF_BLOCKING) { + len = Semaphore_Signal( &pipe->Semaphore, remaining ); + } else + { if(pipe->ReadPos == (pipe->WritePos+1)%pipe->BufSize) return 0; - - // Write buffer - if(pipe->ReadPos - pipe->WritePos < remaining) - len = pipe->ReadPos - pipe->WritePos; - else - len = remaining; + // Write buffer + if(pipe->ReadPos - pipe->WritePos < remaining) + len = pipe->ReadPos - pipe->WritePos; + else + len = remaining; + } // Check if write overflows buffer if(len > pipe->BufSize - pipe->WritePos) @@ -321,24 +322,25 @@ Uint64 FIFO_Write(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) tPipe *FIFO_Int_NewPipe(int Size, char *Name) { tPipe *ret; - int allocsize = sizeof(tPipe) + sizeof(tVFS_ACL) + Size; + int namelen = strlen(Name) + 1; + int allocsize = sizeof(tPipe) + sizeof(tVFS_ACL) + Size + namelen; ret = malloc(allocsize); if(!ret) return NULL; // Clear Return memset(ret, 0, allocsize); - - ret->Name = Name; ret->Flags = PF_BLOCKING; // Allocate Buffer ret->BufSize = Size; ret->Buffer = (void*)( (Uint)ret + sizeof(tPipe) + sizeof(tVFS_ACL) ); - if(!ret->Buffer) { - free(ret); - return NULL; - } + + // Set name (and FIFO name) + ret->Name = ret->Buffer + Size; + strcpy(ret->Name, Name); + // - Start empty, max of `Size` + Semaphore_Init( &ret->Semaphore, 0, Size, "FIFO", ret->Name ); // Set Node ret->Node.Size = 0; diff --git a/Kernel/include/acess.h b/Kernel/include/acess.h index 95acb313..d2e1e802 100644 --- a/Kernel/include/acess.h +++ b/Kernel/include/acess.h @@ -22,7 +22,6 @@ typedef Uint tGID; typedef Sint64 tTimestamp; typedef struct sShortSpinlock tShortSpinlock; typedef struct sMutex tMutex; -typedef struct sSemaphore tSemaphore; struct sMutex { tShortSpinlock Protector; //!< Protector for the lock strucure @@ -32,14 +31,6 @@ struct sMutex { struct sThread *LastWaiting; //!< Waiting threads }; -struct sSemaphore { - tShortSpinlock Protector; //!< Protector for the lock strucure - const char *Name; //!< Human-readable name - volatile int Value; //!< Current mutex value - struct sThread *Waiting; //!< Waiting threads - struct sThread *LastWaiting; //!< Waiting threads -}; - // --- Helper Macros --- /** * \name Helper Macros @@ -404,6 +395,9 @@ extern int Time_CreateTimer(int Delta, tTimerCallback *Callback, void *Argument) * \brief Removed an active timer */ extern void Time_RemoveTimer(int ID); +/** + * \brief Wait for a period of milliseconds + */ extern void Time_Delay(int Delay); /** * \} @@ -427,7 +421,7 @@ extern tGID Threads_GetGID(void); extern int SpawnTask(tThreadFunction Function, void *Arg); extern Uint *Threads_GetCfgPtr(int Id); extern int Threads_SetName(const char *NewName); -extern void Mutex_Acquire(tMutex *Mutex); +extern int Mutex_Acquire(tMutex *Mutex); extern void Mutex_Release(tMutex *Mutex); extern int Mutex_IsLocked(tMutex *Mutex); /** diff --git a/Kernel/include/semaphore.h b/Kernel/include/semaphore.h new file mode 100644 index 00000000..0cf4b211 --- /dev/null +++ b/Kernel/include/semaphore.h @@ -0,0 +1,69 @@ +/* + * Acess2 Kernel + * semaphore.h + * - Semaphore syncronisation primitive + */ +#ifndef _SEMAPHORE_H +#define _SEMAPHORE_H + +#include + +/** + * \brief Semaphore typedef + */ +typedef struct sSemaphore tSemaphore; + +/** + * \brief Semaphore structure + */ +struct sSemaphore { + tShortSpinlock Protector; //!< Protector for the lock strucure + const char *ModName; //!< Human-readable module name + const char *Name; //!< Human-readable name + volatile int Value; //!< Current value + volatile int MaxValue; //!< Maximum value (signal will wait if it will go over this) + + struct sThread *Waiting; //!< Waiting threads + struct sThread *LastWaiting; //!< Waiting threads + struct sThread *Signaling; //!< Waiting threads (from Semaphore_Signal) + struct sThread *LastSignaling; //!< Last waiting thread (from Semaphore_Signal) +}; + +/** + * \brief Initialise the semaphore + * \param Sem Semaphore structure to initialsie + * \param Value Initial value of the semaphore + * \param Module Module name + * \param Name Symbolic name + * \note Not always needed, as initialising to 0 is valid, but it is preferred + * if all semaphores have \a Name set + * + * \note \a Module and \a Name must be avaliable for as long as the semaphore is used + */ +extern void Semaphore_Init(tSemaphore *Sem, int InitValue, int MaxValue, const char *Module, const char *Name); +/** + * \brief Acquire items from the semaphore + * \param Semaphore Semaphore structure to use + * \param MaxToTake Maximum number of items to take off the list (if zero, as much as possible is taken) + * \return Number of items fetched + * \retval 0 Semaphore interrupted + * \retval -1 Unspecified error + */ +extern int Semaphore_Wait(tSemaphore *Sem, int MaxToTake); +/** + * \brief Add an "item" to the semaphore + * \param Sem Semaphore to use + * \param AmmountToAdd Number of items to add + * \return Actual number of items added + * \retval 0 Semaphore interrupted + * \retval -1 Unspecified error + */ +extern int Semaphore_Signal(tSemaphore *Sem, int AmmountToAdd); +/** + * \brief Get the number of items waiting in a semaphore + * \param Sem Semaphore to use + * \return Number of waiting items + */ +extern int Semaphore_GetValue(tSemaphore *Sem); + +#endif diff --git a/Kernel/include/threads.h b/Kernel/include/threads.h index fc2303c9..0f164f8c 100644 --- a/Kernel/include/threads.h +++ b/Kernel/include/threads.h @@ -86,6 +86,7 @@ static const char * const casTHREAD_STAT[] = { "THREAD_STAT_ACTIVE", "THREAD_STAT_SLEEPING", "THREAD_STAT_MUTEXSLEEP", + "THREAD_STAT_SEMAPHORESLEEP", "THREAD_STAT_WAITING", "THREAD_STAT_PREINIT", "THREAD_STAT_ZOMBIE", @@ -117,4 +118,7 @@ extern void Threads_Kill(tThread *Thread, int Status); extern void Threads_AddActive(tThread *Thread); extern tThread *Threads_GetNextToRun(int CPU, tThread *Last); +extern tThread *Threads_CloneTCB(Uint *Err, Uint Flags); +extern tThread *Threads_CloneThreadZero(void); + #endif diff --git a/Kernel/threads.c b/Kernel/threads.c index 13914a85..2043bc42 100644 --- a/Kernel/threads.c +++ b/Kernel/threads.c @@ -6,6 +6,7 @@ #include #include #include +#include // Configuration #define DEBUG_TRACE_TICKETS 0 // Trace ticket counts @@ -63,7 +64,7 @@ tGID Threads_GetGID(void); void Threads_Dump(void); void Threads_DumpActive(void); -void Mutex_Acquire(tMutex *Mutex); + int Mutex_Acquire(tMutex *Mutex); void Mutex_Release(tMutex *Mutex); int Mutex_IsLocked(tMutex *Mutex); @@ -286,6 +287,75 @@ tThread *Threads_CloneTCB(Uint *Err, Uint Flags) return new; } +/** + * \fn tThread *Threads_CloneTCB(Uint *Err, Uint Flags) + * \brief Clone the TCB of the current thread + */ +tThread *Threads_CloneThreadZero(void) +{ + tThread *cur, *new; + int i; + cur = Proc_GetCurThread(); + + // Allocate and duplicate + new = malloc(sizeof(tThread)); + if(new == NULL) { + return NULL; + } + memcpy(new, &gThreadZero, sizeof(tThread)); + + new->CurCPU = -1; + new->Next = NULL; + memset( &new->IsLocked, 0, sizeof(new->IsLocked)); + new->Status = THREAD_STAT_PREINIT; + new->RetStatus = 0; + + // Get Thread ID + new->TID = giNextTID++; + new->Parent = 0; + + // Clone Name + new->ThreadName = NULL; + + // Messages are not inherited + new->Messages = NULL; + new->LastMessage = NULL; + + // Set State + new->Remaining = new->Quantum = cur->Quantum; + new->Priority = cur->Priority; + + // Set Signal Handlers + new->CurFaultNum = 0; + new->FaultHandler = cur->FaultHandler; + + for( i = 0; i < NUM_CFG_ENTRIES; i ++ ) + { + switch(cCONFIG_TYPES[i]) + { + default: + new->Config[i] = cur->Config[i]; + break; + case CFGT_HEAPSTR: + if(cur->Config[i]) + new->Config[i] = (Uint) strdup( (void*)cur->Config[i] ); + else + new->Config[i] = 0; + break; + } + } + + // Maintain a global list of threads + SHORTLOCK( &glThreadListLock ); + new->GlobalPrev = NULL; // Protect against bugs + new->GlobalNext = gAllThreads; + gAllThreads->GlobalPrev = new; + gAllThreads = new; + SHORTREL( &glThreadListLock ); + + return new; +} + /** * \brief Get a configuration pointer from the Per-Thread data area * \param ID Config slot ID @@ -914,6 +984,13 @@ void Threads_Dump(void) case THREAD_STAT_MUTEXSLEEP: Log(" Mutex Pointer: %p", thread->WaitPointer); break; + case THREAD_STAT_SEMAPHORESLEEP: + Log(" Semaphore Pointer: %p", thread->WaitPointer); + Log(" Semaphore Name: %s:%s", + ((tSemaphore*)thread->WaitPointer)->ModName, + ((tSemaphore*)thread->WaitPointer)->Name + ); + break; case THREAD_STAT_ZOMBIE: Log(" Return Status: %i", thread->RetStatus); break; @@ -947,6 +1024,7 @@ tThread *Threads_GetNextToRun(int CPU, tThread *Last) // Clear Delete Queue // - I should probably put this in a worker thread to avoid calling free() in the scheduler + // DEFINITELY - free() can deadlock in this case while(gDeleteThreads) { thread = gDeleteThreads->Next; @@ -1163,7 +1241,7 @@ void Threads_SegFault(tVAddr Addr) * the oldest thread (top thread) on the queue is given the lock and * restarted. */ -void Mutex_Acquire(tMutex *Mutex) +int Mutex_Acquire(tMutex *Mutex) { tThread *us = Proc_GetCurThread(); @@ -1218,6 +1296,8 @@ void Mutex_Acquire(tMutex *Mutex) if( Mutex != &glPhysAlloc ) LogF("Mutex %p taken by %i %p\n", Mutex, us->TID, __builtin_return_address(0)); #endif + + return 0; } /** @@ -1265,84 +1345,204 @@ int Mutex_IsLocked(tMutex *Mutex) return Mutex->Owner != NULL; } -/** - * \brief Initialise the semaphore - * \param Value Initial value of the semaphore - * \param Label Symbolic name - */ -void Semaphore_Init(tSemaphore *Sem, int Value, const char *Label) +// +// Initialise a semaphore +// +void Semaphore_Init(tSemaphore *Sem, int Value, int MaxValue, const char *Module, const char *Name) { Sem->Value = Value; - Sem->Name = Label; + Sem->ModName = Module; + Sem->Name = Name; } - -/** - * \brief Acquire a "item" from the semaphore - */ -void Semaphore_Wait(tSemaphore *Sem) +// +// Wait for items to be avaliable +// +int Semaphore_Wait(tSemaphore *Sem, int MaxToTake) { tThread *us; + int taken; + if( MaxToTake < 0 ) { + Log_Warning("Threads", "Semaphore_Wait: User bug - MaxToTake(%i) < 0, Sem=%p(%s)", + MaxToTake, Sem, Sem->Name); + } SHORTLOCK( &Sem->Protector ); + + // Check if there's already items avaliable if( Sem->Value > 0 ) { - Sem->Value --; + // Take what we need + if( MaxToTake && Sem->Value > MaxToTake ) + taken = MaxToTake; + else + taken = Sem->Value; + Sem->Value -= taken; SHORTREL( &Sem->Protector ); - return ; - } - - SHORTLOCK( &glThreadListLock ); - - // - Remove from active list - us = Threads_RemActive(); - us->Next = NULL; - // - Mark as sleeping - us->Status = THREAD_STAT_SEMAPHORESLEEP; - us->WaitPointer = Sem; - - // - Add to waiting - if(Sem->LastWaiting) { - Sem->LastWaiting->Next = us; - Sem->LastWaiting = us; } - else { - Sem->Waiting = us; - Sem->LastWaiting = us; + else + { + SHORTLOCK( &glThreadListLock ); + + // - Remove from active list + us = Threads_RemActive(); + us->Next = NULL; + // - Mark as sleeping + us->Status = THREAD_STAT_SEMAPHORESLEEP; + us->WaitPointer = Sem; + us->RetStatus = MaxToTake; // Use RetStatus as a temp variable + + // - Add to waiting + if(Sem->LastWaiting) { + Sem->LastWaiting->Next = us; + Sem->LastWaiting = us; + } + else { + Sem->Waiting = us; + Sem->LastWaiting = us; + } + + SHORTREL( &glThreadListLock ); + SHORTREL( &Sem->Protector ); + while(us->Status == THREAD_STAT_SEMAPHORESLEEP) Threads_Yield(); + // We're only woken when there's something avaliable + us->WaitPointer = NULL; + + taken = us->RetStatus; + + // Get the lock again + SHORTLOCK( &Sem->Protector ); } - SHORTREL( &glThreadListLock ); + // While there is space, and there are thread waiting + // wake the first thread and give it what it wants (or what's left) + while( (Sem->MaxValue == 0 || Sem->Value < Sem->MaxValue) && Sem->Signaling ) + { + int given; + tThread *toWake = Sem->Signaling; + + Sem->Signaling = Sem->Signaling->Next; + // Reset ->LastWaiting to NULL if we have just removed the last waiting thread + if( Sem->Signaling == NULL ) + Sem->LastSignaling = NULL; + + // Figure out how much to give + if( toWake->RetStatus && Sem->Value + toWake->RetStatus < Sem->MaxValue ) + given = toWake->RetStatus; + else + given = Sem->MaxValue - Sem->Value; + Sem->Value -= given; + + // Save the number we gave to the thread's status + toWake->RetStatus = given; + + // Wake the sleeper + SHORTLOCK( &glThreadListLock ); + if( toWake->Status != THREAD_STAT_ACTIVE ) + Threads_AddActive(toWake); + SHORTREL( &glThreadListLock ); + } SHORTREL( &Sem->Protector ); - while(us->Status == THREAD_STAT_MUTEXSLEEP) Threads_Yield(); - // We're only woken when there's something avaliable - us->WaitPointer = NULL; + + return taken; } -/** - * \brief Add an "item" to the semaphore - */ -void Semaphore_Signal(tSemaphore *Sem) +// +// Add items to a semaphore +// +int Semaphore_Signal(tSemaphore *Sem, int AmmountToAdd) { + int given; + int added; + + if( AmmountToAdd < 0 ) { + Log_Warning("Threads", "Semaphore_Signal: User bug - AmmountToAdd(%i) < 0, Sem=%p(%s)", + AmmountToAdd, Sem, Sem->Name); + } SHORTLOCK( &Sem->Protector ); - Sem->Value ++; - if( Sem->Waiting ) + // Check if we have to block + if( Sem->MaxValue && Sem->Value == Sem->MaxValue ) + { + tThread *us; + SHORTLOCK( &glThreadListLock ); + + // - Remove from active list + us = Threads_RemActive(); + us->Next = NULL; + // - Mark as sleeping + us->Status = THREAD_STAT_SEMAPHORESLEEP; + us->WaitPointer = Sem; + us->RetStatus = AmmountToAdd; // Use RetStatus as a temp variable + + // - Add to waiting + if(Sem->LastSignaling) { + Sem->LastSignaling->Next = us; + Sem->LastSignaling = us; + } + else { + Sem->Signaling = us; + Sem->LastSignaling = us; + } + + SHORTREL( &glThreadListLock ); + SHORTREL( &Sem->Protector ); + while(us->Status == THREAD_STAT_SEMAPHORESLEEP) Threads_Yield(); + // We're only woken when there's something avaliable + us->WaitPointer = NULL; + + added = us->RetStatus; + + // Get the lock again + SHORTLOCK( &Sem->Protector ); + } + // Non blocking + else + { + // Figure out how much we need to take off + if( Sem->MaxValue && Sem->Value + AmmountToAdd > Sem->MaxValue) + added = Sem->MaxValue - Sem->Value; + else + added = AmmountToAdd; + Sem->Value += added; + } + + // While there are items avaliable, and there are thread waiting + // wake the first thread and give it what it wants (or what's left) + while( Sem->Value && Sem->Waiting ) { tThread *toWake = Sem->Waiting; - Sem->Waiting = Sem->Waiting->Next; // Next! + Sem->Waiting = Sem->Waiting->Next; // Reset ->LastWaiting to NULL if we have just removed the last waiting thread if( Sem->Waiting == NULL ) Sem->LastWaiting = NULL; - // Wake new owner + // Figure out how much to give + if( toWake->RetStatus && Sem->Value > toWake->RetStatus ) + given = toWake->RetStatus; + else + given = Sem->Value; + Sem->Value -= given; + + // Save the number we gave to the thread's status + toWake->RetStatus = given; + + // Wake the sleeper SHORTLOCK( &glThreadListLock ); if( toWake->Status != THREAD_STAT_ACTIVE ) Threads_AddActive(toWake); SHORTREL( &glThreadListLock ); - - // Decrement (the value is now "owned" by `toWake`) - Sem->Value --; } SHORTREL( &Sem->Protector ); + + return added; +} + +// +// Get the current value of a semaphore +// +int Semaphore_GetValue(tSemaphore *Sem) +{ + return Sem->Value; } // === EXPORTS === @@ -1351,3 +1551,6 @@ EXPORT(Threads_GetGID); EXPORT(Mutex_Acquire); EXPORT(Mutex_Release); EXPORT(Mutex_IsLocked); +EXPORT(Semaphore_Init); +EXPORT(Semaphore_Wait); +EXPORT(Semaphore_Signal); diff --git a/Modules/Network/NE2000/ne2000.c b/Modules/Network/NE2000/ne2000.c index d7525802..cf3edee5 100644 --- a/Modules/Network/NE2000/ne2000.c +++ b/Modules/Network/NE2000/ne2000.c @@ -10,6 +10,7 @@ #include #include #include +#include // === CONSTANTS === #define MEM_START 0x40 @@ -70,7 +71,8 @@ typedef struct sNe2k_Card { Uint16 IOBase; //!< IO Port Address from PCI Uint8 IRQ; //!< IRQ Assigned from PCI - int NumWaitingPackets; + tSemaphore Semaphore; +// int NumWaitingPackets; int NextRXPage; int NextMemPage; //!< Next Card Memory page to use @@ -215,6 +217,10 @@ int Ne2k_Install(char **Options) gpNe2k_Cards[ k ].Node.Write = Ne2k_Write; gpNe2k_Cards[ k ].Node.Read = Ne2k_Read; gpNe2k_Cards[ k ].Node.IOCtl = Ne2k_IOCtl; + + // Initialise packet semaphore + // - Start at zero, no max + Semaphore_Init( &gpNe2k_Cards[k].Semaphore, 0, 0, "NE2000", gpNe2k_Cards[ k ].Name ); } } @@ -365,8 +371,8 @@ Uint64 Ne2k_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) ENTER("pNode XOffset XLength pBuffer", Node, Offset, Length, Buffer); - // TODO: Use MutexP/MutexV instead - while(Card->NumWaitingPackets == 0) Threads_Yield(); + // Wait for packets + Semaphore_Wait( &Card->Semaphore, 1 ); // Make sure that the card is in page 0 outb(Card->IOBase + CMD, 0|0x22); // Page 0, Start, NoDMA @@ -442,7 +448,6 @@ Uint64 Ne2k_Read(tVFS_Node *Node, Uint64 Offset, Uint64 Length, void *Buffer) outb( Card->IOBase + BNRY, page-1 ); // Set next RX Page and decrement the waiting list Card->NextRXPage = page; - Card->NumWaitingPackets --; LEAVE('i', Length); return Length; @@ -479,9 +484,9 @@ void Ne2k_IRQHandler(int IntNum) // 0: Packet recieved (no error) if( byte & 1 ) { - gpNe2k_Cards[i].NumWaitingPackets ++; - if( gpNe2k_Cards[i].NumWaitingPackets > MAX_PACKET_QUEUE ) - gpNe2k_Cards[i].NumWaitingPackets = MAX_PACKET_QUEUE; + //if( gpNe2k_Cards[i].NumWaitingPackets > MAX_PACKET_QUEUE ) + // gpNe2k_Cards[i].NumWaitingPackets = MAX_PACKET_QUEUE; + Semaphore_Signal( &gpNe2k_Cards[i].Semaphore, 1 ); } // 1: Packet sent (no error) // 2: Recieved with error