green: Rip the bandaid off, introduce libgreen

This extracts everything related to green scheduling from libstd and introduces
a new libgreen crate. This mostly involves deleting most of std::rt and moving
it to libgreen.

Along with the movement of code, this commit rearchitects many functions in the
scheduler in order to adapt to the fact that Local::take now *only* works on a
Task, not a scheduler. This mostly just involved threading the current green
task through in a few locations, but there were one or two spots where things
got hairy.

There are a few repercussions of this commit:

* tube/rc have been removed (the runtime implementation of rc)
* There is no longer a "single threaded" spawning mode for tasks. This is now
  encompassed by 1:1 scheduling + communication. Convenience methods have been
  introduced that are specific to libgreen to assist in the spawning of pools of
  schedulers.
This commit is contained in:
Alex Crichton 2013-12-12 18:01:59 -08:00
parent 6aadc9d188
commit 51abdee5f1
28 changed files with 1715 additions and 2060 deletions

View File

@ -14,7 +14,6 @@
/// parallelism.
use std::task::SchedMode;
use std::task;
use std::vec;
@ -46,7 +45,6 @@ impl<T> TaskPool<T> {
/// returns a function which, given the index of the task, should return
/// local data to be kept around in that task.
pub fn new(n_tasks: uint,
opt_sched_mode: Option<SchedMode>,
init_fn_factory: || -> proc(uint) -> T)
-> TaskPool<T> {
assert!(n_tasks >= 1);
@ -65,18 +63,8 @@ impl<T> TaskPool<T> {
}
};
// Start the task.
match opt_sched_mode {
None => {
// Run on this scheduler.
task::spawn(task_body);
}
Some(sched_mode) => {
let mut task = task::task();
task.sched_mode(sched_mode);
task.spawn(task_body);
}
}
// Run on this scheduler.
task::spawn(task_body);
chan
});
@ -99,7 +87,7 @@ fn test_task_pool() {
let g: proc(uint) -> uint = proc(i) i;
g
};
let mut pool = TaskPool::new(4, Some(SingleThreaded), f);
let mut pool = TaskPool::new(4, f);
8.times(|| {
pool.execute(proc(i) println!("Hello from thread {}!", *i));
})

View File

@ -11,15 +11,15 @@
//! This is a basic event loop implementation not meant for any "real purposes"
//! other than testing the scheduler and proving that it's possible to have a
//! pluggable event loop.
//!
//! This implementation is also used as the fallback implementation of an event
//! loop if no other one is provided (and M:N scheduling is desired).
use prelude::*;
use cast;
use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausableIdleCallback,
Callback};
use unstable::sync::Exclusive;
use io::native;
use util;
use std::cast;
use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback,
Callback};
use std::unstable::sync::Exclusive;
use std::util;
/// This is the only exported function from this module.
pub fn event_loop() -> ~EventLoop {
@ -32,7 +32,6 @@ struct BasicLoop {
remotes: ~[(uint, ~Callback)],
next_remote: uint,
messages: Exclusive<~[Message]>,
io: ~IoFactory,
}
enum Message { RunRemote(uint), RemoveRemote(uint) }
@ -45,7 +44,6 @@ impl BasicLoop {
next_remote: 0,
remotes: ~[],
messages: Exclusive::new(~[]),
io: ~native::IoFactory as ~IoFactory,
}
}
@ -159,10 +157,7 @@ impl EventLoop for BasicLoop {
~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> {
let factory: &mut IoFactory = self.io;
Some(factory)
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
}
struct BasicRemote {

View File

@ -8,14 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use super::stack::StackSegment;
use libc::c_void;
use uint;
use cast::{transmute, transmute_mut_unsafe,
transmute_region, transmute_mut_region};
use std::libc::c_void;
use std::uint;
use std::cast::{transmute, transmute_mut_unsafe,
transmute_region, transmute_mut_region};
use std::unstable::stack;
pub static RED_ZONE: uint = 20 * 1024;
use stack::StackSegment;
// FIXME #7761: Registers is boxed so that it is 16-byte aligned, for storing
// SSE regs. It would be marginally better not to do this. In C++ we
@ -25,7 +24,7 @@ pub static RED_ZONE: uint = 20 * 1024;
// then misalign the regs again.
pub struct Context {
/// The context entry point, saved here for later destruction
priv start: Option<~proc()>,
priv start: ~Option<proc()>,
/// Hold the registers while the task or scheduler is suspended
priv regs: ~Registers,
/// Lower bound and upper bound for the stack
@ -35,7 +34,7 @@ pub struct Context {
impl Context {
pub fn empty() -> Context {
Context {
start: None,
start: ~None,
regs: new_regs(),
stack_bounds: None,
}
@ -43,32 +42,29 @@ impl Context {
/// Create a new context that will resume execution by running proc()
pub fn new(start: proc(), stack: &mut StackSegment) -> Context {
// FIXME #7767: Putting main into a ~ so it's a thin pointer and can
// be passed to the spawn function. Another unfortunate
// allocation
let start = ~start;
// The C-ABI function that is the task entry point
extern fn task_start_wrapper(f: &proc()) {
// XXX(pcwalton): This may be sketchy.
unsafe {
let f: &|| = transmute(f);
(*f)()
}
extern fn task_start_wrapper(f: &mut Option<proc()>) {
f.take_unwrap()()
}
let fp: *c_void = task_start_wrapper as *c_void;
let argp: *c_void = unsafe { transmute::<&proc(), *c_void>(&*start) };
let sp: *uint = stack.end();
let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) };
// Save and then immediately load the current context,
// which we will then modify to call the given function when restored
let mut regs = new_regs();
unsafe {
rust_swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs));
rust_swap_registers(transmute_mut_region(&mut *regs),
transmute_region(&*regs));
};
initialize_call_frame(&mut *regs, fp, argp, sp);
// FIXME #7767: Putting main into a ~ so it's a thin pointer and can
// be passed to the spawn function. Another unfortunate
// allocation
let box = ~Some(start);
initialize_call_frame(&mut *regs,
task_start_wrapper as *c_void,
unsafe { transmute(&*box) },
sp);
// Scheduler tasks don't have a stack in the "we allocated it" sense,
// but rather they run on pthreads stacks. We have complete control over
@ -82,7 +78,7 @@ impl Context {
Some((stack_base as uint, sp as uint))
};
return Context {
start: Some(start),
start: box,
regs: regs,
stack_bounds: bounds,
}
@ -113,17 +109,18 @@ impl Context {
// invalid for the current task. Lucky for us `rust_swap_registers`
// is a C function so we don't have to worry about that!
match in_context.stack_bounds {
Some((lo, hi)) => record_stack_bounds(lo, hi),
Some((lo, hi)) => stack::record_stack_bounds(lo, hi),
// If we're going back to one of the original contexts or
// something that's possibly not a "normal task", then reset
// the stack limit to 0 to make morestack never fail
None => record_stack_bounds(0, uint::max_value),
None => stack::record_stack_bounds(0, uint::max_value),
}
rust_swap_registers(out_regs, in_regs)
}
}
}
#[link(name = "rustrt", kind = "static")]
extern {
fn rust_swap_registers(out_regs: *mut Registers, in_regs: *Registers);
}
@ -282,182 +279,6 @@ fn align_down(sp: *mut uint) -> *mut uint {
// ptr::mut_offset is positive ints only
#[inline]
pub fn mut_offset<T>(ptr: *mut T, count: int) -> *mut T {
use mem::size_of;
use std::mem::size_of;
(ptr as int + count * (size_of::<T>() as int)) as *mut T
}
#[inline(always)]
pub unsafe fn record_stack_bounds(stack_lo: uint, stack_hi: uint) {
// When the old runtime had segmented stacks, it used a calculation that was
// "limit + RED_ZONE + FUDGE". The red zone was for things like dynamic
// symbol resolution, llvm function calls, etc. In theory this red zone
// value is 0, but it matters far less when we have gigantic stacks because
// we don't need to be so exact about our stack budget. The "fudge factor"
// was because LLVM doesn't emit a stack check for functions < 256 bytes in
// size. Again though, we have giant stacks, so we round all these
// calculations up to the nice round number of 20k.
record_sp_limit(stack_lo + RED_ZONE);
return target_record_stack_bounds(stack_lo, stack_hi);
#[cfg(not(windows))] #[cfg(not(target_arch = "x86_64"))] #[inline(always)]
unsafe fn target_record_stack_bounds(_stack_lo: uint, _stack_hi: uint) {}
#[cfg(windows, target_arch = "x86_64")] #[inline(always)]
unsafe fn target_record_stack_bounds(stack_lo: uint, stack_hi: uint) {
// Windows compiles C functions which may check the stack bounds. This
// means that if we want to perform valid FFI on windows, then we need
// to ensure that the stack bounds are what they truly are for this
// task. More info can be found at:
// https://github.com/mozilla/rust/issues/3445#issuecomment-26114839
//
// stack range is at TIB: %gs:0x08 (top) and %gs:0x10 (bottom)
asm!("mov $0, %gs:0x08" :: "r"(stack_hi) :: "volatile");
asm!("mov $0, %gs:0x10" :: "r"(stack_lo) :: "volatile");
}
}
/// Records the current limit of the stack as specified by `end`.
///
/// This is stored in an OS-dependent location, likely inside of the thread
/// local storage. The location that the limit is stored is a pre-ordained
/// location because it's where LLVM has emitted code to check.
///
/// Note that this cannot be called under normal circumstances. This function is
/// changing the stack limit, so upon returning any further function calls will
/// possibly be triggering the morestack logic if you're not careful.
///
/// Also note that this and all of the inside functions are all flagged as
/// "inline(always)" because they're messing around with the stack limits. This
/// would be unfortunate for the functions themselves to trigger a morestack
/// invocation (if they were an actual function call).
#[inline(always)]
pub unsafe fn record_sp_limit(limit: uint) {
return target_record_sp_limit(limit);
// x86-64
#[cfg(target_arch = "x86_64", target_os = "macos")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
asm!("movq $$0x60+90*8, %rsi
movq $0, %gs:(%rsi)" :: "r"(limit) : "rsi" : "volatile")
}
#[cfg(target_arch = "x86_64", target_os = "linux")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
asm!("movq $0, %fs:112" :: "r"(limit) :: "volatile")
}
#[cfg(target_arch = "x86_64", target_os = "win32")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
// see: http://en.wikipedia.org/wiki/Win32_Thread_Information_Block
// store this inside of the "arbitrary data slot", but double the size
// because this is 64 bit instead of 32 bit
asm!("movq $0, %gs:0x28" :: "r"(limit) :: "volatile")
}
#[cfg(target_arch = "x86_64", target_os = "freebsd")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
asm!("movq $0, %fs:24" :: "r"(limit) :: "volatile")
}
// x86
#[cfg(target_arch = "x86", target_os = "macos")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
asm!("movl $$0x48+90*4, %eax
movl $0, %gs:(%eax)" :: "r"(limit) : "eax" : "volatile")
}
#[cfg(target_arch = "x86", target_os = "linux")]
#[cfg(target_arch = "x86", target_os = "freebsd")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
asm!("movl $0, %gs:48" :: "r"(limit) :: "volatile")
}
#[cfg(target_arch = "x86", target_os = "win32")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
// see: http://en.wikipedia.org/wiki/Win32_Thread_Information_Block
// store this inside of the "arbitrary data slot"
asm!("movl $0, %fs:0x14" :: "r"(limit) :: "volatile")
}
// mips, arm - Some brave soul can port these to inline asm, but it's over
// my head personally
#[cfg(target_arch = "mips")]
#[cfg(target_arch = "arm")] #[inline(always)]
unsafe fn target_record_sp_limit(limit: uint) {
return record_sp_limit(limit as *c_void);
extern {
fn record_sp_limit(limit: *c_void);
}
}
}
/// The counterpart of the function above, this function will fetch the current
/// stack limit stored in TLS.
///
/// Note that all of these functions are meant to be exact counterparts of their
/// brethren above, except that the operands are reversed.
///
/// As with the setter, this function does not have a __morestack header and can
/// therefore be called in a "we're out of stack" situation.
#[inline(always)]
// currently only called by `rust_stack_exhausted`, which doesn't
// exist in a test build.
#[cfg(not(test))]
pub unsafe fn get_sp_limit() -> uint {
return target_get_sp_limit();
// x86-64
#[cfg(target_arch = "x86_64", target_os = "macos")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movq $$0x60+90*8, %rsi
movq %gs:(%rsi), $0" : "=r"(limit) :: "rsi" : "volatile");
return limit;
}
#[cfg(target_arch = "x86_64", target_os = "linux")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movq %fs:112, $0" : "=r"(limit) ::: "volatile");
return limit;
}
#[cfg(target_arch = "x86_64", target_os = "win32")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movq %gs:0x28, $0" : "=r"(limit) ::: "volatile");
return limit;
}
#[cfg(target_arch = "x86_64", target_os = "freebsd")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movq %fs:24, $0" : "=r"(limit) ::: "volatile");
return limit;
}
// x86
#[cfg(target_arch = "x86", target_os = "macos")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movl $$0x48+90*4, %eax
movl %gs:(%eax), $0" : "=r"(limit) :: "eax" : "volatile");
return limit;
}
#[cfg(target_arch = "x86", target_os = "linux")]
#[cfg(target_arch = "x86", target_os = "freebsd")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movl %gs:48, $0" : "=r"(limit) ::: "volatile");
return limit;
}
#[cfg(target_arch = "x86", target_os = "win32")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
let limit;
asm!("movl %fs:0x14, $0" : "=r"(limit) ::: "volatile");
return limit;
}
// mips, arm - Some brave soul can port these to inline asm, but it's over
// my head personally
#[cfg(target_arch = "mips")]
#[cfg(target_arch = "arm")] #[inline(always)]
unsafe fn target_get_sp_limit() -> uint {
return get_sp_limit() as uint;
extern {
fn get_sp_limit() -> *c_void;
}
}
}

62
src/libgreen/coroutine.rs Normal file
View File

@ -0,0 +1,62 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Coroutines represent nothing more than a context and a stack
// segment.
use std::rt::env;
use context::Context;
use stack::{StackPool, StackSegment};
/// A coroutine is nothing more than a (register context, stack) pair.
pub struct Coroutine {
/// The segment of stack on which the task is currently running or
/// if the task is blocked, on which the task will resume
/// execution.
///
/// Servo needs this to be public in order to tell SpiderMonkey
/// about the stack bounds.
current_stack_segment: StackSegment,
/// Always valid if the task is alive and not running.
saved_context: Context
}
impl Coroutine {
pub fn new(stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc())
-> Coroutine {
let stack_size = match stack_size {
Some(size) => size,
None => env::min_stack()
};
let mut stack = stack_pool.take_segment(stack_size);
let initial_context = Context::new(start, &mut stack);
Coroutine {
current_stack_segment: stack,
saved_context: initial_context
}
}
pub fn empty() -> Coroutine {
Coroutine {
current_stack_segment: StackSegment::new(0),
saved_context: Context::empty()
}
}
/// Destroy coroutine and try to reuse std::stack segment.
pub fn recycle(self, stack_pool: &mut StackPool) {
let Coroutine { current_stack_segment, .. } = self;
stack_pool.give_segment(current_stack_segment);
}
}

351
src/libgreen/lib.rs Normal file
View File

