blob: 8afa8f7cc3805ff069d6f269f152397ba0d9a92c [file] [log] [blame]
/**
* The condition module provides a primitive for synchronized condition
* checking.
*
* Copyright: Copyright Sean Kelly 2005 - 2009.
* License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
* Authors: Sean Kelly
* Source: $(DRUNTIMESRC core/sync/_condition.d)
*/
/* Copyright Sean Kelly 2005 - 2009.
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*/
module core.sync.condition;
public import core.sync.exception;
public import core.sync.mutex;
public import core.time;
version (Windows)
{
private import core.sync.semaphore;
private import core.sys.windows.basetsd /+: HANDLE+/;
private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
private import core.sys.windows.windef /+: BOOL, DWORD+/;
private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
}
else version (Posix)
{
private import core.sync.config;
private import core.stdc.errno;
private import core.sys.posix.pthread;
private import core.sys.posix.time;
}
else
{
static assert(false, "Platform not supported");
}
////////////////////////////////////////////////////////////////////////////////
// Condition
//
// void wait();
// void notify();
// void notifyAll();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a condition variable as conceived by C.A.R. Hoare. As
* per Mesa type monitors however, "signal" has been replaced with "notify" to
* indicate that control is not transferred to the waiter when a notification
* is sent.
*/
class Condition
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a condition object which is associated with the supplied
* mutex object.
*
* Params:
* m = The mutex with which this condition will be associated.
*
* Throws:
* SyncError on error.
*/
this( Mutex m ) nothrow @safe
{
version (Windows)
{
m_blockLock = CreateSemaphoreA( null, 1, 1, null );
if ( m_blockLock == m_blockLock.init )
throw new SyncError( "Unable to initialize condition" );
scope(failure) CloseHandle( m_blockLock );
m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
if ( m_blockQueue == m_blockQueue.init )
throw new SyncError( "Unable to initialize condition" );
scope(failure) CloseHandle( m_blockQueue );
InitializeCriticalSection( &m_unblockLock );
m_assocMutex = m;
}
else version (Posix)
{
m_assocMutex = m;
int rc = pthread_cond_init( &m_hndl, null );
if ( rc )
throw new SyncError( "Unable to initialize condition" );
}
}
~this()
{
version (Windows)
{
BOOL rc = CloseHandle( m_blockLock );
assert( rc, "Unable to destroy condition" );
rc = CloseHandle( m_blockQueue );
assert( rc, "Unable to destroy condition" );
DeleteCriticalSection( &m_unblockLock );
}
else version (Posix)
{
int rc = pthread_cond_destroy( &m_hndl );
assert( !rc, "Unable to destroy condition" );
}
}
////////////////////////////////////////////////////////////////////////////
// General Properties
////////////////////////////////////////////////////////////////////////////
/**
* Gets the mutex associated with this condition.
*
* Returns:
* The mutex associated with this condition.
*/
@property Mutex mutex()
{
return m_assocMutex;
}
// undocumented function for internal use
final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
{
return m_assocMutex;
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Wait until notified.
*
* Throws:
* SyncError on error.
*/
void wait()
{
version (Windows)
{
timedWait( INFINITE );
}
else version (Posix)
{
int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() );
if ( rc )
throw new SyncError( "Unable to wait for condition" );
}
}
/**
* Suspends the calling thread until a notification occurs or until the
* supplied time period has elapsed.
*
* Params:
* val = The time to wait.
*
* In:
* val must be non-negative.
*
* Throws:
* SyncError on error.
*
* Returns:
* true if notified before the timeout and false if not.
*/
bool wait( Duration val )
in
{
assert( !val.isNegative );
}
body
{
version (Windows)
{
auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
while ( val > maxWaitMillis )
{
if ( timedWait( cast(uint)
maxWaitMillis.total!"msecs" ) )
return true;
val -= maxWaitMillis;
}
return timedWait( cast(uint) val.total!"msecs" );
}
else version (Posix)
{
timespec t = void;
mktspec( t, val );
int rc = pthread_cond_timedwait( &m_hndl,
m_assocMutex.handleAddr(),
&t );
if ( !rc )
return true;
if ( rc == ETIMEDOUT )
return false;
throw new SyncError( "Unable to wait for condition" );
}
}
/**
* Notifies one waiter.
*
* Throws:
* SyncError on error.
*/
void notify()
{
version (Windows)
{
notify( false );
}
else version (Posix)
{
int rc = pthread_cond_signal( &m_hndl );
if ( rc )
throw new SyncError( "Unable to notify condition" );
}
}
/**
* Notifies all waiters.
*
* Throws:
* SyncError on error.
*/
void notifyAll()
{
version (Windows)
{
notify( true );
}
else version (Posix)
{
int rc = pthread_cond_broadcast( &m_hndl );
if ( rc )
throw new SyncError( "Unable to notify condition" );
}
}
private:
version (Windows)
{
bool timedWait( DWORD timeout )
{
int numSignalsLeft;
int numWaitersGone;
DWORD rc;
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
m_numWaitersBlocked++;
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
m_assocMutex.unlock();
scope(failure) m_assocMutex.lock();
rc = WaitForSingleObject( m_blockQueue, timeout );
assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
bool timedOut = (rc == WAIT_TIMEOUT);
EnterCriticalSection( &m_unblockLock );
scope(failure) LeaveCriticalSection( &m_unblockLock );
if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
{
if ( timedOut )
{
// timeout (or canceled)
if ( m_numWaitersBlocked != 0 )
{
m_numWaitersBlocked--;
// do not unblock next waiter below (already unblocked)
numSignalsLeft = 0;
}
else
{
// spurious wakeup pending!!
m_numWaitersGone = 1;
}
}
if ( --m_numWaitersToUnblock == 0 )
{
if ( m_numWaitersBlocked != 0 )
{
// open the gate
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
// do not open the gate below again
numSignalsLeft = 0;
}
else if ( (numWaitersGone = m_numWaitersGone) != 0 )
{
m_numWaitersGone = 0;
}
}
}
else if ( ++m_numWaitersGone == int.max / 2 )
{
// timeout/canceled or spurious event :-)
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
// something is going on here - test of timeouts?
m_numWaitersBlocked -= m_numWaitersGone;
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc == WAIT_OBJECT_0 );
m_numWaitersGone = 0;
}
LeaveCriticalSection( &m_unblockLock );
if ( numSignalsLeft == 1 )
{
// better now than spurious later (same as ResetEvent)
for ( ; numWaitersGone > 0; --numWaitersGone )
{
rc = WaitForSingleObject( m_blockQueue, INFINITE );
assert( rc == WAIT_OBJECT_0 );
}
// open the gate
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
}
else if ( numSignalsLeft != 0 )
{
// unblock next waiter
rc = ReleaseSemaphore( m_blockQueue, 1, null );
assert( rc );
}
m_assocMutex.lock();
return !timedOut;
}
void notify( bool all )
{
DWORD rc;
EnterCriticalSection( &m_unblockLock );
scope(failure) LeaveCriticalSection( &m_unblockLock );
if ( m_numWaitersToUnblock != 0 )
{
if ( m_numWaitersBlocked == 0 )
{
LeaveCriticalSection( &m_unblockLock );
return;
}
if ( all )
{
m_numWaitersToUnblock += m_numWaitersBlocked;
m_numWaitersBlocked = 0;
}
else
{
m_numWaitersToUnblock++;
m_numWaitersBlocked--;
}
LeaveCriticalSection( &m_unblockLock );
}
else if ( m_numWaitersBlocked > m_numWaitersGone )
{
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
if ( 0 != m_numWaitersGone )
{
m_numWaitersBlocked -= m_numWaitersGone;
m_numWaitersGone = 0;
}
if ( all )
{
m_numWaitersToUnblock = m_numWaitersBlocked;
m_numWaitersBlocked = 0;
}
else
{
m_numWaitersToUnblock = 1;
m_numWaitersBlocked--;
}
LeaveCriticalSection( &m_unblockLock );
rc = ReleaseSemaphore( m_blockQueue, 1, null );
assert( rc );
}
else
{
LeaveCriticalSection( &m_unblockLock );
}
}
// NOTE: This implementation uses Algorithm 8c as described here:
// http://groups.google.com/group/comp.programming.threads/
// browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
HANDLE m_blockLock; // auto-reset event (now semaphore)
HANDLE m_blockQueue; // auto-reset event (now semaphore)
Mutex m_assocMutex; // external mutex/CS
CRITICAL_SECTION m_unblockLock; // internal mutex/CS
int m_numWaitersGone = 0;
int m_numWaitersBlocked = 0;
int m_numWaitersToUnblock = 0;
}
else version (Posix)
{
Mutex m_assocMutex;
pthread_cond_t m_hndl;
}
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
version (unittest)
{
private import core.thread;
private import core.sync.mutex;
private import core.sync.semaphore;
void testNotify()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
auto semDone = new Semaphore;
auto synLoop = new Object;
int numWaiters = 10;
int numTries = 10;
int numReady = 0;
int numTotal = 0;
int numDone = 0;
int numPost = 0;
void waiter()
{
for ( int i = 0; i < numTries; ++i )
{
synchronized( mutex )
{
while ( numReady < 1 )
{
condReady.wait();
}
--numReady;
++numTotal;
}
synchronized( synLoop )
{
++numDone;
}
semDone.wait();
}
}
auto group = new ThreadGroup;
for ( int i = 0; i < numWaiters; ++i )
group.create( &waiter );
for ( int i = 0; i < numTries; ++i )
{
for ( int j = 0; j < numWaiters; ++j )
{
synchronized( mutex )
{
++numReady;
condReady.notify();
}
}
while ( true )
{
synchronized( synLoop )
{
if ( numDone >= numWaiters )
break;
}
Thread.yield();
}
for ( int j = 0; j < numWaiters; ++j )
{
semDone.notify();
}
}
group.joinAll();
assert( numTotal == numWaiters * numTries );
}
void testNotifyAll()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
int numWaiters = 10;
int numReady = 0;
int numDone = 0;
bool alert = false;
void waiter()
{
synchronized( mutex )
{
++numReady;
while ( !alert )
condReady.wait();
++numDone;
}
}
auto group = new ThreadGroup;
for ( int i = 0; i < numWaiters; ++i )
group.create( &waiter );
while ( true )
{
synchronized( mutex )
{
if ( numReady >= numWaiters )
{
alert = true;
condReady.notifyAll();
break;
}
}
Thread.yield();
}
group.joinAll();
assert( numReady == numWaiters && numDone == numWaiters );
}
void testWaitTimeout()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
bool waiting = false;
bool alertedOne = true;
bool alertedTwo = true;
void waiter()
{
synchronized( mutex )
{
waiting = true;
// we never want to miss the notification (30s)
alertedOne = condReady.wait( dur!"seconds"(30) );
// but we don't want to wait long for the timeout (10ms)
alertedTwo = condReady.wait( dur!"msecs"(10) );
}
}
auto thread = new Thread( &waiter );
thread.start();
while ( true )
{
synchronized( mutex )
{
if ( waiting )
{
condReady.notify();
break;
}
}
Thread.yield();
}
thread.join();
assert( waiting );
assert( alertedOne );
assert( !alertedTwo );
}
unittest
{
testNotify();
testNotifyAll();
testWaitTimeout();
}
}