123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
|
/*******************************************************************************
copyright: Copyright (c) 2004 Kris Bell. All rights reserved
license: BSD style: $(LICENSE)
version: Mar 2004: Initial release
author: Kris
*******************************************************************************/
module tango.io.device.Conduit;
private import tango.core.Thread,
tango.core.Exception;
public import tango.io.model.IConduit;
/*******************************************************************************
Conduit abstract base-class, implementing interface IConduit.
Only the conduit-specific read(), write(), detach() and
bufferSize() need to be implemented for a concrete conduit
implementation. See File for an example.
Conduits provide virtualized access to external content, and
represent things like files or Internet connections. Conduits
expose a pair of streams, are modelled by tango.io.model.IConduit,
and are implemented via classes such as File & SocketConduit.
Additional kinds of conduit are easy to construct: either subclass
tango.io.device.Conduit, or implement tango.io.model.IConduit. A
conduit typically reads and writes via a Buffer in large chunks,
typically the entire buffer. Alternatively, one can invoke method
read(dst[]) and/or write(src[]) directly.
*******************************************************************************/
class Conduit : IConduit
{
version(TangoRuntime)
{
protected Fiber.Scheduler scheduler; // optional scheduler
}
private uint duration = -1; // scheduling timeout
/***********************************************************************
Test for asynchronous capability. This will be eligable
for scheduling where (a) it is created within a fiber and
(b) there is a scheduler attached to the fiber at the time
this is invoked.
Note that fibers may schedule just one outstanding I/O
request at a time.
***********************************************************************/
this ()
{
auto f = Fiber.getThis();
version(TangoRuntime)
{
if (f)
scheduler = f.event.scheduler;
}
}
/***********************************************************************
Clean up when collected. See method detach().
***********************************************************************/
~this ()
{
detach();
}
/***********************************************************************
Return the name of this conduit.
***********************************************************************/
abstract override string toString ();
/***********************************************************************
Return a preferred size for buffering conduit I/O.
***********************************************************************/
abstract const size_t bufferSize ();
/***********************************************************************
Read from conduit into a target array. The provided dst
will be populated with content from the conduit.
Returns the number of bytes read, which may be less than
requested in dst. Eof is returned whenever an end-of-flow
condition arises.
***********************************************************************/
abstract size_t read (void[] dst);
/***********************************************************************
Write to conduit from a source array. The provided src
content will be written to the conduit.
Returns the number of bytes written from src, which may
be less than the quantity provided. Eof is returned when
an end-of-flow condition arises.
***********************************************************************/
abstract size_t write (const(void)[] src);
/***********************************************************************
Disconnect this conduit. Note that this may be invoked
both explicitly by the user, and implicitly by the GC.
Be sure to manage multiple detachment requests correctly:
set a flag, or sentinel value as necessary.
***********************************************************************/
abstract void detach ();
/***********************************************************************
Set the active timeout period for IO calls (in milliseconds.)
***********************************************************************/
@property final void timeout (uint millisec)
{
duration = millisec;
}
/***********************************************************************
Get the active timeout period for IO calls (in milliseconds.)
***********************************************************************/
@property final const uint timeout ()
{
return duration;
}
/***********************************************************************
Is the conduit alive? Default behaviour returns true.
***********************************************************************/
@property const bool isAlive ()
{
return true;
}
/***********************************************************************
Return the host. This is part of the Stream interface.
***********************************************************************/
@property final IConduit conduit ()
{
return this;
}
/***********************************************************************
Emit buffered output or reset buffered input.
***********************************************************************/
IOStream flush ()
{
return this;
}
/***********************************************************************
Close this conduit.
Both input and output are detached, and are no longer usable.
***********************************************************************/
void close ()
{
this.detach();
}
/***********************************************************************
Throw an IOException, with the provided message.
***********************************************************************/
final void error (const(char[]) msg)
{
throw new IOException (msg.idup);
}
/***********************************************************************
Return the input stream.
***********************************************************************/
@property final InputStream input ()
{
return this;
}
/***********************************************************************
Return the output stream.
***********************************************************************/
@property final OutputStream output ()
{
return this;
}
/***********************************************************************
Emit fixed-length content from 'src' into this conduit,
throwing an IOException upon Eof.
***********************************************************************/
final Conduit put (void[] src)
{
put (src, this);
return this;
}
/***********************************************************************
Consume fixed-length content into 'dst' from this conduit,
throwing an IOException upon Eof.
***********************************************************************/
final Conduit get (void[] dst)
{
get (dst, this);
return this;
}
/***********************************************************************
Rewind to beginning of file.
***********************************************************************/
final Conduit rewind ()
{
seek (0);
return this;
}
/***********************************************************************
Transfer the content of another conduit to this one. Returns
the dst OutputStream, or throws IOException on failure.
***********************************************************************/
OutputStream copy (InputStream src, size_t max = -1)
{
transfer (src, this, max);
return this;
}
/***********************************************************************
Seek on this stream. Source conduits that don't support
seeking will throw an IOException.
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
error (this.toString() ~ " does not support seek requests");
return 0;
}
/***********************************************************************
Load text from a stream, and return them all in an
array.
Returns an array representing the content, and throws
IOException on error.
***********************************************************************/
char[] text(T=char) (size_t max = -1)
{
return cast(T[]) load (max);
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume the input.
Returns an array representing the content, and throws
IOException on error.
***********************************************************************/
void[] load (size_t max = -1)
{
return load (this, max);
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume input.
Returns an array representing the content, and throws
IOException on error.
***********************************************************************/
static void[] load (InputStream src, size_t max=-1)
{
void[] dst;
size_t i,
len,
chunk;
if (max != -1)
chunk = max;
else
chunk = src.conduit.bufferSize;
while (len < max)
{
if (dst.length - len is 0)
dst.length = len + chunk;
if ((i = src.read (dst[len .. $])) is Eof)
break;
len += i;
}
return dst [0 .. len];
}
/***********************************************************************
Emit fixed-length content from 'src' into 'output',
throwing an IOException upon Eof.
***********************************************************************/
static void put (void[] src, OutputStream output)
{
while (src.length)
{
auto i = output.write (src);
if (i is Eof)
output.conduit.error ("Conduit.put :: eof while writing");
src = src [i..$];
}
}
/***********************************************************************
Consume fixed-length content into 'dst' from 'input',
throwing an IOException upon Eof.
***********************************************************************/
static void get (void[] dst, InputStream input)
{
while (dst.length)
{
auto i = input.read (dst);
if (i is Eof)
input.conduit.error ("Conduit.get :: eof while reading");
dst = dst [i..$];
}
}
/***********************************************************************
Low-level data transfer, where max represents the maximum
number of bytes to transfer.
Returns Eof on failure, number of bytes copied on success.
***********************************************************************/
static size_t transfer (InputStream src, OutputStream dst, size_t max=-1)
{
byte[8192] tmp;
size_t done;
while (max)
{
auto len = max;
if (len > tmp.length)
len = tmp.length;
if ((len = src.read(tmp[0 .. len])) is Eof)
max = 0;
else
{
max -= len;
done += len;
auto p = tmp.ptr;
for (size_t j=0; len > 0; len -= j, p += j)
if ((j = dst.write (p[0 .. len])) is Eof)
return Eof;
}
}
return done;
}
}
/*******************************************************************************
Base class for input stream filtering. The provided source stream
should generally never be null, though some filters have a need to
set this lazily.
*******************************************************************************/
class InputFilter : InputStream
{
protected InputStream source;
/***********************************************************************
Attach to the provided stream. The provided source stream
should generally never be null, though some filters have a
need to set this lazily.
***********************************************************************/
this (InputStream source)
{
this.source = source;
}
/***********************************************************************
Return the hosting conduit.
***********************************************************************/
@property IConduit conduit ()
{
return source.conduit;
}
/***********************************************************************
Read from conduit into a target array. The provided dst
will be populated with content from the conduit.
Returns the number of bytes read, which may be less than
requested in dst. Eof is returned whenever an end-of-flow
condition arises.
***********************************************************************/
size_t read (void[] dst)
{
return source.read (dst);
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume the input.
Returns an array representing the content, and throws
IOException on error.
***********************************************************************/
void[] load (size_t max = -1)
{
return Conduit.load (this, max);
}
/***********************************************************************
Clear any buffered content.
***********************************************************************/
IOStream flush ()
{
source.flush();
return this;
}
/***********************************************************************
Seek on this stream. Target conduits that don't support
seeking will throw an IOException.
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
return source.seek (offset, anchor);
}
/***********************************************************************
Return the upstream host of this filter.
***********************************************************************/
@property InputStream input ()
{
return source;
}
/***********************************************************************
Close the input.
***********************************************************************/
void close ()
{
source.close();
}
}
/*******************************************************************************
Base class for output stream filtering. The provided sink stream
should generally never be null, though some filters have a need to
set this lazily.
*******************************************************************************/
class OutputFilter : OutputStream
{
protected OutputStream sink;
/***********************************************************************
Attach to the provided stream.
***********************************************************************/
this (OutputStream sink)
{
this.sink = sink;
}
/***********************************************************************
Return the hosting conduit.
***********************************************************************/
@property IConduit conduit ()
{
return sink.conduit;
}
/***********************************************************************
Write to conduit from a source array. The provided src
content will be written to the conduit.
Returns the number of bytes written from src, which may
be less than the quantity provided. Eof is returned when
an end-of-flow condition arises.
***********************************************************************/
size_t write (const(void)[] src)
{
return sink.write (src);
}
/***********************************************************************
Transfer the content of another conduit to this one. Returns
a reference to this class, or throws IOException on failure.
***********************************************************************/
OutputStream copy (InputStream src, size_t max = -1)
{
Conduit.transfer (src, this, max);
return this;
}
/***********************************************************************
Emit/purge buffered content.
***********************************************************************/
IOStream flush ()
{
sink.flush();
return this;
}
/***********************************************************************
Seek on this stream. Target conduits that don't support
seeking will throw an IOException.
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
return sink.seek (offset, anchor);
}
/***********************************************************************
Return the upstream host of this filter.
***********************************************************************/
@property OutputStream output ()
{
return sink;
}
/***********************************************************************
Close the output.
***********************************************************************/
void close ()
{
sink.close();
}
}
|