@ -0,0 +1,351 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! The "green scheduling" library
//!
//! This library provides M:N threading for rust programs. Internally this has
//! the implementation of a green scheduler along with context switching and a
//! stack-allocation strategy.
//!
//! This can be optionally linked in to rust programs in order to provide M:N
//! functionality inside of 1:1 programs.
#[link(name = "green",
package_id = "green",
vers = "0.9-pre",
uuid = "20c38f8c-bfea-83ed-a068-9dc05277be26",
url = "https://github.com/mozilla/rust/tree/master/src/libgreen")];
#[license = "MIT/ASL2"];
#[crate_type = "rlib"];
#[crate_type = "dylib"];
// NB this does *not* include globs, please keep it that way.
#[feature(macro_rules)];
use std::cast;
use std::os;
use std::rt::thread::Thread;
use std::rt;
use std::rt::crate_map;
use std::rt::task::Task;
use std::rt::rtio;
use std::sync::deque;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
use std::task::TaskResult;
use std::vec;
use std::util;
use sched::{Shutdown, Scheduler, SchedHandle};
use sleeper_list::SleeperList;
use task::{GreenTask, HomeSched};
mod macros;
pub mod basic;
pub mod context;
pub mod coroutine;
pub mod sched;
pub mod sleeper_list;
pub mod stack;
pub mod task;
#[cfg(stage0)]
#[lang = "start"]
pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
do start(argc, argv) {
let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
main();
}
}
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
/// This function will block the current thread of execution until the entire
/// pool of M:N schedulers have exited.
///
/// # Arguments
///
/// * `argc` & `argv` - The argument vector. On Unix this information is used
/// by os::args.
/// * `main` - The initial procedure to run inside of the M:N scheduling pool.
/// Once this procedure exits, the scheduling pool will begin to shut
/// down. The entire pool (and this function) will only return once
/// all child tasks have finished executing.
///
/// # Return value
///
/// The return value is used as the process return code. 0 on success, 101 on
/// error.
pub fn start(argc: int, argv: **u8, main: proc()) -> int {
rt::init(argc, argv);
let exit_code = run(main);
// unsafe is ok b/c we're sure that the runtime is gone
unsafe { rt::cleanup() }
exit_code
}
/// Execute the main function in a pool of M:N schedulers.
///
/// Configures the runtime according to the environment, by default
/// using a task scheduler with the same number of threads as cores.
/// Returns a process exit code.
///
/// This function will not return until all schedulers in the associated pool
/// have returned.
pub fn run(main: proc()) -> int {
let config = Config {
shutdown_after_main_exits: true,
..Config::new()
};
Pool::spawn(config, main).wait();
os::get_exit_status()
}
/// Configuration of how an M:N pool of schedulers is spawned.
pub struct Config {
/// If this flag is set, then when schedulers are spawned via the `start`
/// and `run` functions the thread invoking `start` and `run` will have a
/// scheduler spawned on it. This scheduler will be "special" in that the
/// main task will be pinned to the scheduler and it will not participate in
/// work stealing.
///
/// If the `spawn` function is used to create a pool of schedulers, then
/// this option has no effect.
use_main_thread: bool,
/// The number of schedulers (OS threads) to spawn into this M:N pool.
threads: uint,
/// When the main function exits, this flag will dictate whether a shutdown
/// is requested of all schedulers. If this flag is `true`, this means that
/// schedulers will shut down as soon as possible after the main task exits
/// (but some may stay alive longer for things like I/O or other tasks).
///
/// If this flag is `false`, then no action is taken when the `main` task
/// exits. The scheduler pool is then shut down via the `wait()` function.
shutdown_after_main_exits: bool,
}
impl Config {
/// Returns the default configuration, as determined the the environment
/// variables of this process.
pub fn new() -> Config {
Config {
use_main_thread: false,
threads: rt::default_sched_threads(),
shutdown_after_main_exits: false,
}
}
}
/// A structure representing a handle to a pool of schedulers. This handle is
/// used to keep the pool alive and also reap the status from the pool.
pub struct Pool {
priv threads: ~[Thread<()>],
priv handles: Option<~[SchedHandle]>,
}
impl Pool {
/// Execute the main function in a pool of M:N schedulers.
///
/// This will configure the pool according to the `config` parameter, and
/// initially run `main` inside the pool of schedulers.
pub fn spawn(config: Config, main: proc()) -> Pool {
static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
let Config {
threads: nscheds,
use_main_thread: use_main_sched,
shutdown_after_main_exits
} = config;
let mut main = Some(main);
let pool_id = unsafe { POOL_ID.fetch_add(1, SeqCst) };
// The shared list of sleeping schedulers.
let sleepers = SleeperList::new();
// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
let mut pool = deque::BufferPool::new();
let arr = vec::from_fn(nscheds, |_| pool.deque());
let (workers, stealers) = vec::unzip(arr.move_iter());
// The schedulers.
let mut scheds = ~[];
// Handles to the schedulers. When the main task ends these will be
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
for worker in workers.move_iter() {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = new_event_loop();
let mut sched = ~Scheduler::new(pool_id,
loop_,
worker,
stealers.clone(),
sleepers.clone());
let handle = sched.make_handle();
scheds.push(sched);
handles.push(handle);
}
// If we need a main-thread task then create a main thread scheduler
// that will reject any task that isn't pinned to it
let main_sched = if use_main_sched {
// Create a friend handle.
let mut friend_sched = scheds.pop();
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);
// This scheduler needs a queue that isn't part of the stealee
// set.
let (worker, _) = pool.deque();
let main_loop = new_event_loop();
let mut main_sched = ~Scheduler::new_special(pool_id,
main_loop,
worker,
stealers.clone(),
sleepers.clone(),
false,
Some(friend_handle));
let mut main_handle = main_sched.make_handle();
// Allow the scheduler to exit when the main task exits.
// Note: sending the shutdown message also prevents the scheduler
// from pushing itself to the sleeper list, which is used for
// waking up schedulers for work stealing; since this is a
// non-work-stealing scheduler it should not be adding itself
// to the list.
main_handle.send(Shutdown);
Some(main_sched)
} else {
None
};
// The pool of schedulers that will be returned from this function
let mut pool = Pool { threads: ~[], handles: None };
// When the main task exits, after all the tasks in the main
// task tree, shut down the schedulers and set the exit code.
let mut on_exit = if shutdown_after_main_exits {
let handles = handles;
Some(proc(exit_success: TaskResult) {
let mut handles = handles;
for handle in handles.mut_iter() {
handle.send(Shutdown);
}
if exit_success.is_err() {
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
}
})
} else {
pool.handles = Some(handles);
None
};
if !use_main_sched {
// In the case where we do not use a main_thread scheduler we
// run the main task in one of our threads.
let mut main = GreenTask::new(&mut scheds[0].stack_pool, None,
main.take_unwrap());
let mut main_task = ~Task::new();
main_task.name = Some(SendStrStatic("<main>"));
main_task.death.on_exit = on_exit.take();
main.put_task(main_task);
let sched = scheds.pop();
let main = main;
let thread = do Thread::start {
sched.bootstrap(main);
};
pool.threads.push(thread);
}
// Run each remaining scheduler in a thread.
for sched in scheds.move_rev_iter() {
rtdebug!("creating regular schedulers");
let thread = do Thread::start {
let mut sched = sched;
let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
rtdebug!("boostraping a non-primary scheduler");
};
task.put_task(~Task::new());
sched.bootstrap(task);
};
pool.threads.push(thread);
}
// If we do have a main thread scheduler, run it now.
if use_main_sched {
rtdebug!("about to create the main scheduler task");
let mut main_sched = main_sched.unwrap();
let home = HomeSched(main_sched.make_handle());
let mut main = GreenTask::new_homed(&mut main_sched.stack_pool, None,
home, main.take_unwrap());
let mut main_task = ~Task::new();
main_task.name = Some(SendStrStatic("<main>"));
main_task.death.on_exit = on_exit.take();
main.put_task(main_task);
rtdebug!("bootstrapping main_task");
main_sched.bootstrap(main);
}
return pool;
}
/// Waits for the pool of schedulers to exit. If the pool was spawned to
/// shutdown after the main task exits, this will simply wait for all the
/// scheudlers to exit. If the pool was not spawned like that, this function
/// will trigger shutdown of all the active schedulers. The schedulers will
/// exit once all tasks in this pool of schedulers has exited.
pub fn wait(&mut self) {
match self.handles.take() {
Some(mut handles) => {
for handle in handles.mut_iter() {
handle.send(Shutdown);
}
}
None => {}
}
for thread in util::replace(&mut self.threads, ~[]).move_iter() {
thread.join();
}
}
}
fn new_event_loop() -> ~rtio::EventLoop {
match crate_map::get_crate_map() {
None => {}
Some(map) => {
match map.event_loop_factory {
None => {}
Some(factory) => return factory()
}
}
}
// If the crate map didn't specify a factory to create an event loop, then
// instead just use a basic event loop missing all I/O services to at least
// get the scheduler running.
return basic::event_loop();
}

130
src/libgreen/macros.rs Normal file
View File

@ -0,0 +1,130 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// XXX: this file probably shouldn't exist
#[macro_escape];
use std::fmt;
use std::libc;
// Indicates whether we should perform expensive sanity checks, including rtassert!
// XXX: Once the runtime matures remove the `true` below to turn off rtassert, etc.
pub static ENFORCE_SANITY: bool = true || !cfg!(rtopt) || cfg!(rtdebug) || cfg!(rtassert);
macro_rules! rterrln (
($($arg:tt)*) => ( {
format_args!(::macros::dumb_println, $($arg)*)
} )
)
// Some basic logging. Enabled by passing `--cfg rtdebug` to the libstd build.
macro_rules! rtdebug (
($($arg:tt)*) => ( {
if cfg!(rtdebug) {
rterrln!($($arg)*)
}
})
)
macro_rules! rtassert (
( $arg:expr ) => ( {
if ::macros::ENFORCE_SANITY {
if !$arg {
rtabort!(" assertion failed: {}", stringify!($arg));
}
}
} )
)
macro_rules! rtabort (
($($arg:tt)*) => ( {
::macros::abort(format!($($arg)*));
} )
)
pub fn dumb_println(args: &fmt::Arguments) {
use std::io;
use std::libc;
use std::vec;
struct Stderr;
impl io::Writer for Stderr {
fn write(&mut self, data: &[u8]) {
unsafe {
libc::write(libc::STDERR_FILENO,
vec::raw::to_ptr(data) as *libc::c_void,
data.len() as libc::size_t);
}
}
}
let mut w = Stderr;
fmt::writeln(&mut w as &mut io::Writer, args);
}
pub fn abort(msg: &str) -> ! {
let msg = if !msg.is_empty() { msg } else { "aborted" };
let hash = msg.chars().fold(0, |accum, val| accum + (val as uint) );
let quote = match hash % 10 {
0 => "
It was from the artists and poets that the pertinent answers came, and I
know that panic would have broken loose had they been able to compare notes.
As it was, lacking their original letters, I half suspected the compiler of
having asked leading questions, or of having edited the correspondence in
corroboration of what he had latently resolved to see.",
1 => "
There are not many persons who know what wonders are opened to them in the
stories and visions of their youth; for when as children we listen and dream,
we think but half-formed thoughts, and when as men we try to remember, we are
dulled and prosaic with the poison of life. But some of us awake in the night
with strange phantasms of enchanted hills and gardens, of fountains that sing
in the sun, of golden cliffs overhanging murmuring seas, of plains that stretch
down to sleeping cities of bronze and stone, and of shadowy companies of heroes
that ride caparisoned white horses along the edges of thick forests; and then
we know that we have looked back through the ivory gates into that world of
wonder which was ours before we were wise and unhappy.",
2 => "
Instead of the poems I had hoped for, there came only a shuddering blackness
and ineffable loneliness; and I saw at last a fearful truth which no one had
ever dared to breathe before the unwhisperable secret of secrets The fact
that this city of stone and stridor is not a sentient perpetuation of Old New
York as London is of Old London and Paris of Old Paris, but that it is in fact
quite dead, its sprawling body imperfectly embalmed and infested with queer
animate things which have nothing to do with it as it was in life.",
3 => "
The ocean ate the last of the land and poured into the smoking gulf, thereby
giving up all it had ever conquered. From the new-flooded lands it flowed
again, uncovering death and decay; and from its ancient and immemorial bed it
trickled loathsomely, uncovering nighted secrets of the years when Time was
young and the gods unborn. Above the waves rose weedy remembered spires. The
moon laid pale lilies of light on dead London, and Paris stood up from its damp
grave to be sanctified with star-dust. Then rose spires and monoliths that were
weedy but not remembered; terrible spires and monoliths of lands that men never
knew were lands...",
4 => "
There was a night when winds from unknown spaces whirled us irresistibly into
limitless vacuum beyond all thought and entity. Perceptions of the most
maddeningly untransmissible sort thronged upon us; perceptions of infinity
which at the time convulsed us with joy, yet which are now partly lost to my
memory and partly incapable of presentation to others.",
_ => "You've met with a terrible fate, haven't you?"
};
rterrln!("{}", "");
rterrln!("{}", quote);
rterrln!("{}", "");
rterrln!("fatal runtime error: {}", msg);
abort();
fn abort() -> ! {
unsafe { libc::abort() }
}
}

View File

