| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 | /** * The read/write mutex module provides a primitive for maintaining shared read * access and mutually exclusive write access. * * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. * License: BSD style: $(LICENSE) * Author: Sean Kelly */ module tango.core.sync.ReadWriteMutex; public import tango.core.Exception : SyncException; private import tango.core.sync.Condition; private import tango.core.sync.Mutex; private import Time = tango.core.Time; version( Win32 ) { private import tango.sys.win32.UserGdi; } else version( Posix ) { private import tango.stdc.posix.pthread; } //////////////////////////////////////////////////////////////////////////////// // ReadWriteMutex // // Reader reader(); // Writer writer(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a mutex that allows any number of readers to enter, * but when a writer enters, all other readers and writers are blocked. * * Please note that this mutex is not recursive and is intended to guard access * to data only. Also, no deadlock checking is in place because doing so would * require dynamic memory allocation, which would reduce performance by an * unacceptable amount. As a result, any attempt to recursively acquire this * mutex may well deadlock the caller, particularly if a write lock is acquired * while holding a read lock, or vice-versa. In practice, this should not be * an issue however, because it is uncommon to call deeply into unknown code * while holding a lock that simply protects data. */ class ReadWriteMutex { /** * Defines the policy used by this mutex. Currently, two policies are * defined. * * The first will queue writers until no readers hold the mutex, then * pass the writers through one at a time. If a reader acquires the mutex * while there are still writers queued, the reader will take precedence. * * The second will queue readers if there are any writers queued. Writers * are passed through one at a time, and once there are no writers present, * all queued readers will be alerted. * * Future policies may offer a more even balance between reader and writer * precedence. */ enum Policy { PREFER_READERS, /// Readers get preference. This may starve writers. PREFER_WRITERS /// Writers get preference. This may starve readers. } //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a read/write mutex object with the supplied policy. * * Params: * policy = The policy to use. * * Throws: * SyncException on error. */ this( Policy policy = Policy.PREFER_WRITERS ) { m_commonMutex = new Mutex; if( !m_commonMutex ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_commonMutex; m_readerQueue = new Condition( m_commonMutex ); if( !m_readerQueue ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_readerQueue; m_writerQueue = new Condition( m_commonMutex ); if( !m_writerQueue ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_writerQueue; m_policy = policy; m_reader = new Reader; m_writer = new Writer; } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * Gets the policy for the associated mutex. * * Returns: * The policy used by this mutex. */ Policy policy() { return m_policy; } //////////////////////////////////////////////////////////////////////////// // Reader/Writer Handles //////////////////////////////////////////////////////////////////////////// /** * Gets an object representing the reader lock for the associated mutex. * * Returns: * A reader sub-mutex. */ @property Reader reader() { return m_reader; } /** * Gets an object representing the writer lock for the associated mutex. * * Returns: * A writer sub-mutex. */ @property Writer writer() { return m_writer; } //////////////////////////////////////////////////////////////////////////// // Reader //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a read lock for the enclosing mutex. */ class Reader : Object.Monitor { /** * Initializes a read/write mutex reader proxy object. */ this() { m_proxy.link = this; (cast(void**) this)[1] = &m_proxy; } /** * Acquires a read lock on the enclosing mutex. */ void lock() { synchronized( m_commonMutex ) { ++m_numQueuedReaders; scope(exit) --m_numQueuedReaders; while( shouldQueueReader() ) m_readerQueue.wait(); ++m_numActiveReaders; } } /** * Releases a read lock on the enclosing mutex. */ void unlock() { synchronized( m_commonMutex ) { if( --m_numActiveReaders < 1 ) { if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); } } } /** * Attempts to acquire a read lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ bool tryLock() { synchronized( m_commonMutex ) { if( shouldQueueReader() ) return false; ++m_numActiveReaders; return true; } } private: bool shouldQueueReader() { if( m_numActiveWriters > 0 ) return true; switch( m_policy ) { case Policy.PREFER_WRITERS: return m_numQueuedWriters > 0; case Policy.PREFER_READERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } //////////////////////////////////////////////////////////////////////////// // Writer //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a write lock for the enclosing mutex. */ class Writer : Object.Monitor { /** * Initializes a read/write mutex writer proxy object. */ this() { m_proxy.link = this; (cast(void**) this)[1] = &m_proxy; } /** * Acquires a write lock on the enclosing mutex. */ void lock() { synchronized( m_commonMutex ) { ++m_numQueuedWriters; scope(exit) --m_numQueuedWriters; while( shouldQueueWriter() ) m_writerQueue.wait(); ++m_numActiveWriters; } } /** * Releases a write lock on the enclosing mutex. */ void unlock() { synchronized( m_commonMutex ) { if( --m_numActiveWriters < 1 ) { switch( m_policy ) { default: case Policy.PREFER_READERS: if( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); else if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); break; case Policy.PREFER_WRITERS: if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); else if( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); } } } } /** * Attempts to acquire a write lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ bool tryLock() { synchronized( m_commonMutex ) { if( shouldQueueWriter() ) return false; ++m_numActiveWriters; return true; } } private: bool shouldQueueWriter() { if( m_numActiveWriters > 0 || m_numActiveReaders > 0 ) return true; switch( m_policy ) { case Policy.PREFER_READERS: return m_numQueuedReaders > 0; case Policy.PREFER_WRITERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } private: Policy m_policy; Reader m_reader; Writer m_writer; Mutex m_commonMutex; Condition m_readerQueue; Condition m_writerQueue; int m_numQueuedReaders; int m_numActiveReaders; int m_numQueuedWriters; int m_numActiveWriters; } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// debug( UnitTest ) { private import tango.core.Thread; void testRead( ReadWriteMutex.Policy policy ) { auto mutex = new ReadWriteMutex( policy ); auto synInfo = new Object; int numThreads = 10; int numReaders = 0; int maxReaders = 0; void readerFn() { synchronized( mutex.reader() ) { synchronized( synInfo ) { if( ++numReaders > maxReaders ) maxReaders = numReaders; } Thread.sleep( Time.seconds(0.001) ); synchronized( synInfo ) { --numReaders; } } } auto group = new ThreadGroup; for( int i = 0; i < numThreads; ++i ) { group.create( &readerFn ); } group.joinAll(); assert( numReaders < 1 && maxReaders > 1 ); } void testReadWrite( ReadWriteMutex.Policy policy ) { auto mutex = new ReadWriteMutex( policy ); auto synInfo = new Object; int numThreads = 10; int numReaders = 0; int numWriters = 0; int maxReaders = 0; int maxWriters = 0; int numTries = 20; void readerFn() { for( int i = 0; i < numTries; ++i ) { synchronized( mutex.reader() ) { synchronized( synInfo ) { if( ++numReaders > maxReaders ) maxReaders = numReaders; } Thread.sleep( Time.seconds(0.001) ); synchronized( synInfo ) { --numReaders; } } } } void writerFn() { for( int i = 0; i < numTries; ++i ) { synchronized( mutex.writer() ) { synchronized( synInfo ) { if( ++numWriters > maxWriters ) maxWriters = numWriters; } Thread.sleep( Time.seconds(0.001) ); synchronized( synInfo ) { --numWriters; } } } } auto group = new ThreadGroup; for( int i = 0; i < numThreads; ++i ) { group.create( &readerFn ); group.create( &writerFn ); } group.joinAll(); assert( numReaders < 1 && maxReaders > 1 && numWriters < 1 && maxWriters < 2 ); } unittest { testRead( ReadWriteMutex.Policy.PREFER_READERS ); testRead( ReadWriteMutex.Policy.PREFER_WRITERS ); testReadWrite( ReadWriteMutex.Policy.PREFER_READERS ); testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS ); } } |