123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
/**
 * This module provides an implementation of the classical thread-pool model.
 *
 * Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved.
 * License:   BSD style: $(LICENSE)
 * Author:    Anders Halager
 */

module tango.core.ThreadPool;

private import tango.core.Thread,
                tango.core.sync.Atomic,
                tango.core.sync.Mutex,
                tango.core.sync.Condition,
                tango.core.Exception : ThreadPoolException;

private import  tango.stdc.string: memmove;

private version = Queued;

/**
 * A thread pool is a way to process multiple jobs in parallel without creating
 * a new thread for each job. This way the overhead of creating a thread is
 * only paid once, and not once for each job and you can limit the maximum
 * number of threads active at any one point.
 *
 * In this case a "job" is simply a delegate and some parameters the delegate
 * will be called with after having been added to the thread pool's queue.
 *
 * Example:
 * --------------------
 * // create a new pool with two threads
 * auto pool = new ThreadPool!(int)(2);
 * void delegate(int) f = (int x) { Log(x); };
 *
 * // Now we have three ways of telling the pool to execute our jobs
 * // First we can say we just want it done at some later point
 * pool.append(f, 1);
 * // Secondly we can ask for a job to be done as soon as possible, blocking
 * // until it is started by some thread
 * pool.assign(f, 2);
 * // Finally we can say we either want it done immediately or not at all
 * if (pool.tryAssign(f, 3))
 *     Log("Someone took the job!");
 * else
 *     Log("No one was available to do the job right now");
 * // After giving the pool some jobs to do, we need to give it a chance to
 * // finish, so we can do one of two things.
 * // Choice no. 1 is to finish what has already been assigned to the threads,
 * // but ignore any remaining queued jobs
 * //   pool.shutdown();
 * // The other choice is to finish all jobs currently executing or in queue:
 * pool.finish();
 * --------------------
 *
 * If append isn't called there should be no additional heap allocations after
 * initialization.
 */

class ThreadPool(Args...)
{
    /// An alias for the type of delegates this thread pool considers a job
    alias void delegate(Args) JobD;

    /**
     * Create a new ThreadPool.
     *
     * Params:
     *   workers = The amount of threads to spawn
     *   q_size  = The expected size of the queue (how many elements are
     *   preallocated)
     */
    this(size_t workers, size_t q_size = 0)
    {
        // pre-allocate memory for q_size jobs in the queue
        q.length = q_size;
        q.length = 0;

        m = new Mutex;
        poolActivity = new Condition(m);
        workerActivity = new Condition(m);

        flagSet(priority_job, cast(Job*) null);
        flagSet(active_jobs, cast(size_t) 0);
        flagSet(done, false);

        for (size_t i = 0; i < workers; i++)
        {
            auto thread = new Thread(&doJob);
            // Allow the OS to kill the threads if we exit the program without
            // handling them our selves
            thread.isDaemon = true;
            thread.start();
            pool ~= thread;
        }
    }

    /**
      Assign the given job to a thread immediately or block until one is
      available
     */
    void assign(JobD job, Args args)
    {
        if(this.pool.length == 0)
        {
            throw new ThreadPoolException("No workers available!");
        }

        m.lock();
        scope(exit) m.unlock();
        auto j = Job(job, args);
        flagSet(priority_job, &j);
        poolActivity.notify();
        // Wait until someone has taken the job
        while (flagGet(priority_job) !is null)
            workerActivity.wait();
    }

    /**
      Assign the given job to a thread immediately or return false if none is
      available. (Returns true if one was available)
     */
    bool tryAssign(JobD job, Args args)
    {
        if (flagGet(active_jobs) >= pool.length)
            return false;
        assign(job, args);
        return true;
    }

    /**
      Put a job into the pool for eventual execution.

      Warning: Acts as a stack, not a queue as you would expect
     */
    void append(JobD job, Args args)
    {
        if(this.pool.length == 0)
        {
            throw new ThreadPoolException("No workers available!");        
        }

        m.lock();
        q ~= Job(job, args);
        m.unlock();
        poolActivity.notify();
    }

    /// Get the number of jobs waiting to be executed
    size_t pendingJobs()
    {
        m.lock(); scope(exit) m.unlock();
        return q.length;
    }

