123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
/*******************************************************************************

        copyright:      Copyright (c) 2008 Steven Schveighoffer.
                        All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Jun 2008: Initial release

        author:         schveiguy

*******************************************************************************/

module tango.io.device.ThreadPipe;

private import tango.core.Exception;

private import tango.io.device.Conduit;

private import tango.core.sync.Condition;

/**
 * Conduit to support a data stream between 2 threads.  One creates a
 * ThreadPipe, then uses the OutputStream and the InputStream from it to
 * communicate.  All traffic is automatically synchronized, so one just uses
 * the streams like they were normal device streams.
 *
 * It works by maintaining a circular buffer, where data is written to, and
 * read from, in a FIFO fashion.
 * ---
 * auto tc = new ThreadPipe;
 * void outFunc()
 * {
 *   Stdout.copy(tc.input);
 * }
 *
 * auto t = new Thread(&outFunc);
 * t.start();
 * tc.write("hello, thread!");
 * tc.close();
 * t.join();
 * ---
 */
class ThreadPipe : Conduit
{
    private bool _closed;
    private size_t _readIdx, _remaining;
    private void[] _buf;
    private Mutex _mutex;
    private Condition _condition;

    /**
     * Create a new ThreadPipe with the given buffer size.
     *
     * Params:
     * bufferSize = The size to allocate the buffer.
     */
    this(size_t bufferSize=(1024*16))
    {
        _buf = new ubyte[bufferSize];
        _closed = false;
        _readIdx = _remaining = 0;
        _mutex = new Mutex;
        _condition = new Condition(_mutex);
    }

    /**
     * Implements IConduit.bufferSize.
     *
     * Returns the appropriate buffer size that should be used to buffer the
     * ThreadPipe.  Note that this is simply the buffer size passed in, and
     * since all the ThreadPipe data is in memory, buffering doesn't make
     * much sense.
     */
    override const size_t bufferSize()
    {
        return _buf.length;
    }

    /**
     * Implements IConduit.toString
     *
     * Returns "<thread conduit>"
     */
    override string toString()
    {
        return "<threadpipe>";
    }

    /**
     * Returns true if there is data left to be read, and the write end isn't
     * closed.
     */
    override const bool isAlive()
    {
        synchronized(_mutex)
        {
            return !_closed || _remaining != 0;
        }
    }

    /**
     * Return the number of bytes remaining to be read in the circular buffer.
     */
    size_t remaining()
    {
        synchronized(_mutex)
            return _remaining;
    }

    /**
     * Return the number of bytes that can be written to the circular buffer.
     */
    size_t writable()
    {
        synchronized(_mutex)
            return _buf.length - _remaining;
    }

    /**
     * Close the write end of the conduit.  Writing to the conduit after it is
     * closed will return Eof.
     *
     * The read end is not closed until the buffer is empty.
     */
    void stop()
    {
        //
        // close write end.  The read end can stay open until the remaining
        // bytes are read.
        //
        synchronized(_mutex)
        {
            _closed = true;
            _condition.notifyAll();
        }
    }

    /**
     * This does nothing because we have no clue whether the members have been
     * collected, and detach is run in the destructor.  To stop communications,
     * use stop().
     *
     * TODO: move stop() functionality to detach when it becomes possible to
     * have fully-owned members
     */
    override void detach()
    {
    }

    /**
     * Implements InputStream.read.
     *
     * Read from the conduit into a target array.  The provided dst will be
     * populated with content from the stream.
     *
     * Returns the number of bytes read, which may be less than requested in
     * dst. Eof is returned whenever an end-of-flow condition arises.
     */
    override size_t read(void[] dst)
    {
        //
        // don't block for empty read
        //
        if(dst.length == 0)
            return 0;
        synchronized(_mutex)
        {
            //
            // see if any remaining data is present
            //
            size_t r;
            while((r = _remaining) == 0 && !_closed)
                _condition.wait();

            //
            // read all data that is available
            //
            if(r == 0)
                return Eof;
            if(r > dst.length)
                r = dst.length;

            auto result = r;

            //
            // handle wrapping
            //
            if(_readIdx + r >= _buf.length)
            {
                size_t x = _buf.length - _readIdx;
                dst[0..x] = _buf[_readIdx..$];
                _readIdx = 0;
                _remaining -= x;
                r -= x;
                dst = dst[x..$];
            }

            dst[0..r] = _buf[_readIdx..(_readIdx + r)];
            _readIdx = (_readIdx + r) % _buf.length;
            _remaining -= r;
            _condition.notifyAll();
            return result;
        }
    }

    /**
     * Implements InputStream.clear().
     *
     * Clear any buffered content.
     */
    ThreadPipe clear()
    {
        synchronized(_mutex)
        {
            if(_remaining != 0)
            {
                /*
                 * this isn't technically necessary, but we do it because it
                 * preserves the most recent data first
                 */
                _readIdx = (_readIdx + _remaining) % _buf.length;
                _remaining = 0;
                _condition.notifyAll();
            }
        }
        return this;
    }

    /**
     * Implements OutputStream.write.
     *
     * Write to stream from a source array. The provided src content will be
     * written to the stream.
     *
     * Returns the number of bytes written from src, which may be less than
     * the quantity provided. Eof is returned when an end-of-flow condition
     * arises.
     */
    override size_t write(const(void)[] src)
    {
        //
        // don't block for empty write
        //
        if(src.length == 0)
            return 0;
        synchronized(_mutex)
        {
            size_t w;
            while((w = _buf.length - _remaining) == 0 && !_closed)
                _condition.wait();

            if(_closed)
                return Eof;

            if(w > src.length)
                w = src.length;

            auto writeIdx = (_readIdx + _remaining) % _buf.length;

            auto result = w;

            if(w + writeIdx >= _buf.length)
            {
                auto x = _buf.length - writeIdx;
                _buf[writeIdx..$] = src[0..x];
                writeIdx = 0;
                w -= x;
                _remaining += x;
                src = src[x..$];
            }
            _buf[writeIdx..(writeIdx + w)] = src[0..w];
            _remaining += w;
            _condition.notifyAll();
            return result;
        }
    }
}

debug(UnitTest)
{
    import tango.core.Thread;

    unittest
    {
        auto source = new uint[1000];
        foreach(i, ref x; source)
            x = cast(uint)i;

        ThreadPipe tp = new ThreadPipe(16);
        void threadA()
        {
            void[] sourceBuf = source;
            while(sourceBuf.length > 0)
            {
                sourceBuf = sourceBuf[tp.write(sourceBuf)..$];
            }
            tp.stop();
        }
        Thread a = new Thread(&threadA);
        a.start();
        int readval;
        int last = -1;
        size_t nread;
        while((nread = tp.read((&readval)[0..1])) == readval.sizeof)
        {
            assert(readval == last + 1);
            last = readval;
        }
        assert(nread == tp.Eof);
        a.join();
    }
}