123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464 |
|
/**
* The semaphore module provides a general use semaphore for synchronization.
*
* Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
* License: BSD style: $(LICENSE)
* Author: Sean Kelly
*/
module tango.core.sync.Semaphore;
public import tango.core.Exception : SyncException;
import Time = tango.core.Time;
version( Win32 )
{
private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
private import tango.core.sync.Config;
private import tango.stdc.errno;
private import tango.stdc.posix.pthread;
private import tango.stdc.posix.semaphore;
}
////////////////////////////////////////////////////////////////////////////////
// Semaphore
//
// void wait();
// void notify();
// bool tryWait();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a general counting semaphore as concieved by Edsger
* Dijkstra. As per Mesa type monitors however, "signal" has been replaced
* with "notify" to indicate that control is not transferred to the waiter when
* a notification is sent.
*/
class Semaphore
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a semaphore object with the specified initial count.
*
* Params:
* count = The initial count for the semaphore.
*
* Throws:
* SyncException on error.
*/
this( uint count = 0 )
{
version( Win32 )
{
m_hndl = CreateSemaphoreA( null, count, int.max, null );
if( m_hndl == m_hndl.init )
throw new SyncException( "Unable to create semaphore" );
}
else version( darwin ){
creatingTask=mach_task_self();
auto rc=semaphore_create(creatingTask,
&m_hndl,
MACH_SYNC_POLICY.SYNC_POLICY_FIFO,
count);
if( rc )
throw new SyncException( "Unable to create semaphore" );
}
else version( Posix )
{
int rc = sem_init( &m_hndl, 0, count );
if( rc )
throw new SyncException( "Unable to create semaphore" );
}
}
~this()
{
version( Win32 )
{
BOOL rc = CloseHandle( m_hndl );
assert( rc, "Unable to destroy semaphore" );
}
else version(darwin)
{
auto rc=semaphore_destroy(creatingTask, m_hndl);
assert( !rc, "Unable to destroy semaphore" );
}
else version( Posix )
{
int rc = sem_destroy( &m_hndl );
assert( !rc, "Unable to destroy semaphore" );
}
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Wait until the current count is above zero, then atomically decrement
* the count by one and return.
*
* Throws:
* SyncException on error.
*/
void wait()
{
version( Win32 )
{
DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
if( rc != WAIT_OBJECT_0 )
throw new SyncException( "Unable to wait for semaphore" );
}
else version(darwin){
while (true){
auto rc=semaphore_wait(m_hndl);
if (rc==KERN_RETURN.ABORTED){
if( errno != EINTR )
throw new SyncException( "Unable to wait for semaphore (abort)" );
// wait again
} else if (rc!=0) {
throw new SyncException( "Unable to wait for semaphore" );
} else {
break;
}
}
}
else version( Posix )
{
while( true )
{
if( !sem_wait( &m_hndl ) )
return;
if( errno != EINTR )
throw new SyncException( "Unable to wait for semaphore" );
}
}
}
/**
* Suspends the calling thread until the current count moves above zero or
* until the supplied time period has elapsed. If the count moves above
* zero in this interval, then atomically decrement the count by one and
* return true. Otherwise, return false. The supplied period may be up to
* a maximum of (uint.max - 1) milliseconds.
*
* Params:
* period = The number of seconds to wait.
*
* In:
* period must be less than (uint.max - 1) milliseconds.
*
* Returns:
* true if notified before the timeout and false if not.
*
* Throws:
* SyncException on error.
*/
bool wait( double period )
in
{
assert( period * 1000 + 0.1 < uint.max - 1);
}
body
{
version( Win32 )
{
DWORD t = cast(DWORD)(period * 1000 + 0.1);
switch( WaitForSingleObject( m_hndl, t ) )
{
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
default:
throw new SyncException( "Unable to wait for semaphore" );
}
}
else version(darwin){
timespec t;
adjTimespec( t, period );
auto rc=semaphore_timedwait(m_hndl,t);
if (rc==0){
return true;
} else if (rc==KERN_RETURN.OPERATION_TIMED_OUT){
return false;
} else if (rc==KERN_RETURN.ABORTED) {
if( errno != EINTR )
throw new SyncException( "Unable to wait for semaphore (abort)" );
return false; // wait can be too short, wait is not resumed
} else {
throw new SyncException( "Unable to wait for semaphore" );
}
}
else version( Posix )
{
timespec t;
getTimespec( t );
adjTimespec( t, period );
while( true )
{
if( !sem_timedwait( &m_hndl, &t ) )
return true;
if( errno == ETIMEDOUT )
return false;
if( errno != EINTR )
throw new SyncException( "Unable to wait for semaphore" );
}
}
// -w trip
//return false;
}
/**
* Atomically increment the current count by one. This will notify one
* waiter, if there are any in the queue.
*
* Throws:
* SyncException on error.
*/
void notify()
{
version( Win32 )
{
if( !ReleaseSemaphore( m_hndl, 1, null ) )
throw new SyncException( "Unable to notify semaphore" );
}
else version(darwin){
semaphore_signal(m_hndl);
}
else version( Posix )
{
int rc = sem_post( &m_hndl );
if( rc )
throw new SyncException( "Unable to notify semaphore" );
}
}
/**
* If the current count is equal to zero, return. Otherwise, atomically
* decrement the count by one and return true.
*
* Returns:
* true if the count was above zero and false if not.
*
* Throws:
* SyncException on error.
*/
bool tryWait()
{
version( Win32 )
{
switch( WaitForSingleObject( m_hndl, 0 ) )
{
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
default:
throw new SyncException( "Unable to wait for semaphore" );
}
}
else version(darwin){
return wait(0.0);
}
else version( Posix )
{
while( true )
{
if( !sem_trywait( &m_hndl ) )
return true;
if( errno == EAGAIN )
return false;
if( errno != EINTR )
throw new SyncException( "Unable to wait for semaphore" );
}
}
// -w trip
//return false;
}
private:
version( Win32 )
{
HANDLE m_hndl;
}
else version(darwin){
task_t creatingTask;
semaphore_t m_hndl;
}
else version( Posix )
{
sem_t m_hndl;
}
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
debug( UnitTest )
{
private import tango.core.Thread;
void testWait()
{
auto semaphore = new Semaphore;
int numToProduce = 10;
bool allProduced = false;
auto synProduced = new Object;
int numConsumed = 0;
auto synConsumed = new Object;
int numConsumers = 10;
int numComplete = 0;
auto synComplete = new Object;
void consumer()
{
while( true )
{
semaphore.wait();
synchronized( synProduced )
{
if( allProduced )
break;
}
synchronized( synConsumed )
{
++numConsumed;
}
}
synchronized( synComplete )
{
++numComplete;
}
}
void producer()
{
assert( !semaphore.tryWait() );
for( int i = 0; i < numToProduce; ++i )
{
semaphore.notify();
Thread.yield();
}
Thread.sleep(Time.seconds(1));
synchronized( synProduced )
{
allProduced = true;
}
for( int i = 0; i < numConsumers; ++i )
{
semaphore.notify();
Thread.yield();
}
for( int i = numConsumers * 10000; i > 0; --i )
{
synchronized( synComplete )
{
if( numComplete == numConsumers )
break;
}
Thread.yield();
}
synchronized( synComplete )
{
assert( numComplete == numConsumers );
}
synchronized( synConsumed )
{
assert( numConsumed == numToProduce );
}
assert( !semaphore.tryWait() );
semaphore.notify();
assert( semaphore.tryWait() );
assert( !semaphore.tryWait() );
}
auto group = new ThreadGroup;
for( int i = 0; i < numConsumers; ++i )
group.create( &consumer );
group.create( &producer );
group.joinAll();
}
void testWaitTimeout()
{
auto synReady = new Object;
auto semReady = new Semaphore;
bool waiting = false;
bool alertedOne = true;
bool alertedTwo = true;
void waiter()
{
synchronized( synReady )
{
waiting = true;
}
alertedOne = semReady.wait( 0.1 );
alertedTwo = semReady.wait( 0.1 );
}
auto thread = new Thread( &waiter );
thread.start();
while( true )
{
synchronized( synReady )
{
if( waiting )
{
semReady.notify();
break;
}
}
Thread.yield();
}
thread.join();
assert( waiting && alertedOne && !alertedTwo );
}
unittest
{
testWait();
testWaitTimeout();
}
}
|