207 lines
4.9 KiB
Rust
Raw Normal View History

// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
2014-12-26 16:04:27 -05:00
//! Abstraction of a thread pool for basic parallelism.
use core::prelude::*;
2014-12-06 18:34:37 -08:00
use thread::Thread;
use comm::{channel, Sender, Receiver};
use sync::{Arc, Mutex};
use thunk::Thunk;
struct Sentinel<'a> {
jobs: &'a Arc<Mutex<Receiver<Thunk>>>,
active: bool
}
impl<'a> Sentinel<'a> {
fn new(jobs: &Arc<Mutex<Receiver<Thunk>>>) -> Sentinel {
Sentinel {
jobs: jobs,
active: true
}
}
// Cancel and destroy this sentinel.
fn cancel(mut self) {
self.active = false;
}
}
#[unsafe_destructor]
impl<'a> Drop for Sentinel<'a> {
2013-09-16 21:18:07 -04:00
fn drop(&mut self) {
if self.active {
spawn_in_pool(self.jobs.clone())
}
}
}
2014-12-26 16:04:27 -05:00
/// A thread pool used to execute functions in parallel.
///
2014-12-26 16:04:27 -05:00
/// Spawns `n` worker threads and replenishes the pool if any worker threads
/// panic.
///
/// # Example
///
/// ```rust
2014-11-16 12:22:40 +01:00
/// # use std::sync::TaskPool;
/// # use std::iter::AdditiveIterator;
///
/// let pool = TaskPool::new(4u);
///
/// let (tx, rx) = channel();
/// for _ in range(0, 8u) {
/// let tx = tx.clone();
/// pool.execute(move|| {
/// tx.send(1u);
/// });
/// }
///
/// assert_eq!(rx.iter().take(8u).sum(), 8u);
/// ```
pub struct TaskPool {
2014-12-26 16:04:27 -05:00
// How the threadpool communicates with subthreads.
//
2014-12-26 16:04:27 -05:00
// This is the only such Sender, so when it is dropped all subthreads will
// quit.
jobs: Sender<Thunk>
}
impl TaskPool {
2014-12-26 16:04:27 -05:00
/// Spawns a new thread pool with `threads` threads.
///
/// # Panics
///
2014-12-26 16:04:27 -05:00
/// This function will panic if `threads` is 0.
pub fn new(threads: uint) -> TaskPool {
assert!(threads >= 1);
let (tx, rx) = channel::<Thunk>();
let rx = Arc::new(Mutex::new(rx));
2014-12-26 16:04:27 -05:00
// Threadpool threads
for _ in range(0, threads) {
spawn_in_pool(rx.clone());
}
TaskPool { jobs: tx }
}
2014-12-26 16:04:27 -05:00
/// Executes the function `job` on a thread in the pool.
pub fn execute<F>(&self, job: F)
where F : FnOnce(), F : Send
{
self.jobs.send(Thunk::new(job));
}
}
fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
2014-12-06 18:34:37 -08:00
Thread::spawn(move |:| {
2014-12-26 16:04:27 -05:00
// Will spawn a new thread on panic unless it is cancelled.
let sentinel = Sentinel::new(&jobs);
loop {
let message = {
// Only lock jobs for the time it takes
// to get a job, not run it.
std: Return Result from RWLock/Mutex methods All of the current std::sync primitives have poisoning enable which means that when a task fails inside of a write-access lock then all future attempts to acquire the lock will fail. This strategy ensures that stale data whose invariants are possibly not upheld are never viewed by other tasks to help propagate unexpected panics (bugs in a program) among tasks. Currently there is no way to test whether a mutex or rwlock is poisoned. One method would be to duplicate all the methods with a sister foo_catch function, for example. This pattern is, however, against our [error guidelines][errors]. As a result, this commit exposes the fact that a task has failed internally through the return value of a `Result`. [errors]: https://github.com/rust-lang/rfcs/blob/master/text/0236-error-conventions.md#do-not-provide-both-result-and-fail-variants All methods now return a `LockResult<T>` or a `TryLockResult<T>` which communicates whether the lock was poisoned or not. In a `LockResult`, both the `Ok` and `Err` variants contains the `MutexGuard<T>` that is being returned in order to allow access to the data if poisoning is not desired. This also means that the lock is *always* held upon returning from `.lock()`. A new type, `PoisonError`, was added with one method `into_guard` which can consume the assertion that a lock is poisoned to gain access to the underlying data. This is a breaking change because the signatures of these methods have changed, often incompatible ways. One major difference is that the `wait` methods on a condition variable now consume the guard and return it in as a `LockResult` to indicate whether the lock was poisoned while waiting. Most code can be updated by calling `.unwrap()` on the return value of `.lock()`. [breaking-change]
2014-12-08 20:20:03 -08:00
let lock = jobs.lock().unwrap();
lock.recv_opt()
};
match message {
Ok(job) => job.invoke(()),
// The Taskpool was dropped.
Err(..) => break
}
}
sentinel.cancel();
}).detach();
}
#[cfg(test)]
mod test {
use prelude::*;
use super::*;
const TEST_TASKS: uint = 4u;
#[test]
fn test_works() {
use iter::AdditiveIterator;
let pool = TaskPool::new(TEST_TASKS);
let (tx, rx) = channel();
for _ in range(0, TEST_TASKS) {
let tx = tx.clone();
pool.execute(move|| {
tx.send(1u);
});
}
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
}
#[test]
#[should_fail]
fn test_zero_tasks_panic() {
TaskPool::new(0);
}
#[test]
fn test_recovery_from_subtask_panic() {
use iter::AdditiveIterator;
let pool = TaskPool::new(TEST_TASKS);
2014-12-26 16:04:27 -05:00
// Panic all the existing threads.
for _ in range(0, TEST_TASKS) {
pool.execute(move|| -> () { panic!() });
}
2014-12-26 16:04:27 -05:00
// Ensure new threads were spawned to compensate.
let (tx, rx) = channel();
for _ in range(0, TEST_TASKS) {
let tx = tx.clone();
pool.execute(move|| {
tx.send(1u);
});
}
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
}
#[test]
fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() {
use sync::{Arc, Barrier};
let pool = TaskPool::new(TEST_TASKS);
let waiter = Arc::new(Barrier::new(TEST_TASKS + 1));
2014-12-26 16:04:27 -05:00
// Panic all the existing threads in a bit.
for _ in range(0, TEST_TASKS) {
let waiter = waiter.clone();
pool.execute(move|| {
waiter.wait();
panic!();
});
}
drop(pool);
// Kick off the failure.
waiter.wait();
}
}