123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
/*******************************************************************************

        copyright:      Copyright (c) 2004 Kris Bell. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Mar 2004: Initial release
        version:        Jan 2005: RedShodan patch for timeout query
        version:        Dec 2006: Outback release
        version:        Apr 2009: revised for asynchronous IO

        author:         Kris

*******************************************************************************/

module tango.net.device.Socket;

private import tango.sys.Common;

private import tango.io.device.Conduit;

package import tango.net.device.Berkeley;

/*******************************************************************************

*******************************************************************************/

version (Windows)
{
         private import tango.sys.win32.WsaSock;
}

/*******************************************************************************

        A wrapper around the Berkeley API to implement the IConduit 
        abstraction and add stream-specific functionality.

*******************************************************************************/

class Socket : Conduit, ISelectable
{
        public alias native socket;             // backward compatibility

        private SocketSet pending;              // synchronous timeouts   
        private Berkeley  berkeley;             // wrap a berkeley socket


        /// see super.timeout(int)
        deprecated void setTimeout (double t) 
        {
                timeout = cast(uint) (t * 1000);
        }

        deprecated bool hadTimeout ()
        {
                return false;
        }

        /***********************************************************************
        
                Create a streaming Internet socket

        ***********************************************************************/

        this ()
        {
                this (AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
        }

        /***********************************************************************
        
                Create an Internet Socket with the provided characteristics

        ***********************************************************************/

        this (Address addr) 
        { 
                this (addr.addressFamily, SocketType.STREAM, ProtocolType.TCP); 
        }
                                
        /***********************************************************************
        
                Create an Internet socket

        ***********************************************************************/

        this (AddressFamily family, SocketType type, ProtocolType protocol)
        {
                berkeley.open (family, type, protocol);
                version (Windows) version(TangoRuntime)
                         if (scheduler)
                             scheduler.open (fileHandle, toString);
        }

        /***********************************************************************

                Return the name of this device

        ***********************************************************************/

        override string toString()
        {
                return "<socket>";
        }

        /***********************************************************************

                Models a handle-oriented device. 

                TODO: figure out how to avoid exposing this in the general
                case

        ***********************************************************************/

        @property Handle fileHandle ()
        {
                return cast(Handle) berkeley.sock;
        }

        /***********************************************************************

                Return the socket wrapper
                
        ***********************************************************************/

        @property Berkeley* native ()
        {
                return &berkeley;
        }

        /***********************************************************************

                Return a preferred size for buffering conduit I/O

        ***********************************************************************/

        @property override const size_t bufferSize ()
        {
                return 1024 * 8;
        }

        /***********************************************************************

                Connect to the provided endpoint
        
        ***********************************************************************/

        Socket connect (const(char)[] address, uint port)
        {
                assert(port < ushort.max);
                scope addr = new IPv4Address (address, cast(ushort) port);
                return connect (addr);
        }

        /***********************************************************************

                Connect to the provided endpoint
        
        ***********************************************************************/

        Socket connect (Address addr)
        {
                version (TangoRuntime)
                {
                    if (scheduler)
                        {
                        asyncConnect (addr);
                        return this;
                        }
                }
                native.connect (addr);
                
                return this;
        }

        /***********************************************************************

                Bind this socket. This is typically used to configure a
                listening socket (such as a server or multicast socket).
                The address given should describe a local adapter, or
                specify the port alone (ADDR_ANY) to have the OS assign
                a local adapter address.
        
        ***********************************************************************/

        Socket bind (Address address)
        {
                berkeley.bind (address);
                return this;
        }

        /***********************************************************************

                Inform other end of a connected socket that we're no longer
                available. In general, this should be invoked before close()
        
                The shutdown function shuts down the connection of the socket: 

                    -   stops receiving data for this socket. If further data 
                        arrives, it is rejected.

                    -   stops trying to transmit data from this socket. Also
                        discards any data waiting to be sent. Stop looking for 
                        acknowledgement of data already sent; don't retransmit 
                        if any data is lost.

        ***********************************************************************/

        Socket shutdown ()
        {
                berkeley.shutdown (SocketShutdown.BOTH);
                return this;
        }

        /***********************************************************************

                Release this Socket

                Note that one should always disconnect a Socket under 
                normal conditions, and generally invoke shutdown on all 
                connected sockets beforehand

        ***********************************************************************/

        override void detach ()
        {
                berkeley.detach();
        }
        
       /***********************************************************************

                Read content from the socket. Note that the operation 
                may timeout if method setTimeout() has been invoked with 
                a non-zero value.

                Returns the number of bytes read from the socket, or
                IConduit.Eof where there's no more content available.

        ***********************************************************************/

        override size_t read (void[] dst)
        {
            version (TangoRuntime)
                if (scheduler)
                    return asyncRead (dst);
            
                auto x = Eof;
                if (wait (true))
                   {
                   x = native.receive (dst);
                   if (x <= 0)
                       x = Eof;
                   }
                return x;                        
        }
        
        /***********************************************************************

        ***********************************************************************/

        override size_t write (const(void)[] src)
        {
                version (TangoRuntime)
                    if (scheduler)
                        return asyncWrite (src);

                auto x = Eof;
                if (wait (false))
                   {
                   x = native.send (src);
                   if (x < 0)
                       x = Eof;
                   }
                return x;                        
        }

        /***********************************************************************

                Transfer the content of another conduit to this one. Returns
                the dst OutputStream, or throws IOException on failure.

                Does optimized transfers 

        ***********************************************************************/

        override OutputStream copy (InputStream src, size_t max = -1)
        {
                auto x = cast(ISelectable) src;
                
                version (TangoRuntime)
                {
                    if (scheduler && x){
                        asyncCopy (x.fileHandle);
                        return this;
                    }
                }
                
                super.copy (src, max);
                return this;
        }

        /***********************************************************************
 
                Manage socket IO under a timeout

        ***********************************************************************/

        package final bool wait (bool reading)
        {
                // did user enable timeout checks?
                if (timeout != -1)
                   {
                   SocketSet read, write;

                   // yes, ensure we have a SocketSet
                   if (pending is null)
                       pending = new SocketSet (1);
                   pending.reset().add (native.sock);

                   // wait until IO is available, or a timeout occurs
                   if (reading)
                       read = pending;
                   else
                      write = pending;
                   int i = pending.select (read, write, null, timeout * 1000);
                   if (i <= 0)
                      {
                      if (i is 0)
                          super.error ("Socket :: request timeout");
                      return false;
                      }
                   }       
                return true;
        }

        /***********************************************************************

                Throw an IOException noting the last error
        
        ***********************************************************************/

        final void error ()
        {
                super.error (this.toString() ~ " :: " ~ SysError.lastMsg);
        }

        /***********************************************************************
 
        ***********************************************************************/

        version (Win32)
        {
                private OVERLAPPED overlapped;
        
                /***************************************************************
        
                        Connect to the provided endpoint
                
                ***************************************************************/
        
                private void asyncConnect (Address addr)
                {
                        IPv4Address.sockaddr_in local;
        
                        auto handle = berkeley.sock;
                        .bind (handle, cast(Address.sockaddr*)&local, local.sizeof);
        
                        ConnectEx (handle, addr.name, addr.nameLen, null, 0, null, &overlapped);
                        version(TangoRuntime)
                           wait (scheduler.Type.Connect);
                        patch (handle, SO_UPDATE_CONNECT_CONTEXT);
                }
        
                /***************************************************************
        
                ***************************************************************/
        
                private void asyncCopy (Handle handle)
                {
                        TransmitFile (berkeley.sock, cast(HANDLE) handle, 
                                      0, 0, &overlapped, null, 0);
                        version(TangoRuntime)
                        if (wait (scheduler.Type.Transfer) is Eof)
                            berkeley.exception ("Socket.copy :: ");
                }

                /***************************************************************

                        Read a chunk of bytes from the file into the provided
                        array. Returns the number of bytes read, or Eof where 
                        there is no further data.

                        Operates asynchronously where the hosting thread is
                        configured in that manner.

                ***************************************************************/

                private size_t asyncRead (void[] dst)
                {
                        DWORD flags;
                        DWORD bytes;
                        WSABUF buf = {dst.length, dst.ptr};

                        WSARecv (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, &flags, &overlapped, null);
                        version(TangoRuntime)
                        if ((bytes = wait (scheduler.Type.Read, bytes)) is Eof)
                             return Eof;

                        // read of zero means Eof
                        if (bytes is 0 && dst.length > 0)
                            return Eof;
                        return bytes;
                }

                /***************************************************************

                        Write a chunk of bytes to the file from the provided
                        array. Returns the number of bytes written, or Eof if 
                        the output is no longer available.

                        Operates asynchronously where the hosting thread is
                        configured in that manner.

                ***************************************************************/

                private size_t asyncWrite (const(void)[] src)
                {
                        DWORD bytes;
                        WSABUF buf = {src.length, cast(void*)src.ptr};

                        WSASend (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, 0, &overlapped, null);
                        version(TangoRuntime)
                        if ((bytes = wait (scheduler.Type.Write, bytes)) is Eof)
                             return Eof;
                        return bytes;
                }

                /***************************************************************

                ***************************************************************/

                version(TangoRuntime)
                {
                   private size_t wait (scheduler.Type type, uint bytes=0)
                   {
                           while (true)
                                 {
                                 auto code = WSAGetLastError;
                                 if (code is ERROR_HANDLE_EOF ||
                                     code is ERROR_BROKEN_PIPE)
                                     return Eof;

                                 if (scheduler)
                                    {
                                    if (code is ERROR_SUCCESS || 
                                        code is ERROR_IO_PENDING || 
                                        code is ERROR_IO_INCOMPLETE)
                                       {
                                       DWORD flags;

                                       if (code is ERROR_IO_INCOMPLETE)
                                           super.error ("timeout"); 

                                       auto handle = fileHandle;
                                       scheduler.await (handle, type, timeout);
                                       if (WSAGetOverlappedResult (handle, &overlapped, &bytes, false, &flags))
                                           return bytes;
                                       }
                                    else
                                       error;
                                    }
                                 else
                                    if (code is ERROR_SUCCESS)
                                        return bytes;
                                    else
                                       error;
                                 }
                           // should never get here
                           assert (false);
                   }
                }
        
                /***************************************************************
        
                ***************************************************************/
        
                private static void patch (socket_t dst, uint how, socket_t* src=null)
                {
                        auto len = src ? src.sizeof : 0;
                        if (setsockopt (dst, SocketOptionLevel.SOCKET, how, src, len))
                            berkeley.exception ("patch :: ");
                }
        }


        /***********************************************************************
 
        ***********************************************************************/

        version (Posix)
        {
                /***************************************************************
        
                        Connect to the provided endpoint
                
                ***************************************************************/
        
                private void asyncConnect (Address addr)
                {
                        assert (false);
                }
        
                /***************************************************************
        
                ***************************************************************/
        
                Socket asyncCopy (Handle file)
                {
                        assert (false);
                }

                /***************************************************************

                        Read a chunk of bytes from the file into the provided
                        array. Returns the number of bytes read, or Eof where 
                        there is no further data.

                        Operates asynchronously where the hosting thread is
                        configured in that manner.

                ***************************************************************/

                private size_t asyncRead (void[] dst)
                {
                        assert (false);
                }

                /***************************************************************

                        Write a chunk of bytes to the file from the provided
                        array. Returns the number of bytes written, or Eof if 
                        the output is no longer available.

                        Operates asynchronously where the hosting thread is
                        configured in that manner.

                ***************************************************************/

                private size_t asyncWrite (const(void)[] src)
                {
                        assert (false);
                }
        }
}



/*******************************************************************************


*******************************************************************************/

class ServerSocket : Socket
{      
        /***********************************************************************

        ***********************************************************************/