@ -8,27 +8,22 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::{Option, Some, None};
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
use super::sleeper_list::SleeperList;
use super::stack::{StackPool};
use super::rtio::EventLoop;
use super::context::Context;
use super::task::{Task, AnySched, Sched};
use rt::kill::BlockedTask;
use rt::deque;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::{RemoteCallback, PausableIdleCallback, Callback};
use borrow::{to_uint};
use rand::{XorShiftRng, Rng, Rand};
use iter::range;
use unstable::mutex::Mutex;
use vec::{OwnedVector};
use std::cast;
use std::rand::{XorShiftRng, Rng, Rand};
use std::rt::local::Local;
use std::rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback, EventLoop};
use std::rt::task::BlockedTask;
use std::rt::task::Task;
use std::sync::deque;
use std::unstable::mutex::Mutex;
use std::unstable::raw;
use mpsc = std::sync::mpsc_queue;
use mpsc = super::mpsc_queue;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
use stack::StackPool;
use task::{TypeSched, GreenTask, HomeSched, AnySched};
/// A scheduler is responsible for coordinating the execution of Tasks
/// on a single thread. The scheduler runs inside a slightly modified
@ -39,11 +34,15 @@ use mpsc = super::mpsc_queue;
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
pub struct Scheduler {
/// ID number of the pool that this scheduler is a member of. When
/// reawakening green tasks, this is used to ensure that tasks aren't
/// reawoken on the wrong pool of schedulers.
pool_id: uint,
/// There are N work queues, one per scheduler.
work_queue: deque::Worker<~Task>,
work_queue: deque::Worker<~GreenTask>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
work_queues: ~[deque::Stealer<~Task>],
work_queues: ~[deque::Stealer<~GreenTask>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
@ -66,15 +65,15 @@ pub struct Scheduler {
stack_pool: StackPool,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
sched_task: Option<~Task>,
sched_task: Option<~GreenTask>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
cleanup_job: Option<CleanupJob>,
/// Should this scheduler run any task, or only pinned tasks?
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
friend_handle: Option<SchedHandle>,
/// Should this scheduler run any task, or only pinned tasks?
run_anything: bool,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng,
/// A togglable idle callback
@ -117,21 +116,22 @@ impl Scheduler {
// * Initialization Functions
pub fn new(event_loop: ~EventLoop,
work_queue: deque::Worker<~Task>,
work_queues: ~[deque::Stealer<~Task>],
pub fn new(pool_id: uint,
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList)
-> Scheduler {
Scheduler::new_special(event_loop, work_queue,
work_queues,
Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
sleeper_list, true, None)
}
pub fn new_special(event_loop: ~EventLoop,
work_queue: deque::Worker<~Task>,
work_queues: ~[deque::Stealer<~Task>],
pub fn new_special(pool_id: uint,
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
@ -139,6 +139,7 @@ impl Scheduler {
let (consumer, producer) = mpsc::queue(());
let mut sched = Scheduler {
pool_id: pool_id,
sleeper_list: sleeper_list,
message_queue: consumer,
message_producer: producer,
@ -170,66 +171,59 @@ impl Scheduler {
// Take a main task to run, and a scheduler to run it in. Create a
// scheduler task and bootstrap into it.
pub fn bootstrap(mut ~self, task: ~Task) {
pub fn bootstrap(mut ~self, task: ~GreenTask) {
// Build an Idle callback.
let cb = ~SchedRunner as ~Callback;
self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
// Initialize the TLS key.
local_ptr::init();
// Create a task for the scheduler with an empty context.
let sched_task = ~Task::new_sched_task();
// Now that we have an empty task struct for the scheduler
// task, put it in TLS.
Local::put(sched_task);
let mut sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
TypeSched);
sched_task.put_task(~Task::new());
// Before starting our first task, make sure the idle callback
// is active. As we do not start in the sleep state this is
// important.
self.idle_callback.get_mut_ref().resume();
// Now, as far as all the scheduler state is concerned, we are
// inside the "scheduler" context. So we can act like the
// scheduler and resume the provided task.
self.resume_task_immediately(task);
// Now, as far as all the scheduler state is concerned, we are inside
// the "scheduler" context. So we can act like the scheduler and resume
// the provided task. Let it think that the currently running task is
// actually the sched_task so it knows where to squirrel it away.
let mut sched_task = self.resume_task_immediately(sched_task, task);
// Now we are back in the scheduler context, having
// successfully run the input task. Start by running the
// scheduler. Grab it out of TLS - performing the scheduler
// action will have given it away.
let sched: ~Scheduler = Local::take();
let sched = sched_task.sched.take_unwrap();
rtdebug!("starting scheduler {}", sched.sched_id());
sched.run();
let mut sched_task = sched.run(sched_task);
// Close the idle callback.
let mut sched: ~Scheduler = Local::take();
let mut sched = sched_task.sched.take_unwrap();
sched.idle_callback.take();
// Make one go through the loop to run the close callback.
sched.run();
let mut stask = sched.run(sched_task);
// Now that we are done with the scheduler, clean up the
// scheduler task. Do so by removing it from TLS and manually
// cleaning up the memory it uses. As we didn't actually call
// task.run() on the scheduler task we never get through all
// the cleanup code it runs.
let mut stask: ~Task = Local::take();
rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id());
// Should not have any messages
let message = stask.sched.get_mut_ref().message_queue.pop();
rtassert!(match message { mpsc::Empty => true, _ => false });
stask.destroyed = true;
stask.task.get_mut_ref().destroyed = true;
}
// This does not return a scheduler, as the scheduler is placed
// inside the task.
pub fn run(mut ~self) {
pub fn run(mut ~self, stask: ~GreenTask) -> ~GreenTask {
// This is unsafe because we need to place the scheduler, with
// the event_loop inside, inside our task. But we still need a
@ -237,16 +231,24 @@ impl Scheduler {
// command.
unsafe {
let event_loop: *mut ~EventLoop = &mut self.event_loop;
{
// Our scheduler must be in the task before the event loop
// is started.
let mut stask = Local::borrow(None::<Task>);
stask.get().sched = Some(self);
}
// Our scheduler must be in the task before the event loop
// is started.
stask.put_with_sched(self);
(*event_loop).run();
}
// This is a serious code smell, but this function could be done away
// with if necessary. The ownership of `stask` was transferred into
// local storage just before the event loop ran, so it is possible to
// transmute `stask` as a uint across the running of the event loop to
// re-acquire ownership here.
//
// This would involve removing the Task from TLS, removing the runtime,
// forgetting the runtime, and then putting the task into `stask`. For
// now, because we have `GreenTask::convert`, I chose to take this
// method for cleanliness. This function is *not* a fundamental reason
// why this function should exist.
GreenTask::convert(Local::take())
}
// * Execution Functions - Core Loop Logic
@ -257,38 +259,37 @@ impl Scheduler {
// you reach the end and sleep. In the case that a scheduler
// action is performed the loop is evented such that this function
// is called again.
fn run_sched_once() {
// When we reach the scheduler context via the event loop we
// already have a scheduler stored in our local task, so we
// start off by taking it. This is the only path through the
// scheduler where we get the scheduler this way.
let mut sched: ~Scheduler = Local::take();
fn run_sched_once(mut ~self, stask: ~GreenTask) {
// Make sure that we're not lying in that the `stask` argument is indeed
// the scheduler task for this scheduler.
assert!(self.sched_task.is_none());
// Assume that we need to continue idling unless we reach the
// end of this function without performing an action.
sched.idle_callback.get_mut_ref().resume();
self.idle_callback.get_mut_ref().resume();
// First we check for scheduler messages, these are higher
// priority than regular tasks.
let sched = match sched.interpret_message_queue(DontTryTooHard) {
Some(sched) => sched,
None => return
};
let (sched, stask) =
match self.interpret_message_queue(stask, DontTryTooHard) {
Some(pair) => pair,
None => return
};
// This helper will use a randomized work-stealing algorithm
// to find work.
let sched = match sched.do_work() {
Some(sched) => sched,
let (sched, stask) = match sched.do_work(stask) {
Some(pair) => pair,
None => return
};
// Now, before sleeping we need to find out if there really
// were any messages. Give it your best!
let mut sched = match sched.interpret_message_queue(GiveItYourBest) {
Some(sched) => sched,
None => return
};
let (mut sched, stask) =
match sched.interpret_message_queue(stask, GiveItYourBest) {
Some(pair) => pair,
None => return
};
// If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so
@ -309,14 +310,17 @@ impl Scheduler {
// Finished a cycle without using the Scheduler. Place it back
// in TLS.
Local::put(sched);
stask.put_with_sched(sched);
}
// This function returns None if the scheduler is "used", or it
// returns the still-available scheduler. At this point all
// message-handling will count as a turn of work, and as a result
// return None.
fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> {
fn interpret_message_queue(mut ~self, stask: ~GreenTask,
effort: EffortLevel)
-> Option<(~Scheduler, ~GreenTask)>
{
let msg = if effort == DontTryTooHard {
self.message_queue.casual_pop()
@ -345,24 +349,25 @@ impl Scheduler {
match msg {
Some(PinnedTask(task)) => {
let mut task = task;
task.give_home(Sched(self.make_handle()));
self.resume_task_immediately(task);
task.give_home(HomeSched(self.make_handle()));
self.resume_task_immediately(stask, task).put();
return None;
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
self.process_task(task, Scheduler::resume_task_immediately_cl);
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
}
Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing).
Scheduler::resume_task_immediately_cl(self, task);
self.resume_task_immediately(stask, task).put();
return None;
}
Some(Wake) => {
self.sleepy = false;
Local::put(self);
stask.put_with_sched(self);
return None;
}
Some(Shutdown) => {
@ -385,26 +390,27 @@ impl Scheduler {
// event loop references we will shut down.
self.no_sleep = true;
self.sleepy = false;
Local::put(self);
stask.put_with_sched(self);
return None;
}
None => {
return Some(self);
return Some((self, stask));
}
}
}
fn do_work(mut ~self) -> Option<~Scheduler> {
fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> {
rtdebug!("scheduler calling do work");
match self.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
self.process_task(task, Scheduler::resume_task_immediately_cl);
rtdebug!("found some work! running the task");
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(self);
return Some((self, stask));
}
}
}
@ -418,7 +424,7 @@ impl Scheduler {
// First step in the process is to find a task. This function does
// that by first checking the local queue, and if there is no work
// there, trying to steal from the remote work queues.
fn find_work(&mut self) -> Option<~Task> {
fn find_work(&mut self) -> Option<~GreenTask> {
rtdebug!("scheduler looking for work");
if !self.steal_for_yield {
match self.work_queue.pop() {
@ -456,7 +462,7 @@ impl Scheduler {
// Try stealing from all queues the scheduler knows about. This
// naive implementation can steal from our own queue or from other
// special schedulers.
fn try_steals(&mut self) -> Option<~Task> {
fn try_steals(&mut self) -> Option<~GreenTask> {
let work_queues = &mut self.work_queues;
let len = work_queues.len();
let start_index = self.rng.gen_range(0, len);
@ -476,53 +482,48 @@ impl Scheduler {
// * Task Routing Functions - Make sure tasks send up in the right
// place.
fn process_task(mut ~self, mut task: ~Task, schedule_fn: SchedulingFn) {
fn process_task(mut ~self, cur: ~GreenTask,
mut next: ~GreenTask, schedule_fn: SchedulingFn) {
rtdebug!("processing a task");
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
match next.take_unwrap_home() {
HomeSched(home_handle) => {
if home_handle.sched_id != self.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
Local::put(self);
next.give_home(HomeSched(home_handle));
Scheduler::send_task_home(next);
cur.put_with_sched(self);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
schedule_fn(self, task);
next.give_home(HomeSched(home_handle));
schedule_fn(self, cur, next);
}
}
AnySched if self.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
schedule_fn(self, task);
next.give_home(AnySched);
schedule_fn(self, cur, next);
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
self.send_to_friend(task);
Local::put(self);
next.give_home(AnySched);
self.send_to_friend(next);
cur.put_with_sched(self);
}
}
}
fn send_task_home(task: ~Task) {
fn send_task_home(task: ~GreenTask) {
let mut task = task;
let mut home = task.take_unwrap_home();
match home {
Sched(ref mut home_handle) => {
home_handle.send(PinnedTask(task));
}
AnySched => {
rtabort!("error: cannot send anysched task home");
}
match task.take_unwrap_home() {
HomeSched(mut home_handle) => home_handle.send(PinnedTask(task)),
AnySched => rtabort!("error: cannot send anysched task home"),
}
}
/// Take a non-homed task we aren't allowed to run here and send
/// it to the designated friend scheduler to execute.
fn send_to_friend(&mut self, task: ~Task) {
fn send_to_friend(&mut self, task: ~GreenTask) {
rtdebug!("sending a task to friend");
match self.friend_handle {
Some(ref mut handle) => {
@ -539,9 +540,10 @@ impl Scheduler {
/// Pushes the task onto the work stealing queue and tells the
/// event loop to run it later. Always use this instead of pushing
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Task) {
pub fn enqueue_task(&mut self, task: ~GreenTask) {
// We push the task onto our local queue clone.
assert!(!task.is_sched());
self.work_queue.push(task);
self.idle_callback.get_mut_ref().resume();
@ -557,47 +559,31 @@ impl Scheduler {
};
}
/// As enqueue_task, but with the possibility for the blocked task to
/// already have been killed.
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
blocked_task.wake().map(|task| self.enqueue_task(task));
}
// * Core Context Switching Functions
// The primary function for changing contexts. In the current
// design the scheduler is just a slightly modified GreenTask, so
// all context swaps are from Task to Task. The only difference
// all context swaps are from GreenTask to GreenTask. The only difference
// between the various cases is where the inputs come from, and
// what is done with the resulting task. That is specified by the
// cleanup function f, which takes the scheduler and the
// old task as inputs.
pub fn change_task_context(mut ~self,
next_task: ~Task,
f: |&mut Scheduler, ~Task|) {
// The current task is grabbed from TLS, not taken as an input.
// Doing an unsafe_take to avoid writing back a null pointer -
// We're going to call `put` later to do that.
let current_task: ~Task = unsafe { Local::unsafe_take() };
current_task: ~GreenTask,
mut next_task: ~GreenTask,
f: |&mut Scheduler, ~GreenTask|) -> ~GreenTask {
let f_opaque = ClosureConverter::from_fn(f);
// Check that the task is not in an atomically() section (e.g.,
// holding a pthread mutex, which could deadlock the scheduler).
current_task.death.assert_may_sleep();
// These transmutes do something fishy with a closure.
let f_fake_region = unsafe {
transmute::<|&mut Scheduler, ~Task|,
|&mut Scheduler, ~Task|>(f)
let current_task_dupe = unsafe {
*cast::transmute::<&~GreenTask, &uint>(&current_task)
};
let f_opaque = ClosureConverter::from_fn(f_fake_region);
// The current task is placed inside an enum with the cleanup
// function. This enum is then placed inside the scheduler.
self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
// The scheduler is then placed inside the next task.
let mut next_task = next_task;
next_task.sched = Some(self);
// However we still need an internal mutable pointer to the
@ -607,12 +593,12 @@ impl Scheduler {
unsafe {
let sched: &mut Scheduler =
transmute_mut_region(*next_task.sched.get_mut_ref());
cast::transmute_mut_region(*next_task.sched.get_mut_ref());
let current_task: &mut Task = match sched.cleanup_job {
let current_task: &mut GreenTask = match sched.cleanup_job {
Some(CleanupJob { task: ref task, .. }) => {
let task_ptr: *~Task = task;
transmute_mut_region(*transmute_mut_unsafe(task_ptr))
let task_ptr: *~GreenTask = task;
cast::transmute_mut_region(*cast::transmute_mut_unsafe(task_ptr))
}
None => {
rtabort!("no cleanup job");
@ -626,7 +612,7 @@ impl Scheduler {
// works because due to transmute the borrow checker
// believes that we have no internal pointers to
// next_task.
Local::put(next_task);
cast::forget(next_task);
// The raw context swap operation. The next action taken
// will be running the cleanup job from the context of the
@ -637,16 +623,19 @@ impl Scheduler {
// When the context swaps back to this task we immediately
// run the cleanup job, as expected by the previously called
// swap_contexts function.
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).sched.get_mut_ref().run_cleanup_job();
let mut current_task: ~GreenTask = unsafe {
cast::transmute(current_task_dupe)
};
current_task.sched.get_mut_ref().run_cleanup_job();
// See the comments in switch_running_tasks_and_then for why a lock
// is acquired here. This is the resumption points and the "bounce"
// that it is referring to.
(*task).nasty_deschedule_lock.lock();
(*task).nasty_deschedule_lock.unlock();
// See the comments in switch_running_tasks_and_then for why a lock
// is acquired here. This is the resumption points and the "bounce"
// that it is referring to.
unsafe {
current_task.nasty_deschedule_lock.lock();
current_task.nasty_deschedule_lock.unlock();
}
return current_task;
}
// Returns a mutable reference to both contexts involved in this
@ -654,37 +643,33 @@ impl Scheduler {
// references to keep even when we don't own the tasks. It looks
// kinda safe because we are doing transmutes before passing in
// the arguments.
pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
pub fn get_contexts<'a>(current_task: &mut GreenTask, next_task: &mut GreenTask) ->
(&'a mut Context, &'a mut Context) {
let current_task_context =
&mut current_task.coroutine.get_mut_ref().saved_context;
let next_task_context =
&mut next_task.coroutine.get_mut_ref().saved_context;
unsafe {
(transmute_mut_region(current_task_context),
transmute_mut_region(next_task_context))
(cast::transmute_mut_region(current_task_context),
cast::transmute_mut_region(next_task_context))
}
}
// * Context Swapping Helpers - Here be ugliness!
pub fn resume_task_immediately(~self, task: ~Task) {
self.change_task_context(task, |sched, stask| {
pub fn resume_task_immediately(~self, cur: ~GreenTask,
next: ~GreenTask) -> ~GreenTask {
assert!(cur.is_sched());
self.change_task_context(cur, next, |sched, stask| {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(stask);
})
}
fn resume_task_immediately_cl(sched: ~Scheduler,
task: ~Task) {
sched.resume_task_immediately(task)
}
pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
match blocked_task.wake() {
Some(task) => { self.resume_task_immediately(task); }
None => Local::put(self)
};
cur: ~GreenTask,
next: ~GreenTask) {
sched.resume_task_immediately(cur, next).put()
}
/// Block a running task, context switch to the scheduler, then pass the
@ -709,15 +694,18 @@ impl Scheduler {
/// guaranteed that this function will not return before the given closure
/// has returned.
pub fn deschedule_running_task_and_then(mut ~self,
cur: ~GreenTask,
f: |&mut Scheduler, BlockedTask|) {
// Trickier - we need to get the scheduler task out of self
// and use it as the destination.
let stask = self.sched_task.take_unwrap();
// Otherwise this is the same as below.
self.switch_running_tasks_and_then(stask, f);
self.switch_running_tasks_and_then(cur, stask, f)
}
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
pub fn switch_running_tasks_and_then(~self,
cur: ~GreenTask,
next: ~GreenTask,
f: |&mut Scheduler, BlockedTask|) {
// And here comes one of the sad moments in which a lock is used in a
// core portion of the rust runtime. As always, this is highly
@ -733,80 +721,96 @@ impl Scheduler {
// task-local lock around this block. The resumption of the task in
// context switching will bounce on the lock, thereby waiting for this
// block to finish, eliminating the race mentioned above.
// fail!("should never return!");
//
// To actually maintain a handle to the lock, we use an unsafe pointer
// to it, but we're guaranteed that the task won't exit until we've
// unlocked the lock so there's no worry of this memory going away.
self.change_task_context(next_task, |sched, mut task| {
let cur = self.change_task_context(cur, next, |sched, mut task| {
let lock: *mut Mutex = &mut task.nasty_deschedule_lock;
unsafe { (*lock).lock() }
f(sched, BlockedTask::block(task));
f(sched, BlockedTask::block(task.swap()));
unsafe { (*lock).unlock() }
})
});
cur.put();
}
fn switch_task(sched: ~Scheduler, task: ~Task) {
sched.switch_running_tasks_and_then(task, |sched, last_task| {
sched.enqueue_blocked_task(last_task);
});
fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) {
sched.change_task_context(cur, next, |sched, last_task| {
if last_task.is_sched() {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(last_task);
} else {
sched.enqueue_task(last_task);
}
}).put()
}
// * Task Context Helpers
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
pub fn terminate_current_task(mut ~self) {
pub fn terminate_current_task(mut ~self, cur: ~GreenTask) {
// Similar to deschedule running task and then, but cannot go through
// the task-blocking path. The task is already dying.
let stask = self.sched_task.take_unwrap();
self.change_task_context(stask, |sched, mut dead_task| {
let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
})
});
fail!("should never return!");
}
pub fn run_task(task: ~Task) {
let sched: ~Scheduler = Local::take();
sched.process_task(task, Scheduler::switch_task);
pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
self.process_task(cur, next, Scheduler::switch_task);
}
pub fn run_task_later(next_task: ~Task) {
let mut sched = Local::borrow(None::<Scheduler>);
sched.get().enqueue_task(next_task);
pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
let mut sched = cur.sched.take_unwrap();
sched.enqueue_task(next);
cur.put_with_sched(sched);
}
/// Yield control to the scheduler, executing another task. This is guaranteed
/// to introduce some amount of randomness to the scheduler. Currently the
/// randomness is a result of performing a round of work stealing (which
/// may end up stealing from the current scheduler).
pub fn yield_now(mut ~self) {
self.yield_check_count = reset_yield_check(&mut self.rng);
// Tell the scheduler to start stealing on the next iteration
self.steal_for_yield = true;
self.deschedule_running_task_and_then(|sched, task| {
sched.enqueue_blocked_task(task);
})
pub fn yield_now(mut ~self, cur: ~GreenTask) {
if cur.is_sched() {
assert!(self.sched_task.is_none());
self.run_sched_once(cur);
} else {
self.yield_check_count = reset_yield_check(&mut self.rng);
// Tell the scheduler to start stealing on the next iteration
self.steal_for_yield = true;
let stask = self.sched_task.take_unwrap();
let cur = self.change_task_context(cur, stask, |sched, task| {
sched.enqueue_task(task);
});
cur.put()
}
}
pub fn maybe_yield(mut ~self) {
// The number of times to do the yield check before yielding, chosen arbitrarily.
pub fn maybe_yield(mut ~self, cur: ~GreenTask) {
// The number of times to do the yield check before yielding, chosen
// arbitrarily.
rtassert!(self.yield_check_count > 0);
self.yield_check_count -= 1;
if self.yield_check_count == 0 {
self.yield_now();
self.yield_now(cur);
} else {
Local::put(self);
cur.put_with_sched(self);
}
}
// * Utility Functions
pub fn sched_id(&self) -> uint { to_uint(self) }
pub fn sched_id(&self) -> uint { unsafe { cast::transmute(self) } }
pub fn run_cleanup_job(&mut self) {
let cleanup_job = self.cleanup_job.take_unwrap();
cleanup_job.run(self);
cleanup_job.run(self)
}
pub fn make_handle(&mut self) -> SchedHandle {
@ -816,20 +820,20 @@ impl Scheduler {
remote: remote,
queue: self.message_producer.clone(),
sched_id: self.sched_id()
};
}
}
}
// Supporting types
type SchedulingFn = extern "Rust" fn (~Scheduler, ~Task);
type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
pub enum SchedMessage {
Wake,
Shutdown,
PinnedTask(~Task),
TaskFromFriend(~Task),
RunOnce(~Task),
PinnedTask(~GreenTask),
TaskFromFriend(~GreenTask),
RunOnce(~GreenTask),
}
pub struct SchedHandle {
@ -849,17 +853,28 @@ struct SchedRunner;
impl Callback for SchedRunner {
fn call(&mut self) {
Scheduler::run_sched_once();
// In theory, this function needs to invoke the `run_sched_once`
// function on the scheduler. Sadly, we have no context here, except for
// knowledge of the local `Task`. In order to avoid a call to
// `GreenTask::convert`, we just call `yield_now` and the scheduler will
// detect when a sched task performs a yield vs a green task performing
// a yield (and act accordingly).
//
// This function could be converted to `GreenTask::convert` if
// absolutely necessary, but for cleanliness it is much better to not
// use the conversion function.
let task: ~Task = Local::take();
task.yield_now();
}
}
struct CleanupJob {
task: ~Task,
task: ~GreenTask,
f: UnsafeTaskReceiver
}
impl CleanupJob {
pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
pub fn new(task: ~GreenTask, f: UnsafeTaskReceiver) -> CleanupJob {
CleanupJob {
task: task,
f: f
@ -876,14 +891,16 @@ impl CleanupJob {
// complaining
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
fn from_fn(|&mut Scheduler, ~Task|) -> Self;
fn to_fn(self) -> |&mut Scheduler, ~Task|;
fn from_fn(|&mut Scheduler, ~GreenTask|) -> Self;
fn to_fn(self) -> |&mut Scheduler, ~GreenTask|;
}
impl ClosureConverter for UnsafeTaskReceiver {
fn from_fn(f: |&mut Scheduler, ~Task|) -> UnsafeTaskReceiver {
unsafe { transmute(f) }
fn from_fn(f: |&mut Scheduler, ~GreenTask|) -> UnsafeTaskReceiver {
unsafe { cast::transmute(f) }
}
fn to_fn(self) -> |&mut Scheduler, ~GreenTask| {
unsafe { cast::transmute(self) }
}
fn to_fn(self) -> |&mut Scheduler, ~Task| { unsafe { transmute(self) } }
}
// On unix, we read randomness straight from /dev/urandom, but the
@ -897,12 +914,9 @@ fn new_sched_rng() -> XorShiftRng {
}
#[cfg(unix)]
fn new_sched_rng() -> XorShiftRng {
use libc;
use mem;
use c_str::ToCStr;
use vec::MutableVector;
use iter::Iterator;
use rand::SeedableRng;
use std::libc;
use std::mem;
use std::rand::SeedableRng;
let fd = "/dev/urandom".with_c_str(|name| {
unsafe { libc::open(name, libc::O_RDONLY, 0) }
@ -933,14 +947,11 @@ fn new_sched_rng() -> XorShiftRng {
#[cfg(test)]
mod test {
use prelude::*;
use borrow::to_uint;
use rt::deque::BufferPool;
use rt::basic;
use rt::sched::{Scheduler};
use rt::task::{Task, Sched};
use rt::test::*;
use rt::task::{GreenTask, Sched};
use rt::thread::Thread;
use rt::util;
use task::TaskResult;
@ -1023,10 +1034,10 @@ mod test {
let mut sched = ~new_test_uv_sched();
let sched_handle = sched.make_handle();
let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None,
let mut task = ~do GreenTask::new_root_homed(&mut sched.stack_pool, None,
Sched(sched_handle)) {
unsafe { *task_ran_ptr = true };
assert!(Task::on_appropriate_sched());
assert!(GreenTask::on_appropriate_sched());
};
let on_exit: proc(TaskResult) = proc(exit_status) {
@ -1064,8 +1075,6 @@ mod test {
let normal_handle = normal_sched.make_handle();
let friend_handle = normal_sched.make_handle();
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
basic::event_loop(),
@ -1086,30 +1095,30 @@ mod test {
// 3) task not homed, sched requeues
// 4) task not home, send home
let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
let task1 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
Sched(t1_handle)) || {
rtassert!(Task::on_appropriate_sched());
rtassert!(GreenTask::on_appropriate_sched());
};
rtdebug!("task1 id: **{}**", borrow::to_uint(task1));
let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(Task::on_appropriate_sched());
let task2 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(GreenTask::on_appropriate_sched());
};
let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(Task::on_appropriate_sched());
let task3 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(GreenTask::on_appropriate_sched());
};
let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
let task4 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
Sched(t4_handle)) {
rtassert!(Task::on_appropriate_sched());
rtassert!(GreenTask::on_appropriate_sched());
};
rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
// Signal from the special task that we are done.
let (port, chan) = Chan::<()>::new();
let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
let normal_task = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtdebug!("*about to submit task2*");
Scheduler::run_task(task2);
rtdebug!("*about to submit task4*");
@ -1124,7 +1133,7 @@ mod test {
rtdebug!("normal task: {}", borrow::to_uint(normal_task));
let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) {
let special_task = ~do GreenTask::new_root(&mut special_sched.stack_pool, None) {
rtdebug!("*about to submit task1*");
Scheduler::run_task(task1);
rtdebug!("*about to submit task3*");
@ -1226,14 +1235,14 @@ mod test {
let thread = do Thread::start {
let mut sched = sched;
let bootstrap_task =
~Task::new_root(&mut sched.stack_pool,
~GreenTask::new_root(&mut sched.stack_pool,
None,
proc()());
sched.bootstrap(bootstrap_task);
};
let mut stack_pool = StackPool::new();
let task = ~Task::new_root(&mut stack_pool, None, proc()());
let task = ~GreenTask::new_root(&mut stack_pool, None, proc()());
handle.send(TaskFromFriend(task));
handle.send(Shutdown);

View File

@ -11,10 +11,9 @@
//! Maintains a shared list of sleeping schedulers. Schedulers
//! use this to wake each other up.
use rt::sched::SchedHandle;
use rt::mpmc_bounded_queue::Queue;
use option::*;
use clone::Clone;
use std::sync::mpmc_bounded_queue::Queue;
use sched::SchedHandle;
pub struct SleeperList {
priv q: Queue<SchedHandle>,

View File

@ -8,11 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use container::Container;
use ptr::RawPtr;
use vec;
use ops::Drop;
use libc::{c_uint, uintptr_t};
use std::vec;
use std::libc::{c_uint, uintptr_t};
pub struct StackSegment {
priv buf: ~[u8],

505
src/libgreen/task.rs Normal file
View File

@ -0,0 +1,505 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! The Green Task implementation
//!
//! This module contains the glue to the libstd runtime necessary to integrate
//! M:N scheduling. This GreenTask structure is hidden as a trait object in all
//! rust tasks and virtual calls are made in order to interface with it.
//!
//! Each green task contains a scheduler if it is currently running, and it also
//! contains the rust task itself in order to juggle around ownership of the
//! values.
use std::cast;
use std::rt::Runtime;
use std::rt::rtio;
use std::rt::local::Local;
use std::rt::task::{Task, BlockedTask};
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use coroutine::Coroutine;
use sched::{Scheduler, SchedHandle, RunOnce};
use stack::StackPool;
/// The necessary fields needed to keep track of a green task (as opposed to a
/// 1:1 task).
pub struct GreenTask {
coroutine: Option<Coroutine>,
handle: Option<SchedHandle>,
sched: Option<~Scheduler>,
task: Option<~Task>,
task_type: TaskType,
pool_id: uint,
// See the comments in the scheduler about why this is necessary
nasty_deschedule_lock: Mutex,
}
pub enum TaskType {
TypeGreen(Option<Home>),
TypeSched,
}
pub enum Home {
AnySched,
HomeSched(SchedHandle),
}
impl GreenTask {
pub fn new(stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc()) -> ~GreenTask {
GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
}
pub fn new_homed(stack_pool: &mut StackPool,
stack_size: Option<uint>,
home: Home,
start: proc()) -> ~GreenTask {
let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home)));
let start = GreenTask::build_start_wrapper(start, ops.as_uint());
ops.coroutine = Some(Coroutine::new(stack_pool, stack_size, start));
return ops;
}
pub fn new_typed(coroutine: Option<Coroutine>,
task_type: TaskType) -> ~GreenTask {
~GreenTask {
pool_id: 0,
coroutine: coroutine,
task_type: task_type,
sched: None,
handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
task: None,
}
}
/// Just like the `maybe_take_runtime` function, this function should *not*
/// exist. Usage of this function is _strongly_ discouraged. This is an
/// absolute last resort necessary for converting a libstd task to a green
/// task.
///
/// This function will assert that the task is indeed a green task before
/// returning (and will kill the entire process if this is wrong).
pub fn convert(mut task: ~Task) -> ~GreenTask {
match task.maybe_take_runtime::<GreenTask>() {
Some(mut green) => {
green.put_task(task);
green
}
None => rtabort!("not a green task any more?"),
}
}
/// Builds a function which is the actual starting execution point for a
/// rust task. This function is the glue necessary to execute the libstd
/// task and then clean up the green thread after it exits.
///
/// The second argument to this function is actually a transmuted copy of
/// the `GreenTask` pointer. Context switches in the scheduler silently
/// transfer ownership of the `GreenTask` to the other end of the context
/// switch, so because this is the first code that is running in this task,
/// it must first re-acquire ownership of the green task.
pub fn build_start_wrapper(start: proc(), ops: uint) -> proc() {
proc() {
// First code after swap to this new context. Run our
// cleanup job after we have re-acquired ownership of the green
// task.
let mut task: ~GreenTask = unsafe { GreenTask::from_uint(ops) };
task.sched.get_mut_ref().run_cleanup_job();
// Convert our green task to a libstd task and then execute the code
// requeted. This is the "try/catch" block for this green task and
// is the wrapper for *all* code run in the task.
let mut start = Some(start);
let task = task.swap().run(|| start.take_unwrap()());
// Once the function has exited, it's time to run the termination
// routine. This means we need to context switch one more time but
// clean ourselves up on the other end. Since we have no way of
// preserving a handle to the GreenTask down to this point, this
// unfortunately must call `GreenTask::convert`. In order to avoid
// this we could add a `terminate` function to the `Runtime` trait
// in libstd, but that seems less appropriate since the coversion
// method exists.
GreenTask::convert(task).terminate();
}
}
pub fn give_home(&mut self, new_home: Home) {
match self.task_type {
TypeGreen(ref mut home) => { *home = Some(new_home); }
TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
}
}
pub fn take_unwrap_home(&mut self) -> Home {
match self.task_type {
TypeGreen(ref mut home) => home.take_unwrap(),
TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
}
}
// New utility functions for homes.
pub fn is_home_no_tls(&self, sched: &Scheduler) -> bool {
match self.task_type {
TypeGreen(Some(AnySched)) => { false }
TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => {
*id == sched.sched_id()
}
TypeGreen(None) => { rtabort!("task without home"); }
TypeSched => {
// Awe yea
rtabort!("type error: expected: TypeGreen, found: TaskSched");
}
}
}
pub fn homed(&self) -> bool {
match self.task_type {
TypeGreen(Some(AnySched)) => { false }
TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true }
TypeGreen(None) => {
rtabort!("task without home");
}
TypeSched => {
rtabort!("type error: expected: TypeGreen, found: TaskSched");
}
}
}
pub fn is_sched(&self) -> bool {
match self.task_type {
TypeGreen(..) => false, TypeSched => true,
}
}
// Unsafe functions for transferring ownership of this GreenTask across
// context switches
pub fn as_uint(&self) -> uint {
unsafe { cast::transmute(self) }
}
pub unsafe fn from_uint(val: uint) -> ~GreenTask { cast::transmute(val) }
// Runtime glue functions and helpers
pub fn put_with_sched(mut ~self, sched: ~Scheduler) {
assert!(self.sched.is_none());
self.sched = Some(sched);
self.put();
}
pub fn put_task(&mut self, task: ~Task) {
assert!(self.task.is_none());
self.task = Some(task);
}
pub fn swap(mut ~self) -> ~Task {
let mut task = self.task.take_unwrap();
task.put_runtime(self as ~Runtime);
return task;
}
pub fn put(~self) {
assert!(self.sched.is_some());
Local::put(self.swap());
}
fn terminate(mut ~self) {
let sched = self.sched.take_unwrap();
sched.terminate_current_task(self);
}
// This function is used to remotely wakeup this green task back on to its
// original pool of schedulers. In order to do so, each tasks arranges a
// SchedHandle upon descheduling to be available for sending itself back to
// the original pool.
//
// Note that there is an interesting transfer of ownership going on here. We
// must relinquish ownership of the green task, but then also send the task
// over the handle back to the original scheduler. In order to safely do
// this, we leverage the already-present "nasty descheduling lock". The
// reason for doing this is that each task will bounce on this lock after
// resuming after a context switch. By holding the lock over the enqueueing
// of the task, we're guaranteed that the SchedHandle's memory will be valid
// for this entire function.
//
// An alternative would include having incredibly cheaply cloneable handles,
// but right now a SchedHandle is something like 6 allocations, so it is
// *not* a cheap operation to clone a handle. Until the day comes that we
// need to optimize this, a lock should do just fine (it's completely
// uncontended except for when the task is rescheduled).
fn reawaken_remotely(mut ~self) {
unsafe {
let mtx = &mut self.nasty_deschedule_lock as *mut Mutex;
let handle = self.handle.get_mut_ref() as *mut SchedHandle;
(*mtx).lock();
(*handle).send(RunOnce(self));
(*mtx).unlock();
}
}
}
impl Runtime for GreenTask {
fn yield_now(mut ~self, cur_task: ~Task) {
self.put_task(cur_task);
let sched = self.sched.take_unwrap();
sched.yield_now(self);
}
fn maybe_yield(mut ~self, cur_task: ~Task) {
self.put_task(cur_task);
let sched = self.sched.take_unwrap();
sched.maybe_yield(self);
}
fn deschedule(mut ~self, times: uint, cur_task: ~Task,
f: |BlockedTask| -> Result<(), BlockedTask>) {
self.put_task(cur_task);
let mut sched = self.sched.take_unwrap();
// In order for this task to be reawoken in all possible contexts, we
// may need a handle back in to the current scheduler. When we're woken
// up in anything other than the local scheduler pool, this handle is
// used to send this task back into the scheduler pool.
if self.handle.is_none() {
self.handle = Some(sched.make_handle());
self.pool_id = sched.pool_id;
}
// This code is pretty standard, except for the usage of
// `GreenTask::convert`. Right now if we use `reawaken` directly it will
// expect for there to be a task in local TLS, but that is not true for
// this deschedule block (because the scheduler must retain ownership of
// the task while the cleanup job is running). In order to get around
// this for now, we invoke the scheduler directly with the converted
// Task => GreenTask structure.
if times == 1 {
sched.deschedule_running_task_and_then(self, |sched, task| {
match f(task) {
Ok(()) => {}
Err(t) => {
t.wake().map(|t| {
sched.enqueue_task(GreenTask::convert(t))
});
}
}
});
} else {
sched.deschedule_running_task_and_then(self, |sched, task| {
for task in task.make_selectable(times) {
match f(task) {
Ok(()) => {},
Err(task) => {
task.wake().map(|t| {
sched.enqueue_task(GreenTask::convert(t))
});
break
}
}
}
});
}
}
fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
self.put_task(to_wake);
assert!(self.sched.is_none());
// Waking up a green thread is a bit of a tricky situation. We have no
// guarantee about where the current task is running. The options we
// have for where this current task is running are:
//
// 1. Our original scheduler pool
// 2. Some other scheduler pool
// 3. Something that isn't a scheduler pool
//
// In order to figure out what case we're in, this is the reason that
// the `maybe_take_runtime` function exists. Using this function we can
// dynamically check to see which of these cases is the current
// situation and then dispatch accordingly.
//
// In case 1, we just use the local scheduler to resume ourselves
// immediately (if a rescheduling is possible).
//
// In case 2 and 3, we need to remotely reawaken ourself in order to be
// transplanted back to the correct scheduler pool.
let mut running_task: ~Task = Local::take();
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
let mut sched = running_green_task.sched.take_unwrap();
if sched.pool_id == self.pool_id {
running_green_task.put_task(running_task);
if can_resched {
sched.run_task(running_green_task, self);
} else {
sched.enqueue_task(self);
running_green_task.put_with_sched(sched);
}
} else {
self.reawaken_remotely();
// put that thing back where it came from!
running_task.put_runtime(running_green_task as ~Runtime);
Local::put(running_task);
}
}
None => {
self.reawaken_remotely();
Local::put(running_task);
}
}
}
fn spawn_sibling(mut ~self, cur_task: ~Task, opts: TaskOpts, f: proc()) {
self.put_task(cur_task);
let TaskOpts {
watched: _watched,
notify_chan, name, stack_size
} = opts;
// Spawns a task into the current scheduler. We allocate the new task's
// stack from the scheduler's stack pool, and then configure it
// accordingly to `opts`. Afterwards we bootstrap it immediately by
// switching to it.
//
// Upon returning, our task is back in TLS and we're good to return.
let mut sched = self.sched.take_unwrap();
let mut sibling = GreenTask::new(&mut sched.stack_pool, stack_size, f);
let mut sibling_task = ~Task::new();
sibling_task.name = name;
match notify_chan {
Some(chan) => {
let on_exit = proc(task_result) { chan.send(task_result) };
sibling_task.death.on_exit = Some(on_exit);
}
None => {}
}
sibling.task = Some(sibling_task);
sched.run_task(self, sibling)
}
// Local I/O is provided by the scheduler's event loop
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
match self.sched.get_mut_ref().event_loop.io() {
Some(io) => Some(rtio::LocalIo::new(io)),
None => None,
}
}
fn wrap(~self) -> ~Any { self as ~Any }
}
impl Drop for GreenTask {
fn drop(&mut self) {
unsafe { self.nasty_deschedule_lock.destroy(); }
}
}
#[cfg(test)]
mod test {
#[test]
fn local_heap() {
do run_in_newsched_task() {
let a = @5;
let b = a;
assert!(*a == 5);
assert!(*b == 5);
}
}
#[test]
fn tls() {
use std::local_data;
do run_in_newsched_task() {
local_data_key!(key: @~str)
local_data::set(key, @~"data");
assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data");
local_data_key!(key2: @~str)
local_data::set(key2, @~"data");
assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data");
}
}
#[test]
fn unwind() {
do run_in_newsched_task() {
let result = spawntask_try(proc()());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = spawntask_try(proc() fail!());
rtdebug!("trying second assert");
assert!(result.is_err());
}
}
#[test]
fn rng() {
do run_in_uv_task() {
use std::rand::{rng, Rng};
let mut r = rng();
let _ = r.next_u32();
}
}
#[test]
fn logging() {
do run_in_uv_task() {
info!("here i am. logging in a newsched task");
}
}
#[test]
fn comm_stream() {
do run_in_newsched_task() {
let (port, chan) = Chan::new();
chan.send(10);
assert!(port.recv() == 10);
}
}
#[test]
fn comm_shared_chan() {
do run_in_newsched_task() {
let (port, chan) = SharedChan::new();
chan.send(10);
assert!(port.recv() == 10);
}
}
//#[test]
//fn heap_cycles() {
// use std::option::{Option, Some, None};
// do run_in_newsched_task {
// struct List {
// next: Option<@mut List>,
// }
// let a = @mut List { next: None };
// let b = @mut List { next: Some(a) };
// a.next = Some(b);
// }
//}
#[test]
#[should_fail]
fn test_begin_unwind() { begin_unwind("cause", file!(), line!()) }
}

View File

@ -12,9 +12,8 @@ use c_str::{ToCStr, CString};
use libc::{c_char, size_t};
use option::{Option, None, Some};
use ptr::RawPtr;
use rt::env;
use rt;
use rt::local::Local;
use rt::task;
use rt::task::Task;
use str::OwnedStr;
use str;
@ -62,7 +61,7 @@ unsafe fn fail_borrowed(alloc: *mut raw::Box<()>, file: *c_char, line: size_t)
match try_take_task_borrow_list() {
None => { // not recording borrows
let msg = "borrowed";
msg.with_c_str(|msg_p| task::begin_unwind_raw(msg_p, file, line))
msg.with_c_str(|msg_p| rt::begin_unwind_raw(msg_p, file, line))
}
Some(borrow_list) => { // recording borrows
let mut msg = ~"borrowed";
@ -76,7 +75,7 @@ unsafe fn fail_borrowed(alloc: *mut raw::Box<()>, file: *c_char, line: size_t)
sep = " and at ";
}
}
msg.with_c_str(|msg_p| task::begin_unwind_raw(msg_p, file, line))
msg.with_c_str(|msg_p| rt::begin_unwind_raw(msg_p, file, line))
}
}
}
@ -95,7 +94,7 @@ unsafe fn debug_borrow<T,P:RawPtr<T>>(tag: &'static str,
//! A useful debugging function that prints a pointer + tag + newline
//! without allocating memory.
if ENABLE_DEBUG && env::debug_borrow() {
if ENABLE_DEBUG && rt::env::debug_borrow() {
debug_borrow_slow(tag, p, old_bits, new_bits, filename, line);
}
@ -180,7 +179,7 @@ pub unsafe fn unrecord_borrow(a: *u8,
if br.alloc != a || br.file != file || br.line != line {
let err = format!("wrong borrow found, br={:?}", br);
err.with_c_str(|msg_p| {
task::begin_unwind_raw(msg_p, file, line)
rt::begin_unwind_raw(msg_p, file, line)
})
}
borrow_list

View File

@ -17,7 +17,7 @@ use os;
// Note that these are all accessed without any synchronization.
// They are expected to be initialized once then left alone.
static mut MIN_STACK: uint = 2000000;
static mut MIN_STACK: uint = 2 * 1024 * 1024;
static mut DEBUG_BORROW: bool = false;
static mut POISON_ON_FREE: bool = false;

View File

@ -1,318 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Task death: asynchronous killing, linked failure, exit code propagation.
This file implements two orthogonal building-blocks for communicating failure
between tasks. One is 'linked failure' or 'task killing', that is, a failing
task causing other tasks to fail promptly (even those that are blocked on
pipes or I/O). The other is 'exit code propagation', which affects the result
observed by the parent of a task::try task that itself spawns child tasks
(such as any #[test] function). In both cases the data structures live in
KillHandle.
I. Task killing.
The model for killing involves two atomic flags, the "kill flag" and the
"unkillable flag". Operations on the kill flag include:
- In the taskgroup code (task/spawn.rs), tasks store a clone of their
KillHandle in their shared taskgroup. Another task in the group that fails
will use that handle to call kill().
- When a task blocks, it turns its ~Task into a BlockedTask by storing a
the transmuted ~Task pointer inside the KillHandle's kill flag. A task
trying to block and a task trying to kill it can simultaneously access the
kill flag, after which the task will get scheduled and fail (no matter who
wins the race). Likewise, a task trying to wake a blocked task normally and
a task trying to kill it can simultaneously access the flag; only one will
get the task to reschedule it.
Operations on the unkillable flag include:
- When a task becomes unkillable, it swaps on the flag to forbid any killer
from waking it up while it's blocked inside the unkillable section. If a
kill was already pending, the task fails instead of becoming unkillable.
- When a task is done being unkillable, it restores the flag to the normal
running state. If a kill was received-but-blocked during the unkillable
section, the task fails at this later point.
- When a task tries to kill another task, before swapping on the kill flag, it
first swaps on the unkillable flag, to see if it's "allowed" to wake up the
task. If it isn't, the killed task will receive the signal when it becomes
killable again. (Of course, a task trying to wake the task normally (e.g.
sending on a channel) does not access the unkillable flag at all.)
Why do we not need acquire/release barriers on any of the kill flag swaps?
This is because barriers establish orderings between accesses on different
memory locations, but each kill-related operation is only a swap on a single
location, so atomicity is all that matters. The exception is kill(), which
does a swap on both flags in sequence. kill() needs no barriers because it
does not matter if its two accesses are seen reordered on another CPU: if a
killer does perform both writes, it means it saw a KILL_RUNNING in the
unkillable flag, which means an unkillable task will see KILL_KILLED and fail
immediately (rendering the subsequent write to the kill flag unnecessary).
II. Exit code propagation.
The basic model for exit code propagation, which is used with the "watched"
spawn mode (on by default for linked spawns, off for supervised and unlinked
spawns), is that a parent will wait for all its watched children to exit
before reporting whether it succeeded or failed. A watching parent will only
report success if it succeeded and all its children also reported success;
otherwise, it will report failure. This is most useful for writing test cases:
```
#[test]
fn test_something_in_another_task {
do spawn {
assert!(collatz_conjecture_is_false());
}
}
```
Here, as the child task will certainly outlive the parent task, we might miss
the failure of the child when deciding whether or not the test case passed.
The watched spawn mode avoids this problem.
In order to propagate exit codes from children to their parents, any
'watching' parent must wait for all of its children to exit before it can
report its final exit status. We achieve this by using an UnsafeArc, using the
reference counting to track how many children are still alive, and using the
unwrap() operation in the parent's exit path to wait for all children to exit.
The UnsafeArc referred to here is actually the KillHandle itself.
This also works transitively, as if a "middle" watched child task is itself
watching a grandchild task, the "middle" task will do unwrap() on its own
KillHandle (thereby waiting for the grandchild to exit) before dropping its
reference to its watching parent (which will alert the parent).
While UnsafeArc::unwrap() accomplishes the synchronization, there remains the
matter of reporting the exit codes themselves. This is easiest when an exiting
watched task has no watched children of its own:
- If the task with no watched children exits successfully, it need do nothing.
- If the task with no watched children has failed, it sets a flag in the
parent's KillHandle ("any_child_failed") to false. It then stays false forever.
However, if a "middle" watched task with watched children of its own exits
before its child exits, we need to ensure that the grandparent task may still
see a failure from the grandchild task. While we could achieve this by having
each intermediate task block on its handle, this keeps around the other resources
the task was using. To be more efficient, this is accomplished via "tombstones".
A tombstone is a closure, proc() -> bool, which will perform any waiting necessary
to collect the exit code of descendant tasks. In its environment is captured
the KillHandle of whichever task created the tombstone, and perhaps also any
tombstones that that task itself had, and finally also another tombstone,
effectively creating a lazy-list of heap closures.
When a child wishes to exit early and leave tombstones behind for its parent,
it must use a LittleLock (pthread mutex) to synchronize with any possible
sibling tasks which are trying to do the same thing with the same parent.
However, on the other side, when the parent is ready to pull on the tombstones,
it need not use this lock, because the unwrap() serves as a barrier that ensures
no children will remain with references to the handle.
The main logic for creating and assigning tombstones can be found in the
function reparent_children_to() in the impl for KillHandle.
IIA. Issues with exit code propagation.
There are two known issues with the current scheme for exit code propagation.
- As documented in issue #8136, the structure mandates the possibility for stack
overflow when collecting tombstones that are very deeply nested. This cannot
be avoided with the closure representation, as tombstones end up structured in
a sort of tree. However, notably, the tombstones do not actually need to be
collected in any particular order, and so a doubly-linked list may be used.
However we do not do this yet because DList is in libextra.
- A discussion with Graydon made me realize that if we decoupled the exit code
propagation from the parents-waiting action, this could result in a simpler
implementation as the exit codes themselves would not have to be propagated,
and could instead be propagated implicitly through the taskgroup mechanism
that we already have. The tombstoning scheme would still be required. I have
not implemented this because currently we can't receive a linked failure kill
signal during the task cleanup activity, as that is currently "unkillable",
and occurs outside the task's unwinder's "try" block, so would require some
restructuring.
*/
use cast;
use option::{Option, Some, None};
use prelude::*;
use iter;
use task::TaskResult;
use rt::task::Task;
use unstable::atomics::{AtomicUint, SeqCst};
use unstable::sync::UnsafeArc;
/// A handle to a blocked task. Usually this means having the ~Task pointer by
/// ownership, but if the task is killable, a killer can steal it at any time.
pub enum BlockedTask {
Owned(~Task),
Shared(UnsafeArc<AtomicUint>),
}
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
// Action to be done with the exit code. If set, also makes the task wait
// until all its watched children exit before collecting the status.
on_exit: Option<proc(TaskResult)>,
// nesting level counter for unstable::atomically calls (0 == can deschedule).
priv wont_sleep: int,
}
pub struct BlockedTaskIterator {
priv inner: UnsafeArc<AtomicUint>,
}
impl Iterator<BlockedTask> for BlockedTaskIterator {
fn next(&mut self) -> Option<BlockedTask> {
Some(Shared(self.inner.clone()))
}
}
impl BlockedTask {
/// Returns Some if the task was successfully woken; None if already killed.
pub fn wake(self) -> Option<~Task> {
match self {
Owned(task) => Some(task),
Shared(arc) => unsafe {
match (*arc.get()).swap(0, SeqCst) {
0 => None,
n => cast::transmute(n),
}
}
}
}
/// Create a blocked task, unless the task was already killed.
pub fn block(task: ~Task) -> BlockedTask {
Owned(task)
}
/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint)
-> iter::Take<BlockedTaskIterator>
{
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
UnsafeArc::new(flag)
}
Shared(arc) => arc.clone(),
};
BlockedTaskIterator{ inner: arc }.take(num_handles)
}
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will fail dramatically instead.
#[inline] #[cfg(not(test))]
pub fn assert_already_awake(self) { }
#[inline] #[cfg(test)]
pub fn assert_already_awake(self) { assert!(self.wake().is_none()); }
/// Convert to an unsafe uint value. Useful for storing in a pipe's state flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
match self {
Owned(task) => {
let blocked_task_ptr: uint = cast::transmute(task);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr
}
Shared(arc) => {
let blocked_task_ptr: uint = cast::transmute(~arc);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr | 0x1
}
}
}
/// Convert from an unsafe uint value. Useful for retrieving a pipe's state flag.
#[inline]
pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Owned(cast::transmute(blocked_task_ptr))
} else {
let ptr: ~UnsafeArc<AtomicUint> = cast::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}
}
}
impl Death {
pub fn new() -> Death {
Death {
on_exit: None,
wont_sleep: 0,
}
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, result: TaskResult) {
match self.on_exit.take() {
Some(f) => f(result),
None => {}
}
}
/// Enter a possibly-nested "atomic" section of code. Just for assertions.
/// All calls must be paired with a subsequent call to allow_deschedule.
#[inline]
pub fn inhibit_deschedule(&mut self) {
self.wont_sleep += 1;
}
/// Exit a possibly-nested "atomic" section of code. Just for assertions.
/// All calls must be paired with a preceding call to inhibit_deschedule.
#[inline]
pub fn allow_deschedule(&mut self) {
rtassert!(self.wont_sleep != 0);
self.wont_sleep -= 1;
}
/// Ensure that the task is allowed to become descheduled.
#[inline]
pub fn assert_may_sleep(&self) {
if self.wont_sleep != 0 {
rtabort!("illegal atomic-sleep: attempt to reschedule while \
using an Exclusive or LittleLock");
}
}
}
impl Drop for Death {
fn drop(&mut self) {
// Mustn't be in an atomic or unkillable section at task death.
rtassert!(self.wont_sleep == 0);
}
}
#[cfg(test)]
mod test {
use rt::test::*;
use super::*;
// Task blocking tests
#[test]
fn block_and_wake() {
do with_test_task |task| {
BlockedTask::block(task).wake().unwrap()
}
}
}

View File

@ -8,8 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::{Option, Some, None};
use rt::sched::Scheduler;
use option::Option;
use rt::task::Task;
use rt::local_ptr;
@ -46,82 +45,6 @@ impl Local<local_ptr::Borrowed<Task>> for Task {
}
}
/// Encapsulates a temporarily-borrowed scheduler.
pub struct BorrowedScheduler {
priv task: local_ptr::Borrowed<Task>,
}
impl BorrowedScheduler {
fn new(mut task: local_ptr::Borrowed<Task>) -> BorrowedScheduler {
if task.get().sched.is_none() {
rtabort!("no scheduler")
} else {
BorrowedScheduler {
task: task,
}
}
}
#[inline]
pub fn get<'a>(&'a mut self) -> &'a mut ~Scheduler {
match self.task.get().sched {
None => rtabort!("no scheduler"),
Some(ref mut sched) => sched,
}
}
}
impl Local<BorrowedScheduler> for Scheduler {
fn put(value: ~Scheduler) {
let mut task = Local::borrow(None::<Task>);
task.get().sched = Some(value);
}
#[inline]
fn take() -> ~Scheduler {
unsafe {
// XXX: Unsafe for speed
let task: *mut Task = Local::unsafe_borrow();
(*task).sched.take_unwrap()
}
}
fn exists(_: Option<Scheduler>) -> bool {
let mut task = Local::borrow(None::<Task>);
task.get().sched.is_some()
}
#[inline]
fn borrow(_: Option<Scheduler>) -> BorrowedScheduler {
BorrowedScheduler::new(Local::borrow(None::<Task>))
}
unsafe fn unsafe_take() -> ~Scheduler { rtabort!("unimpl") }
unsafe fn unsafe_borrow() -> *mut Scheduler {
let task: *mut Task = Local::unsafe_borrow();
match (*task).sched {
Some(~ref mut sched) => {
let s: *mut Scheduler = &mut *sched;
return s;
}
None => {
rtabort!("no scheduler")
}
}
}
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> {
let task_opt: Option<*mut Task> = Local::try_unsafe_borrow();
match task_opt {
Some(task) => {
match (*task).sched {
Some(~ref mut sched) => {
let s: *mut Scheduler = &mut *sched;
Some(s)
}
None => None
}
}
None => None
}
}
}
#[cfg(test)]
mod test {
use option::None;

View File

@ -57,27 +57,17 @@ Several modules in `core` are clients of `rt`:
// XXX: this should not be here.
#[allow(missing_doc)];
use any::Any;
use clone::Clone;
use container::Container;
use iter::Iterator;
use option::{Option, None, Some};
use option::Option;
use ptr::RawPtr;
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use task::TaskResult;
use rt::task::{Task, SchedTask, GreenTask, Sched};
use send_str::SendStrStatic;
use unstable::atomics::{AtomicInt, AtomicBool, SeqCst};
use unstable::sync::UnsafeArc;
use result::Result;
use task::TaskOpts;
use vec::{OwnedVector, MutableVector, ImmutableVector};
use vec;
use self::thread::Thread;
// the os module needs to reach into this helper, so allow general access
// through this reexport.
pub use self::util::set_exit_status;
use self::task::{Task, BlockedTask};
// this is somewhat useful when a program wants to spawn a "reasonable" number
// of workers based on the constraints of the system that it's running on.
@ -85,8 +75,8 @@ pub use self::util::set_exit_status;
// method...
pub use self::util::default_sched_threads;
// Re-export of the functionality in the kill module
pub use self::kill::BlockedTask;
// Export unwinding facilities used by the failure macros
pub use self::unwind::{begin_unwind, begin_unwind_raw};
// XXX: these probably shouldn't be public...
#[doc(hidden)]
@ -99,21 +89,12 @@ pub mod shouldnt_be_public {
// Internal macros used by the runtime.
mod macros;
/// Basic implementation of an EventLoop, provides no I/O interfaces
mod basic;
/// The global (exchange) heap.
pub mod global_heap;
/// Implementations of language-critical runtime features like @.
pub mod task;
/// Facilities related to task failure, killing, and death.
mod kill;
/// The coroutine task scheduler, built on the `io` event loop.
pub mod sched;
/// The EventLoop and internal synchronous I/O interface.
pub mod rtio;
@ -121,27 +102,6 @@ pub mod rtio;
/// or task-local storage.
pub mod local;
/// A mostly lock-free multi-producer, single consumer queue.
pub mod mpsc_queue;
/// A lock-free single-producer, single consumer queue.
pub mod spsc_queue;
/// A lock-free multi-producer, multi-consumer bounded queue.
mod mpmc_bounded_queue;
/// A parallel work-stealing deque
pub mod deque;
/// A parallel data structure for tracking sleeping schedulers.
pub mod sleeper_list;
/// Stack segments and caching.
pub mod stack;
/// CPU context swapping.
mod context;
/// Bindings to system threading libraries.
pub mod thread;
@ -157,16 +117,6 @@ pub mod logging;
/// Crate map
pub mod crate_map;
/// Tools for testing the runtime
pub mod test;
/// Reference counting
pub mod rc;
/// A simple single-threaded channel type for passing buffered data between
/// scheduler and task context
pub mod tube;
/// The runtime needs to be able to put a pointer into thread-local storage.
mod local_ptr;

View File

@ -1,139 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! An owned, task-local, reference counted type
//!
//! # Safety note
//!
//! XXX There is currently no type-system mechanism for enforcing that
//! reference counted types are both allocated on the exchange heap
//! and also non-sendable
//!
//! This doesn't prevent borrowing multiple aliasable mutable pointers
use ops::Drop;
use clone::Clone;
use libc::c_void;
use cast;
pub struct RC<T> {
priv p: *c_void // ~(uint, T)
}
impl<T> RC<T> {
pub fn new(val: T) -> RC<T> {
unsafe {
let v = ~(1, val);
let p: *c_void = cast::transmute(v);
RC { p: p }
}
}
fn get_mut_state(&mut self) -> *mut (uint, T) {
unsafe {
let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
let p: *mut (uint, T) = &mut **p;
return p;
}
}
fn get_state(&self) -> *(uint, T) {
unsafe {
let p: &~(uint, T) = cast::transmute(&self.p);
let p: *(uint, T) = &**p;
return p;
}
}
pub fn unsafe_borrow_mut(&mut self) -> *mut T {
unsafe {
match *self.get_mut_state() {
(_, ref mut p) => {
let p: *mut T = p;
return p;
}
}
}
}
pub fn refcount(&self) -> uint {
unsafe {
match *self.get_state() {
(count, _) => count
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for RC<T> {
fn drop(&mut self) {
assert!(self.refcount() > 0);
unsafe {
match *self.get_mut_state() {
(ref mut count, _) => {
*count = *count - 1
}
}
if self.refcount() == 0 {
let _: ~(uint, T) = cast::transmute(self.p);
}
}
}
}
impl<T> Clone for RC<T> {
fn clone(&self) -> RC<T> {
unsafe {
// XXX: Mutable clone
let this: &mut RC<T> = cast::transmute_mut(self);
match *this.get_mut_state() {
(ref mut count, _) => {
*count = *count + 1;
}
}
}
RC { p: self.p }
}
}
#[cfg(test)]
mod test {
use super::RC;
#[test]
fn smoke_test() {
unsafe {
let mut v1 = RC::new(100);
assert!(*v1.unsafe_borrow_mut() == 100);
assert!(v1.refcount() == 1);
let mut v2 = v1.clone();
assert!(*v2.unsafe_borrow_mut() == 100);
assert!(v2.refcount() == 2);
*v2.unsafe_borrow_mut() = 200;
assert!(*v2.unsafe_borrow_mut() == 200);
assert!(*v1.unsafe_borrow_mut() == 200);
let v3 = v2.clone();
assert!(v3.refcount() == 3);
{
let _v1 = v1;
let _v2 = v2;
}
assert!(v3.refcount() == 1);
}
}
}

View File

@ -14,14 +14,15 @@ use comm::{SharedChan, Port};
use libc::c_int;
use libc;
use ops::Drop;
use option::*;
use option::{Option, Some, None};
use path::Path;
use result::*;
use result::{Result, Ok, Err};
use rt::task::Task;
use rt::local::Local;
use ai = io::net::addrinfo;
use io;
use io::IoError;
use io::native::NATIVE_IO_FACTORY;
use io::native;
use io::net::ip::{IpAddr, SocketAddr};
use io::process::{ProcessConfig, ProcessExit};
use io::signal::Signum;
@ -149,6 +150,8 @@ impl<'a> LocalIo<'a> {
}
pub trait IoFactory {
fn id(&self) -> uint;
// networking
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError>;
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError>;

View File

@ -13,29 +13,31 @@
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.
use super::local_heap::LocalHeap;
use prelude::*;
use any::AnyOwnExt;
use borrow;
use cleanup;
use io::Writer;
use libc::{c_char, size_t};
use local_data;
use ops::Drop;
use option::{Option, Some, None};
use prelude::drop;
use result::{Result, Ok, Err};
use rt::Runtime;
use rt::borrowck::BorrowRecord;
use rt::borrowck;
use rt::context::Context;
use rt::env;
use rt::kill::Death;
use rt::local::Local;
use rt::local_heap::LocalHeap;
use rt::logging::StdErrLogger;
use rt::sched::{Scheduler, SchedHandle};
use rt::stack::{StackSegment, StackPool};
use rt::rtio::LocalIo;
use rt::unwind::Unwinder;
use send_str::SendStr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint, SeqCst};
use task::{TaskResult, TaskOpts};
use unstable::finally::Finally;
use unstable::mutex::Mutex;
#[cfg(stage0)] pub use rt::unwind::begin_unwind;
// The Task struct represents all state associated with a rust
// task. There are at this point two primary "subtypes" of task,
@ -45,201 +47,89 @@ use unstable::mutex::Mutex;
pub struct Task {
heap: LocalHeap,
priv gc: GarbageCollector,
gc: GarbageCollector,
storage: LocalStorage,
logger: Option<StdErrLogger>,
unwinder: Unwinder,
death: Death,
destroyed: bool,
name: Option<SendStr>,
coroutine: Option<Coroutine>,
sched: Option<~Scheduler>,
task_type: TaskType,
// Dynamic borrowck debugging info
borrow_list: Option<~[BorrowRecord]>,
logger: Option<StdErrLogger>,
stdout_handle: Option<~Writer>,
// See the comments in the scheduler about why this is necessary
nasty_deschedule_lock: Mutex,
}
pub enum TaskType {
GreenTask(Option<SchedHome>),
SchedTask
}
/// A coroutine is nothing more than a (register context, stack) pair.
pub struct Coroutine {
/// The segment of stack on which the task is currently running or
/// if the task is blocked, on which the task will resume
/// execution.
///
/// Servo needs this to be public in order to tell SpiderMonkey
/// about the stack bounds.
current_stack_segment: StackSegment,
/// Always valid if the task is alive and not running.
saved_context: Context
}
/// Some tasks have a dedicated home scheduler that they must run on.
pub enum SchedHome {
AnySched,
Sched(SchedHandle)
priv imp: Option<~Runtime>,
}
pub struct GarbageCollector;
pub struct LocalStorage(Option<local_data::Map>);
/// A handle to a blocked task. Usually this means having the ~Task pointer by
/// ownership, but if the task is killable, a killer can steal it at any time.
pub enum BlockedTask {
Owned(~Task),
Shared(UnsafeArc<AtomicUint>),
}
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
// Action to be done with the exit code. If set, also makes the task wait
// until all its watched children exit before collecting the status.
on_exit: Option<proc(TaskResult)>,
}
pub struct BlockedTaskIterator {
priv inner: UnsafeArc<AtomicUint>,
}
impl Task {
// A helper to build a new task using the dynamically found
// scheduler and task. Only works in GreenTask context.
pub fn build_homed_child(stack_size: Option<uint>,
f: proc(),
home: SchedHome)
-> ~Task {
let mut running_task = Local::borrow(None::<Task>);
let mut sched = running_task.get().sched.take_unwrap();
let new_task = ~running_task.get()
.new_child_homed(&mut sched.stack_pool,
stack_size,
home,
f);
running_task.get().sched = Some(sched);
new_task
}
pub fn build_child(stack_size: Option<uint>, f: proc()) -> ~Task {
Task::build_homed_child(stack_size, f, AnySched)
}
pub fn build_homed_root(stack_size: Option<uint>,
f: proc(),
home: SchedHome)
-> ~Task {
let mut running_task = Local::borrow(None::<Task>);
let mut sched = running_task.get().sched.take_unwrap();
let new_task = ~Task::new_root_homed(&mut sched.stack_pool,
stack_size,
home,
f);
running_task.get().sched = Some(sched);
new_task
}
pub fn build_root(stack_size: Option<uint>, f: proc()) -> ~Task {
Task::build_homed_root(stack_size, f, AnySched)
}
pub fn new_sched_task() -> Task {
pub fn new() -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
death: Death::new(),
destroyed: false,
coroutine: Some(Coroutine::empty()),
name: None,
sched: None,
task_type: SchedTask,
borrow_list: None,
stdout_handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
}
}
pub fn new_root(stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc()) -> Task {
Task::new_root_homed(stack_pool, stack_size, AnySched, start)
}
pub fn new_child(&mut self,
stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc()) -> Task {
self.new_child_homed(stack_pool, stack_size, AnySched, start)
}
pub fn new_root_homed(stack_pool: &mut StackPool,
stack_size: Option<uint>,
home: SchedHome,
start: proc()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
unwinder: Unwinder::new(),
death: Death::new(),
destroyed: false,
name: None,
coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
sched: None,
task_type: GreenTask(Some(home)),
borrow_list: None,
stdout_handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
}
}
pub fn new_child_homed(&mut self,
stack_pool: &mut StackPool,
stack_size: Option<uint>,
home: SchedHome,
start: proc()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
death: Death::new(),
destroyed: false,
name: None,
coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
sched: None,
task_type: GreenTask(Some(home)),
borrow_list: None,
stdout_handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
imp: None,
}
}
pub fn give_home(&mut self, new_home: SchedHome) {
match self.task_type {
GreenTask(ref mut home) => {
*home = Some(new_home);
}
SchedTask => {
rtabort!("type error: used SchedTask as GreenTask");
}
}
}
pub fn take_unwrap_home(&mut self) -> SchedHome {
match self.task_type {
GreenTask(ref mut home) => {
let out = home.take_unwrap();
return out;
}
SchedTask => {
rtabort!("type error: used SchedTask as GreenTask");
}
}
}
pub fn run(&mut self, f: ||) {
rtdebug!("run called on task: {}", borrow::to_uint(self));
/// Executes the given closure as if it's running inside this task. The task
/// is consumed upon entry, and the destroyed task is returned from this
/// function in order for the caller to free. This function is guaranteed to
/// not unwind because the closure specified is run inside of a `rust_try`
/// block. (this is the only try/catch block in the world).
///
/// This function is *not* meant to be abused as a "try/catch" block. This
/// is meant to be used at the absolute boundaries of a task's lifetime, and
/// only for that purpose.
pub fn run(~self, f: ||) -> ~Task {
// Need to put ourselves into TLS, but also need access to the unwinder.
// Unsafely get a handle to the task so we can continue to use it after
// putting it in tls (so we can invoke the unwinder).
let handle: *mut Task = unsafe {
*cast::transmute::<&~Task, &*mut Task>(&self)
};
Local::put(self);
// The only try/catch block in the world. Attempt to run the task's
// client-specified code and catch any failures.
self.unwinder.try(|| {
let try_block = || {
// Run the task main function, then do some cleanup.
f.finally(|| {
fn flush(w: Option<~Writer>) {
match w {
Some(mut w) => { w.flush(); }
None => {}
}
}
// First, destroy task-local storage. This may run user dtors.
//
@ -260,7 +150,10 @@ impl Task {
// TLS, or possibly some destructors for those objects being
// annihilated invoke TLS. Sadly these two operations seemed to
// be intertwined, and miraculously work for now...
self.storage.take();
let mut task = Local::borrow(None::<Task>);
let storage = task.get().storage.take();
drop(task);
drop(storage);
// Destroy remaining boxes. Also may run user dtors.
unsafe { cleanup::annihilate(); }
@ -268,77 +161,112 @@ impl Task {
// Finally flush and destroy any output handles which the task
// owns. There are no boxes here, and no user destructors should
// run after this any more.
match self.stdout_handle.take() {
Some(handle) => {
let mut handle = handle;
handle.flush();
}
None => {}
}
self.logger.take();
let mut task = Local::borrow(None::<Task>);
let stdout = task.get().stdout_handle.take();
let logger = task.get().logger.take();
drop(task);
flush(stdout);
drop(logger);
})
});
};
unsafe { (*handle).unwinder.try(try_block); }
// Cleanup the dynamic borrowck debugging info
borrowck::clear_task_borrow_list();
self.death.collect_failure(self.unwinder.result());
self.destroyed = true;
let mut me: ~Task = Local::take();
me.death.collect_failure(me.unwinder.result());
me.destroyed = true;
return me;
}
// New utility functions for homes.
/// Inserts a runtime object into this task, transferring ownership to the
/// task. It is illegal to replace a previous runtime object in this task
/// with this argument.
pub fn put_runtime(&mut self, ops: ~Runtime) {
assert!(self.imp.is_none());
self.imp = Some(ops);
}
pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
match self.task_type {
GreenTask(Some(AnySched)) => { false }
GreenTask(Some(Sched(SchedHandle { sched_id: ref id, .. }))) => {
*id == sched.sched_id()
}
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
// Awe yea
rtabort!("type error: expected: GreenTask, found: SchedTask");
/// Attempts to extract the runtime as a specific type. If the runtime does
/// not have the provided type, then the runtime is not removed. If the
/// runtime does have the specified type, then it is removed and returned
/// (transfer of ownership).
///
/// It is recommended to only use this method when *absolutely necessary*.
/// This function may not be available in the future.
pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<~T> {
// This is a terrible, terrible function. The general idea here is to
// take the runtime, cast it to ~Any, check if it has the right type,
// and then re-cast it back if necessary. The method of doing this is
// pretty sketchy and involves shuffling vtables of trait objects
// around, but it gets the job done.
//
// XXX: This function is a serious code smell and should be avoided at
// all costs. I have yet to think of a method to avoid this
// function, and I would be saddened if more usage of the function
// crops up.
unsafe {
let imp = self.imp.take_unwrap();
let &(vtable, _): &(uint, uint) = cast::transmute(&imp);
match imp.wrap().move::<T>() {
Ok(t) => Some(t),
Err(t) => {
let (_, obj): (uint, uint) = cast::transmute(t);
let obj: ~Runtime = cast::transmute((vtable, obj));
self.put_runtime(obj);
None
}
}
}
}
pub fn homed(&self) -> bool {
match self.task_type {
GreenTask(Some(AnySched)) => { false }
GreenTask(Some(Sched(SchedHandle { .. }))) => { true }
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
rtabort!("type error: expected: GreenTask, found: SchedTask");
}
}
/// Spawns a sibling to this task. The newly spawned task is configured with
/// the `opts` structure and will run `f` as the body of its code.
pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc()) {
let ops = self.imp.take_unwrap();
ops.spawn_sibling(self, opts, f)
}
// Grab both the scheduler and the task from TLS and check if the
// task is executing on an appropriate scheduler.
pub fn on_appropriate_sched() -> bool {
let mut task = Local::borrow(None::<Task>);
let sched_id = task.get().sched.get_ref().sched_id();
let sched_run_anything = task.get().sched.get_ref().run_anything;
match task.get().task_type {
GreenTask(Some(AnySched)) => {
rtdebug!("anysched task in sched check ****");
sched_run_anything
}
GreenTask(Some(Sched(SchedHandle { sched_id: ref id, ..}))) => {
rtdebug!("homed task in sched check ****");
*id == sched_id
}
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
rtabort!("type error: expected: GreenTask, found: SchedTask");
}
}
/// Deschedules the current task, invoking `f` `amt` times. It is not
/// recommended to use this function directly, but rather communication
/// primitives in `std::comm` should be used.
pub fn deschedule(mut ~self, amt: uint,
f: |BlockedTask| -> Result<(), BlockedTask>) {
let ops = self.imp.take_unwrap();
ops.deschedule(amt, self, f)
}
/// Wakes up a previously blocked task, optionally specifiying whether the
/// current task can accept a change in scheduling. This function can only
/// be called on tasks that were previously blocked in `deschedule`.
pub fn reawaken(mut ~self, can_resched: bool) {
let ops = self.imp.take_unwrap();
ops.reawaken(self, can_resched);
}
/// Yields control of this task to another task. This function will
/// eventually return, but possibly not immediately. This is used as an
/// opportunity to allow other tasks a chance to run.
pub fn yield_now(mut ~self) {
let ops = self.imp.take_unwrap();
ops.yield_now(self);
}
/// Similar to `yield_now`, except that this function may immediately return
/// without yielding (depending on what the runtime decides to do).
pub fn maybe_yield(mut ~self) {
let ops = self.imp.take_unwrap();
ops.maybe_yield(self);
}
/// Acquires a handle to the I/O factory that this task contains, normally
/// stored in the task's runtime. This factory may not always be available,
/// which is why the return type is `Option`
pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
self.imp.get_mut_ref().local_io()
}
}
@ -346,253 +274,101 @@ impl Drop for Task {
fn drop(&mut self) {
rtdebug!("called drop for a task: {}", borrow::to_uint(self));
rtassert!(self.destroyed);
unsafe { self.nasty_deschedule_lock.destroy(); }
}
}
// Coroutines represent nothing more than a context and a stack
// segment.
impl Coroutine {
pub fn new(stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc())
-> Coroutine {
let stack_size = match stack_size {
Some(size) => size,
None => env::min_stack()
};
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(stack_size);
let initial_context = Context::new(start, &mut stack);
Coroutine {
current_stack_segment: stack,
saved_context: initial_context
}
impl Iterator<BlockedTask> for BlockedTaskIterator {
fn next(&mut self) -> Option<BlockedTask> {
Some(Shared(self.inner.clone()))
}
}
pub fn empty() -> Coroutine {
Coroutine {
current_stack_segment: StackSegment::new(0),
saved_context: Context::empty()
}
}
fn build_start_wrapper(start: proc()) -> proc() {
let wrapper: proc() = proc() {
// First code after swap to this new context. Run our
// cleanup job.
unsafe {
// Again - might work while safe, or it might not.
{
let mut sched = Local::borrow(None::<Scheduler>);
sched.get().run_cleanup_job();
}
// To call the run method on a task we need a direct
// reference to it. The task is in TLS, so we can
// simply unsafe_borrow it to get this reference. We
// need to still have the task in TLS though, so we
// need to unsafe_borrow.
let task: *mut Task = Local::unsafe_borrow();
let mut start_cell = Some(start);
(*task).run(|| {
// N.B. Removing `start` from the start wrapper
// closure by emptying a cell is critical for
// correctness. The ~Task pointer, and in turn the
// closure used to initialize the first call
// frame, is destroyed in the scheduler context,
// not task context. So any captured closures must
// not contain user-definable dtors that expect to
// be in task context. By moving `start` out of
// the closure, all the user code goes our of
// scope while the task is still running.
let start = start_cell.take_unwrap();
start();
});
}
// We remove the sched from the Task in TLS right now.
let sched: ~Scheduler = Local::take();
// ... allowing us to give it away when performing a
// scheduling operation.
sched.terminate_current_task()
};
return wrapper;
}
/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(self, stack_pool: &mut StackPool) {
impl BlockedTask {
/// Returns Some if the task was successfully woken; None if already killed.
pub fn wake(self) -> Option<~Task> {
match self {
Coroutine { current_stack_segment, .. } => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
/// This function is invoked from rust's current __morestack function. Segmented
/// stacks are currently not enabled as segmented stacks, but rather one giant
/// stack segment. This means that whenever we run out of stack, we want to
/// truly consider it to be stack overflow rather than allocating a new stack.
#[no_mangle] // - this is called from C code
#[no_split_stack] // - it would be sad for this function to trigger __morestack
#[doc(hidden)] // - Function must be `pub` to get exported, but it's
// irrelevant for documentation purposes.
#[cfg(not(test))] // in testing, use the original libstd's version
pub extern "C" fn rust_stack_exhausted() {
use rt::context;
use rt::in_green_task_context;
use rt::task::Task;
use rt::local::Local;
use unstable::intrinsics;
unsafe {
// We're calling this function because the stack just ran out. We need
// to call some other rust functions, but if we invoke the functions
// right now it'll just trigger this handler being called again. In
// order to alleviate this, we move the stack limit to be inside of the
// red zone that was allocated for exactly this reason.
let limit = context::get_sp_limit();
context::record_sp_limit(limit - context::RED_ZONE / 2);
// This probably isn't the best course of action. Ideally one would want
// to unwind the stack here instead of just aborting the entire process.
// This is a tricky problem, however. There's a few things which need to
// be considered:
//
// 1. We're here because of a stack overflow, yet unwinding will run
// destructors and hence arbitrary code. What if that code overflows
// the stack? One possibility is to use the above allocation of an
// extra 10k to hope that we don't hit the limit, and if we do then
// abort the whole program. Not the best, but kind of hard to deal
// with unless we want to switch stacks.
//
// 2. LLVM will optimize functions based on whether they can unwind or
// not. It will flag functions with 'nounwind' if it believes that
// the function cannot trigger unwinding, but if we do unwind on
// stack overflow then it means that we could unwind in any function
// anywhere. We would have to make sure that LLVM only places the
// nounwind flag on functions which don't call any other functions.
//
// 3. The function that overflowed may have owned arguments. These
// arguments need to have their destructors run, but we haven't even
// begun executing the function yet, so unwinding will not run the
// any landing pads for these functions. If this is ignored, then
// the arguments will just be leaked.
//
// Exactly what to do here is a very delicate topic, and is possibly
// still up in the air for what exactly to do. Some relevant issues:
//
// #3555 - out-of-stack failure leaks arguments
// #3695 - should there be a stack limit?
// #9855 - possible strategies which could be taken
// #9854 - unwinding on windows through __morestack has never worked
// #2361 - possible implementation of not using landing pads
if in_green_task_context() {
let mut task = Local::borrow(None::<Task>);
let n = task.get()
.name
.as_ref()
.map(|n| n.as_slice())
.unwrap_or("<unnamed>");
// See the message below for why this is not emitted to the
// task's logger. This has the additional conundrum of the
// logger may not be initialized just yet, meaning that an FFI
// call would happen to initialized it (calling out to libuv),
// and the FFI call needs 2MB of stack when we just ran out.
rterrln!("task '{}' has overflowed its stack", n);
} else {
rterrln!("stack overflow in non-task context");
}
intrinsics::abort();
}
}
/// This is the entry point of unwinding for things like lang items and such.
/// The arguments are normally generated by the compiler, and need to
/// have static lifetimes.
pub fn begin_unwind_raw(msg: *c_char, file: *c_char, line: size_t) -> ! {
use c_str::CString;
use cast::transmute;
#[inline]
fn static_char_ptr(p: *c_char) -> &'static str {
let s = unsafe { CString::new(p, false) };
match s.as_str() {
Some(s) => unsafe { transmute::<&str, &'static str>(s) },
None => rtabort!("message wasn't utf8?")
}
}
let msg = static_char_ptr(msg);
let file = static_char_ptr(file);
begin_unwind(msg, file, line as uint)
}
/// This is the entry point of unwinding for fail!() and assert!().
pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! {
use any::AnyRefExt;
use rt::in_green_task_context;
use rt::local::Local;
use rt::task::Task;
use str::Str;
use unstable::intrinsics;
unsafe {
let task: *mut Task;
// Note that this should be the only allocation performed in this block.
// Currently this means that fail!() on OOM will invoke this code path,
// but then again we're not really ready for failing on OOM anyway. If
// we do start doing this, then we should propagate this allocation to
// be performed in the parent of this task instead of the task that's
// failing.
let msg = ~msg as ~Any;
{
//let msg: &Any = msg;
let msg_s = match msg.as_ref::<&'static str>() {
Some(s) => *s,
None => match msg.as_ref::<~str>() {
Some(s) => s.as_slice(),
None => "~Any",
Owned(task) => Some(task),
Shared(arc) => unsafe {
match (*arc.get()).swap(0, SeqCst) {
0 => None,
n => Some(cast::transmute(n)),
}
};
if !in_green_task_context() {
rterrln!("failed in non-task context at '{}', {}:{}",
msg_s, file, line);
intrinsics::abort();
}
task = Local::unsafe_borrow();
let n = (*task).name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>");
// XXX: this should no get forcibly printed to the console, this should
// either be sent to the parent task (ideally), or get printed to
// the task's logger. Right now the logger is actually a uvio
// instance, which uses unkillable blocks internally for various
// reasons. This will cause serious trouble if the task is failing
// due to mismanagment of its own kill flag, so calling our own
// logger in its current state is a bit of a problem.
rterrln!("task '{}' failed at '{}', {}:{}", n, msg_s, file, line);
if (*task).unwinder.unwinding {
rtabort!("unwinding again");
}
}
}
(*task).unwinder.begin_unwind(msg);
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will fail dramatically instead.
#[cfg(not(test))] pub fn trash(self) { }
#[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); }
/// Create a blocked task, unless the task was already killed.
pub fn block(task: ~Task) -> BlockedTask {
Owned(task)
}
/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTaskIterator>
{
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
UnsafeArc::new(flag)
}
Shared(arc) => arc.clone(),
};
BlockedTaskIterator{ inner: arc }.take(num_handles)
}
/// Convert to an unsafe uint value. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
match self {
Owned(task) => {
let blocked_task_ptr: uint = cast::transmute(task);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr
}
Shared(arc) => {
let blocked_task_ptr: uint = cast::transmute(~arc);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr | 0x1
}
}
}
/// Convert from an unsafe uint value. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Owned(cast::transmute(blocked_task_ptr))
} else {
let ptr: ~UnsafeArc<AtomicUint> =
cast::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}
}
}
impl Death {
pub fn new() -> Death {
Death { on_exit: None, }
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, result: TaskResult) {
match self.on_exit.take() {
Some(f) => f(result),
None => {}
}
}
}
impl Drop for Death {
fn drop(&mut self) {
// make this type noncopyable
}
}
@ -690,4 +466,13 @@ mod test {
#[test]
#[should_fail]
fn test_begin_unwind() { begin_unwind("cause", file!(), line!()) }
// Task blocking tests
#[test]
fn block_and_wake() {
do with_test_task |task| {
BlockedTask::block(task).wake().unwrap()
}
}
}

View File

@ -69,6 +69,12 @@ impl Thread<()> {
/// called, when the `Thread` falls out of scope its destructor will block
/// waiting for the OS thread.
pub fn start<T: Send>(main: proc() -> T) -> Thread<T> {
Thread::start_stack(DEFAULT_STACK_SIZE, main)
}
/// Performs the same functionality as `start`, but specifies an explicit
/// stack size for the new thread.
pub fn start_stack<T: Send>(stack: uint, main: proc() -> T) -> Thread<T> {
// We need the address of the packet to fill in to be stable so when
// `main` fills it in it's still valid, so allocate an extra ~ box to do

View File

@ -1,170 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A very simple unsynchronized channel type for sending buffered data from
//! scheduler context to task context.
//!
//! XXX: This would be safer to use if split into two types like Port/Chan
use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::Scheduler;
use rt::kill::BlockedTask;
use rt::local::Local;
use vec::OwnedVector;
use container::Container;
struct TubeState<T> {
blocked_task: Option<BlockedTask>,
buf: ~[T]
}
pub struct Tube<T> {
priv p: RC<TubeState<T>>
}
impl<T> Tube<T> {
pub fn new() -> Tube<T> {
Tube {
p: RC::new(TubeState {
blocked_task: None,
buf: ~[]
})
}
}
pub fn send(&mut self, val: T) {
rtdebug!("tube send");
unsafe {
let state = self.p.unsafe_borrow_mut();
(*state).buf.push(val);
if (*state).blocked_task.is_some() {
// There's a waiting task. Wake it up
rtdebug!("waking blocked tube");
let task = (*state).blocked_task.take_unwrap();
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(task);
}
}
}
pub fn recv(&mut self) -> T {
unsafe {
let state = self.p.unsafe_borrow_mut();
if !(*state).buf.is_empty() {
return (*state).buf.shift();
} else {
// Block and wait for the next message
rtdebug!("blocking on tube recv");
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched: ~Scheduler = Local::take();
sched.deschedule_running_task_and_then(|_, task| {
(*state).blocked_task = Some(task);
});
rtdebug!("waking after tube recv");
let buf = &mut (*state).buf;
assert!(!buf.is_empty());
return buf.shift();
}
}
}
}
impl<T> Clone for Tube<T> {
fn clone(&self) -> Tube<T> {
Tube { p: self.p.clone() }
}
}
#[cfg(test)]
mod test {
use rt::test::*;
use rt::rtio::EventLoop;
use rt::sched::Scheduler;
use rt::local::Local;
use super::*;
use prelude::*;
#[test]
fn simple_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let mut tube_clone = Some(tube.clone());
let sched: ~Scheduler = Local::take();
sched.deschedule_running_task_and_then(|sched, task| {
let mut tube_clone = tube_clone.take_unwrap();
tube_clone.send(1);
sched.enqueue_blocked_task(task);
});
assert!(tube.recv() == 1);
}
}
#[test]
fn blocking_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let mut tube_clone = Some(tube.clone());
let sched: ~Scheduler = Local::take();
sched.deschedule_running_task_and_then(|sched, task| {
let tube_clone = tube_clone.take_unwrap();
do sched.event_loop.callback {
let mut tube_clone = tube_clone;
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
sched.enqueue_blocked_task(task);
});
assert!(tube.recv() == 1);
}
}
#[test]
fn many_blocking_test() {
static MAX: int = 100;
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let mut tube_clone = Some(tube.clone());
let sched: ~Scheduler = Local::take();
sched.deschedule_running_task_and_then(|sched, task| {
callback_send(tube_clone.take_unwrap(), 0);
fn callback_send(tube: Tube<int>, i: int) {
if i == 100 {
return
}
let mut sched = Local::borrow(None::<Scheduler>);
do sched.get().event_loop.callback {
let mut tube = tube;
// The task should be blocked on this now and
// sending will wake it up.
tube.send(i);
callback_send(tube, i + 1);
}
}
sched.enqueue_blocked_task(task);
});
for i in range(0, MAX) {
let j = tube.recv();
assert!(j == i);
}
}
}
}

View File

@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Implementation of Rust stack unwinding
//
// For background on exception handling and stack unwinding please see "Exception Handling in LLVM"
@ -254,3 +253,74 @@ pub extern "C" fn rust_eh_personality_catch(version: c_int,
}
}
}
/// This is the entry point of unwinding for things like lang items and such.
/// The arguments are normally generated by the compiler, and need to
/// have static lifetimes.
pub fn begin_unwind_raw(msg: *c_char, file: *c_char, line: size_t) -> ! {
#[inline]
fn static_char_ptr(p: *c_char) -> &'static str {
let s = unsafe { CString::new(p, false) };
match s.as_str() {
Some(s) => unsafe { cast::transmute::<&str, &'static str>(s) },
None => rtabort!("message wasn't utf8?")
}
}
let msg = static_char_ptr(msg);
let file = static_char_ptr(file);
begin_unwind(msg, file, line as uint)
}
/// This is the entry point of unwinding for fail!() and assert!().
pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! {
unsafe {
let task: *mut Task;
// Note that this should be the only allocation performed in this block.
// Currently this means that fail!() on OOM will invoke this code path,
// but then again we're not really ready for failing on OOM anyway. If
// we do start doing this, then we should propagate this allocation to
// be performed in the parent of this task instead of the task that's
// failing.
let msg = ~msg as ~Any;
{
let msg_s = match msg.as_ref::<&'static str>() {
Some(s) => *s,
None => match msg.as_ref::<~str>() {
Some(s) => s.as_slice(),
None => "~Any",
}
};
// It is assumed that all reasonable rust code will have a local
// task at all times. This means that this `try_unsafe_borrow` will
// succeed almost all of the time. There are border cases, however,
// when the runtime has *almost* set up the local task, but hasn't
// quite gotten there yet. In order to get some better diagnostics,
// we print on failure and immediately abort the whole process if
// there is no local task available.
match Local::try_unsafe_borrow() {
Some(t) => {
task = t;
let n = (*task).name.as_ref()
.map(|n| n.as_slice()).unwrap_or("<unnamed>");
println!("task '{}' failed at '{}', {}:{}", n, msg_s,
file, line);
}
None => {
println!("failed at '{}', {}:{}", msg_s, file, line);
intrinsics::abort();
}
}
if (*task).unwinder.unwinding {
rtabort!("unwinding again");
}
}
(*task).unwinder.begin_unwind(msg);
}
}

View File

@ -15,7 +15,6 @@ use libc;
use option::{Some, None, Option};
use os;
use str::StrSlice;
use unstable::atomics::{AtomicInt, INIT_ATOMIC_INT, SeqCst};
use unstable::running_on_valgrind;
// Indicates whether we should perform expensive sanity checks, including rtassert!
@ -144,13 +143,3 @@ memory and partly incapable of presentation to others.",
unsafe { libc::abort() }
}
}
static mut EXIT_STATUS: AtomicInt = INIT_ATOMIC_INT;
pub fn set_exit_status(code: int) {
unsafe { EXIT_STATUS.store(code, SeqCst) }
}
pub fn get_exit_status() -> int {
unsafe { EXIT_STATUS.load(SeqCst) }
}

View File

@ -338,8 +338,8 @@ mod tests {
use str;
use task::spawn;
use unstable::running_on_valgrind;
use io::native::file;
use io::{FileNotFound, Reader, Writer, io_error};
use io::pipe::PipeStream;
use io::{Writer, Reader, io_error, FileNotFound, OtherIoError};
#[test]
#[cfg(not(target_os="android"))] // FIXME(#10380)
@ -426,13 +426,13 @@ mod tests {
}
fn writeclose(fd: c_int, s: &str) {
let mut writer = file::FileDesc::new(fd, true);
let mut writer = PipeStream::open(fd as int);
writer.write(s.as_bytes());
}
fn readclose(fd: c_int) -> ~str {
let mut res = ~[];
let mut reader = file::FileDesc::new(fd, true);
let mut reader = PipeStream::open(fd as int);
let mut buf = [0, ..1024];
loop {
match reader.read(buf) {

View File

@ -53,22 +53,21 @@
#[allow(missing_doc)];
use prelude::*;
use any::Any;
use comm::{Chan, Port};
use kinds::Send;
use option::{None, Some, Option};
use result::{Result, Ok, Err};
use rt::in_green_task_context;
use rt::local::Local;
use rt::task::Task;
use send_str::{SendStr, IntoSendStr};
use str::Str;
use util;
#[cfg(test)] use any::Any;
#[cfg(test)] use comm::SharedChan;
#[cfg(test)] use ptr;
#[cfg(test)] use result;
pub mod spawn;
/// Indicates the manner in which a task exited.
///
/// A task that completes without failing is considered to exit successfully.
@ -80,27 +79,6 @@ pub mod spawn;
/// children tasks complete, recommend using a result future.
pub type TaskResult = Result<(), ~Any>;
/// Scheduler modes
#[deriving(Eq)]
pub enum SchedMode {
/// Run task on the default scheduler
DefaultScheduler,
/// All tasks run in the same OS thread
SingleThreaded,
}
/**
* Scheduler configuration options
*
* # Fields
*
* * sched_mode - The operating mode of the scheduler
*
*/
pub struct SchedOpts {
priv mode: SchedMode,
}
/**
* Task configuration options
*
@ -121,10 +99,9 @@ pub struct SchedOpts {
* scheduler other tasks will be impeded or even blocked indefinitely.
*/
pub struct TaskOpts {
priv watched: bool,
priv notify_chan: Option<Chan<TaskResult>>,
watched: bool,
notify_chan: Option<Chan<TaskResult>>,
name: Option<SendStr>,
sched: SchedOpts,
stack_size: Option<uint>
}
@ -169,7 +146,6 @@ impl TaskBuilder {
watched: self.opts.watched,
notify_chan: notify_chan,
name: name,
sched: self.opts.sched,
stack_size: self.opts.stack_size
},
gen_body: gen_body,
@ -229,11 +205,6 @@ impl TaskBuilder {
self.opts.name = Some(name.into_send_str());
}
/// Configure a custom scheduler mode for the task.
pub fn sched_mode(&mut self, mode: SchedMode) {
self.opts.sched.mode = mode;
}
/**
* Add a wrapper to the body of the spawned task.
*
@ -285,7 +256,6 @@ impl TaskBuilder {
watched: x.opts.watched,
notify_chan: notify_chan,
name: name,
sched: x.opts.sched,
stack_size: x.opts.stack_size
};
let f = match gen_body {
@ -296,7 +266,9 @@ impl TaskBuilder {
f
}
};
spawn::spawn_raw(opts, f);
let t: ~Task = Local::take();
t.spawn_sibling(opts, f);
}
/**
@ -343,9 +315,6 @@ pub fn default_task_opts() -> TaskOpts {
watched: true,
notify_chan: None,
name: None,
sched: SchedOpts {
mode: DefaultScheduler,
},
stack_size: None
}
}
@ -363,24 +332,6 @@ pub fn spawn(f: proc()) {
task.spawn(f)
}
pub fn spawn_sched(mode: SchedMode, f: proc()) {
/*!
* Creates a new task on a new or existing scheduler.
*
* When there are no more tasks to execute the
* scheduler terminates.
*
* # Failure
*
* In manual threads mode the number of threads requested must be
* greater than zero.
*/
let mut task = task();
task.sched_mode(mode);
task.spawn(f)
}
pub fn try<T:Send>(f: proc() -> T) -> Result<T, ~Any> {
/*!
* Execute a function in another task and return either the return value
@ -400,14 +351,10 @@ pub fn try<T:Send>(f: proc() -> T) -> Result<T, ~Any> {
pub fn with_task_name<U>(blk: |Option<&str>| -> U) -> U {
use rt::task::Task;
if in_green_task_context() {
let mut task = Local::borrow(None::<Task>);
match task.get().name {
Some(ref name) => blk(Some(name.as_slice())),
None => blk(None)
}
} else {
fail!("no task name exists in non-green task context")
let mut task = Local::borrow(None::<Task>);
match task.get().name {
Some(ref name) => blk(Some(name.as_slice())),
None => blk(None)
}
}
@ -415,11 +362,10 @@ pub fn deschedule() {
//! Yield control to the task scheduler
use rt::local::Local;
use rt::sched::Scheduler;
// FIXME(#7544): Optimize this, since we know we won't block.
let sched: ~Scheduler = Local::take();
sched.yield_now();
let task: ~Task = Local::take();
task.yield_now();
}
pub fn failing() -> bool {
@ -428,7 +374,7 @@ pub fn failing() -> bool {
use rt::task::Task;
let mut local = Local::borrow(None::<Task>);
local.get().unwinder.unwinding
local.get().unwinder.unwinding()
}
// The following 8 tests test the following 2^3 combinations:

View File

@ -1,233 +0,0 @@
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!**************************************************************************
*
* WARNING: linked failure has been removed since this doc comment was written,
* but it was so pretty that I didn't want to remove it.
*
* Spawning & linked failure
*
* Several data structures are involved in task management to allow properly
* propagating failure across linked/supervised tasks.
*
* (1) The "taskgroup_arc" is an unsafe::exclusive which contains a hashset of
* all tasks that are part of the group. Some tasks are 'members', which
* means if they fail, they will kill everybody else in the taskgroup.
* Other tasks are 'descendants', which means they will not kill tasks
* from this group, but can be killed by failing members.
*
* A new one of these is created each spawn_linked or spawn_supervised.
*
* (2) The "taskgroup" is a per-task control structure that tracks a task's
* spawn configuration. It contains a reference to its taskgroup_arc, a
* reference to its node in the ancestor list (below), and an optionally
* configured notification port. These are stored in TLS.
*
* (3) The "ancestor_list" is a cons-style list of unsafe::exclusives which
* tracks 'generations' of taskgroups -- a group's ancestors are groups
* which (directly or transitively) spawn_supervised-ed them. Each task
* is recorded in the 'descendants' of each of its ancestor groups.
*
* Spawning a supervised task is O(n) in the number of generations still
* alive, and exiting (by success or failure) that task is also O(n).
*
* This diagram depicts the references between these data structures:
*
* linked_________________________________
* ___/ _________ \___
* / \ | group X | / \
* ( A ) - - - - - - - > | {A,B} {}|< - - -( B )
* \___/ |_________| \___/
* unlinked
* | __ (nil)
* | //| The following code causes this:
* |__ // /\ _________
* / \ // || | group Y | fn taskA() {
* ( C )- - - ||- - - > |{C} {D,E}| spawn(taskB);
* \___/ / \=====> |_________| spawn_unlinked(taskC);
* supervise /gen \ ...
* | __ \ 00 / }
* | //| \__/ fn taskB() { ... }
* |__ // /\ _________ fn taskC() {
* / \/ || | group Z | spawn_supervised(taskD);
* ( D )- - - ||- - - > | {D} {E} | ...
* \___/ / \=====> |_________| }
* supervise /gen \ fn taskD() {
* | __ \ 01 / spawn_supervised(taskE);
* | //| \__/ ...
* |__ // _________ }
* / \/ | group W | fn taskE() { ... }
* ( E )- - - - - - - > | {E} {} |
* \___/ |_________|
*
* "tcb" "taskgroup_arc"
* "ancestor_list"
*
****************************************************************************/
#[doc(hidden)];
use prelude::*;
use comm::Chan;
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
use rt::task::{Task, Sched};
use rt::thread::Thread;
use rt::{in_green_task_context, new_event_loop};
use task::{SingleThreaded, TaskOpts, TaskResult};
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use task;
pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
assert!(in_green_task_context());
let mut task = if opts.sched.mode != SingleThreaded {
if opts.watched {
Task::build_child(opts.stack_size, f)
} else {
Task::build_root(opts.stack_size, f)
}
} else {
unsafe {
// Creating a 1:1 task:thread ...
let sched: *mut Scheduler = Local::unsafe_borrow();
let sched_handle = (*sched).make_handle();
// Since this is a 1:1 scheduler we create a queue not in
// the stealee set. The run_anything flag is set false
// which will disable stealing.
let (worker, _stealer) = (*sched).work_queue.pool().deque();
// Create a new scheduler to hold the new task
let mut new_sched = ~Scheduler::new_special(new_event_loop(),
worker,
(*sched).work_queues.clone(),
(*sched).sleeper_list.clone(),
false,
Some(sched_handle));
let mut new_sched_handle = new_sched.make_handle();
// Allow the scheduler to exit when the pinned task exits
new_sched_handle.send(Shutdown);
// Pin the new task to the new scheduler
let new_task = if opts.watched {
Task::build_homed_child(opts.stack_size, f, Sched(new_sched_handle))
} else {
Task::build_homed_root(opts.stack_size, f, Sched(new_sched_handle))
};
// Create a task that will later be used to join with the new scheduler
// thread when it is ready to terminate
let (thread_port, thread_chan) = Chan::new();
let join_task = do Task::build_child(None) {
debug!("running join task");
let thread: Thread<()> = thread_port.recv();
thread.join();
};
// Put the scheduler into another thread
let orig_sched_handle = (*sched).make_handle();
let new_sched = new_sched;
let thread = do Thread::start {
let mut new_sched = new_sched;
let mut orig_sched_handle = orig_sched_handle;
let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool, None) || {
debug!("boostrapping a 1:1 scheduler");
};
new_sched.bootstrap(bootstrap_task);
// Now tell the original scheduler to join with this thread
// by scheduling a thread-joining task on the original scheduler
orig_sched_handle.send(TaskFromFriend(join_task));
// NB: We can't simply send a message from here to another task
// because this code isn't running in a task and message passing doesn't
// work outside of tasks. Hence we're sending a scheduler message
// to execute a new task directly to a scheduler.
};
// Give the thread handle to the join task
thread_chan.send(thread);
// When this task is enqueued on the current scheduler it will then get
// forwarded to the scheduler to which it is pinned
new_task
}
};
if opts.notify_chan.is_some() {
let notify_chan = opts.notify_chan.take_unwrap();
let on_exit: proc(TaskResult) = proc(task_result) {
notify_chan.try_send(task_result);
};
task.death.on_exit = Some(on_exit);
}
task.name = opts.name.take();
debug!("spawn calling run_task");
Scheduler::run_task(task);
}
#[test]
fn test_spawn_raw_simple() {
let (po, ch) = Chan::new();
do spawn_raw(default_task_opts()) {
ch.send(());
}
po.recv();
}
#[test]
fn test_spawn_raw_unsupervise() {
let opts = task::TaskOpts {
watched: false,
notify_chan: None,
.. default_task_opts()
};
do spawn_raw(opts) {
fail!();
}
}
#[test]
fn test_spawn_raw_notify_success() {
let (notify_po, notify_ch) = Chan::new();
let opts = task::TaskOpts {
notify_chan: Some(notify_ch),
.. default_task_opts()
};
do spawn_raw(opts) {
}
assert!(notify_po.recv().is_ok());
}
#[test]
fn test_spawn_raw_notify_failure() {
// New bindings for these
let (notify_po, notify_ch) = Chan::new();
let opts = task::TaskOpts {
watched: false,
notify_chan: Some(notify_ch),
.. default_task_opts()
};
do spawn_raw(opts) {
fail!();
}
assert!(notify_po.recv().is_err());
}

View File

@ -11,15 +11,13 @@
//! Runtime calls emitted by the compiler.
use c_str::ToCStr;
use cast::transmute;
use libc::{c_char, size_t, uintptr_t};
use rt::task;
use rt::borrowck;
#[cold]
#[lang="fail_"]
pub fn fail_(expr: *c_char, file: *c_char, line: size_t) -> ! {
task::begin_unwind_raw(expr, file, line);
::rt::begin_unwind_raw(expr, file, line);
}
#[cold]
@ -81,15 +79,3 @@ pub unsafe fn check_not_borrowed(a: *u8,
line: size_t) {
borrowck::check_not_borrowed(a, file, line)
}
#[lang="start"]
pub fn start(main: *u8, argc: int, argv: **c_char) -> int {
use rt;
unsafe {
return do rt::start(argc, argv as **u8) {
let main: extern "Rust" fn() = transmute(main);
main();
};
}
}

View File

@ -23,6 +23,7 @@ pub mod lang;
pub mod sync;
pub mod mutex;
pub mod raw;
pub mod stack;
/**

View File

@ -740,10 +740,10 @@ pub fn std_macros() -> @str {
fail!("explicit failure")
);
($msg:expr) => (
::std::rt::task::begin_unwind($msg, file!(), line!())
::std::rt::begin_unwind($msg, file!(), line!())
);
($fmt:expr, $($arg:tt)*) => (
::std::rt::task::begin_unwind(format!($fmt, $($arg)*), file!(), line!())
::std::rt::begin_unwind(format!($fmt, $($arg)*), file!(), line!())
)
)