auto merge of #11212 : alexcrichton/rust/local-task-count, r=brson

For libgreen, bookeeping should not be global but rather on a per-pool basis.
Inside libnative, it's known that there must be a global counter with a
mutex/cvar.

The benefit of taking this strategy is to remove this functionality from libstd
to allow fine-grained control of it through libnative/libgreen. Notably, helper
threads in libnative can manually decrement the global count so they don't count
towards the global count of threads. Also, the shutdown process of *all* sched
pools is now dependent on the number of tasks in the pool being 0 rather than
this only being a hardcoded solution for the initial sched pool in libgreen.

This involved adding a Local::try_take() method on the Local trait in order for
the channel wakeup to work inside of libgreen. The channel send was happening
from a SchedTask when there is no Task available in TLS, and now this is
possible to work (remote wakeups are always possible, just a little slower).
This commit is contained in:
bors 2014-01-01 13:21:48 -08:00
commit 48918fab72
9 changed files with 244 additions and 71 deletions

View File

@ -25,11 +25,12 @@
// NB this does *not* include globs, please keep it that way.
#[feature(macro_rules)];
// Allow check-stage0-green for now
#[cfg(test, stage0)] extern mod green;
use std::os;
use std::rt::crate_map;
use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::Task;
use std::rt::thread::Thread;
use std::rt;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
@ -37,6 +38,7 @@ use std::sync::deque;
use std::task::TaskOpts;
use std::util;
use std::vec;
use std::sync::arc::UnsafeArc;
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
use sleeper_list::SleeperList;
@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int {
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
}
// Once the main task has exited and we've set our exit code, wait for all
// spawned sub-tasks to finish running. This is done to allow all schedulers
// to remain active while there are still tasks possibly running.
unsafe {
let mut task = Local::borrow(None::<Task>);
task.get().wait_for_other_tasks();
}
// Now that we're sure all tasks are dead, shut down the pool of schedulers,
// waiting for them all to return.
pool.shutdown();
@ -164,6 +158,17 @@ pub struct SchedPool {
priv deque_pool: deque::BufferPool<~task::GreenTask>,
priv sleepers: SleeperList,
priv factory: fn() -> ~rtio::EventLoop,
priv task_state: TaskState,
priv tasks_done: Port<()>,
}
/// This is an internal state shared among a pool of schedulers. This is used to
/// keep track of how many tasks are currently running in the pool and then
/// sending on a channel once the entire pool has been drained of all tasks.
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
done: SharedChan<()>,
}
impl SchedPool {
@ -182,6 +187,7 @@ impl SchedPool {
assert!(nscheds > 0);
// The pool of schedulers that will be returned from this function
let (p, state) = TaskState::new();
let mut pool = SchedPool {
threads: ~[],
handles: ~[],
@ -192,6 +198,8 @@ impl SchedPool {
deque_pool: deque::BufferPool::new(),
next_friend: 0,
factory: factory,
task_state: state,
tasks_done: p,
};
// Create a work queue for each scheduler, ntimes. Create an extra
@ -210,21 +218,30 @@ impl SchedPool {
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.sleepers.clone());
pool.sleepers.clone(),
pool.task_state.clone());
pool.handles.push(sched.make_handle());
let sched = sched;
pool.threads.push(do Thread::start {
sched.bootstrap();
});
pool.threads.push(do Thread::start { sched.bootstrap(); });
}
return pool;
}
/// Creates a new task configured to run inside of this pool of schedulers.
/// This is useful to create a task which can then be sent to a specific
/// scheduler created by `spawn_sched` (and possibly pin it to that
/// scheduler).
pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
GreenTask::configure(&mut self.stack_pool, opts, f)
}
/// Spawns a new task into this pool of schedulers, using the specified
/// options to configure the new task which is spawned.
///
/// New tasks are spawned in a round-robin fashion to the schedulers in this
/// pool, but tasks can certainly migrate among schedulers once they're in
/// the pool.
pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
let task = self.task(opts, f);
@ -262,7 +279,8 @@ impl SchedPool {
(self.factory)(),
worker,
self.stealers.clone(),
self.sleepers.clone());
self.sleepers.clone(),
self.task_state.clone());
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
let sched = sched;
@ -271,9 +289,28 @@ impl SchedPool {
return ret;
}
/// Consumes the pool of schedulers, waiting for all tasks to exit and all
/// schedulers to shut down.
///
/// This function is required to be called in order to drop a pool of
/// schedulers, it is considered an error to drop a pool without calling
/// this method.
///
/// This only waits for all tasks in *this pool* of schedulers to exit, any
/// native tasks or extern pools will not be waited on
pub fn shutdown(mut self) {
self.stealers = ~[];
// Wait for everyone to exit. We may have reached a 0-task count
// multiple times in the past, meaning there could be several buffered
// messages on the `tasks_done` port. We're guaranteed that after *some*
// message the current task count will be 0, so we just receive in a
// loop until everything is totally dead.
while self.task_state.active() {
self.tasks_done.recv();
}
// Now that everyone's gone, tell everything to shut down.
for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
handle.send(Shutdown);
}
@ -283,6 +320,31 @@ impl SchedPool {
}
}
impl TaskState {
fn new() -> (Port<()>, TaskState) {
let (p, c) = SharedChan::new();
(p, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
done: c,
})
}
fn increment(&mut self) {
unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
}
fn active(&self) -> bool {
unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
}
fn decrement(&mut self) {
let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
if prev == 1 {
self.done.send(());
}
}
}
impl Drop for SchedPool {
fn drop(&mut self) {
if self.threads.len() > 0 {

View File

@ -19,6 +19,7 @@ use std::unstable::mutex::Mutex;
use std::unstable::raw;
use mpsc = std::sync::mpsc_queue;
use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
@ -85,6 +86,9 @@ pub struct Scheduler {
/// A flag to tell the scheduler loop it needs to do some stealing
/// in order to introduce randomness as part of a yield
steal_for_yield: bool,
/// Bookeeping for the number of tasks which are currently running around
/// inside this pool of schedulers
task_state: TaskState,
// n.b. currently destructors of an object are run in top-to-bottom in order
// of field declaration. Due to its nature, the pausable idle callback
@ -120,11 +124,12 @@ impl Scheduler {
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList)
sleeper_list: SleeperList,
state: TaskState)
-> Scheduler {
Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
sleeper_list, true, None)
sleeper_list, true, None, state)
}
@ -134,7 +139,8 @@ impl Scheduler {
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
friend: Option<SchedHandle>,
state: TaskState)
-> Scheduler {
let (consumer, producer) = mpsc::queue(());
@ -156,7 +162,8 @@ impl Scheduler {
rng: new_sched_rng(),
idle_callback: None,
yield_check_count: 0,
steal_for_yield: false
steal_for_yield: false,
task_state: state,
};
sched.yield_check_count = reset_yield_check(&mut sched.rng);
@ -756,6 +763,7 @@ impl Scheduler {
let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
sched.task_state.decrement();
});
fail!("should never return!");
}
@ -955,11 +963,10 @@ mod test {
use std::rt::task::Task;
use std::rt::local::Local;
use {TaskState, PoolConfig, SchedPool};
use basic;
use sched::{TaskFromFriend, PinnedTask};
use task::{GreenTask, HomeSched};
use PoolConfig;
use SchedPool;
fn pool() -> SchedPool {
SchedPool::new(PoolConfig {
@ -1078,6 +1085,7 @@ mod test {
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
let queues = ~[normal_stealer, special_stealer];
let (_p, state) = TaskState::new();
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
@ -1085,7 +1093,8 @@ mod test {
basic::event_loop(),
normal_worker,
queues.clone(),
sleepers.clone());
sleepers.clone(),
state.clone());
let normal_handle = normal_sched.make_handle();
let friend_handle = normal_sched.make_handle();
@ -1098,7 +1107,8 @@ mod test {
queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Some(friend_handle),
state);
let special_handle = special_sched.make_handle();

View File

@ -33,11 +33,32 @@ use stack::StackPool;
/// The necessary fields needed to keep track of a green task (as opposed to a
/// 1:1 task).
pub struct GreenTask {
/// Coroutine that this task is running on, otherwise known as the register
/// context and the stack that this task owns. This field is optional to
/// relinquish ownership back to a scheduler to recycle stacks at a later
/// date.
coroutine: Option<Coroutine>,
/// Optional handle back into the home sched pool of this task. This field
/// is lazily initialized.
handle: Option<SchedHandle>,
/// Slot for maintaining ownership of a scheduler. If a task is running,
/// this value will be Some(sched) where the task is running on "sched".
sched: Option<~Scheduler>,
/// Temporary ownership slot of a std::rt::task::Task object. This is used
/// to squirrel that libstd task away while we're performing green task
/// operations.
task: Option<~Task>,
/// Dictates whether this is a sched task or a normal green task
task_type: TaskType,
/// Home pool that this task was spawned into. This field is lazily
/// initialized until when the task is initially scheduled, and is used to
/// make sure that tasks are always woken up in the correct pool of
/// schedulers.
pool_id: uint,
// See the comments in the scheduler about why this is necessary
@ -147,10 +168,15 @@ impl GreenTask {
// cleanup job after we have re-acquired ownership of the green
// task.
let mut task: ~GreenTask = unsafe { GreenTask::from_uint(ops) };
task.sched.get_mut_ref().run_cleanup_job();
task.pool_id = {
let sched = task.sched.get_mut_ref();
sched.run_cleanup_job();
sched.task_state.increment();
sched.pool_id
};
// Convert our green task to a libstd task and then execute the code
// requeted. This is the "try/catch" block for this green task and
// requested. This is the "try/catch" block for this green task and
// is the wrapper for *all* code run in the task.
let mut start = Some(start);
let task = task.swap().run(|| start.take_unwrap()());
@ -350,6 +376,14 @@ impl Runtime for GreenTask {
self.put_task(to_wake);
assert!(self.sched.is_none());
// Optimistically look for a local task, but if one's not available to
// inspect (in order to see if it's in the same sched pool as we are),
// then just use our remote wakeup routine and carry on!
let mut running_task: ~Task = match Local::try_take() {
Some(task) => task,
None => return self.reawaken_remotely()
};
// Waking up a green thread is a bit of a tricky situation. We have no
// guarantee about where the current task is running. The options we
// have for where this current task is running are:
@ -368,7 +402,6 @@ impl Runtime for GreenTask {
//
// In case 2 and 3, we need to remotely reawaken ourself in order to be
// transplanted back to the correct scheduler pool.
let mut running_task: ~Task = Local::take();
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);

View File

@ -0,0 +1,49 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! 1:1 Task bookeeping
//!
//! This module keeps track of the number of running 1:1 tasks so that entry
//! points with libnative know when it's possible to exit the program (once all
//! tasks have exited).
//!
//! The green counterpart for this is bookeeping on sched pools.
use std::sync::atomics;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: Mutex = MUTEX_INIT;
pub fn increment() {
unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst); }
}
pub fn decrement() {
unsafe {
if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 {
TASK_LOCK.lock();
TASK_LOCK.signal();
TASK_LOCK.unlock();
}
}
}
/// Waits for all other native tasks in the system to exit. This is only used by
/// the entry points of native programs
pub fn wait_for_other_tasks() {
unsafe {
TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
TASK_LOCK.wait();
}
TASK_LOCK.unlock();
}
}

View File

@ -27,14 +27,12 @@
// answer is that you don't need them)
use std::os;
use std::rt::local::Local;
use std::rt::task::Task;
use std::rt;
mod bookeeping;
pub mod io;
pub mod task;
// XXX: this should not exist here
#[cfg(stage0)]
#[lang = "start"]
@ -83,11 +81,7 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
/// This function has all of the same details as `start` except for a different
/// number of arguments.
pub fn run(main: proc()) -> int {
// Run the main procedure and then wait for everything to finish
main();
unsafe {
let mut task = Local::borrow(None::<Task>);
task.get().wait_for_other_tasks();
}
bookeeping::wait_for_other_tasks();
os::get_exit_status()
}

View File

@ -27,6 +27,7 @@ use std::unstable::stack;
use io;
use task;
use bookeeping;
/// Creates a new Task which is ready to execute as a 1:1 task.
pub fn new() -> ~Task {
@ -79,8 +80,10 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
}
bookeeping::increment();
let mut f = Some(f);
task.run(|| { f.take_unwrap()() });
bookeeping::decrement();
})
}

View File

@ -16,6 +16,7 @@ use rt::local_ptr;
pub trait Local<Borrowed> {
fn put(value: ~Self);
fn take() -> ~Self;
fn try_take() -> Option<~Self>;
fn exists(unused_value: Option<Self>) -> bool;
fn borrow(unused_value: Option<Self>) -> Borrowed;
unsafe fn unsafe_take() -> ~Self;
@ -28,6 +29,8 @@ impl Local<local_ptr::Borrowed<Task>> for Task {
fn put(value: ~Task) { unsafe { local_ptr::put(value) } }
#[inline]
fn take() -> ~Task { unsafe { local_ptr::take() } }
#[inline]
fn try_take() -> Option<~Task> { unsafe { local_ptr::try_take() } }
fn exists(_: Option<Task>) -> bool { local_ptr::exists() }
#[inline]
fn borrow(_: Option<Task>) -> local_ptr::Borrowed<Task> {
@ -47,7 +50,7 @@ impl Local<local_ptr::Borrowed<Task>> for Task {
#[cfg(test)]
mod test {
use option::None;
use option::{None, Option};
use unstable::run_in_bare_thread;
use super::*;
use rt::task::Task;
@ -56,7 +59,6 @@ mod test {
#[test]
fn thread_local_task_smoke_test() {
do run_in_bare_thread {
local_ptr::init();
let task = ~Task::new();
Local::put(task);
let task: ~Task = Local::take();
@ -67,7 +69,6 @@ mod test {
#[test]
fn thread_local_task_two_instances() {
do run_in_bare_thread {
local_ptr::init();
let task = ~Task::new();
Local::put(task);
let task: ~Task = Local::take();
@ -83,7 +84,6 @@ mod test {
#[test]
fn borrow_smoke_test() {
do run_in_bare_thread {
local_ptr::init();
let task = ~Task::new();
Local::put(task);
@ -98,7 +98,6 @@ mod test {
#[test]
fn borrow_with_return() {
do run_in_bare_thread {
local_ptr::init();
let task = ~Task::new();
Local::put(task);
@ -111,6 +110,20 @@ mod test {
}
}
#[test]
fn try_take() {
do run_in_bare_thread {
let task = ~Task::new();
Local::put(task);
let t: ~Task = Local::try_take().unwrap();
let u: Option<~Task> = Local::try_take();
assert!(u.is_none());
cleanup_task(t);
}
}
fn cleanup_task(mut t: ~Task) {
t.destroyed = true;
}

View File

@ -117,6 +117,24 @@ pub mod compiled {
ptr
}
/// Optionally take ownership of a pointer from thread-local storage.
///
/// # Safety note
///
/// Does not validate the pointer type.
#[inline]
pub unsafe fn try_take<T>() -> Option<~T> {
let ptr = RT_TLS_PTR;
if ptr.is_null() {
None
} else {
let ptr: ~T = cast::transmute(ptr);
// can't use `as`, due to type not matching with `cfg(test)`
RT_TLS_PTR = cast::transmute(0);
Some(ptr)
}
}
/// Take ownership of a pointer from thread-local storage.
///
/// # Safety note
@ -205,6 +223,28 @@ pub mod native {
return ptr;
}
/// Optionally take ownership of a pointer from thread-local storage.
///
/// # Safety note
///
/// Does not validate the pointer type.
#[inline]
pub unsafe fn try_take<T>() -> Option<~T> {
match maybe_tls_key() {
Some(key) => {
let void_ptr: *mut c_void = tls::get(key);
if void_ptr.is_null() {
None
} else {
let ptr: ~T = cast::transmute(void_ptr);
tls::set(key, ptr::mut_null());
Some(ptr)
}
}
None => None
}
}
/// Take ownership of a pointer from thread-local storage.
///
/// # Safety note

View File

@ -34,21 +34,13 @@ use rt::rtio::LocalIo;
use rt::unwind::Unwinder;
use send_str::SendStr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
use sync::atomics::{AtomicUint, SeqCst};
use task::{TaskResult, TaskOpts};
use unstable::finally::Finally;
use unstable::mutex::{Mutex, MUTEX_INIT};
#[cfg(stage0)]
pub use rt::unwind::begin_unwind;
// These two statics are used as bookeeping to keep track of the rust runtime's
// count of threads. In 1:1 contexts, this is used to know when to return from
// the main function, and in M:N contexts this is used to know when to shut down
// the pool of schedulers.
static mut TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT;
static mut TASK_LOCK: Mutex = MUTEX_INIT;
// The Task struct represents all state associated with a rust
// task. There are at this point two primary "subtypes" of task,
// however instead of using a subtype we just have a "task_type" field
@ -127,7 +119,6 @@ impl Task {
*cast::transmute::<&~Task, &*mut Task>(&self)
};
Local::put(self);
unsafe { TASK_COUNT.fetch_add(1, SeqCst); }
// The only try/catch block in the world. Attempt to run the task's
// client-specified code and catch any failures.
@ -194,13 +185,6 @@ impl Task {
unsafe {
let me: *mut Task = Local::unsafe_borrow();
(*me).death.collect_failure((*me).unwinder.result());
// see comments on these statics for why they're used
if TASK_COUNT.fetch_sub(1, SeqCst) == 1 {
TASK_LOCK.lock();
TASK_LOCK.signal();
TASK_LOCK.unlock();
}
}
let mut me: ~Task = Local::take();
me.destroyed = true;
@ -293,21 +277,6 @@ impl Task {
pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
self.imp.get_mut_ref().local_io()
}
/// The main function of all rust executables will by default use this
/// function. This function will *block* the OS thread (hence the `unsafe`)
/// waiting for all known tasks to complete. Once this function has
/// returned, it is guaranteed that no more user-defined code is still
/// running.
pub unsafe fn wait_for_other_tasks(&mut self) {
TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves
TASK_LOCK.lock();
while TASK_COUNT.load(SeqCst) > 0 {
TASK_LOCK.wait();
}
TASK_LOCK.unlock();
TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in
}
}
impl Drop for Task {