123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
/**
 * The semaphore module provides a general use semaphore for synchronization.
 *
 * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
 * License:   BSD style: $(LICENSE)
 * Author:    Sean Kelly
 */
module tango.core.sync.Semaphore;


public import tango.core.Exception : SyncException;

import Time = tango.core.Time;

version( Win32 )
{
    private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
    private import tango.core.sync.Config;
    private import tango.stdc.errno;
    private import tango.stdc.posix.pthread;
    private import tango.stdc.posix.semaphore;
}


////////////////////////////////////////////////////////////////////////////////
// Semaphore
//
// void wait();
// void notify();
// bool tryWait();
////////////////////////////////////////////////////////////////////////////////


/**
 * This class represents a general counting semaphore as concieved by Edsger
 * Dijkstra.  As per Mesa type monitors however, "signal" has been replaced
 * with "notify" to indicate that control is not transferred to the waiter when
 * a notification is sent.
 */
class Semaphore
{
    ////////////////////////////////////////////////////////////////////////////
    // Initialization
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Initializes a semaphore object with the specified initial count.
     *
     * Params:
     *  count = The initial count for the semaphore.
     *
     * Throws:
     *  SyncException on error.
     */
    this( uint count = 0 )
    {
        version( Win32 )
        {
            m_hndl = CreateSemaphoreA( null, count, int.max, null );
            if( m_hndl == m_hndl.init )
                throw new SyncException( "Unable to create semaphore" );
        }
        else version( darwin ){
            creatingTask=mach_task_self();
            auto rc=semaphore_create(creatingTask,
                                   &m_hndl,
                                   MACH_SYNC_POLICY.SYNC_POLICY_FIFO,
                                   count);
            if( rc )
                throw new SyncException( "Unable to create semaphore" );
        }
        else version( Posix )
        {
            int rc = sem_init( &m_hndl, 0, count );
            if( rc )
                throw new SyncException( "Unable to create semaphore" );
        }
    }


    ~this()
    {
        version( Win32 )
        {
            BOOL rc = CloseHandle( m_hndl );
            assert( rc, "Unable to destroy semaphore" );
        }
		else version(darwin)
		{
            auto rc=semaphore_destroy(creatingTask, m_hndl);
            assert( !rc, "Unable to destroy semaphore" );
        }
		else version( Posix )
        {
            int rc = sem_destroy( &m_hndl );
            assert( !rc, "Unable to destroy semaphore" );
        }
    }


    ////////////////////////////////////////////////////////////////////////////
    // General Actions
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Wait until the current count is above zero, then atomically decrement
     * the count by one and return.
     *
     * Throws:
     *  SyncException on error.
     */
    void wait()
    {
        version( Win32 )
        {
            DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
            if( rc != WAIT_OBJECT_0 )
                throw new SyncException( "Unable to wait for semaphore" );
        }
        else version(darwin){
            while (true){
                auto rc=semaphore_wait(m_hndl);
                if (rc==KERN_RETURN.ABORTED){
                    if( errno != EINTR )
                        throw new SyncException( "Unable to wait for semaphore (abort)" );
                    // wait again
                } else if (rc!=0) {
                    throw new SyncException( "Unable to wait for semaphore" );
                } else {
                    break;
                }
            }
        }
        else version( Posix )
        {
            while( true )
            {
                if( !sem_wait( &m_hndl ) )
                    return;
                if( errno != EINTR )
                    throw new SyncException( "Unable to wait for semaphore" );
            }
        }
    }


    /**
     * Suspends the calling thread until the current count moves above zero or
     * until the supplied time period has elapsed.  If the count moves above
     * zero in this interval, then atomically decrement the count by one and
     * return true.  Otherwise, return false.  The supplied period may be up to
     * a maximum of (uint.max - 1) milliseconds.
     *
     * Params:
     *  period = The number of seconds to wait.
     *
     * In:
     *  period must be less than (uint.max - 1) milliseconds.
     *
     * Returns:
     *  true if notified before the timeout and false if not.
     *
     * Throws:
     *  SyncException on error.
     */
    bool wait( double period )
    in
    {
        assert( period * 1000 + 0.1 < uint.max - 1);
    }
    body
    {
        version( Win32 )
        {
            DWORD t = cast(DWORD)(period * 1000 + 0.1);
            switch( WaitForSingleObject( m_hndl, t ) )
            {
            case WAIT_OBJECT_0:
                return true;
            case WAIT_TIMEOUT:
                return false;
            default:
                throw new SyncException( "Unable to wait for semaphore" );
            }
        }
        else version(darwin){
            timespec t;

            adjTimespec( t, period );
            auto rc=semaphore_timedwait(m_hndl,t);
            if (rc==0){
                return true;
            } else if (rc==KERN_RETURN.OPERATION_TIMED_OUT){
                return false;
            } else if (rc==KERN_RETURN.ABORTED) {
                if( errno != EINTR )
                    throw new SyncException( "Unable to wait for semaphore (abort)" );
                return false; // wait can be too short, wait is not resumed
            } else {
                throw new SyncException( "Unable to wait for semaphore" );
            }
        }
        else version( Posix )
        {
            timespec t;

            getTimespec( t );
            adjTimespec( t, period );

            while( true )
            {
                if( !sem_timedwait( &m_hndl, &t ) )
                    return true;
                if( errno == ETIMEDOUT )
                    return false;
                if( errno != EINTR )
                    throw new SyncException( "Unable to wait for semaphore" );
            }
        }

        // -w trip
        //return false;
    }