    /// Get the number of jobs being executed
    size_t activeJobs()
    {
        return flagGet(active_jobs);
    }

    /// Block until all pending jobs complete, but do not shut down.  This allows more tasks to be added later.
    void wait()
    {    
        m.lock();
        while (q.length > 0 || flagGet(active_jobs) > 0)
               workerActivity.wait();
        m.unlock();
    } 

    /// Finish currently executing jobs and drop all pending.
    void shutdown()
    {
        flagSet(done, true);
        m.lock();
        q.length = 0;
        m.unlock();
        poolActivity.notifyAll();
        foreach (thread; pool)
            thread.join();

        pool.length = 0;

        m.lock();
        m.unlock();
    }

    /// Complete all pending jobs and shutdown.
    void finish()
    {
        wait();
        shutdown();
    }

private:
    // Our list of threads -- only used during startup and shutdown
    Thread[] pool;
    struct Job
    {
        JobD dg;
        Args args;
    }
    // Used for storing queued jobs that will be executed eventually
    Job[] q;

    // This is to store a single job for immediate execution, which hopefully
    // means that any program using only assign and tryAssign wont need any
    // heap allocations after startup.
    Job* priority_job;

    // This should be used when accessing the job queue
    Mutex m;

    // Notify is called on this condition whenever we have activity in the pool
    // that the workers might want to know about.
    Condition poolActivity;

    // Worker threads call notify on this when they are done with a job or are
    // completely done.
    // This allows a graceful shut down and is necessary since assign has to
    // wait for a job to become available
    Condition workerActivity;

    // Are we in the shutdown phase?
    bool done;

    // Counter for the number of jobs currently being calculated
    size_t active_jobs;

    // Thread delegate:
    void doJob()
    {
        while (!flagGet(done))
        {
            m.lock();
            while (q.length == 0 && flagGet(priority_job) is null && !flagGet(done))
                poolActivity.wait();
            if (flagGet(done)) {
                m.unlock(); // not using scope(exit), need to manually unlock
                break;
            }
            Job job;
            Job* jobPtr = flagGet(priority_job);
            if (jobPtr !is null)
            {
                job = *jobPtr;
                flagSet(priority_job, cast(Job*)null);
                workerActivity.notify();
            }
            else
            {
                version (Queued) // #1896
                        {
                        job = q[0];
                        memmove(q.ptr, q.ptr + 1, (q.length - 1) * typeof(*(q.ptr)).sizeof);
                        q.length = q.length - 1;
                        }
                     else
                        {
                        // A stack -- should be a queue
                        job = q[$ - 1];
                        q.length = q.length - 1;
                        }
            }

            // Make sure we unlock before we start doing the calculations
            m.unlock();

            // Do the actual job
            flagAdd!(size_t)(active_jobs, 1);
            try {
                job.dg(job.args);
            } catch (Exception ex) { }
            flagAdd!(size_t)(active_jobs, -1);

            // Tell the pool that we are done with something
            m.lock();
            workerActivity.notify();
            m.unlock();
        }
        // Tell the pool that we are now done
        m.lock();
        workerActivity.notify();
        m.unlock();
    }
}



/*******************************************************************************

        Invoke as "threadpool 1 2 3 4 5 6 7 10 20" or similar

*******************************************************************************/

debug (ThreadPool)
{
        import tango.util.log.Trace;
        import Integer = tango.text.convert.Integer;

        void main(char[][] args)
        {
                long job(long val)
                {
                        // a 'big job'
                        Thread.sleep (3.0/val);
                        return val;
                }

                void hashJob(char[] file)
                {
                        // If we don't catch exceptions the thread-pool will still
                        // work, but the job will fail silently
                        try {
                            long n = Integer.parse(file);
                            Trace.formatln("job({}) = {}", n, job(n));
                            } catch (Exception ex) {
                                    Trace.formatln("Exception: {}", ex.msg);
                                    }
                }

                // Create new thread pool with one worker thread per file given
                auto thread_pool = new ThreadPool!(char[])(args.length - 1);

                Thread.sleep(1);
                Trace.formatln ("starting");

                foreach (file; args[1 .. args.length])
                         thread_pool.assign(&hashJob, file);

                thread_pool.finish();
        }
}