
        copyright:      Copyright (c) 2005 Kris Bell. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Mar 2005: Initial release

        author:         Kris


module tango.net.util.MemCache;

private import  tango.io.Console;

private import  Thr = tango.core.Thread, /* Because the druntime folks in their infinite wisdom decided to publically import core.time in thread */

private import  tango.io.stream.Lines,

private import  tango.net.device.Socket,

private import  Integer = tango.text.convert.Integer;



class MemCache : Thr.Thread
        private shared(Connection)[] hosts;
        private bool            active;
        private uint            watchdog;



        this (const(char[])[] hosts, uint watchdog = 3)
                super (&run);
                setHosts (hosts);

                // save configuration
                this.watchdog = watchdog;

                // start the watchdog
                active = true;



        final void close ()
                if (hosts)
                   foreach (shared Connection server; hosts)
                   hosts = null;


                Store the key and value


        final bool set (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
                return select(key).put("set", key, value, flags, timeout);


                Store the value if key does not already exist


        final bool add (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
                return select(key).put("add", key, value, flags, timeout);


                Store the value only if key exists


        final bool replace (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
                return select(key).put("replace", key, value, flags, timeout);


                Remove the specified key and make key "invalid" for the
                duration of timeout, causing add(), get() and remove() on
                the same key to fail within that period


        final bool remove (const(void)[][] key, int timeout=0)
                return select(key).remove(key, timeout);


                VALUE <key> <flags> <bytes>\r\n
                <data block>\r\n


        final bool get (const(void)[][] key, Buffer buffer)
                return select(key).get(key, buffer);



        final bool incr (const(void)[][] key, uint value)
                uint result;
                return incr (key, value, result);



        final bool decr (const(void)[][] key, uint value)
                uint result;
                return decr (key, value, result);



        final bool incr (const(void)[][] key, uint value, ref uint result)
                return select(key).bump ("incr", key, value, result);



        final bool decr (const(void)[][] key, uint value, ref uint result)
                return select(key).bump ("decr", key, value, result);



        final void status (void delegate (const(char)[], const(char[])[] list) dg)
                foreach (shared Connection server; hosts)
                         server.status (dg);



        final Buffer buffer (uint size)
                return new Buffer (size);



        final void setHosts (const(char[])[] hosts)
                auto conn = new shared(Connection) [hosts.length];

                foreach (int i, const(char)[] host; hosts)
                         conn[i] = new shared(Connection) (host);

                // set new list of connections
                this.hosts = conn;
                connect (conn);


                Connection watchdog thread


        private void run ()
                while (active)
                       try {
                           Thr.Thread.sleep (seconds(watchdog));
                           debug(TangoMemCache) Cout ("testing connections ...").newline;
                           connect (hosts);
                           } catch (Exception e)
                                    debug(TangoMemCache) Cout ("memcache watchdog: ") (e.toString).newline;



        private shared(Connection) select (const(void)[][] key)
                return hosts[jhash(key) % hosts.length];



        private void connect (shared(Connection)[] hosts)
                foreach (Connection c; cast(Connection[])hosts)



        static class Buffer
                private size_t    extent;
                private void[]  content;



                private this (size_t size)
                        this.content = new byte [size];



                bool expand (size_t size)
                        if (size > content.length)
                            content.length = size;
                        return true;



                void[] set (size_t size)
                        extent = size;
                        return get();



                void[] get ()
                        return content [0..extent];


	        jhash() -- hash a variable-length key into a 32-bit value

	          k     : the key (the unaligned variable-length array of bytes)
	          len   : the length of the key, counting by bytes
	          level : can be any 4-byte value

	        Returns a 32-bit value.  Every bit of the key affects every bit of
	        the return value.  Every 1-bit and 2-bit delta achieves avalanche.

	        About 4.3*len + 80 X86 instructions, with excellent pipelining

	        The best hash table sizes are powers of 2.  There is no need to do
	        mod a prime (mod is sooo slow!).  If you need less than 32 bits,
	        use a bitmask.  For example, if you need only 10 bits, do

	                    h = (h & hashmask(10));

	        In which case, the hash table should have hashsize(10) elements.
	        If you are hashing n strings (ub1 **)k, do it like this:

	                    for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h);

	        By Bob Jenkins, 1996.  bob_jenkins@burtleburtle.net.  You may use
	        this code any way you wish, private, educational, or commercial.
	        It's free.

	        See http://burlteburtle.net/bob/hash/evahash.html
	        Use for hash table lookup, or anything where one collision in 2^32
	        is acceptable. Do NOT use for cryptographic purposes.


	static final uint jhash (const(void)[][] x, uint c = 0)
	        uint    a,

	        a = b = 0x9e3779b9;

            auto len = x.length;
	        ubyte* k = cast(ubyte *) x.ptr;

	        // handle most of the key
	        while (len >= 12)
	              a += *cast(uint *)(k+0);
	              b += *cast(uint *)(k+4);
	              c += *cast(uint *)(k+8);

	              a -= b; a -= c; a ^= (c>>13);
	              b -= c; b -= a; b ^= (a<<8);
	              c -= a; c -= b; c ^= (b>>13);
	              a -= b; a -= c; a ^= (c>>12);
	              b -= c; b -= a; b ^= (a<<16);
	              c -= a; c -= b; c ^= (b>>5);
	              a -= b; a -= c; a ^= (c>>3);
	              b -= c; b -= a; b ^= (a<<10);
	              c -= a; c -= b; c ^= (b>>15);
	              k += 12; len -= 12;

	        // handle the last 11 bytes
	        c += x.length;
	        switch (len)
	               case 11: c += (cast(uint)k[10]<<24); goto case;
	               case 10: c += (cast(uint)k[9]<<16); goto case;
	               case 9 : c += (cast(uint)k[8]<<8); goto case;
	               case 8 : b += (cast(uint)k[7]<<24); goto case;
	               case 7 : b += (cast(uint)k[6]<<16); goto case;
	               case 6 : b += (cast(uint)k[5]<<8); goto case;
	               case 5 : b += k[4]; goto case;
	               case 4 : a += (cast(uint)k[3]<<24); goto case;
	               case 3 : a += (cast(uint)k[2]<<16); goto case;
	               case 2 : a += (cast(uint)k[1]<<8); goto case;
	               case 1 : a += k[0]; break;

	        a -= b; a -= c; a ^= (c>>13);
	        b -= c; b -= a; b ^= (a<<8);
	        c -= a; c -= b; c ^= (b>>13);
	        a -= b; a -= c; a ^= (c>>12);
	        b -= c; b -= a; b ^= (a<<16);
	        c -= a; c -= b; c ^= (b>>5);
	        a -= b; a -= c; a ^= (c>>3);
	        b -= c; b -= a; b ^= (a<<10);
	        c -= a; c -= b; c ^= (b>>15);

	        return c;



private class Connection
        private alias Lines!(char) Line;

        private const(char)[]          host;           // original host address
        private __gshared Line         line;           // reading lines from server
        private __gshared Bin          input;          // input stream
        private __gshared Bout         output;         // output stream
        private __gshared Socket       conduit;        // socket to server
        private InternetAddress address;        // where server is listening
        private bool            connected;      // currently connected?



        this (const(char)[] host)
                this.host = host;
                conduit = new Socket;
                output = new Bout (conduit);
                input = new Bin (conduit);
                line = new Line (input);
                address = new InternetAddress (host);



        private void connect ()
                if (! connected)
                      try {
                          conduit.connect (address);
                          connected = true;
                          debug(TangoMemCache) Cout ("connected to ") (host).newline;
                          } catch (Throwable th)
                                   debug(TangoMemCache) Cout ("failed to connect to ")(host).newline;



        private synchronized void close ()
                bool alive = connected;
                connected = false;

                if (alive)



        private synchronized void error ()
                // close this dead socket

                // open another one for next attempt to connect



        private synchronized bool put (const(char)[] cmd, const(void)[][] key, const(void)[][] value, int flags, int timeout)
                if (connected)
                    try {
                        char[16] tmp;

                        output.append ("delete ")
                              .append (key)
                              .append (" ")
                              .append (Integer.format (tmp, timeout))
                              .append ("\r\n")

                        if (line.next)
                            return line.get() == "DELETED";
                        } catch (IOException e)
                return false;


                VALUE <key> <flags> <bytes>\r\n
                <data block>\r\n


        private synchronized bool get (const(void)[][] key, MemCache.Buffer buffer)
                if (connected)
                    try {
                        output.append ("get ")
                              .append (key)
                              .append ("\r\n")

                        if (line.next)
                           const(char)[] content = line.get();
                           if (content.length > 4 && content[0..5] == "VALUE")
                              size_t i = 0;

                              // parse the incoming content-length
                              for (i=content.length; content[--i] != ' ';)
                              i = cast(size_t) Integer.parse (content[i .. $]);

                              // ensure output buffer has enough space
                              buffer.expand (i);
                              void[] dst = buffer.set (i);

                              // fill the buffer content
                              if (! input.fill (dst))
                                    return false;

                              // eat the CR and test terminator
                              return line.get() == "END";
                        } catch (IOException e)
                return false;


                Remove the specified key and make key "invalid" for the
                duration of timeout, causing add(), get() and remove() on
                the same key to fail within that period


        private synchronized bool remove (const(void)[][] key, int timeout=0)
                if (connected)
                    try {
                        char[16] tmp;

                        output.append ("delete ")
                              .append (key)
                              .append (" ")
                              .append (Integer.format (tmp, timeout))
                              .append ("\r\n")

                        if (line.next)
                            return line.get() == "DELETED";
                        } catch (IOException e)
                return false;



        private synchronized bool bump (const(char)[] cmd, const(void)[][] key, uint value,
                                        ref uint result)
                if (connected)
                    try {
                        char[16] tmp;

                        output.append (cmd)
                              .append (" ")
                              .append (key)
                              .append (" ")
                              .append (Integer.format (tmp, value))
                              .append ("\r\n")

                        if (line.next)
                            if (line.get() != "NOT_FOUND")
                               result = cast(uint)Integer.parse (line.get());
                               return true;
                        } catch (IOException e)
                return false;



        private synchronized void status (scope void delegate (const(char)[], const(char[])[] list) dg)
                if (connected)
                    try {
                        const(char[])[] list;

                        output.write ("stats\r\n");

                        while (line.next)
                               if (line.get() == "END")
                                  dg (cast(char[])host, list);
                                  list ~= line.get();

                        } catch (IOException e)


debug (TangoMemCache)


void main()
        static const(char[])[] hosts = [""];

        auto cache = new MemCache (hosts);

        cache.set ("foo", "bar");
        cache.set ("foo", "wumpus");

        auto buffer = cache.buffer (1024);
        if (cache.get ("foo", buffer))
            Cout ("value: ") (cast(const(char)[]) buffer.get).newline;

        void stat (const(char)[] host, const(char[])[] list)
                foreach (const(char)[] line; list)
                         Cout (host) (" ") (line).newline;

        while (true)
              cache.status (&stat);
              Thr.Thread.sleep (seconds(1.0));
        Cout ("exiting");