123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
|
/**
* The condition module provides a primitive for synchronized condition
* checking.
*
* Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
* License: BSD style: $(LICENSE)
* Author: Sean Kelly
*/
module tango.core.sync.Condition;
public import tango.core.Exception : SyncException;
public import tango.core.sync.Mutex;
version( Win32 )
{
private import tango.core.sync.Semaphore;
private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
private import tango.core.sync.Config;
private import tango.stdc.errno;
private import tango.stdc.posix.pthread;
private import tango.stdc.posix.time;
}
////////////////////////////////////////////////////////////////////////////////
// Condition
//
// void wait();
// void notify();
// void notifyAll();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a condition variable as concieved by C.A.R. Hoare. As
* per Mesa type monitors however, "signal" has been replaced with "notify" to
* indicate that control is not transferred to the waiter when a notification
* is sent.
*/
class Condition
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a condition object which is associated with the supplied
* mutex object.
*
* Params:
* m = The mutex with which this condition will be associated.
*
* Throws:
* SyncException on error.
*/
this( Mutex m )
{
version( Win32 )
{
m_blockLock = CreateSemaphoreA( null, 1, 1, null );
if( m_blockLock == m_blockLock.init )
throw new SyncException( "Unable to initialize condition" );
scope(failure) CloseHandle( m_blockLock );
m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
if( m_blockQueue == m_blockQueue.init )
throw new SyncException( "Unable to initialize condition" );
scope(failure) CloseHandle( m_blockQueue );
InitializeCriticalSection( &m_unblockLock );
m_assocMutex = m;
}
else version( Posix )
{
m_mutexAddr = m.handleAddr();
int rc = pthread_cond_init( &m_hndl, null );
if( rc )
throw new SyncException( "Unable to initialize condition" );
}
}
~this()
{
version( Win32 )
{
BOOL rc = CloseHandle( m_blockLock );
assert( rc, "Unable to destroy condition" );
rc = CloseHandle( m_blockQueue );
assert( rc, "Unable to destroy condition" );
DeleteCriticalSection( &m_unblockLock );
}
else version( Posix )
{
int rc = pthread_cond_destroy( &m_hndl );
assert( !rc, "Unable to destroy condition" );
}
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Wait until notified.
*
* Throws:
* SyncException on error.
*/
void wait()
{
version( Win32 )
{
timedWait( INFINITE );
}
else version( Posix )
{
int rc = pthread_cond_wait( &m_hndl, m_mutexAddr );
if( rc )
throw new SyncException( "Unable to wait for condition" );
}
}
/**
* Suspends the calling thread until a notification occurs or until the
* supplied time period has elapsed. The supplied period may be up to a
* maximum of (uint.max - 1) milliseconds.
*
* Params:
* period = The time to wait, in seconds (fractional values are accepted).
*
* In:
* period must be less than (uint.max - 1) milliseconds.
*
* Returns:
* true if notified before the timeout and false if not.
*
* Throws:
* SyncException on error.
*/
bool wait( double period )
in
{
// NOTE: The fractional value added to period is to correct fp error.
assert( period * 1000 + 0.1 < uint.max - 1 );
}
body
{
version( Win32 )
{
return timedWait( cast(uint)(period * 1000 + 0.1) );
}
else version( Posix )
{
timespec t;
getTimespec( t );
adjTimespec( t, period );
int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t );
if( !rc )
return true;
if( rc == ETIMEDOUT )
return false;
throw new SyncException( "Unable to wait for condition" );
}
}
/**
* Notifies one waiter.
*
* Throws:
* SyncException on error.
*/
void notify()
{
version( Win32 )
{
notify( false );
}
else version( Posix )
{
int rc = pthread_cond_signal( &m_hndl );
if( rc )
throw new SyncException( "Unable to notify condition" );
}
}
/**
* Notifies all waiters.
*
* Throws:
* SyncException on error.
*/
void notifyAll()
{
version( Win32 )
{
notify( true );
}
else version( Posix )
{
int rc = pthread_cond_broadcast( &m_hndl );
if( rc )
throw new SyncException( "Unable to notify condition" );
}
}
private:
version( Win32 )
{
bool timedWait( DWORD timeout )
{
int numSignalsLeft;
int numWaitersGone;
DWORD rc;
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
m_numWaitersBlocked++;
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
m_assocMutex.unlock();
scope(failure) m_assocMutex.lock();
rc = WaitForSingleObject( m_blockQueue, timeout );
assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
bool timedOut = (rc == WAIT_TIMEOUT);
EnterCriticalSection( &m_unblockLock );
scope(failure) LeaveCriticalSection( &m_unblockLock );
if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
{
if ( timedOut )
{
// timeout (or canceled)
if( m_numWaitersBlocked != 0 )
{
m_numWaitersBlocked--;
// do not unblock next waiter below (already unblocked)
numSignalsLeft = 0;
}
else
{
// spurious wakeup pending!!
m_numWaitersGone = 1;
}
}
if( --m_numWaitersToUnblock == 0 )
{
if( m_numWaitersBlocked != 0 )
{
// open the gate
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
// do not open the gate below again
numSignalsLeft = 0;
}
else if( (numWaitersGone = m_numWaitersGone) != 0 )
{
m_numWaitersGone = 0;
}
}
}
else if( ++m_numWaitersGone == int.max / 2 )
{
// timeout/canceled or spurious event :-)
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
// something is going on here - test of timeouts?
m_numWaitersBlocked -= m_numWaitersGone;
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc == WAIT_OBJECT_0 );
m_numWaitersGone = 0;
}
LeaveCriticalSection( &m_unblockLock );
if( numSignalsLeft == 1 )
{
// better now than spurious later (same as ResetEvent)
for( ; numWaitersGone > 0; --numWaitersGone )
{
rc = WaitForSingleObject( m_blockQueue, INFINITE );
assert( rc == WAIT_OBJECT_0 );
}
// open the gate
rc = ReleaseSemaphore( m_blockLock, 1, null );
assert( rc );
}
else if( numSignalsLeft != 0 )
{
// unblock next waiter
rc = ReleaseSemaphore( m_blockQueue, 1, null );
assert( rc );
}
m_assocMutex.lock();
return !timedOut;
}
void notify( bool all )
{
DWORD rc;
EnterCriticalSection( &m_unblockLock );
scope(failure) LeaveCriticalSection( &m_unblockLock );
if( m_numWaitersToUnblock != 0 )
{
if( m_numWaitersBlocked == 0 )
{
LeaveCriticalSection( &m_unblockLock );
return;
}
if( all )
{
m_numWaitersToUnblock += m_numWaitersBlocked;
m_numWaitersBlocked = 0;
}
else
{
m_numWaitersToUnblock++;
m_numWaitersBlocked--;
}
LeaveCriticalSection( &m_unblockLock );
}
else if( m_numWaitersBlocked > m_numWaitersGone )
{
rc = WaitForSingleObject( m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
if( 0 != m_numWaitersGone )
{
m_numWaitersBlocked -= m_numWaitersGone;
m_numWaitersGone = 0;
}
if( all )
{
m_numWaitersToUnblock = m_numWaitersBlocked;
m_numWaitersBlocked = 0;
}
else
{
m_numWaitersToUnblock = 1;
m_numWaitersBlocked--;
}
LeaveCriticalSection( &m_unblockLock );
rc = ReleaseSemaphore( m_blockQueue, 1, null );
assert( rc );
}
else
{
LeaveCriticalSection( &m_unblockLock );
}
}
// NOTE: This implementation uses Algorithm 8c as described here:
// http://groups.google.com/group/comp.programming.threads/
// browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
HANDLE m_blockLock; // auto-reset event (now semaphore)
HANDLE m_blockQueue; // auto-reset event (now semaphore)
Mutex m_assocMutex; // external mutex/CS
CRITICAL_SECTION m_unblockLock; // internal mutex/CS
int m_numWaitersGone = 0;
int m_numWaitersBlocked = 0;
int m_numWaitersToUnblock = 0;
}
else version( Posix )
{
pthread_cond_t m_hndl;
pthread_mutex_t* m_mutexAddr;
}
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
debug( UnitTest )
{
private import tango.core.Thread;
private import tango.core.sync.Mutex;
private import tango.core.sync.Semaphore;
void testNotify()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
auto semDone = new Semaphore;
auto synLoop = new Object;
int numWaiters = 10;
int numTries = 10;
int numReady = 0;
int numTotal = 0;
int numDone = 0;
int numPost = 0;
void waiter()
{
for( int i = 0; i < numTries; ++i )
{
synchronized( mutex )
{
while( numReady < 1 )
{
condReady.wait();
}
--numReady;
++numTotal;
}
synchronized( synLoop )
{
++numDone;
}
semDone.wait();
}
}
auto group = new ThreadGroup;
for( int i = 0; i < numWaiters; ++i )
group.create( &waiter );
for( int i = 0; i < numTries; ++i )
{
for( int j = 0; j < numWaiters; ++j )
{
synchronized( mutex )
{
++numReady;
condReady.notify();
}
}
while( true )
{
synchronized( synLoop )
{
if( numDone >= numWaiters )
break;
}
Thread.yield();
}
for( int j = 0; j < numWaiters; ++j )
{
semDone.notify();
}
}
group.joinAll();
assert( numTotal == numWaiters * numTries );
}
void testNotifyAll()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
int numWaiters = 10;
int numReady = 0;
int numDone = 0;
bool alert = false;
void waiter()
{
synchronized( mutex )
{
++numReady;
while( !alert )
condReady.wait();
++numDone;
}
}
auto group = new ThreadGroup;
for( int i = 0; i < numWaiters; ++i )
group.create( &waiter );
while( true )
{
synchronized( mutex )
{
if( numReady >= numWaiters )
{
alert = true;
condReady.notifyAll();
break;
}
}
Thread.yield();
}
group.joinAll();
assert( numReady == numWaiters && numDone == numWaiters );
}
void testWaitTimeout()
{
auto mutex = new Mutex;
auto condReady = new Condition( mutex );
bool waiting = false;
bool alertedOne = true;
bool alertedTwo = true;
void waiter()
{
synchronized( mutex )
{
waiting = true;
alertedOne = condReady.wait( 1 );
alertedTwo = condReady.wait( 1 );
}
}
auto thread = new Thread( &waiter );
thread.start();
while( true )
{
synchronized( mutex )
{
if( waiting )
{
condReady.notify();
break;
}
}
Thread.yield();
}
thread.join();
assert( waiting && alertedOne && !alertedTwo );
}
unittest
{
testNotify();
testNotifyAll();
testWaitTimeout();
}
}
|