123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
/**
 * The read/write mutex module provides a primitive for maintaining shared read
 * access and mutually exclusive write access.
 *
 * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
 * License:   BSD style: $(LICENSE)
 * Author:    Sean Kelly
 */
module tango.core.sync.ReadWriteMutex;


public import tango.core.Exception : SyncException;
private import tango.core.sync.Condition;
private import tango.core.sync.Mutex;
private import Time = tango.core.Time;

version( Win32 )
{
    private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
    private import tango.stdc.posix.pthread;
}


////////////////////////////////////////////////////////////////////////////////
// ReadWriteMutex
//
// Reader reader();
// Writer writer();
////////////////////////////////////////////////////////////////////////////////


/**
 * This class represents a mutex that allows any number of readers to enter,
 * but when a writer enters, all other readers and writers are blocked.
 *
 * Please note that this mutex is not recursive and is intended to guard access
 * to data only.  Also, no deadlock checking is in place because doing so would
 * require dynamic memory allocation, which would reduce performance by an
 * unacceptable amount.  As a result, any attempt to recursively acquire this
 * mutex may well deadlock the caller, particularly if a write lock is acquired
 * while holding a read lock, or vice-versa.  In practice, this should not be
 * an issue however, because it is uncommon to call deeply into unknown code
 * while holding a lock that simply protects data.
 */
class ReadWriteMutex
{
    /**
     * Defines the policy used by this mutex.  Currently, two policies are
     * defined.
     *
     * The first will queue writers until no readers hold the mutex, then
     * pass the writers through one at a time.  If a reader acquires the mutex
     * while there are still writers queued, the reader will take precedence.
     *
     * The second will queue readers if there are any writers queued.  Writers
     * are passed through one at a time, and once there are no writers present,
     * all queued readers will be alerted.
     *
     * Future policies may offer a more even balance between reader and writer
     * precedence.
     */
    enum Policy
    {
        PREFER_READERS, /// Readers get preference.  This may starve writers.
        PREFER_WRITERS  /// Writers get preference.  This may starve readers.
    }


    ////////////////////////////////////////////////////////////////////////////
    // Initialization
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Initializes a read/write mutex object with the supplied policy.
     *
     * Params:
     *  policy = The policy to use.
     *
     * Throws:
     *  SyncException on error.
     */
    this( Policy policy = Policy.PREFER_WRITERS )
    {
        m_commonMutex = new Mutex;
        if( !m_commonMutex )
            throw new SyncException( "Unable to initialize mutex" );
        scope(failure) delete m_commonMutex;

        m_readerQueue = new Condition( m_commonMutex );
        if( !m_readerQueue )
            throw new SyncException( "Unable to initialize mutex" );
        scope(failure) delete m_readerQueue;

        m_writerQueue = new Condition( m_commonMutex );
        if( !m_writerQueue )
            throw new SyncException( "Unable to initialize mutex" );
        scope(failure) delete m_writerQueue;

        m_policy = policy;
        m_reader = new Reader;
        m_writer = new Writer;
    }


    ////////////////////////////////////////////////////////////////////////////
    // General Properties
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Gets the policy for the associated mutex.
     *
     * Returns:
     *  The policy used by this mutex.
     */
    Policy policy()
    {
        return m_policy;
    }


    ////////////////////////////////////////////////////////////////////////////
    // Reader/Writer Handles
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Gets an object representing the reader lock for the associated mutex.
     *
     * Returns:
     *  A reader sub-mutex.
     */
    @property Reader reader()
    {
        return m_reader;
    }


    /**
     * Gets an object representing the writer lock for the associated mutex.
     *
     * Returns:
     *  A writer sub-mutex.
     */
    @property Writer writer()
    {
        return m_writer;
    }


    ////////////////////////////////////////////////////////////////////////////
    // Reader
    ////////////////////////////////////////////////////////////////////////////


