123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
|
/**
* This module provides an implementation of the classical thread-pool model.
*
* Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved.
* License: BSD style: $(LICENSE)
* Author: Anders Halager
*/
module tango.core.ThreadPool;
private import tango.core.Thread,
tango.core.sync.Atomic,
tango.core.sync.Mutex,
tango.core.sync.Condition,
tango.core.Exception : ThreadPoolException;
private import tango.stdc.string: memmove;
private version = Queued;
/**
* A thread pool is a way to process multiple jobs in parallel without creating
* a new thread for each job. This way the overhead of creating a thread is
* only paid once, and not once for each job and you can limit the maximum
* number of threads active at any one point.
*
* In this case a "job" is simply a delegate and some parameters the delegate
* will be called with after having been added to the thread pool's queue.
*
* Example:
* --------------------
* // create a new pool with two threads
* auto pool = new ThreadPool!(int)(2);
* void delegate(int) f = (int x) { Log(x); };
*
* // Now we have three ways of telling the pool to execute our jobs
* // First we can say we just want it done at some later point
* pool.append(f, 1);
* // Secondly we can ask for a job to be done as soon as possible, blocking
* // until it is started by some thread
* pool.assign(f, 2);
* // Finally we can say we either want it done immediately or not at all
* if (pool.tryAssign(f, 3))
* Log("Someone took the job!");
* else
* Log("No one was available to do the job right now");
* // After giving the pool some jobs to do, we need to give it a chance to
* // finish, so we can do one of two things.
* // Choice no. 1 is to finish what has already been assigned to the threads,
* // but ignore any remaining queued jobs
* // pool.shutdown();
* // The other choice is to finish all jobs currently executing or in queue:
* pool.finish();
* --------------------
*
* If append isn't called there should be no additional heap allocations after
* initialization.
*/
class ThreadPool(Args...)
{
/// An alias for the type of delegates this thread pool considers a job
alias void delegate(Args) JobD;
/**
* Create a new ThreadPool.
*
* Params:
* workers = The amount of threads to spawn
* q_size = The expected size of the queue (how many elements are
* preallocated)
*/
this(size_t workers, size_t q_size = 0)
{
// pre-allocate memory for q_size jobs in the queue
q.length = q_size;
q.length = 0;
m = new Mutex;
poolActivity = new Condition(m);
workerActivity = new Condition(m);
flagSet(priority_job, cast(Job*) null);
flagSet(active_jobs, cast(size_t) 0);
flagSet(done, false);
for (size_t i = 0; i < workers; i++)
{
auto thread = new Thread(&doJob);
// Allow the OS to kill the threads if we exit the program without
// handling them our selves
thread.isDaemon = true;
thread.start();
pool ~= thread;
}
}
/**
Assign the given job to a thread immediately or block until one is
available
*/
void assign(JobD job, Args args)
{
if(this.pool.length == 0)
{
throw new ThreadPoolException("No workers available!");
}
m.lock();
scope(exit) m.unlock();
auto j = Job(job, args);
flagSet(priority_job, &j);
poolActivity.notify();
// Wait until someone has taken the job
while (flagGet(priority_job) !is null)
workerActivity.wait();
}
/**
Assign the given job to a thread immediately or return false if none is
available. (Returns true if one was available)
*/
bool tryAssign(JobD job, Args args)
{
if (flagGet(active_jobs) >= pool.length)
return false;
assign(job, args);
return true;
}
/**
Put a job into the pool for eventual execution.
Warning: Acts as a stack, not a queue as you would expect
*/
void append(JobD job, Args args)
{
if(this.pool.length == 0)
{
throw new ThreadPoolException("No workers available!");
}
m.lock();
q ~= Job(job, args);
m.unlock();
poolActivity.notify();
}
/// Get the number of jobs waiting to be executed
size_t pendingJobs()
{
m.lock(); scope(exit) m.unlock();
return q.length;
}
/// Get the number of jobs being executed
size_t activeJobs()
{
return flagGet(active_jobs);
}
/// Block until all pending jobs complete, but do not shut down. This allows more tasks to be added later.
void wait()
{
m.lock();
while (q.length > 0 || flagGet(active_jobs) > 0)
workerActivity.wait();
m.unlock();
}
/// Finish currently executing jobs and drop all pending.
void shutdown()
{
flagSet(done, true);
m.lock();
q.length = 0;
m.unlock();
poolActivity.notifyAll();
foreach (thread; pool)
thread.join();
pool.length = 0;
m.lock();
m.unlock();
}
/// Complete all pending jobs and shutdown.
void finish()
{
wait();
shutdown();
}
private:
// Our list of threads -- only used during startup and shutdown
Thread[] pool;
struct Job
{
JobD dg;
Args args;
}
// Used for storing queued jobs that will be executed eventually
Job[] q;
// This is to store a single job for immediate execution, which hopefully
// means that any program using only assign and tryAssign wont need any
// heap allocations after startup.
Job* priority_job;
// This should be used when accessing the job queue
Mutex m;
// Notify is called on this condition whenever we have activity in the pool
// that the workers might want to know about.
Condition poolActivity;
// Worker threads call notify on this when they are done with a job or are
// completely done.
// This allows a graceful shut down and is necessary since assign has to
// wait for a job to become available
Condition workerActivity;
// Are we in the shutdown phase?
bool done;
// Counter for the number of jobs currently being calculated
size_t active_jobs;
// Thread delegate:
void doJob()
{
while (!flagGet(done))
{
m.lock();
while (q.length == 0 && flagGet(priority_job) is null && !flagGet(done))
poolActivity.wait();
if (flagGet(done)) {
m.unlock(); // not using scope(exit), need to manually unlock
break;
}
Job job;
Job* jobPtr = flagGet(priority_job);
if (jobPtr !is null)
{
job = *jobPtr;
flagSet(priority_job, cast(Job*)null);
workerActivity.notify();
}
else
{
version (Queued) // #1896
{
job = q[0];
memmove(q.ptr, q.ptr + 1, (q.length - 1) * typeof(*(q.ptr)).sizeof);
q.length = q.length - 1;
}
else
{
// A stack -- should be a queue
job = q[$ - 1];
q.length = q.length - 1;
}
}
// Make sure we unlock before we start doing the calculations
m.unlock();
// Do the actual job
flagAdd!(size_t)(active_jobs, 1);
try {
job.dg(job.args);
} catch (Exception ex) { }
flagAdd!(size_t)(active_jobs, -1);
// Tell the pool that we are done with something
m.lock();
workerActivity.notify();
m.unlock();
}
// Tell the pool that we are now done
m.lock();
workerActivity.notify();
m.unlock();
}
}
/*******************************************************************************
Invoke as "threadpool 1 2 3 4 5 6 7 10 20" or similar
*******************************************************************************/
debug (ThreadPool)
{
import tango.util.log.Trace;
import Integer = tango.text.convert.Integer;
void main(char[][] args)
{
long job(long val)
{
// a 'big job'
Thread.sleep (3.0/val);
return val;
}
void hashJob(char[] file)
{
// If we don't catch exceptions the thread-pool will still
// work, but the job will fail silently
try {
long n = Integer.parse(file);
Trace.formatln("job({}) = {}", n, job(n));
} catch (Exception ex) {
Trace.formatln("Exception: {}", ex.msg);
}
}
// Create new thread pool with one worker thread per file given
auto thread_pool = new ThreadPool!(char[])(args.length - 1);
Thread.sleep(1);
Trace.formatln ("starting");
foreach (file; args[1 .. args.length])
thread_pool.assign(&hashJob, file);
thread_pool.finish();
}
}
|