| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 | /******************************************************************************* copyright: Copyright (c) 2005 Kris Bell. All rights reserved license: BSD style: $(LICENSE) version: Mar 2005: Initial release author: Kris *******************************************************************************/ module tango.net.util.MemCache; private import tango.io.Console; private import Thr = tango.core.Thread, /* Because the druntime folks in their infinite wisdom decided to publically import core.time in thread */ tango.core.Exception, tango.core.Time; private import tango.io.stream.Lines, tango.io.stream.Buffered; private import tango.net.device.Socket, tango.net.InternetAddress; private import Integer = tango.text.convert.Integer; /****************************************************************************** ******************************************************************************/ class MemCache : Thr.Thread { private shared(Connection)[] hosts; private bool active; private uint watchdog; /********************************************************************** **********************************************************************/ this (const(char[])[] hosts, uint watchdog = 3) { super (&run); setHosts (hosts); // save configuration this.watchdog = watchdog; // start the watchdog active = true; super.start(); } /********************************************************************** **********************************************************************/ final void close () { if (hosts) { foreach (shared Connection server; hosts) server.close(); hosts = null; } } /********************************************************************** Store the key and value **********************************************************************/ final bool set (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) { return select(key).put("set", key, value, flags, timeout); } /********************************************************************** Store the value if key does not already exist **********************************************************************/ final bool add (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) { return select(key).put("add", key, value, flags, timeout); } /********************************************************************** Store the value only if key exists **********************************************************************/ final bool replace (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) { return select(key).put("replace", key, value, flags, timeout); } /********************************************************************** Remove the specified key and make key "invalid" for the duration of timeout, causing add(), get() and remove() on the same key to fail within that period **********************************************************************/ final bool remove (const(void)[][] key, int timeout=0) { return select(key).remove(key, timeout); } /********************************************************************** VALUE <key> <flags> <bytes>\r\n <data block>\r\n **********************************************************************/ final bool get (const(void)[][] key, Buffer buffer) { return select(key).get(key, buffer); } /********************************************************************** **********************************************************************/ final bool incr (const(void)[][] key, uint value) { uint result; return incr (key, value, result); } /********************************************************************** **********************************************************************/ final bool decr (const(void)[][] key, uint value) { uint result; return decr (key, value, result); } /********************************************************************** **********************************************************************/ final bool incr (const(void)[][] key, uint value, ref uint result) { return select(key).bump ("incr", key, value, result); } /********************************************************************** **********************************************************************/ final bool decr (const(void)[][] key, uint value, ref uint result) { return select(key).bump ("decr", key, value, result); } /********************************************************************** **********************************************************************/ final void status (void delegate (const(char)[], const(char[])[] list) dg) { foreach (shared Connection server; hosts) server.status (dg); } /********************************************************************** **********************************************************************/ final Buffer buffer (uint size) { return new Buffer (size); } /********************************************************************** **********************************************************************/ final void setHosts (const(char[])[] hosts) { auto conn = new shared(Connection) [hosts.length]; foreach (int i, const(char)[] host; hosts) conn[i] = new shared(Connection) (host); // set new list of connections this.hosts = conn; connect (conn); } /********************************************************************** Connection watchdog thread **********************************************************************/ private void run () { while (active) try { Thr.Thread.sleep (seconds(watchdog)); debug(TangoMemCache) Cout ("testing connections ...").newline; connect (hosts); } catch (Exception e) debug(TangoMemCache) Cout ("memcache watchdog: ") (e.toString).newline; } /********************************************************************** **********************************************************************/ private shared(Connection) select (const(void)[][] key) { return hosts[jhash(key) % hosts.length]; } /********************************************************************** **********************************************************************/ private void connect (shared(Connection)[] hosts) { foreach (Connection c; cast(Connection[])hosts) c.connect(); } /********************************************************************** **********************************************************************/ static class Buffer { private size_t extent; private void[] content; /************************************************************** **************************************************************/ private this (size_t size) { this.content = new byte [size]; } /************************************************************** **************************************************************/ bool expand (size_t size) { if (size > content.length) content.length = size; return true; } /************************************************************** **************************************************************/ void[] set (size_t size) { extent = size; return get(); } /************************************************************** **************************************************************/ void[] get () { return content [0..extent]; } } /********************************************************************** jhash() -- hash a variable-length key into a 32-bit value k : the key (the unaligned variable-length array of bytes) len : the length of the key, counting by bytes level : can be any 4-byte value Returns a 32-bit value. Every bit of the key affects every bit of the return value. Every 1-bit and 2-bit delta achieves avalanche. About 4.3*len + 80 X86 instructions, with excellent pipelining The best hash table sizes are powers of 2. There is no need to do mod a prime (mod is sooo slow!). If you need less than 32 bits, use a bitmask. For example, if you need only 10 bits, do h = (h & hashmask(10)); In which case, the hash table should have hashsize(10) elements. If you are hashing n strings (ub1 **)k, do it like this: for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h); By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net. You may use this code any way you wish, private, educational, or commercial. It's free. See http://burlteburtle.net/bob/hash/evahash.html Use for hash table lookup, or anything where one collision in 2^32 is acceptable. Do NOT use for cryptographic purposes. **********************************************************************/ static final uint jhash (const(void)[][] x, uint c = 0) { uint a, b; a = b = 0x9e3779b9; auto len = x.length; ubyte* k = cast(ubyte *) x.ptr; // handle most of the key while (len >= 12) { a += *cast(uint *)(k+0); b += *cast(uint *)(k+4); c += *cast(uint *)(k+8); a -= b; a -= c; a ^= (c>>13); b -= c; b -= a; b ^= (a<<8); c -= a; c -= b; c ^= (b>>13); a -= b; a -= c; a ^= (c>>12); b -= c; b -= a; b ^= (a<<16); c -= a; c -= b; c ^= (b>>5); a -= b; a -= c; a ^= (c>>3); b -= c; b -= a; b ^= (a<<10); c -= a; c -= b; c ^= (b>>15); k += 12; len -= 12; } // handle the last 11 bytes c += x.length; switch (len) { case 11: c += (cast(uint)k[10]<<24); goto case; case 10: c += (cast(uint)k[9]<<16); goto case; case 9 : c += (cast(uint)k[8]<<8); goto case; case 8 : b += (cast(uint)k[7]<<24); goto case; case 7 : b += (cast(uint)k[6]<<16); goto case; case 6 : b += (cast(uint)k[5]<<8); goto case; case 5 : b += k[4]; goto case; case 4 : a += (cast(uint)k[3]<<24); goto case; case 3 : a += (cast(uint)k[2]<<16); goto case; case 2 : a += (cast(uint)k[1]<<8); goto case; case 1 : a += k[0]; break; default: } a -= b; a -= c; a ^= (c>>13); b -= c; b -= a; b ^= (a<<8); c -= a; c -= b; c ^= (b>>13); a -= b; a -= c; a ^= (c>>12); b -= c; b -= a; b ^= (a<<16); c -= a; c -= b; c ^= (b>>5); a -= b; a -= c; a ^= (c>>3); b -= c; b -= a; b ^= (a<<10); c -= a; c -= b; c ^= (b>>15); return c; } } /****************************************************************************** ******************************************************************************/ private class Connection { private alias Lines!(char) Line; private const(char)[] host; // original host address private __gshared Line line; // reading lines from server private __gshared Bin input; // input stream private __gshared Bout output; // output stream private __gshared Socket conduit; // socket to server private InternetAddress address; // where server is listening private bool connected; // currently connected? /********************************************************************** **********************************************************************/ this (const(char)[] host) { this.host = host; conduit = new Socket; output = new Bout (conduit); input = new Bin (conduit); line = new Line (input); address = new InternetAddress (host); } /********************************************************************** **********************************************************************/ private void connect () { if (! connected) try { conduit.connect (address); connected = true; debug(TangoMemCache) Cout ("connected to ") (host).newline; } catch (Throwable th) debug(TangoMemCache) Cout ("failed to connect to ")(host).newline; } /********************************************************************** **********************************************************************/ private synchronized void close () { bool alive = connected; connected = false; if (alive) conduit.close(); } /********************************************************************** **********************************************************************/ private synchronized void error () { // close this dead socket close(); // open another one for next attempt to connect conduit.socket.reopen(); } /********************************************************************** **********************************************************************/ private synchronized bool put (const(char)[] cmd, const(void)[][] key, const(void)[][] value, int flags, int timeout) { if (connected) try { char[16] tmp; output.clear(); output.append ("delete ") .append (key) .append (" ") .append (Integer.format (tmp, timeout)) .append ("\r\n") .flush(); if (line.next) return line.get() == "DELETED"; } catch (IOException e) error(); return false; } /********************************************************************** VALUE <key> <flags> <bytes>\r\n <data block>\r\n **********************************************************************/ private synchronized bool get (const(void)[][] key, MemCache.Buffer buffer) { if (connected) try { output.clear(); output.append ("get ") .append (key) .append ("\r\n") .flush(); if (line.next) { const(char)[] content = line.get(); if (content.length > 4 && content[0..5] == "VALUE") { size_t i = 0; // parse the incoming content-length for (i=content.length; content[--i] != ' ';) {} i = cast(size_t) Integer.parse (content[i .. $]); // ensure output buffer has enough space buffer.expand (i); void[] dst = buffer.set (i); // fill the buffer content if (! input.fill (dst)) return false; // eat the CR and test terminator line.next; line.next; return line.get() == "END"; } } } catch (IOException e) error(); return false; } /********************************************************************** Remove the specified key and make key "invalid" for the duration of timeout, causing add(), get() and remove() on the same key to fail within that period **********************************************************************/ private synchronized bool remove (const(void)[][] key, int timeout=0) { if (connected) try { char[16] tmp; output.clear(); output.append ("delete ") .append (key) .append (" ") .append (Integer.format (tmp, timeout)) .append ("\r\n") .flush(); if (line.next) return line.get() == "DELETED"; } catch (IOException e) error(); return false; } /********************************************************************** **********************************************************************/ private synchronized bool bump (const(char)[] cmd, const(void)[][] key, uint value, ref uint result) { if (connected) try { char[16] tmp; output.clear(); output.append (cmd) .append (" ") .append (key) .append (" ") .append (Integer.format (tmp, value)) .append ("\r\n") .flush(); if (line.next) if (line.get() != "NOT_FOUND") { result = cast(uint)Integer.parse (line.get()); return true; } } catch (IOException e) error(); return false; } /********************************************************************** **********************************************************************/ private synchronized void status (scope void delegate (const(char)[], const(char[])[] list) dg) { if (connected) try { const(char[])[] list; output.clear(); output.write ("stats\r\n"); while (line.next) if (line.get() == "END") { dg (cast(char[])host, list); break; } else list ~= line.get(); } catch (IOException e) error(); } } debug (TangoMemCache) { /****************************************************************************** ******************************************************************************/ void main() { static const(char[])[] hosts = ["192.168.111.224:11211"]; auto cache = new MemCache (hosts); cache.set ("foo", "bar"); cache.set ("foo", "wumpus"); auto buffer = cache.buffer (1024); if (cache.get ("foo", buffer)) Cout ("value: ") (cast(const(char)[]) buffer.get).newline; void stat (const(char)[] host, const(char[])[] list) { foreach (const(char)[] line; list) Cout (host) (" ") (line).newline; } while (true) { cache.status (&stat); Thr.Thread.sleep (seconds(1.0)); } Cout ("exiting"); } } |