        this (uint port, int backlog=32, bool reuse=false)
        {
                scope addr = new IPv4Address (cast(ushort) port);
                this (addr, backlog, reuse);
        }

        /***********************************************************************

        ***********************************************************************/

        this (Address addr, int backlog=32, bool reuse=false)
        {
                super (addr);
                berkeley.addressReuse(reuse).bind(addr).listen(backlog);
        }

        /***********************************************************************

                Return the name of this device

        ***********************************************************************/

        override string toString()
        {
                return "<accept>";
        }

        /***********************************************************************

        ***********************************************************************/

        Socket accept (Socket recipient = null)
        {
                if (recipient is null)
                    recipient = new Socket;
                    
                version (TangoRuntime)
                {
                    if (scheduler)
                        asyncAccept(recipient);
                    else
                        berkeley.accept(recipient.berkeley);
                }
                else
                    berkeley.accept(recipient.berkeley);
                
                recipient.timeout = timeout;
                return recipient;
        }

        /***********************************************************************

        ***********************************************************************/

        version (Windows)
        {
                /***************************************************************

                ***************************************************************/

                private void asyncAccept (Socket recipient)
                {
                        byte[128]      tmp;
                        DWORD          bytes;
                        DWORD          flags;

                        auto target = recipient.berkeley.sock;
                        AcceptEx (berkeley.sock, target, tmp.ptr, 0, 64, 64, &bytes, &overlapped);
                        version(TangoRuntime)
                           wait (scheduler.Type.Accept);
                        patch (target, SO_UPDATE_ACCEPT_CONTEXT, &berkeley.sock);
                }
        }

        /***********************************************************************

        ***********************************************************************/

        version (Posix)
        {
                /***************************************************************

                ***************************************************************/

                private void asyncAccept (Socket recipient)
                {
                        assert (false);
                }
        }
}