    /**
     * Atomically increment the current count by one.  This will notify one
     * waiter, if there are any in the queue.
     *
     * Throws:
     *  SyncException on error.
     */
    void notify()
    {
        version( Win32 )
        {
            if( !ReleaseSemaphore( m_hndl, 1, null ) )
                throw new SyncException( "Unable to notify semaphore" );
        }
        else version(darwin){
            semaphore_signal(m_hndl);
        }
        else version( Posix )
        {
            int rc = sem_post( &m_hndl );
            if( rc )
                throw new SyncException( "Unable to notify semaphore" );
        }
    }


    /**
     * If the current count is equal to zero, return.  Otherwise, atomically
     * decrement the count by one and return true.
     *
     * Returns:
     *  true if the count was above zero and false if not.
     *
     * Throws:
     *  SyncException on error.
     */
    bool tryWait()
    {
        version( Win32 )
        {
            switch( WaitForSingleObject( m_hndl, 0 ) )
            {
            case WAIT_OBJECT_0:
                return true;
            case WAIT_TIMEOUT:
                return false;
            default:
                throw new SyncException( "Unable to wait for semaphore" );
            }
        }
        else version(darwin){
            return wait(0.0);
        }
        else version( Posix )
        {
            while( true )
            {
                if( !sem_trywait( &m_hndl ) )
                    return true;
                if( errno == EAGAIN )
                    return false;
                if( errno != EINTR )
                    throw new SyncException( "Unable to wait for semaphore" );
            }
        }

        // -w trip
        //return false;
    }


private:
    version( Win32 )
    {
        HANDLE  m_hndl;
    }
    else version(darwin){
        task_t creatingTask;
        semaphore_t m_hndl;
    }
    else version( Posix )
    {
        sem_t   m_hndl;
    }
}


////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////


debug( UnitTest )
{
    private import tango.core.Thread;


    void testWait()
    {
        auto semaphore    = new Semaphore;
        int  numToProduce = 10;
        bool allProduced  = false;
        auto synProduced  = new Object;
        int  numConsumed  = 0;
        auto synConsumed  = new Object;
        int  numConsumers = 10;
        int  numComplete  = 0;
        auto synComplete  = new Object;

        void consumer()
        {
            while( true )
            {
                semaphore.wait();

                synchronized( synProduced )
                {
                    if( allProduced )
                        break;
                }

                synchronized( synConsumed )
                {
                    ++numConsumed;
                }
            }

            synchronized( synComplete )
            {
                ++numComplete;
            }
        }

        void producer()
        {
            assert( !semaphore.tryWait() );

            for( int i = 0; i < numToProduce; ++i )
            {
                semaphore.notify();
                Thread.yield();
            }
            Thread.sleep(Time.seconds(1));
            synchronized( synProduced )
            {
                allProduced = true;
            }

            for( int i = 0; i < numConsumers; ++i )
            {
                semaphore.notify();
                Thread.yield();
            }

            for( int i = numConsumers * 10000; i > 0; --i )
            {
                synchronized( synComplete )
                {
                    if( numComplete == numConsumers )
                        break;
                }
                Thread.yield();
            }

            synchronized( synComplete )
            {
                assert( numComplete == numConsumers );
            }

            synchronized( synConsumed )
            {
                assert( numConsumed == numToProduce );
            }

            assert( !semaphore.tryWait() );
            semaphore.notify();
            assert( semaphore.tryWait() );
            assert( !semaphore.tryWait() );
        }

        auto group = new ThreadGroup;

        for( int i = 0; i < numConsumers; ++i )
            group.create( &consumer );
        group.create( &producer );
        group.joinAll();
    }


    void testWaitTimeout()
    {
        auto synReady   = new Object;
        auto semReady   = new Semaphore;
        bool waiting    = false;
        bool alertedOne = true;
        bool alertedTwo = true;

        void waiter()
        {
            synchronized( synReady )
            {
                waiting    = true;
            }
            alertedOne = semReady.wait( 0.1 );
            alertedTwo = semReady.wait( 0.1 );
        }

        auto thread = new Thread( &waiter );
        thread.start();

        while( true )
        {
            synchronized( synReady )
            {
                if( waiting )
                {
                    semReady.notify();
                    break;
                }
            }
            Thread.yield();
        }
        thread.join();
        assert( waiting && alertedOne && !alertedTwo );
    }


    unittest
    {
        testWait();
        testWaitTimeout();
    }
}