blob: 56ac7dc3663580e6c5df930c8fc5edf7ed08741e [file] [log] [blame]
/**
* The semaphore module provides a general use semaphore for synchronization.
*
* Copyright: Copyright Sean Kelly 2005 - 2009.
* License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
* Authors: Sean Kelly
* Source: $(DRUNTIMESRC core/sync/_semaphore.d)
*/
/* Copyright Sean Kelly 2005 - 2009.
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*/
module core.sync.semaphore;
public import core.sync.exception;
public import core.time;
version (OSX)
version = Darwin;
else version (iOS)
version = Darwin;
else version (TVOS)
version = Darwin;
else version (WatchOS)
version = Darwin;
version (Windows)
{
private import core.sys.windows.basetsd /+: HANDLE+/;
private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE,
ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
private import core.sys.windows.windef /+: BOOL, DWORD+/;
private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
}
else version (Darwin)
{
private import core.sync.config;
private import core.stdc.errno;
private import core.sys.posix.time;
private import core.sys.darwin.mach.semaphore;
}
else version (Posix)
{
private import core.sync.config;
private import core.stdc.errno;
private import core.sys.posix.pthread;
private import core.sys.posix.semaphore;
}
else
{
static assert(false, "Platform not supported");
}
////////////////////////////////////////////////////////////////////////////////
// Semaphore
//
// void wait();
// void notify();
// bool tryWait();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a general counting semaphore as concieved by Edsger
* Dijkstra. As per Mesa type monitors however, "signal" has been replaced
* with "notify" to indicate that control is not transferred to the waiter when
* a notification is sent.
*/
class Semaphore
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a semaphore object with the specified initial count.
*
* Params:
* count = The initial count for the semaphore.
*
* Throws:
* SyncError on error.
*/
this( uint count = 0 )
{
version (Windows)
{
m_hndl = CreateSemaphoreA( null, count, int.max, null );
if ( m_hndl == m_hndl.init )
throw new SyncError( "Unable to create semaphore" );
}
else version (Darwin)
{
auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
if ( rc )
throw new SyncError( "Unable to create semaphore" );
}
else version (Posix)
{
int rc = sem_init( &m_hndl, 0, count );
if ( rc )
throw new SyncError( "Unable to create semaphore" );
}
}
~this()
{
version (Windows)
{
BOOL rc = CloseHandle( m_hndl );
assert( rc, "Unable to destroy semaphore" );
}
else version (Darwin)
{
auto rc = semaphore_destroy( mach_task_self(), m_hndl );
assert( !rc, "Unable to destroy semaphore" );
}
else version (Posix)
{
int rc = sem_destroy( &m_hndl );
assert( !rc, "Unable to destroy semaphore" );
}
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Wait until the current count is above zero, then atomically decrement
* the count by one and return.
*
* Throws:
* SyncError on error.
*/
void wait()
{
version (Windows)
{
DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
if ( rc != WAIT_OBJECT_0 )
throw new SyncError( "Unable to wait for semaphore" );
}
else version (Darwin)
{
while ( true )
{
auto rc = semaphore_wait( m_hndl );
if ( !rc )
return;
if ( rc == KERN_ABORTED && errno == EINTR )
continue;
throw new SyncError( "Unable to wait for semaphore" );
}
}
else version (Posix)
{
while ( true )
{
if ( !sem_wait( &m_hndl ) )
return;
if ( errno != EINTR )
throw new SyncError( "Unable to wait for semaphore" );
}
}
}
/**
* Suspends the calling thread until the current count moves above zero or
* until the supplied time period has elapsed. If the count moves above
* zero in this interval, then atomically decrement the count by one and
* return true. Otherwise, return false.
*
* Params:
* period = The time to wait.
*
* In:
* period must be non-negative.
*
* Throws:
* SyncError on error.
*
* Returns:
* true if notified before the timeout and false if not.
*/
bool wait( Duration period )
in
{
assert( !period.isNegative );
}
body
{
version (Windows)
{
auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
while ( period > maxWaitMillis )
{
auto rc = WaitForSingleObject( m_hndl, cast(uint)
maxWaitMillis.total!"msecs" );
switch ( rc )
{
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
period -= maxWaitMillis;
continue;
default:
throw new SyncError( "Unable to wait for semaphore" );
}
}
switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
{
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
default:
throw new SyncError( "Unable to wait for semaphore" );
}
}
else version (Darwin)
{
mach_timespec_t t = void;
(cast(byte*) &t)[0 .. t.sizeof] = 0;
if ( period.total!"seconds" > t.tv_sec.max )
{
t.tv_sec = t.tv_sec.max;
t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
}
else
period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
while ( true )
{
auto rc = semaphore_timedwait( m_hndl, t );
if ( !rc )
return true;
if ( rc == KERN_OPERATION_TIMED_OUT )
return false;
if ( rc != KERN_ABORTED || errno != EINTR )
throw new SyncError( "Unable to wait for semaphore" );
}
}
else version (Posix)
{
timespec t = void;
mktspec( t, period );
while ( true )
{
if ( !sem_timedwait( &m_hndl, &t ) )
return true;
if ( errno == ETIMEDOUT )
return false;
if ( errno != EINTR )
throw new SyncError( "Unable to wait for semaphore" );
}
}
}
/**
* Atomically increment the current count by one. This will notify one
* waiter, if there are any in the queue.
*
* Throws:
* SyncError on error.
*/
void notify()
{
version (Windows)
{
if ( !ReleaseSemaphore( m_hndl, 1, null ) )
throw new SyncError( "Unable to notify semaphore" );
}
else version (Darwin)
{
auto rc = semaphore_signal( m_hndl );
if ( rc )
throw new SyncError( "Unable to notify semaphore" );
}
else version (Posix)
{
int rc = sem_post( &m_hndl );
if ( rc )
throw new SyncError( "Unable to notify semaphore" );
}
}
/**
* If the current count is equal to zero, return. Otherwise, atomically
* decrement the count by one and return true.
*
* Throws:
* SyncError on error.
*
* Returns:
* true if the count was above zero and false if not.
*/
bool tryWait()
{
version (Windows)
{
switch ( WaitForSingleObject( m_hndl, 0 ) )
{
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
default:
throw new SyncError( "Unable to wait for semaphore" );
}
}
else version (Darwin)
{
return wait( dur!"hnsecs"(0) );
}
else version (Posix)
{
while ( true )
{
if ( !sem_trywait( &m_hndl ) )
return true;
if ( errno == EAGAIN )
return false;
if ( errno != EINTR )
throw new SyncError( "Unable to wait for semaphore" );
}
}
}
protected:
/// Aliases the operating-system-specific semaphore type.
version (Windows) alias Handle = HANDLE;
/// ditto
else version (Darwin) alias Handle = semaphore_t;
/// ditto
else version (Posix) alias Handle = sem_t;
/// Handle to the system-specific semaphore.
Handle m_hndl;
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
version (unittest)
{
import core.thread, core.atomic;
void testWait()
{
auto semaphore = new Semaphore;
shared bool stopConsumption = false;
immutable numToProduce = 20;
immutable numConsumers = 10;
shared size_t numConsumed;
shared size_t numComplete;
void consumer()
{
while (true)
{
semaphore.wait();
if (atomicLoad(stopConsumption))
break;
atomicOp!"+="(numConsumed, 1);
}
atomicOp!"+="(numComplete, 1);
}
void producer()
{
assert(!semaphore.tryWait());
foreach (_; 0 .. numToProduce)
semaphore.notify();
// wait until all items are consumed
while (atomicLoad(numConsumed) != numToProduce)
Thread.yield();
// mark consumption as finished
atomicStore(stopConsumption, true);
// wake all consumers
foreach (_; 0 .. numConsumers)
semaphore.notify();
// wait until all consumers completed
while (atomicLoad(numComplete) != numConsumers)
Thread.yield();
assert(!semaphore.tryWait());
semaphore.notify();
assert(semaphore.tryWait());
assert(!semaphore.tryWait());
}
auto group = new ThreadGroup;
for ( int i = 0; i < numConsumers; ++i )
group.create(&consumer);
group.create(&producer);
group.joinAll();
}
void testWaitTimeout()
{
auto sem = new Semaphore;
shared bool semReady;
bool alertedOne, alertedTwo;
void waiter()
{
while (!atomicLoad(semReady))
Thread.yield();
alertedOne = sem.wait(dur!"msecs"(1));
alertedTwo = sem.wait(dur!"msecs"(1));
assert(alertedOne && !alertedTwo);
}
auto thread = new Thread(&waiter);
thread.start();
sem.notify();
atomicStore(semReady, true);
thread.join();
assert(alertedOne && !alertedTwo);
}
unittest
{
testWait();
testWaitTimeout();
}
}