This commit is the final step in the libstd facade, #13851. The purpose of this commit is to move libsync underneath the standard library, behind the facade. This will allow core primitives like channels, queues, and atomics to all live in the same location. There were a few notable changes and a few breaking changes as part of this movement: * The `Vec` and `String` types are reexported at the top level of libcollections * The `unreachable!()` macro was copied to libcore * The `std::rt::thread` module was moved to librustrt, but it is still reexported at the same location. * The `std::comm` module was moved to libsync * The `sync::comm` module was moved under `sync::comm`, and renamed to `duplex`. It is now a private module with types/functions being reexported under `sync::comm`. This is a breaking change for any existing users of duplex streams. * All concurrent queues/deques were moved directly under libsync. They are also all marked with #![experimental] for now if they are public. * The `task_pool` and `future` modules no longer live in libsync, but rather live under `std::sync`. They will forever live at this location, but they may move to libsync if the `std::task` module moves as well. [breaking-change]
99 lines
2.7 KiB
Rust
99 lines
2.7 KiB
Rust
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
|
// file at the top-level directory of this distribution and at
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
// option. This file may not be copied, modified, or distributed
|
|
// except according to those terms.
|
|
|
|
#![allow(missing_doc)]
|
|
|
|
/// A task pool abstraction. Useful for achieving predictable CPU
|
|
/// parallelism.
|
|
|
|
use core::prelude::*;
|
|
|
|
use task;
|
|
use task::spawn;
|
|
use vec::Vec;
|
|
use comm::{channel, Sender};
|
|
|
|
enum Msg<T> {
|
|
Execute(proc(&T):Send),
|
|
Quit
|
|
}
|
|
|
|
pub struct TaskPool<T> {
|
|
channels: Vec<Sender<Msg<T>>>,
|
|
next_index: uint,
|
|
}
|
|
|
|
#[unsafe_destructor]
|
|
impl<T> Drop for TaskPool<T> {
|
|
fn drop(&mut self) {
|
|
for channel in self.channels.mut_iter() {
|
|
channel.send(Quit);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> TaskPool<T> {
|
|
/// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode`
|
|
/// is None, the tasks run on this scheduler; otherwise, they run on a
|
|
/// new scheduler with the given mode. The provided `init_fn_factory`
|
|
/// returns a function which, given the index of the task, should return
|
|
/// local data to be kept around in that task.
|
|
pub fn new(n_tasks: uint,
|
|
init_fn_factory: || -> proc(uint):Send -> T)
|
|
-> TaskPool<T> {
|
|
assert!(n_tasks >= 1);
|
|
|
|
let channels = Vec::from_fn(n_tasks, |i| {
|
|
let (tx, rx) = channel::<Msg<T>>();
|
|
let init_fn = init_fn_factory();
|
|
|
|
let task_body = proc() {
|
|
let local_data = init_fn(i);
|
|
loop {
|
|
match rx.recv() {
|
|
Execute(f) => f(&local_data),
|
|
Quit => break
|
|
}
|
|
}
|
|
};
|
|
|
|
// Run on this scheduler.
|
|
task::spawn(task_body);
|
|
|
|
tx
|
|
});
|
|
|
|
return TaskPool {
|
|
channels: channels,
|
|
next_index: 0,
|
|
};
|
|
}
|
|
|
|
/// Executes the function `f` on a task in the pool. The function
|
|
/// receives a reference to the local data returned by the `init_fn`.
|
|
pub fn execute(&mut self, f: proc(&T):Send) {
|
|
self.channels.get(self.next_index).send(Execute(f));
|
|
self.next_index += 1;
|
|
if self.next_index == self.channels.len() { self.next_index = 0; }
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_task_pool() {
|
|
let f: || -> proc(uint):Send -> uint = || {
|
|
let g: proc(uint):Send -> uint = proc(i) i;
|
|
g
|
|
};
|
|
let mut pool = TaskPool::new(4, f);
|
|
for _ in range(0, 8) {
|
|
pool.execute(proc(i) println!("Hello from thread {}!", *i));
|
|
}
|
|
}
|