123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903 |
|
/*******************************************************************************
copyright: Copyright (c) 2006 Juan Jose Comellas. All rights reserved
license: BSD style: $(LICENSE)
author: Juan Jose Comellas $(EMAIL juanjo@comellas.com.ar)
*******************************************************************************/
module tango.io.selector.SelectSelector;
public import tango.io.model.IConduit;
private import Time = tango.core.Time;
public import tango.io.selector.model.ISelector;
private import tango.io.selector.AbstractSelector;
private import tango.io.selector.SelectorException;
private import tango.sys.Common;
private import tango.stdc.errno;
debug (selector)
{
private import tango.io.Stdout;
private import tango.text.convert.Integer;
}
version (Windows)
{
private import tango.core.Thread;
private
{
// Opaque struct
struct fd_set
{
}
extern (Windows) int select(int nfds, fd_set* readfds, fd_set* writefds,
fd_set* errorfds, timeval* timeout);
}
}
version (Posix)
{
private import tango.core.BitArray;
}
/**
* Selector that uses the select() system call to receive I/O events for
* the registered conduits. To use this class you would normally do
* something like this:
*
* Examples:
* ---
* import tango.io.selector.SelectSelector;
*
* Socket socket;
* ISelector selector = new SelectSelector();
*
* selector.open(100, 10);
*
* // Register to read from socket
* selector.register(socket, Event.Read);
*
* int eventCount = selector.select(0.1); // 0.1 seconds
* if (eventCount > 0)
* {
* // We can now read from the socket
* socket.read();
* }
* else if (eventCount == 0)
* {
* // Timeout
* }
* else if (eventCount == -1)
* {
* // Another thread called the wakeup() method.
* }
* else
* {
* // Error: should never happen.
* }
*
* selector.close();
* ---
*/
public class SelectSelector: AbstractSelector
{
/**
* Alias for the select() method as we're not reimplementing it in
* this class.
*/
alias AbstractSelector.select select;
uint _size;
private SelectionKey[ISelectable.Handle] _keys;
private HandleSet _readSet;
private HandleSet _writeSet;
private HandleSet _exceptionSet;
private HandleSet _selectedReadSet;
private HandleSet _selectedWriteSet;
private HandleSet _selectedExceptionSet;
int _eventCount;
version (Posix)
{
private ISelectable.Handle _maxfd = cast(ISelectable.Handle) -1;
/**
* Default number of SelectionKey's that will be handled by the
* SelectSelector.
*/
public const uint DefaultSize = 1024;
}
else
{
/**
* Default number of SelectionKey's that will be handled by the
* SelectSelector.
*/
public const uint DefaultSize = 63;
}
/**
* Open the select()-based selector.
*
* Params:
* size = maximum amount of conduits that will be registered;
* it will grow dynamically if needed.
* maxEvents = maximum amount of conduit events that will be
* returned in the selection set per call to select();
* this value is currently not used by this selector.
*/
override public void open(uint size = DefaultSize, uint maxEvents = DefaultSize)
in
{
assert(size > 0);
}
body
{
_size = size;
}
/**
* Close the selector.
*
* Remarks:
* It can be called multiple times without harmful side-effects.
*/
override public void close()
{
_size = 0;
_keys = null;
_readSet = HandleSet.init;
_writeSet = HandleSet.init;
_exceptionSet = HandleSet.init;
_selectedReadSet = HandleSet.init;
_selectedWriteSet = HandleSet.init;
_selectedExceptionSet = HandleSet.init;
}
private HandleSet *allocateSet(ref HandleSet set, ref HandleSet selectedSet)
{
if(!set.initialized)
{
set.setup(_size);
selectedSet.setup(_size);
}
return &set;
}
/**
* Associate a conduit to the selector and track specific I/O events.
* If a conduit is already associated with the selector, the events and
* attachment are upated.
*
* Params:
* conduit = conduit that will be associated to the selector;
* must be a valid conduit (i.e. not null and open).
* events = bit mask of Event values that represent the events
* that will be tracked for the conduit.
* attachment = optional object with application-specific data that
* will be available when an event is triggered for the
* conduit
*
* Throws:
* RegisteredConduitException if the conduit had already been
* registered to the selector.
*
* Examples:
* ---
* selector.register(conduit, Event.Read | Event.Write, object);
* ---
*/
override public void register(ISelectable conduit, Event events, Object attachment = null)
in
{
assert(conduit !is null && conduit.fileHandle());
}
body
{
ISelectable.Handle handle = conduit.fileHandle();
debug (selector)
Stdout.format("--- SelectSelector.register(handle={0}, events=0x{1:x})\n",
cast(int) handle, cast(uint) events);
SelectionKey *key = (handle in _keys);
if (key !is null)
{
if ((events & Event.Read) || (events & Event.Hangup))
{
allocateSet(_readSet, _selectedReadSet).set(handle);
}
else if (_readSet.initialized)
{
_readSet.clear(handle);
}
if ((events & Event.Write))
{
allocateSet(_writeSet, _selectedWriteSet).set(handle);
}
else if (_writeSet.initialized)
{
_writeSet.clear(handle);
}
if (events & Event.Error)
{
allocateSet(_exceptionSet, _selectedExceptionSet).set(handle);
}
else if (_exceptionSet.initialized)
{
_exceptionSet.clear(handle);
}
version (Posix)
{
if (handle > _maxfd)
_maxfd = handle;
}
key.events = events;
key.attachment = attachment;
}
else
{
// Keep record of the Conduits for whom we're tracking events.
_keys[handle] = SelectionKey(conduit, events, attachment);
if ((events & Event.Read) || (events & Event.Hangup))
{
allocateSet(_readSet, _selectedReadSet).set(handle);
}
if (events & Event.Write)
{
allocateSet(_writeSet, _selectedWriteSet).set(handle);
}
if (events & Event.Error)
{
allocateSet(_exceptionSet, _selectedExceptionSet).set(handle);
}
version (Posix)
{
if (handle > _maxfd)
_maxfd = handle;
}
}
}
/**
* Remove a conduit from the selector.
*
* Params:
* conduit = conduit that had been previously associated to the
* selector; it can be null.
*
* Remarks:
* Unregistering a null conduit is allowed and no exception is thrown
* if this happens.
*
* Throws:
* UnregisteredConduitException if the conduit had not been previously
* registered to the selector.
*/
override public void unregister(ISelectable conduit)
{
if (conduit !is null)
{
ISelectable.Handle handle = conduit.fileHandle();
debug (selector)
Stdout.format("--- SelectSelector.unregister(handle={0})\n",
cast(int) handle);
SelectionKey* removed = (handle in _keys);
if (removed !is null)
{
if (removed.events & Event.Error)
{
_exceptionSet.clear(handle);
}
if (removed.events & Event.Write)
{
_writeSet.clear(handle);
}
if ((removed.events & Event.Read) || (removed.events & Event.Hangup))
{
_readSet.clear(handle);
}
_keys.remove(handle);
version (Posix)
{
// If we're removing the biggest handle we've entered so far
// we need to recalculate this value for the set.
if (handle == _maxfd)
{
while (--_maxfd >= 0)
{
if (_readSet.isSet(_maxfd) ||
_writeSet.isSet(_maxfd) ||
_exceptionSet.isSet(_maxfd))
{
break;
}
}
}
}
}
else
{
debug (selector)
Stdout.format("--- SelectSelector.unregister(handle={0}): conduit was not found\n",
cast(int) conduit.fileHandle());
throw new UnregisteredConduitException(__FILE__, __LINE__);
}
}
}
/**
* Wait for I/O events from the registered conduits for a specified
* amount of time.
*
* Params:
* timeout = TimeSpan with the maximum amount of time that the
* selector will wait for events from the conduits; the
* amount of time is relative to the current system time
* (i.e. just the number of milliseconds that the selector
* has to wait for the events).
*
* Returns:
* The amount of conduits that have received events; 0 if no conduits
* have received events within the specified timeout; and -1 if the
* wakeup() method has been called from another thread.
*
* Throws:
* InterruptedSystemCallException if the underlying system call was
* interrupted by a signal and the 'restartInterruptedSystemCall'
* property was set to false; SelectorException if there were no
* resources available to wait for events from the conduits.
*/
override public int select(TimeSpan timeout)
{
fd_set *readfds;
fd_set *writefds;
fd_set *exceptfds;
timeval tv;
version (Windows)
bool handlesAvailable = false;
debug (selector)
Stdout.format("--- SelectSelector.select(timeout={0} msec)\n", timeout.millis);
if (_readSet.initialized)
{
debug (selector)
_readSet.dump("_readSet");
version (Windows)
handlesAvailable = handlesAvailable || (_readSet.length > 0);
readfds = cast(fd_set*) _selectedReadSet.copy(_readSet);
}
if (_writeSet.initialized)
{
debug (selector)
_writeSet.dump("_writeSet");
version (Windows)
handlesAvailable = handlesAvailable || (_writeSet.length > 0);
writefds = cast(fd_set*) _selectedWriteSet.copy(_writeSet);
}
if (_exceptionSet.initialized)
{
debug (selector)
_exceptionSet.dump("_exceptionSet");
version (Windows)
handlesAvailable = handlesAvailable || (_exceptionSet.length > 0);
exceptfds = cast(fd_set*) _selectedExceptionSet.copy(_exceptionSet);
}
version (Posix)
{
while (true)
{
toTimeval(&tv, timeout);
// FIXME: add support for the wakeup() call.
_eventCount = .select(_maxfd + 1, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv);
debug (selector)
Stdout.format("--- .select() returned {0} (maxfd={1})\n",
_eventCount, cast(int) _maxfd);
if (_eventCount >= 0)
{
break;
}
else
{
if (errno != EINTR || !_restartInterruptedSystemCall)
{
// checkErrno() always throws an exception
checkErrno(__FILE__, __LINE__);
}
debug (selector)
Stdout.print("--- Restarting select() after being interrupted\n");
}
}
}
else
{
// Windows returns an error when select() is called with all three
// handle sets empty, so we emulate the POSIX behavior by calling
// Thread.sleep().
if (handlesAvailable)
{
toTimeval(&tv, timeout);
// FIXME: Can a system call be interrupted on Windows?
_eventCount = .select(uint.max, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv);
debug (selector)
Stdout.format("--- .select() returned {0}\n", _eventCount);
}
else
{
Thread.sleep(Time.seconds(timeout.interval()));
_eventCount = 0;
}
}
return _eventCount;
}
/**
* Return the selection set resulting from the call to any of the
* select() methods.
*
* Remarks:
* If the call to select() was unsuccessful or it did not return any
* events, the returned value will be null.
*/
override public ISelectionSet selectedSet()
{
return (_eventCount > 0 ? new SelectSelectionSet(_keys, cast(uint) _eventCount, _selectedReadSet,
_selectedWriteSet, _selectedExceptionSet) : null);
}
/**
* Return the selection key resulting from the registration of a
* conduit to the selector.
*
* Remarks:
* If the conduit is not registered to the selector the returned
* value will be null. No exception will be thrown by this method.
*/
override public SelectionKey key(ISelectable conduit)
{
if(conduit !is null)
{
if(auto k = conduit.fileHandle in _keys)
{
return *k;
}
}
return SelectionKey.init;
}
/**
* Return the number of keys resulting from the registration of a conduit
* to the selector.
*/
override public size_t count()
{
return _keys.length;
}
/**
* Iterate through the currently registered selection keys. Note that
* you should not erase or add any items from the selector while
* iterating, although you can register existing conduits again.
*/
int opApply(scope int delegate(ref SelectionKey) dg)
{
int result = 0;
foreach(v; _keys)
{
if((result = dg(v)) != 0)
break;
}
return result;
}
}
/**
* SelectionSet for the select()-based Selector.
*/
private class SelectSelectionSet: ISelectionSet
{
SelectionKey[ISelectable.Handle] _keys;
uint _eventCount;
HandleSet _readSet;
HandleSet _writeSet;
HandleSet _exceptionSet;
this(SelectionKey[ISelectable.Handle] keys, uint eventCount,
HandleSet readSet, HandleSet writeSet, HandleSet exceptionSet)
{
_keys = keys;
_eventCount = eventCount;
_readSet = readSet;
_writeSet = writeSet;
_exceptionSet = exceptionSet;
}
@property size_t length()
{
return _eventCount;
}
int opApply(scope int delegate(ref SelectionKey) dg)
{
int rc = 0;
ISelectable.Handle handle;
Event events;
debug (selector)
Stdout.format("--- SelectSelectionSet.opApply() ({0} elements)\n", _eventCount);
foreach (SelectionKey current; _keys)
{
handle = current.conduit.fileHandle();
if (_readSet.isSet(handle))
events = Event.Read;
else
events = Event.None;
if (_writeSet.isSet(handle))
events |= Event.Write;
if (_exceptionSet.isSet(handle))
events |= Event.Error;
// Only invoke the delegate if there is an event for the conduit.
if (events != Event.None)
{
current.events = events;
debug (selector)
Stdout.format("--- Calling foreach delegate with selection key ({0}, 0x{1:x})\n",
cast(int) handle, cast(uint) events);
if ((rc = dg(current)) != 0)
{
break;
}
}
else
{
debug (selector)
Stdout.format("--- Handle {0} doesn't have pending events\n",
cast(int) handle);
}
}
return rc;
}
}
version (Windows)
{
/**
* Helper class used by the select()-based Selector to store handles.
* On Windows the handles are kept in an array of uints and the first
* element of the array stores the array "length" (i.e. number of handles
* in the array). Everything is stored so that the native select() API
* can use the HandleSet without additional conversions by just casting it
* to a fd_set*.
*/
private struct HandleSet
{
/** Default number of handles that will be held in the HandleSet. */
const uint DefaultSize = 63;
uint[] _buffer;
/**
* Constructor. Sets the initial number of handles that will be held
* in the HandleSet.
*/
void setup(uint size = DefaultSize)
{
_buffer = new uint[1 + size];
_buffer[0] = 0;
}
/**
* return true if this handle set has been initialized.
*/
@property bool initialized()
{
return _buffer.length > 0;
}
/**
* Return the number of handles present in the HandleSet.
*/
@property uint length()
{
return _buffer[0];
}
/**
* Add the handle to the set.
*/
void set(ISelectable.Handle handle)
in
{
assert(handle);
}
body
{
if (!isSet(handle))
{
// If we added too many sockets we increment the size of the buffer
if (++_buffer[0] >= _buffer.length)
{
_buffer.length = _buffer[0] + 1;
}
_buffer[_buffer[0]] = cast(uint) handle;
}
}
/**
* Remove the handle from the set.
*/
void clear(ISelectable.Handle handle)
{
for (uint i = 1; i <= _buffer[0]; ++i)
{
if (_buffer[i] == cast(uint) handle)
{
// We don't need to keep the handles in the order in which
// they were inserted, so we optimize the removal by
// copying the last element to the position of the removed
// element.
if (i != _buffer[0])
{
_buffer[i] = _buffer[_buffer[0]];
}
_buffer[0]--;
return;
}
}
}
/**
* Copy the contents of the HandleSet into this instance.
*/
HandleSet copy(HandleSet handleSet)
{
if(handleSet._buffer.length > _buffer.length)
{
_buffer.length = handleSet._buffer[0] + 1;
}
_buffer[] = handleSet._buffer[0.._buffer.length];
return this;
}
/**
* Check whether the handle has been set.
*/
public bool isSet(ISelectable.Handle handle)
{
if(_buffer.length == 0)
return false;
uint* start;
uint* stop;
for (start = _buffer.ptr + 1, stop = start + _buffer[0]; start != stop; start++)
{
if (*start == cast(uint) handle)
return true;
}
return false;
}
/**
* Cast the current object to a pointer to an fd_set, to be used with the
* select() system call.
*/
public fd_set* opCast()
{
return cast(fd_set*) _buffer.ptr;
}
debug (selector)
{
/**
* Dump the contents of a HandleSet into stdout.
*/
void dump(const(char)[] name = null)
{
if (_buffer !is null && _buffer.length > 0 && _buffer[0] > 0)
{
const(char)[] handleStr = new char[16];
const(char)[] handleListStr;
bool isFirst = true;
if (name is null)
{
name = "HandleSet";
}
for (uint i = 1; i < _buffer[0]; ++i)
{
if (!isFirst)
{
handleListStr ~= ", ";
}
else
{
isFirst = false;
}
handleListStr ~= itoa(handleStr, _buffer[i]);
}
Stdout.formatln("--- {0}[{1}]: {2}", name, _buffer[0], handleListStr);
}
}
}
}
}
else version (Posix)
{
private import tango.core.BitManip;
/**
* Helper class used by the select()-based Selector to store handles.
* On POSIX-compatible platforms the handles are kept in an array of bits.
* Everything is stored so that the native select() API can use the
* HandleSet without additional conversions by casting it to a fd_set*.
*/
private struct HandleSet
{
/** Default number of handles that will be held in the HandleSet. */
const uint DefaultSize = 1024;
BitArray _buffer;
/**
* Constructor. Sets the initial number of handles that will be held
* in the HandleSet.
*/
void setup(uint size = DefaultSize)
{
if (size < 1024)
size = 1024;
_buffer.length = size;
}
/**
* Return true if the handleset has been initialized
*/
@property bool initialized()
{
return _buffer.length > 0;
}
/**
* Add a handle to the set.
*/
public void set(ISelectable.Handle handle)
{
// If we added too many sockets we increment the size of the buffer
uint fd = cast(uint)handle;
if(fd >= _buffer.length)
_buffer.length = fd + 1;
_buffer[fd] = true;
}
/**
* Remove a handle from the set.
*/
public void clear(ISelectable.Handle handle)
{
auto fd = cast(uint)handle;
if(fd < _buffer.length)
_buffer[fd] = false;
}
/**
* Copy the contents of the HandleSet into this instance.
*/
HandleSet copy(HandleSet handleSet)
{
//
// adjust the length if necessary
//
if(handleSet._buffer.length != _buffer.length)
_buffer.length = handleSet._buffer.length;
_buffer[] = handleSet._buffer;
return this;
}
/**
* Check whether the handle has been set.
*/
bool isSet(ISelectable.Handle handle)
{
auto fd = cast(uint)handle;
if(fd < _buffer.length)
return _buffer[fd];
return false;
}
/**
* Cast the current object to a pointer to an fd_set, to be used with the
* select() system call.
*/
fd_set* opCast()
{
return cast(fd_set*) _buffer.ptr;
}
debug (selector)
{
/**
* Dump the contents of a HandleSet into stdout.
*/
void dump(const(char)[] name = null)
{
if (_buffer !is null && _buffer.length > 0)
{
const(char)[] handleStr = new char[16];
const(char)[] handleListStr;
bool isFirst = true;
if (name is null)
{
name = "HandleSet";
}
for (uint i = 0; i < _buffer.length * _buffer[0].sizeof; ++i)
{
if (isSet(cast(ISelectable.Handle) i))
{
if (!isFirst)
{
handleListStr ~= ", ";
}
else
{
isFirst = false;
}
handleListStr ~= itoa(handleStr, i);
}
}
Stdout.formatln("--- {0}: {1}", name, handleListStr);
}
}
}
}
}
|