|| /******************************************************************************* copyright: Copyright (c) 2008 Steven Schveighoffer. All rights reserved license: BSD style: $(LICENSE) version: Jun 2008: Initial release author: schveiguy *******************************************************************************/ module tango.io.device.ThreadPipe; private import tango.core.Exception; private import tango.io.device.Conduit; private import tango.core.sync.Condition; /** * Conduit to support a data stream between 2 threads. One creates a * ThreadPipe, then uses the OutputStream and the InputStream from it to * communicate. All traffic is automatically synchronized, so one just uses * the streams like they were normal device streams. * * It works by maintaining a circular buffer, where data is written to, and * read from, in a FIFO fashion. * --- * auto tc = new ThreadPipe; * void outFunc() * { * Stdout.copy(tc.input); * } * * auto t = new Thread(&outFunc); * t.start(); * tc.write("hello, thread!"); * tc.close(); * t.join(); * --- */ class ThreadPipe : Conduit { private bool _closed; private size_t _readIdx, _remaining; private void[] _buf; private Mutex _mutex; private Condition _condition; /** * Create a new ThreadPipe with the given buffer size. * * Params: * bufferSize = The size to allocate the buffer. */ this(size_t bufferSize=(1024*16)) { _buf = new ubyte[bufferSize]; _closed = false; _readIdx = _remaining = 0; _mutex = new Mutex; _condition = new Condition(_mutex); } /** * Implements IConduit.bufferSize. * * Returns the appropriate buffer size that should be used to buffer the * ThreadPipe. Note that this is simply the buffer size passed in, and * since all the ThreadPipe data is in memory, buffering doesn't make * much sense. */ override const size_t bufferSize() { return _buf.length; } /** * Implements IConduit.toString * * Returns "<thread conduit>" */ override string toString() { return "<threadpipe>"; } /** * Returns true if there is data left to be read, and the write end isn't * closed. */ override const bool isAlive() { synchronized(_mutex) { return !_closed || _remaining != 0; } } /** * Return the number of bytes remaining to be read in the circular buffer. */ size_t remaining() { synchronized(_mutex) return _remaining; } /** * Return the number of bytes that can be written to the circular buffer. */ size_t writable() { synchronized(_mutex) return _buf.length - _remaining; } /** * Close the write end of the conduit. Writing to the conduit after it is * closed will return Eof. * * The read end is not closed until the buffer is empty. */ void stop() { // // close write end. The read end can stay open until the remaining // bytes are read. // synchronized(_mutex) { _closed = true; _condition.notifyAll(); } } /** * This does nothing because we have no clue whether the members have been * collected, and detach is run in the destructor. To stop communications, * use stop(). * * TODO: move stop() functionality to detach when it becomes possible to * have fully-owned members */ override void detach() { } /** * Implements InputStream.read. * * Read from the conduit into a target array. The provided dst will be * populated with content from the stream. * * Returns the number of bytes read, which may be less than requested in * dst. Eof is returned whenever an end-of-flow condition arises. */ override size_t read(void[] dst) { // // don't block for empty read // if(dst.length == 0) return 0; synchronized(_mutex) { // // see if any remaining data is present // size_t r; while((r = _remaining) == 0 && !_closed) _condition.wait(); // // read all data that is available // if(r == 0) return Eof; if(r > dst.length) r = dst.length; auto result = r; // // handle wrapping // if(_readIdx + r >= _buf.length) { size_t x = _buf.length - _readIdx; dst[0..x] = _buf[_readIdx..$]; _readIdx = 0; _remaining -= x; r -= x; dst = dst[x..$]; } dst[0..r] = _buf[_readIdx..(_readIdx + r)]; _readIdx = (_readIdx + r) % _buf.length; _remaining -= r; _condition.notifyAll(); return result; } } /** * Implements InputStream.clear(). * * Clear any buffered content. */ ThreadPipe clear() { synchronized(_mutex) { if(_remaining != 0) { /* * this isn't technically necessary, but we do it because it * preserves the most recent data first */ _readIdx = (_readIdx + _remaining) % _buf.length; _remaining = 0; _condition.notifyAll(); } } return this; } /** * Implements OutputStream.write. * * Write to stream from a source array. The provided src content will be * written to the stream. * * Returns the number of bytes written from src, which may be less than * the quantity provided. Eof is returned when an end-of-flow condition * arises. */ override size_t write(const(void)[] src) { // // don't block for empty write // if(src.length == 0) return 0; synchronized(_mutex) { size_t w; while((w = _buf.length - _remaining) == 0 && !_closed) _condition.wait(); if(_closed) return Eof; if(w > src.length) w = src.length; auto writeIdx = (_readIdx + _remaining) % _buf.length; auto result = w; if(w + writeIdx >= _buf.length) { auto x = _buf.length - writeIdx; _buf[writeIdx..$] = src[0..x]; writeIdx = 0; w -= x; _remaining += x; src = src[x..$]; } _buf[writeIdx..(writeIdx + w)] = src[0..w]; _remaining += w; _condition.notifyAll(); return result; } } } debug(UnitTest) { import tango.core.Thread; unittest { auto source = new uint[1000]; foreach(i, ref x; source) x = cast(uint)i; ThreadPipe tp = new ThreadPipe(16); void threadA() { void[] sourceBuf = source; while(sourceBuf.length > 0) { sourceBuf = sourceBuf[tp.write(sourceBuf)..$]; } tp.stop(); } Thread a = new Thread(&threadA); a.start(); int readval; int last = -1; size_t nread; while((nread = tp.read((&readval)[0..1])) == readval.sizeof) { assert(readval == last + 1); last = readval; } assert(nread == tp.Eof); a.join(); } } |