    /**
     * This class can be considered a mutex in its own right, and is used to
     * negotiate a read lock for the enclosing mutex.
     */
    class Reader :
        Object.Monitor
    {
        /**
         * Initializes a read/write mutex reader proxy object.
         */
        this()
        {
            m_proxy.link = this;
            (cast(void**) this)[1] = &m_proxy;
        }


        /**
         * Acquires a read lock on the enclosing mutex.
         */
        void lock()
        {
            synchronized( m_commonMutex )
            {
                ++m_numQueuedReaders;
                scope(exit) --m_numQueuedReaders;

                while( shouldQueueReader() )
                    m_readerQueue.wait();
                ++m_numActiveReaders;
            }
        }


        /**
         * Releases a read lock on the enclosing mutex.
         */
        void unlock()
        {
            synchronized( m_commonMutex )
            {
                if( --m_numActiveReaders < 1 )
                {
                    if( m_numQueuedWriters > 0 )
                        m_writerQueue.notify();
                }
            }
        }


        /**
         * Attempts to acquire a read lock on the enclosing mutex.  If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned.  If not, the lock is not acquired and false is returned.
         *
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        bool tryLock()
        {
            synchronized( m_commonMutex )
            {
                if( shouldQueueReader() )
                    return false;
                ++m_numActiveReaders;
                return true;
            }
        }


    private:
        bool shouldQueueReader()
        {
            if( m_numActiveWriters > 0 )
                return true;

            switch( m_policy )
            {
            case Policy.PREFER_WRITERS:
                 return m_numQueuedWriters > 0;

            case Policy.PREFER_READERS:
            default:
                 break;
            }

        return false;
        }

        struct MonitorProxy
        {
            Object.Monitor link;
        }

        MonitorProxy    m_proxy;
    }


    ////////////////////////////////////////////////////////////////////////////
    // Writer
    ////////////////////////////////////////////////////////////////////////////


    /**
     * This class can be considered a mutex in its own right, and is used to
     * negotiate a write lock for the enclosing mutex.
     */
    class Writer :
        Object.Monitor
    {
        /**
         * Initializes a read/write mutex writer proxy object.
         */
        this()
        {
            m_proxy.link = this;
            (cast(void**) this)[1] = &m_proxy;
        }


        /**
         * Acquires a write lock on the enclosing mutex.
         */
        void lock()
        {
            synchronized( m_commonMutex )
            {
                ++m_numQueuedWriters;
                scope(exit) --m_numQueuedWriters;

                while( shouldQueueWriter() )
                    m_writerQueue.wait();
                ++m_numActiveWriters;
            }
        }


        /**
         * Releases a write lock on the enclosing mutex.
         */
        void unlock()
        {
            synchronized( m_commonMutex )
            {
                if( --m_numActiveWriters < 1 )
                {
                    switch( m_policy )
                    {
                    default:
                    case Policy.PREFER_READERS:
                        if( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                        else if( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        break;
                    case Policy.PREFER_WRITERS:
                        if( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        else if( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                    }
                }
            }
        }


        /**
         * Attempts to acquire a write lock on the enclosing mutex.  If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned.  If not, the lock is not acquired and false is returned.
         *
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        bool tryLock()
        {
            synchronized( m_commonMutex )
            {
                if( shouldQueueWriter() )
                    return false;
                ++m_numActiveWriters;
                return true;
            }
        }


    private:
        bool shouldQueueWriter()
        {
            if( m_numActiveWriters > 0 ||
                m_numActiveReaders > 0 )
                return true;
            switch( m_policy )
            {
            case Policy.PREFER_READERS:
                return m_numQueuedReaders > 0;

            case Policy.PREFER_WRITERS:
            default:
                 break;
            }

        return false;
        }

        struct MonitorProxy
        {
            Object.Monitor link;
        }

        MonitorProxy    m_proxy;
    }


private:
    Policy      m_policy;
    Reader      m_reader;
    Writer      m_writer;

    Mutex       m_commonMutex;
    Condition   m_readerQueue;
    Condition   m_writerQueue;

    int         m_numQueuedReaders;
    int         m_numActiveReaders;
    int         m_numQueuedWriters;
    int         m_numActiveWriters;
}


////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////


debug( UnitTest )
{
    private import tango.core.Thread;


    void testRead( ReadWriteMutex.Policy policy )
    {
        auto mutex      = new ReadWriteMutex( policy );
        auto synInfo    = new Object;
        int  numThreads = 10;
        int  numReaders = 0;
        int  maxReaders = 0;

        void readerFn()
        {
            synchronized( mutex.reader() )
            {
                synchronized( synInfo )
                {
                    if( ++numReaders > maxReaders )
                        maxReaders = numReaders;
                }
                Thread.sleep( Time.seconds(0.001) );
                synchronized( synInfo )
                {
                    --numReaders;
                }
            }
        }

        auto group = new ThreadGroup;

        for( int i = 0; i < numThreads; ++i )
        {
            group.create( &readerFn );
        }
        group.joinAll();
        assert( numReaders < 1 && maxReaders > 1 );
    }


    void testReadWrite( ReadWriteMutex.Policy policy )
    {
        auto mutex      = new ReadWriteMutex( policy );
        auto synInfo    = new Object;
        int  numThreads = 10;
        int  numReaders = 0;
        int  numWriters = 0;
        int  maxReaders = 0;
        int  maxWriters = 0;
        int  numTries   = 20;

        void readerFn()
        {
            for( int i = 0; i < numTries; ++i )
            {
                synchronized( mutex.reader() )
                {
                    synchronized( synInfo )
                    {
                        if( ++numReaders > maxReaders )
                            maxReaders = numReaders;
                    }
                    Thread.sleep( Time.seconds(0.001) );
                    synchronized( synInfo )
                    {
                        --numReaders;
                    }
                }
            }
        }

        void writerFn()
        {
            for( int i = 0; i < numTries; ++i )
            {
                synchronized( mutex.writer() )
                {
                    synchronized( synInfo )
                    {
                        if( ++numWriters > maxWriters )
                            maxWriters = numWriters;
                    }
                    Thread.sleep( Time.seconds(0.001) );
                    synchronized( synInfo )
                    {
                        --numWriters;
                    }
                }
            }
        }

        auto group = new ThreadGroup;

        for( int i = 0; i < numThreads; ++i )
        {
            group.create( &readerFn );
            group.create( &writerFn );
        }
        group.joinAll();
        assert( numReaders < 1 && maxReaders > 1 &&
                numWriters < 1 && maxWriters < 2 );
    }


    unittest
    {
        testRead( ReadWriteMutex.Policy.PREFER_READERS );
        testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
        testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
        testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
    }
}