123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
|
/**
* The read/write mutex module provides a primitive for maintaining shared read
* access and mutually exclusive write access.
*
* Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
* License: BSD style: $(LICENSE)
* Author: Sean Kelly
*/
module tango.core.sync.ReadWriteMutex;
public import tango.core.Exception : SyncException;
private import tango.core.sync.Condition;
private import tango.core.sync.Mutex;
private import Time = tango.core.Time;
version( Win32 )
{
private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
private import tango.stdc.posix.pthread;
}
////////////////////////////////////////////////////////////////////////////////
// ReadWriteMutex
//
// Reader reader();
// Writer writer();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a mutex that allows any number of readers to enter,
* but when a writer enters, all other readers and writers are blocked.
*
* Please note that this mutex is not recursive and is intended to guard access
* to data only. Also, no deadlock checking is in place because doing so would
* require dynamic memory allocation, which would reduce performance by an
* unacceptable amount. As a result, any attempt to recursively acquire this
* mutex may well deadlock the caller, particularly if a write lock is acquired
* while holding a read lock, or vice-versa. In practice, this should not be
* an issue however, because it is uncommon to call deeply into unknown code
* while holding a lock that simply protects data.
*/
class ReadWriteMutex
{
/**
* Defines the policy used by this mutex. Currently, two policies are
* defined.
*
* The first will queue writers until no readers hold the mutex, then
* pass the writers through one at a time. If a reader acquires the mutex
* while there are still writers queued, the reader will take precedence.
*
* The second will queue readers if there are any writers queued. Writers
* are passed through one at a time, and once there are no writers present,
* all queued readers will be alerted.
*
* Future policies may offer a more even balance between reader and writer
* precedence.
*/
enum Policy
{
PREFER_READERS, /// Readers get preference. This may starve writers.
PREFER_WRITERS /// Writers get preference. This may starve readers.
}
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a read/write mutex object with the supplied policy.
*
* Params:
* policy = The policy to use.
*
* Throws:
* SyncException on error.
*/
this( Policy policy = Policy.PREFER_WRITERS )
{
m_commonMutex = new Mutex;
if( !m_commonMutex )
throw new SyncException( "Unable to initialize mutex" );
scope(failure) delete m_commonMutex;
m_readerQueue = new Condition( m_commonMutex );
if( !m_readerQueue )
throw new SyncException( "Unable to initialize mutex" );
scope(failure) delete m_readerQueue;
m_writerQueue = new Condition( m_commonMutex );
if( !m_writerQueue )
throw new SyncException( "Unable to initialize mutex" );
scope(failure) delete m_writerQueue;
m_policy = policy;
m_reader = new Reader;
m_writer = new Writer;
}
////////////////////////////////////////////////////////////////////////////
// General Properties
////////////////////////////////////////////////////////////////////////////
/**
* Gets the policy for the associated mutex.
*
* Returns:
* The policy used by this mutex.
*/
Policy policy()
{
return m_policy;
}
////////////////////////////////////////////////////////////////////////////
// Reader/Writer Handles
////////////////////////////////////////////////////////////////////////////
/**
* Gets an object representing the reader lock for the associated mutex.
*
* Returns:
* A reader sub-mutex.
*/
@property Reader reader()
{
return m_reader;
}
/**
* Gets an object representing the writer lock for the associated mutex.
*
* Returns:
* A writer sub-mutex.
*/
@property Writer writer()
{
return m_writer;
}
////////////////////////////////////////////////////////////////////////////
// Reader
////////////////////////////////////////////////////////////////////////////
/**
* This class can be considered a mutex in its own right, and is used to
* negotiate a read lock for the enclosing mutex.
*/
class Reader :
Object.Monitor
{
/**
* Initializes a read/write mutex reader proxy object.
*/
this()
{
m_proxy.link = this;
(cast(void**) this)[1] = &m_proxy;
}
/**
* Acquires a read lock on the enclosing mutex.
*/
void lock()
{
synchronized( m_commonMutex )
{
++m_numQueuedReaders;
scope(exit) --m_numQueuedReaders;
while( shouldQueueReader() )
m_readerQueue.wait();
++m_numActiveReaders;
}
}
/**
* Releases a read lock on the enclosing mutex.
*/
void unlock()
{
synchronized( m_commonMutex )
{
if( --m_numActiveReaders < 1 )
{
if( m_numQueuedWriters > 0 )
m_writerQueue.notify();
}
}
}
/**
* Attempts to acquire a read lock on the enclosing mutex. If one can
* be obtained without blocking, the lock is acquired and true is
* returned. If not, the lock is not acquired and false is returned.
*
* Returns:
* true if the lock was acquired and false if not.
*/
bool tryLock()
{
synchronized( m_commonMutex )
{
if( shouldQueueReader() )
return false;
++m_numActiveReaders;
return true;
}
}
private:
bool shouldQueueReader()
{
if( m_numActiveWriters > 0 )
return true;
switch( m_policy )
{
case Policy.PREFER_WRITERS:
return m_numQueuedWriters > 0;
case Policy.PREFER_READERS:
default:
break;
}
return false;
}
struct MonitorProxy
{
Object.Monitor link;
}
MonitorProxy m_proxy;
}
////////////////////////////////////////////////////////////////////////////
// Writer
////////////////////////////////////////////////////////////////////////////
/**
* This class can be considered a mutex in its own right, and is used to
* negotiate a write lock for the enclosing mutex.
*/
class Writer :
Object.Monitor
{
/**
* Initializes a read/write mutex writer proxy object.
*/
this()
{
m_proxy.link = this;
(cast(void**) this)[1] = &m_proxy;
}
/**
* Acquires a write lock on the enclosing mutex.
*/
void lock()
{
synchronized( m_commonMutex )
{
++m_numQueuedWriters;
scope(exit) --m_numQueuedWriters;
while( shouldQueueWriter() )
m_writerQueue.wait();
++m_numActiveWriters;
}
}
/**
* Releases a write lock on the enclosing mutex.
*/
void unlock()
{
synchronized( m_commonMutex )
{
if( --m_numActiveWriters < 1 )
{
switch( m_policy )
{
default:
case Policy.PREFER_READERS:
if( m_numQueuedReaders > 0 )
m_readerQueue.notifyAll();
else if( m_numQueuedWriters > 0 )
m_writerQueue.notify();
break;
case Policy.PREFER_WRITERS:
if( m_numQueuedWriters > 0 )
m_writerQueue.notify();
else if( m_numQueuedReaders > 0 )
m_readerQueue.notifyAll();
}
}
}
}
/**
* Attempts to acquire a write lock on the enclosing mutex. If one can
* be obtained without blocking, the lock is acquired and true is
* returned. If not, the lock is not acquired and false is returned.
*
* Returns:
* true if the lock was acquired and false if not.
*/
bool tryLock()
{
synchronized( m_commonMutex )
{
if( shouldQueueWriter() )
return false;
++m_numActiveWriters;
return true;
}
}
private:
bool shouldQueueWriter()
{
if( m_numActiveWriters > 0 ||
m_numActiveReaders > 0 )
return true;
switch( m_policy )
{
case Policy.PREFER_READERS:
return m_numQueuedReaders > 0;
case Policy.PREFER_WRITERS:
default:
break;
}
return false;
}
struct MonitorProxy
{
Object.Monitor link;
}
MonitorProxy m_proxy;
}
private:
Policy m_policy;
Reader m_reader;
Writer m_writer;
Mutex m_commonMutex;
Condition m_readerQueue;
Condition m_writerQueue;
int m_numQueuedReaders;
int m_numActiveReaders;
int m_numQueuedWriters;
int m_numActiveWriters;
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
debug( UnitTest )
{
private import tango.core.Thread;
void testRead( ReadWriteMutex.Policy policy )
{
auto mutex = new ReadWriteMutex( policy );
auto synInfo = new Object;
int numThreads = 10;
int numReaders = 0;
int maxReaders = 0;
void readerFn()
{
synchronized( mutex.reader() )
{
synchronized( synInfo )
{
if( ++numReaders > maxReaders )
maxReaders = numReaders;
}
Thread.sleep( Time.seconds(0.001) );
synchronized( synInfo )
{
--numReaders;
}
}
}
auto group = new ThreadGroup;
for( int i = 0; i < numThreads; ++i )
{
group.create( &readerFn );
}
group.joinAll();
assert( numReaders < 1 && maxReaders > 1 );
}
void testReadWrite( ReadWriteMutex.Policy policy )
{
auto mutex = new ReadWriteMutex( policy );
auto synInfo = new Object;
int numThreads = 10;
int numReaders = 0;
int numWriters = 0;
int maxReaders = 0;
int maxWriters = 0;
int numTries = 20;
void readerFn()
{
for( int i = 0; i < numTries; ++i )
{
synchronized( mutex.reader() )
{
synchronized( synInfo )
{
if( ++numReaders > maxReaders )
maxReaders = numReaders;
}
Thread.sleep( Time.seconds(0.001) );
synchronized( synInfo )
{
--numReaders;
}
}
}
}
void writerFn()
{
for( int i = 0; i < numTries; ++i )
{
synchronized( mutex.writer() )
{
synchronized( synInfo )
{
if( ++numWriters > maxWriters )
maxWriters = numWriters;
}
Thread.sleep( Time.seconds(0.001) );
synchronized( synInfo )
{
--numWriters;
}
}
}
}
auto group = new ThreadGroup;
for( int i = 0; i < numThreads; ++i )
{
group.create( &readerFn );
group.create( &writerFn );
}
group.joinAll();
assert( numReaders < 1 && maxReaders > 1 &&
numWriters < 1 && maxWriters < 2 );
}
unittest
{
testRead( ReadWriteMutex.Policy.PREFER_READERS );
testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
}
}
|