Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
|
2012-12-03 16:48:01 -08:00
|
|
|
// file at the top-level directory of this distribution and at
|
|
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
|
|
// option. This file may not be copied, modified, or distributed
|
|
|
|
// except according to those terms.
|
|
|
|
|
2014-06-19 23:17:49 -04:00
|
|
|
//! Abstraction of a task pool for basic parallelism.
|
2012-10-31 18:29:44 -07:00
|
|
|
|
2014-06-07 11:13:26 -07:00
|
|
|
use core::prelude::*;
|
|
|
|
|
|
|
|
use task::spawn;
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
use comm::{channel, Sender, Receiver};
|
|
|
|
use sync::{Arc, Mutex};
|
2012-10-31 18:29:44 -07:00
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
struct Sentinel<'a> {
|
|
|
|
jobs: &'a Arc<Mutex<Receiver<proc(): Send>>>,
|
|
|
|
active: bool
|
2012-10-31 18:29:44 -07:00
|
|
|
}
|
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
impl<'a> Sentinel<'a> {
|
|
|
|
fn new(jobs: &Arc<Mutex<Receiver<proc(): Send>>>) -> Sentinel {
|
|
|
|
Sentinel {
|
|
|
|
jobs: jobs,
|
|
|
|
active: true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel and destroy this sentinel.
|
|
|
|
fn cancel(mut self) {
|
|
|
|
self.active = false;
|
|
|
|
}
|
2013-02-27 19:13:53 -05:00
|
|
|
}
|
|
|
|
|
2013-03-20 18:18:57 -07:00
|
|
|
#[unsafe_destructor]
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
impl<'a> Drop for Sentinel<'a> {
|
2013-09-16 21:18:07 -04:00
|
|
|
fn drop(&mut self) {
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
if self.active {
|
|
|
|
spawn_in_pool(self.jobs.clone())
|
2012-10-31 18:29:44 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
/// A task pool used to execute functions in parallel.
|
|
|
|
///
|
|
|
|
/// Spawns `n` worker tasks and replenishes the pool if any worker tasks
|
|
|
|
/// panic.
|
|
|
|
///
|
|
|
|
/// # Example
|
|
|
|
///
|
|
|
|
/// ```rust
|
2014-11-16 12:22:40 +01:00
|
|
|
/// # use std::sync::TaskPool;
|
|
|
|
/// # use std::iter::AdditiveIterator;
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
///
|
|
|
|
/// let pool = TaskPool::new(4u);
|
|
|
|
///
|
|
|
|
/// let (tx, rx) = channel();
|
|
|
|
/// for _ in range(0, 8u) {
|
|
|
|
/// let tx = tx.clone();
|
|
|
|
/// pool.execute(proc() {
|
|
|
|
/// tx.send(1u);
|
|
|
|
/// });
|
|
|
|
/// }
|
|
|
|
///
|
|
|
|
/// assert_eq!(rx.iter().take(8u).sum(), 8u);
|
|
|
|
/// ```
|
|
|
|
pub struct TaskPool {
|
|
|
|
// How the taskpool communicates with subtasks.
|
|
|
|
//
|
|
|
|
// This is the only such Sender, so when it is dropped all subtasks will
|
|
|
|
// quit.
|
2014-11-26 10:10:52 -05:00
|
|
|
jobs: Sender<Thunk>
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl TaskPool {
|
|
|
|
/// Spawns a new task pool with `tasks` tasks.
|
2014-06-19 23:17:49 -04:00
|
|
|
///
|
2014-10-09 15:17:22 -04:00
|
|
|
/// # Panics
|
2014-06-19 23:17:49 -04:00
|
|
|
///
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
/// This function will panic if `tasks` is 0.
|
|
|
|
pub fn new(tasks: uint) -> TaskPool {
|
|
|
|
assert!(tasks >= 1);
|
2012-10-31 18:29:44 -07:00
|
|
|
|
2014-11-26 10:10:52 -05:00
|
|
|
let (tx, rx) = channel::<Thunk>();
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
let rx = Arc::new(Mutex::new(rx));
|
2012-10-31 18:29:44 -07:00
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
// Taskpool tasks.
|
|
|
|
for _ in range(0, tasks) {
|
|
|
|
spawn_in_pool(rx.clone());
|
|
|
|
}
|
2012-10-31 18:29:44 -07:00
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
TaskPool { jobs: tx }
|
2012-10-31 18:29:44 -07:00
|
|
|
}
|
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
/// Executes the function `job` on a task in the pool.
|
2014-11-26 10:10:52 -05:00
|
|
|
pub fn execute<F>(&self, job: F)
|
|
|
|
where F : FnOnce(), F : Send
|
|
|
|
{
|
|
|
|
self.jobs.send(Thunk::new(job));
|
2012-10-31 18:29:44 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-11-26 10:10:52 -05:00
|
|
|
fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
|
|
|
|
spawn(move |:| {
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
// Will spawn a new task on panic unless it is cancelled.
|
|
|
|
let sentinel = Sentinel::new(&jobs);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
let message = {
|
|
|
|
// Only lock jobs for the time it takes
|
|
|
|
// to get a job, not run it.
|
|
|
|
let lock = jobs.lock();
|
|
|
|
lock.recv_opt()
|
|
|
|
};
|
|
|
|
|
|
|
|
match message {
|
2014-11-26 10:10:52 -05:00
|
|
|
Ok(job) => job.invoke(()),
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
|
|
|
|
// The Taskpool was dropped.
|
|
|
|
Err(..) => break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sentinel.cancel();
|
|
|
|
})
|
2012-10-31 18:29:44 -07:00
|
|
|
}
|
2014-06-19 23:17:49 -04:00
|
|
|
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use core::prelude::*;
|
|
|
|
use super::*;
|
|
|
|
use comm::channel;
|
|
|
|
use iter::range;
|
|
|
|
|
|
|
|
const TEST_TASKS: uint = 4u;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_works() {
|
|
|
|
use iter::AdditiveIterator;
|
|
|
|
|
|
|
|
let pool = TaskPool::new(TEST_TASKS);
|
|
|
|
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
for _ in range(0, TEST_TASKS) {
|
|
|
|
let tx = tx.clone();
|
|
|
|
pool.execute(proc() {
|
|
|
|
tx.send(1u);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
#[should_fail]
|
|
|
|
fn test_zero_tasks_panic() {
|
|
|
|
TaskPool::new(0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_recovery_from_subtask_panic() {
|
|
|
|
use iter::AdditiveIterator;
|
|
|
|
|
|
|
|
let pool = TaskPool::new(TEST_TASKS);
|
|
|
|
|
|
|
|
// Panic all the existing tasks.
|
|
|
|
for _ in range(0, TEST_TASKS) {
|
|
|
|
pool.execute(proc() { panic!() });
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure new tasks were spawned to compensate.
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
for _ in range(0, TEST_TASKS) {
|
|
|
|
let tx = tx.clone();
|
|
|
|
pool.execute(proc() {
|
|
|
|
tx.send(1u);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() {
|
|
|
|
use sync::{Arc, Barrier};
|
|
|
|
|
|
|
|
let pool = TaskPool::new(TEST_TASKS);
|
|
|
|
let waiter = Arc::new(Barrier::new(TEST_TASKS + 1));
|
|
|
|
|
|
|
|
// Panic all the existing tasks in a bit.
|
|
|
|
for _ in range(0, TEST_TASKS) {
|
|
|
|
let waiter = waiter.clone();
|
|
|
|
pool.execute(proc() {
|
|
|
|
waiter.wait();
|
|
|
|
panic!();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
drop(pool);
|
|
|
|
|
|
|
|
// Kick off the failure.
|
|
|
|
waiter.wait();
|
|
|
|
}
|
2014-06-19 23:17:49 -04:00
|
|
|
}
|
Rewrite std::sync::TaskPool to be load balancing and panic-resistant
The previous implementation was very likely to cause panics during
unwinding through this process:
- child panics, drops its receiver
- taskpool comes back around and sends another job over to that child
- the child receiver has hung up, so the taskpool panics on send
- during unwinding, the taskpool attempts to send a quit message to
the child, causing a panic during unwinding
- panic during unwinding causes a process abort
This meant that TaskPool upgraded any child panic to a full process
abort. This came up in Iron when it caused crashes in long-running
servers.
This implementation uses a single channel to communicate between
spawned tasks and the TaskPool, which significantly reduces the complexity
of the implementation and cuts down on allocation. The TaskPool uses
the channel as a single-producer-multiple-consumer queue.
Additionally, through the use of send_opt and recv_opt instead of
send and recv, this TaskPool is robust on the face of child panics,
both before, during, and after the TaskPool itself is dropped.
Due to the TaskPool no longer using an `init_fn_factory`, this is a
[breaking-change]
otherwise, the API has not changed.
If you used `init_fn_factory` in your code, and this change breaks for
you, you can instead use an `AtomicUint` counter and a channel to
move information into child tasks.
2014-11-13 18:04:28 -08:00
|
|
|
|