rust/src/libstd/sync/task_pool.rs

215 lines
5.2 KiB
Rust
Raw Normal View History

// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
2014-12-26 16:04:27 -05:00
//! Abstraction of a thread pool for basic parallelism.
#![unstable(feature = "std_misc",
reason = "the semantics of a failing task and whether a thread is \
re-attached to a thread pool are somewhat unclear, and the \
utility of this type in `std::sync` is questionable with \
respect to the jobs of other primitives")]
2014-12-29 15:03:01 -08:00
use core::prelude::*;
use sync::{Arc, Mutex};
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
use sync::mpsc::{channel, Sender, Receiver};
use thread::Thread;
use thunk::Thunk;
struct Sentinel<'a> {
jobs: &'a Arc<Mutex<Receiver<Thunk>>>,
active: bool
}
impl<'a> Sentinel<'a> {
fn new(jobs: &Arc<Mutex<Receiver<Thunk>>>) -> Sentinel {
Sentinel {
jobs: jobs,
active: true
}
}
// Cancel and destroy this sentinel.
fn cancel(mut self) {
self.active = false;
}
}
#[unsafe_destructor]
impl<'a> Drop for Sentinel<'a> {
2013-09-16 21:18:07 -04:00
fn drop(&mut self) {
if self.active {
spawn_in_pool(self.jobs.clone())
}
}
}
2014-12-26 16:04:27 -05:00
/// A thread pool used to execute functions in parallel.
///
2014-12-26 16:04:27 -05:00
/// Spawns `n` worker threads and replenishes the pool if any worker threads
/// panic.
///
/// # Example
///
/// ```rust
/// use std::sync::TaskPool;
/// use std::iter::AdditiveIterator;
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
/// use std::sync::mpsc::channel;
///
/// let pool = TaskPool::new(4u);
///
/// let (tx, rx) = channel();
/// for _ in 0..8u {
/// let tx = tx.clone();
/// pool.execute(move|| {
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
/// tx.send(1u).unwrap();
/// });
/// }
///
/// assert_eq!(rx.iter().take(8u).sum(), 8u);
/// ```
pub struct TaskPool {
2014-12-26 16:04:27 -05:00
// How the threadpool communicates with subthreads.
//
2014-12-26 16:04:27 -05:00
// This is the only such Sender, so when it is dropped all subthreads will
// quit.
jobs: Sender<Thunk>
}
impl TaskPool {
2014-12-26 16:04:27 -05:00
/// Spawns a new thread pool with `threads` threads.
///
/// # Panics
///
2014-12-26 16:04:27 -05:00
/// This function will panic if `threads` is 0.
pub fn new(threads: uint) -> TaskPool {
assert!(threads >= 1);
let (tx, rx) = channel::<Thunk>();
let rx = Arc::new(Mutex::new(rx));
2014-12-26 16:04:27 -05:00
// Threadpool threads
for _ in 0..threads {
spawn_in_pool(rx.clone());
}
TaskPool { jobs: tx }
}
2014-12-26 16:04:27 -05:00
/// Executes the function `job` on a thread in the pool.
pub fn execute<F>(&self, job: F)
where F : FnOnce(), F : Send
{
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
self.jobs.send(Thunk::new(job)).unwrap();
}
}
fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
Thread::spawn(move || {
2014-12-26 16:04:27 -05:00
// Will spawn a new thread on panic unless it is cancelled.
let sentinel = Sentinel::new(&jobs);
loop {
let message = {
// Only lock jobs for the time it takes
// to get a job, not run it.
std: Return Result from RWLock/Mutex methods All of the current std::sync primitives have poisoning enable which means that when a task fails inside of a write-access lock then all future attempts to acquire the lock will fail. This strategy ensures that stale data whose invariants are possibly not upheld are never viewed by other tasks to help propagate unexpected panics (bugs in a program) among tasks. Currently there is no way to test whether a mutex or rwlock is poisoned. One method would be to duplicate all the methods with a sister foo_catch function, for example. This pattern is, however, against our [error guidelines][errors]. As a result, this commit exposes the fact that a task has failed internally through the return value of a `Result`. [errors]: https://github.com/rust-lang/rfcs/blob/master/text/0236-error-conventions.md#do-not-provide-both-result-and-fail-variants All methods now return a `LockResult<T>` or a `TryLockResult<T>` which communicates whether the lock was poisoned or not. In a `LockResult`, both the `Ok` and `Err` variants contains the `MutexGuard<T>` that is being returned in order to allow access to the data if poisoning is not desired. This also means that the lock is *always* held upon returning from `.lock()`. A new type, `PoisonError`, was added with one method `into_guard` which can consume the assertion that a lock is poisoned to gain access to the underlying data. This is a breaking change because the signatures of these methods have changed, often incompatible ways. One major difference is that the `wait` methods on a condition variable now consume the guard and return it in as a `LockResult` to indicate whether the lock was poisoned while waiting. Most code can be updated by calling `.unwrap()` on the return value of `.lock()`. [breaking-change]
2014-12-08 20:20:03 -08:00
let lock = jobs.lock().unwrap();
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
lock.recv()
};
match message {
Ok(job) => job.invoke(()),
// The Taskpool was dropped.
Err(..) => break
}
}
sentinel.cancel();
2015-01-05 21:59:45 -08:00
});
}
#[cfg(test)]
mod test {
use prelude::v1::*;
use super::*;
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
use sync::mpsc::channel;
const TEST_TASKS: uint = 4u;
#[test]
fn test_works() {
use iter::AdditiveIterator;
let pool = TaskPool::new(TEST_TASKS);
let (tx, rx) = channel();
for _ in 0..TEST_TASKS {
let tx = tx.clone();
pool.execute(move|| {
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
tx.send(1u).unwrap();
});
}
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
}
#[test]
#[should_fail]
fn test_zero_tasks_panic() {
TaskPool::new(0);
}
#[test]
fn test_recovery_from_subtask_panic() {
use iter::AdditiveIterator;
let pool = TaskPool::new(TEST_TASKS);
2014-12-26 16:04:27 -05:00
// Panic all the existing threads.
for _ in 0..TEST_TASKS {
pool.execute(move|| -> () { panic!() });
}
2014-12-26 16:04:27 -05:00
// Ensure new threads were spawned to compensate.
let (tx, rx) = channel();
for _ in 0..TEST_TASKS {
let tx = tx.clone();
pool.execute(move|| {
std: Second pass stabilization for `comm` This commit is a second pass stabilization for the `std::comm` module, performing the following actions: * The entire `std::comm` module was moved under `std::sync::mpsc`. This movement reflects that channels are just yet another synchronization primitive, and they don't necessarily deserve a special place outside of the other concurrency primitives that the standard library offers. * The `send` and `recv` methods have all been removed. * The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`. This means that all send/receive operations return a `Result` now indicating whether the operation was successful or not. * The error type of `send` is now a `SendError` to implement a custom error message and allow for `unwrap()`. The error type contains an `into_inner` method to extract the value. * The error type of `recv` is now `RecvError` for the same reasons as `send`. * The `TryRecvError` and `TrySendError` types have had public reexports removed of their variants and the variant names have been tweaked with enum namespacing rules. * The `Messages` iterator is renamed to `Iter` This functionality is now all `#[stable]`: * `Sender` * `SyncSender` * `Receiver` * `std::sync::mpsc` * `channel` * `sync_channel` * `Iter` * `Sender::send` * `Sender::clone` * `SyncSender::send` * `SyncSender::try_send` * `SyncSender::clone` * `Receiver::recv` * `Receiver::try_recv` * `Receiver::iter` * `SendError` * `RecvError` * `TrySendError::{mod, Full, Disconnected}` * `TryRecvError::{mod, Empty, Disconnected}` * `SendError::into_inner` * `TrySendError::into_inner` This is a breaking change due to the modification of where this module is located, as well as the changing of the semantics of `send` and `recv`. Most programs just need to rename imports of `std::comm` to `std::sync::mpsc` and add calls to `unwrap` after a send or a receive operation. [breaking-change]
2014-12-23 11:53:35 -08:00
tx.send(1u).unwrap();
});
}
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
}
#[test]
fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() {
use sync::{Arc, Barrier};
let pool = TaskPool::new(TEST_TASKS);
let waiter = Arc::new(Barrier::new(TEST_TASKS + 1));
2014-12-26 16:04:27 -05:00
// Panic all the existing threads in a bit.
for _ in 0..TEST_TASKS {
let waiter = waiter.clone();
pool.execute(move|| {
waiter.wait();
panic!();
});
}
drop(pool);
// Kick off the failure.
waiter.wait();
}
}