From 3f11f8738201dcf230a1647e30c312c980513b37 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 30 Dec 2013 00:55:27 -0800 Subject: [PATCH] Move task count bookeeping out of libstd For libgreen, bookeeping should not be global but rather on a per-pool basis. Inside libnative, it's known that there must be a global counter with a mutex/cvar. The benefit of taking this strategy is to remove this functionality from libstd to allow fine-grained control of it through libnative/libgreen. Notably, helper threads in libnative can manually decrement the global count so they don't count towards the global count of threads. Also, the shutdown process of *all* sched pools is now dependent on the number of tasks in the pool being 0 rather than this only being a hardcoded solution for the initial sched pool in libgreen. This involved adding a Local::try_take() method on the Local trait in order for the channel wakeup to work inside of libgreen. The channel send was happening from a SchedTask when there is no Task available in TLS, and now this is possible to work (remote wakeups are always possible, just a little slower). --- src/libgreen/lib.rs | 92 +++++++++++++++++++++++++++++++------ src/libgreen/sched.rs | 26 +++++++---- src/libgreen/task.rs | 39 ++++++++++++++-- src/libnative/bookeeping.rs | 49 ++++++++++++++++++++ src/libnative/lib.rs | 10 +--- src/libnative/task.rs | 3 ++ src/libstd/rt/local.rs | 23 ++++++++-- src/libstd/rt/local_ptr.rs | 40 ++++++++++++++++ src/libstd/rt/task.rs | 33 +------------ 9 files changed, 244 insertions(+), 71 deletions(-) create mode 100644 src/libnative/bookeeping.rs diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 9fdb8175e5c..3ddd1f05f25 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -25,11 +25,12 @@ // NB this does *not* include globs, please keep it that way. #[feature(macro_rules)]; +// Allow check-stage0-green for now +#[cfg(test, stage0)] extern mod green; + use std::os; use std::rt::crate_map; -use std::rt::local::Local; use std::rt::rtio; -use std::rt::task::Task; use std::rt::thread::Thread; use std::rt; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; @@ -37,6 +38,7 @@ use std::sync::deque; use std::task::TaskOpts; use std::util; use std::vec; +use std::sync::arc::UnsafeArc; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; use sleeper_list::SleeperList; @@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int { os::set_exit_status(rt::DEFAULT_ERROR_CODE); } - // Once the main task has exited and we've set our exit code, wait for all - // spawned sub-tasks to finish running. This is done to allow all schedulers - // to remain active while there are still tasks possibly running. - unsafe { - let mut task = Local::borrow(None::); - task.get().wait_for_other_tasks(); - } - // Now that we're sure all tasks are dead, shut down the pool of schedulers, // waiting for them all to return. pool.shutdown(); @@ -164,6 +158,17 @@ pub struct SchedPool { priv deque_pool: deque::BufferPool<~task::GreenTask>, priv sleepers: SleeperList, priv factory: fn() -> ~rtio::EventLoop, + priv task_state: TaskState, + priv tasks_done: Port<()>, +} + +/// This is an internal state shared among a pool of schedulers. This is used to +/// keep track of how many tasks are currently running in the pool and then +/// sending on a channel once the entire pool has been drained of all tasks. +#[deriving(Clone)] +struct TaskState { + cnt: UnsafeArc, + done: SharedChan<()>, } impl SchedPool { @@ -182,6 +187,7 @@ impl SchedPool { assert!(nscheds > 0); // The pool of schedulers that will be returned from this function + let (p, state) = TaskState::new(); let mut pool = SchedPool { threads: ~[], handles: ~[], @@ -192,6 +198,8 @@ impl SchedPool { deque_pool: deque::BufferPool::new(), next_friend: 0, factory: factory, + task_state: state, + tasks_done: p, }; // Create a work queue for each scheduler, ntimes. Create an extra @@ -210,21 +218,30 @@ impl SchedPool { (pool.factory)(), worker, pool.stealers.clone(), - pool.sleepers.clone()); + pool.sleepers.clone(), + pool.task_state.clone()); pool.handles.push(sched.make_handle()); let sched = sched; - pool.threads.push(do Thread::start { - sched.bootstrap(); - }); + pool.threads.push(do Thread::start { sched.bootstrap(); }); } return pool; } + /// Creates a new task configured to run inside of this pool of schedulers. + /// This is useful to create a task which can then be sent to a specific + /// scheduler created by `spawn_sched` (and possibly pin it to that + /// scheduler). pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask { GreenTask::configure(&mut self.stack_pool, opts, f) } + /// Spawns a new task into this pool of schedulers, using the specified + /// options to configure the new task which is spawned. + /// + /// New tasks are spawned in a round-robin fashion to the schedulers in this + /// pool, but tasks can certainly migrate among schedulers once they're in + /// the pool. pub fn spawn(&mut self, opts: TaskOpts, f: proc()) { let task = self.task(opts, f); @@ -262,7 +279,8 @@ impl SchedPool { (self.factory)(), worker, self.stealers.clone(), - self.sleepers.clone()); + self.sleepers.clone(), + self.task_state.clone()); let ret = sched.make_handle(); self.handles.push(sched.make_handle()); let sched = sched; @@ -271,9 +289,28 @@ impl SchedPool { return ret; } + /// Consumes the pool of schedulers, waiting for all tasks to exit and all + /// schedulers to shut down. + /// + /// This function is required to be called in order to drop a pool of + /// schedulers, it is considered an error to drop a pool without calling + /// this method. + /// + /// This only waits for all tasks in *this pool* of schedulers to exit, any + /// native tasks or extern pools will not be waited on pub fn shutdown(mut self) { self.stealers = ~[]; + // Wait for everyone to exit. We may have reached a 0-task count + // multiple times in the past, meaning there could be several buffered + // messages on the `tasks_done` port. We're guaranteed that after *some* + // message the current task count will be 0, so we just receive in a + // loop until everything is totally dead. + while self.task_state.active() { + self.tasks_done.recv(); + } + + // Now that everyone's gone, tell everything to shut down. for mut handle in util::replace(&mut self.handles, ~[]).move_iter() { handle.send(Shutdown); } @@ -283,6 +320,31 @@ impl SchedPool { } } +impl TaskState { + fn new() -> (Port<()>, TaskState) { + let (p, c) = SharedChan::new(); + (p, TaskState { + cnt: UnsafeArc::new(AtomicUint::new(0)), + done: c, + }) + } + + fn increment(&mut self) { + unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); } + } + + fn active(&self) -> bool { + unsafe { (*self.cnt.get()).load(SeqCst) != 0 } + } + + fn decrement(&mut self) { + let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) }; + if prev == 1 { + self.done.send(()); + } + } +} + impl Drop for SchedPool { fn drop(&mut self) { if self.threads.len() > 0 { diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 984cc88a4c9..b0b88e4be79 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -19,6 +19,7 @@ use std::unstable::mutex::Mutex; use std::unstable::raw; use mpsc = std::sync::mpsc_queue; +use TaskState; use context::Context; use coroutine::Coroutine; use sleeper_list::SleeperList; @@ -85,6 +86,9 @@ pub struct Scheduler { /// A flag to tell the scheduler loop it needs to do some stealing /// in order to introduce randomness as part of a yield steal_for_yield: bool, + /// Bookeeping for the number of tasks which are currently running around + /// inside this pool of schedulers + task_state: TaskState, // n.b. currently destructors of an object are run in top-to-bottom in order // of field declaration. Due to its nature, the pausable idle callback @@ -120,11 +124,12 @@ impl Scheduler { event_loop: ~EventLoop, work_queue: deque::Worker<~GreenTask>, work_queues: ~[deque::Stealer<~GreenTask>], - sleeper_list: SleeperList) + sleeper_list: SleeperList, + state: TaskState) -> Scheduler { Scheduler::new_special(pool_id, event_loop, work_queue, work_queues, - sleeper_list, true, None) + sleeper_list, true, None, state) } @@ -134,7 +139,8 @@ impl Scheduler { work_queues: ~[deque::Stealer<~GreenTask>], sleeper_list: SleeperList, run_anything: bool, - friend: Option) + friend: Option, + state: TaskState) -> Scheduler { let (consumer, producer) = mpsc::queue(()); @@ -156,7 +162,8 @@ impl Scheduler { rng: new_sched_rng(), idle_callback: None, yield_check_count: 0, - steal_for_yield: false + steal_for_yield: false, + task_state: state, }; sched.yield_check_count = reset_yield_check(&mut sched.rng); @@ -756,6 +763,7 @@ impl Scheduler { let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| { let coroutine = dead_task.coroutine.take_unwrap(); coroutine.recycle(&mut sched.stack_pool); + sched.task_state.decrement(); }); fail!("should never return!"); } @@ -955,11 +963,10 @@ mod test { use std::rt::task::Task; use std::rt::local::Local; + use {TaskState, PoolConfig, SchedPool}; use basic; use sched::{TaskFromFriend, PinnedTask}; use task::{GreenTask, HomeSched}; - use PoolConfig; - use SchedPool; fn pool() -> SchedPool { SchedPool::new(PoolConfig { @@ -1078,6 +1085,7 @@ mod test { let (normal_worker, normal_stealer) = pool.deque(); let (special_worker, special_stealer) = pool.deque(); let queues = ~[normal_stealer, special_stealer]; + let (_p, state) = TaskState::new(); // Our normal scheduler let mut normal_sched = ~Scheduler::new( @@ -1085,7 +1093,8 @@ mod test { basic::event_loop(), normal_worker, queues.clone(), - sleepers.clone()); + sleepers.clone(), + state.clone()); let normal_handle = normal_sched.make_handle(); let friend_handle = normal_sched.make_handle(); @@ -1098,7 +1107,8 @@ mod test { queues.clone(), sleepers.clone(), false, - Some(friend_handle)); + Some(friend_handle), + state); let special_handle = special_sched.make_handle(); diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index eff80df2a11..fc4e1c08ba5 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -33,11 +33,32 @@ use stack::StackPool; /// The necessary fields needed to keep track of a green task (as opposed to a /// 1:1 task). pub struct GreenTask { + /// Coroutine that this task is running on, otherwise known as the register + /// context and the stack that this task owns. This field is optional to + /// relinquish ownership back to a scheduler to recycle stacks at a later + /// date. coroutine: Option, + + /// Optional handle back into the home sched pool of this task. This field + /// is lazily initialized. handle: Option, + + /// Slot for maintaining ownership of a scheduler. If a task is running, + /// this value will be Some(sched) where the task is running on "sched". sched: Option<~Scheduler>, + + /// Temporary ownership slot of a std::rt::task::Task object. This is used + /// to squirrel that libstd task away while we're performing green task + /// operations. task: Option<~Task>, + + /// Dictates whether this is a sched task or a normal green task task_type: TaskType, + + /// Home pool that this task was spawned into. This field is lazily + /// initialized until when the task is initially scheduled, and is used to + /// make sure that tasks are always woken up in the correct pool of + /// schedulers. pool_id: uint, // See the comments in the scheduler about why this is necessary @@ -147,10 +168,15 @@ impl GreenTask { // cleanup job after we have re-acquired ownership of the green // task. let mut task: ~GreenTask = unsafe { GreenTask::from_uint(ops) }; - task.sched.get_mut_ref().run_cleanup_job(); + task.pool_id = { + let sched = task.sched.get_mut_ref(); + sched.run_cleanup_job(); + sched.task_state.increment(); + sched.pool_id + }; // Convert our green task to a libstd task and then execute the code - // requeted. This is the "try/catch" block for this green task and + // requested. This is the "try/catch" block for this green task and // is the wrapper for *all* code run in the task. let mut start = Some(start); let task = task.swap().run(|| start.take_unwrap()()); @@ -350,6 +376,14 @@ impl Runtime for GreenTask { self.put_task(to_wake); assert!(self.sched.is_none()); + // Optimistically look for a local task, but if one's not available to + // inspect (in order to see if it's in the same sched pool as we are), + // then just use our remote wakeup routine and carry on! + let mut running_task: ~Task = match Local::try_take() { + Some(task) => task, + None => return self.reawaken_remotely() + }; + // Waking up a green thread is a bit of a tricky situation. We have no // guarantee about where the current task is running. The options we // have for where this current task is running are: @@ -368,7 +402,6 @@ impl Runtime for GreenTask { // // In case 2 and 3, we need to remotely reawaken ourself in order to be // transplanted back to the correct scheduler pool. - let mut running_task: ~Task = Local::take(); match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); diff --git a/src/libnative/bookeeping.rs b/src/libnative/bookeeping.rs new file mode 100644 index 00000000000..ca40c1a1958 --- /dev/null +++ b/src/libnative/bookeeping.rs @@ -0,0 +1,49 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! 1:1 Task bookeeping +//! +//! This module keeps track of the number of running 1:1 tasks so that entry +//! points with libnative know when it's possible to exit the program (once all +//! tasks have exited). +//! +//! The green counterpart for this is bookeeping on sched pools. + +use std::sync::atomics; +use std::unstable::mutex::{Mutex, MUTEX_INIT}; + +static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; +static mut TASK_LOCK: Mutex = MUTEX_INIT; + +pub fn increment() { + unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst); } +} + +pub fn decrement() { + unsafe { + if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 { + TASK_LOCK.lock(); + TASK_LOCK.signal(); + TASK_LOCK.unlock(); + } + } +} + +/// Waits for all other native tasks in the system to exit. This is only used by +/// the entry points of native programs +pub fn wait_for_other_tasks() { + unsafe { + TASK_LOCK.lock(); + while TASK_COUNT.load(atomics::SeqCst) > 0 { + TASK_LOCK.wait(); + } + TASK_LOCK.unlock(); + } +} diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index d92127cfbb2..498945a04cb 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -27,14 +27,12 @@ // answer is that you don't need them) use std::os; -use std::rt::local::Local; -use std::rt::task::Task; use std::rt; +mod bookeeping; pub mod io; pub mod task; - // XXX: this should not exist here #[cfg(stage0)] #[lang = "start"] @@ -83,11 +81,7 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { /// This function has all of the same details as `start` except for a different /// number of arguments. pub fn run(main: proc()) -> int { - // Run the main procedure and then wait for everything to finish main(); - unsafe { - let mut task = Local::borrow(None::); - task.get().wait_for_other_tasks(); - } + bookeeping::wait_for_other_tasks(); os::get_exit_status() } diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 8f2dff42404..c4d3f651777 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -27,6 +27,7 @@ use std::unstable::stack; use io; use task; +use bookeeping; /// Creates a new Task which is ready to execute as a 1:1 task. pub fn new() -> ~Task { @@ -79,8 +80,10 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) { stack::record_stack_bounds(my_stack - stack + 1024, my_stack); } + bookeeping::increment(); let mut f = Some(f); task.run(|| { f.take_unwrap()() }); + bookeeping::decrement(); }) } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 1c04b6b43ce..b4a6f06c2a4 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -16,6 +16,7 @@ use rt::local_ptr; pub trait Local { fn put(value: ~Self); fn take() -> ~Self; + fn try_take() -> Option<~Self>; fn exists(unused_value: Option) -> bool; fn borrow(unused_value: Option) -> Borrowed; unsafe fn unsafe_take() -> ~Self; @@ -28,6 +29,8 @@ impl Local> for Task { fn put(value: ~Task) { unsafe { local_ptr::put(value) } } #[inline] fn take() -> ~Task { unsafe { local_ptr::take() } } + #[inline] + fn try_take() -> Option<~Task> { unsafe { local_ptr::try_take() } } fn exists(_: Option) -> bool { local_ptr::exists() } #[inline] fn borrow(_: Option) -> local_ptr::Borrowed { @@ -47,7 +50,7 @@ impl Local> for Task { #[cfg(test)] mod test { - use option::None; + use option::{None, Option}; use unstable::run_in_bare_thread; use super::*; use rt::task::Task; @@ -56,7 +59,6 @@ mod test { #[test] fn thread_local_task_smoke_test() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); @@ -67,7 +69,6 @@ mod test { #[test] fn thread_local_task_two_instances() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); @@ -83,7 +84,6 @@ mod test { #[test] fn borrow_smoke_test() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); @@ -98,7 +98,6 @@ mod test { #[test] fn borrow_with_return() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); @@ -111,6 +110,20 @@ mod test { } } + #[test] + fn try_take() { + do run_in_bare_thread { + let task = ~Task::new(); + Local::put(task); + + let t: ~Task = Local::try_take().unwrap(); + let u: Option<~Task> = Local::try_take(); + assert!(u.is_none()); + + cleanup_task(t); + } + } + fn cleanup_task(mut t: ~Task) { t.destroyed = true; } diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index 42cce272e44..9921e742ba6 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -117,6 +117,24 @@ pub mod compiled { ptr } + /// Optionally take ownership of a pointer from thread-local storage. + /// + /// # Safety note + /// + /// Does not validate the pointer type. + #[inline] + pub unsafe fn try_take() -> Option<~T> { + let ptr = RT_TLS_PTR; + if ptr.is_null() { + None + } else { + let ptr: ~T = cast::transmute(ptr); + // can't use `as`, due to type not matching with `cfg(test)` + RT_TLS_PTR = cast::transmute(0); + Some(ptr) + } + } + /// Take ownership of a pointer from thread-local storage. /// /// # Safety note @@ -215,6 +233,28 @@ pub mod native { return ptr; } + /// Optionally take ownership of a pointer from thread-local storage. + /// + /// # Safety note + /// + /// Does not validate the pointer type. + #[inline] + pub unsafe fn try_take() -> Option<~T> { + match maybe_tls_key() { + Some(key) => { + let void_ptr: *mut c_void = tls::get(key); + if void_ptr.is_null() { + None + } else { + let ptr: ~T = cast::transmute(void_ptr); + tls::set(key, ptr::mut_null()); + Some(ptr) + } + } + None => None + } + } + /// Take ownership of a pointer from thread-local storage. /// /// # Safety note diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index e6ab159a769..583a1e0657c 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -34,21 +34,13 @@ use rt::rtio::LocalIo; use rt::unwind::Unwinder; use send_str::SendStr; use sync::arc::UnsafeArc; -use sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +use sync::atomics::{AtomicUint, SeqCst}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; -use unstable::mutex::{Mutex, MUTEX_INIT}; #[cfg(stage0)] pub use rt::unwind::begin_unwind; -// These two statics are used as bookeeping to keep track of the rust runtime's -// count of threads. In 1:1 contexts, this is used to know when to return from -// the main function, and in M:N contexts this is used to know when to shut down -// the pool of schedulers. -static mut TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; -static mut TASK_LOCK: Mutex = MUTEX_INIT; - // The Task struct represents all state associated with a rust // task. There are at this point two primary "subtypes" of task, // however instead of using a subtype we just have a "task_type" field @@ -127,7 +119,6 @@ impl Task { *cast::transmute::<&~Task, &*mut Task>(&self) }; Local::put(self); - unsafe { TASK_COUNT.fetch_add(1, SeqCst); } // The only try/catch block in the world. Attempt to run the task's // client-specified code and catch any failures. @@ -194,13 +185,6 @@ impl Task { unsafe { let me: *mut Task = Local::unsafe_borrow(); (*me).death.collect_failure((*me).unwinder.result()); - - // see comments on these statics for why they're used - if TASK_COUNT.fetch_sub(1, SeqCst) == 1 { - TASK_LOCK.lock(); - TASK_LOCK.signal(); - TASK_LOCK.unlock(); - } } let mut me: ~Task = Local::take(); me.destroyed = true; @@ -293,21 +277,6 @@ impl Task { pub fn local_io<'a>(&'a mut self) -> Option> { self.imp.get_mut_ref().local_io() } - - /// The main function of all rust executables will by default use this - /// function. This function will *block* the OS thread (hence the `unsafe`) - /// waiting for all known tasks to complete. Once this function has - /// returned, it is guaranteed that no more user-defined code is still - /// running. - pub unsafe fn wait_for_other_tasks(&mut self) { - TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves - TASK_LOCK.lock(); - while TASK_COUNT.load(SeqCst) > 0 { - TASK_LOCK.wait(); - } - TASK_LOCK.unlock(); - TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in - } } impl Drop for Task {