diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 57e2a0bfe16..7318eaaf679 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -33,7 +33,9 @@ use std::os; use std::rt::crate_map; +use std::rt::local::Local; use std::rt::rtio; +use std::rt::task::Task; use std::rt::thread::Thread; use std::rt; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; @@ -41,7 +43,6 @@ use std::sync::deque; use std::task::TaskOpts; use std::util; use std::vec; -use stdtask = std::rt::task; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; use sleeper_list::SleeperList; @@ -49,6 +50,7 @@ use stack::StackPool; use task::GreenTask; mod macros; +mod simple; pub mod basic; pub mod context; @@ -61,16 +63,20 @@ pub mod task; #[lang = "start"] pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int { use std::cast; - do start(argc, argv) { - let main: extern "Rust" fn() = unsafe { cast::transmute(main) }; - main(); - } + let mut ret = None; + simple::task().run(|| { + ret = Some(do start(argc, argv) { + let main: extern "Rust" fn() = unsafe { cast::transmute(main) }; + main(); + }) + }); + ret.unwrap() } /// Set up a default runtime configuration, given compiler-supplied arguments. /// -/// This function will block the current thread of execution until the entire -/// pool of M:N schedulers have exited. +/// This function will block until the entire pool of M:N schedulers have +/// exited. This function also requires a local task to be available. /// /// # Arguments /// @@ -95,24 +101,37 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { /// Execute the main function in a pool of M:N schedulers. /// -/// Configures the runtime according to the environment, by default -/// using a task scheduler with the same number of threads as cores. -/// Returns a process exit code. +/// Configures the runtime according to the environment, by default using a task +/// scheduler with the same number of threads as cores. Returns a process exit +/// code. /// /// This function will not return until all schedulers in the associated pool /// have returned. pub fn run(main: proc()) -> int { + // Create a scheduler pool and spawn the main task into this pool. We will + // get notified over a channel when the main task exits. let mut pool = SchedPool::new(PoolConfig::new()); let (port, chan) = Chan::new(); let mut opts = TaskOpts::new(); opts.notify_chan = Some(chan); pool.spawn(opts, main); - do pool.spawn(TaskOpts::new()) { - if port.recv().is_err() { - os::set_exit_status(rt::DEFAULT_ERROR_CODE); - } + + // Wait for the main task to return, and set the process error code + // appropriately. + if port.recv().is_err() { + os::set_exit_status(rt::DEFAULT_ERROR_CODE); } - unsafe { stdtask::wait_for_completion(); } + + // Once the main task has exited and we've set our exit code, wait for all + // spawned sub-tasks to finish running. This is done to allow all schedulers + // to remain active while there are still tasks possibly running. + unsafe { + let mut task = Local::borrow(None::); + task.get().wait_for_other_tasks(); + } + + // Now that we're sure all tasks are dead, shut down the pool of schedulers, + // waiting for them all to return. pool.shutdown(); os::get_exit_status() } diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs new file mode 100644 index 00000000000..6fd2c436b2e --- /dev/null +++ b/src/libgreen/simple.rs @@ -0,0 +1,77 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A small module implementing a simple "runtime" used for bootstrapping a rust +//! scheduler pool and then interacting with it. + +use std::cast; +use std::rt::Runtime; +use std::task::TaskOpts; +use std::rt::rtio; +use std::rt::local::Local; +use std::rt::task::{Task, BlockedTask}; +use std::unstable::sync::LittleLock; + +struct SimpleTask { + lock: LittleLock, +} + +impl Runtime for SimpleTask { + // Implement the simple tasks of descheduling and rescheduling, but only in + // a simple number of cases. + fn deschedule(mut ~self, times: uint, mut cur_task: ~Task, + f: |BlockedTask| -> Result<(), BlockedTask>) { + assert!(times == 1); + + let my_lock: *mut LittleLock = &mut self.lock; + cur_task.put_runtime(self as ~Runtime); + + unsafe { + let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task); + let task = BlockedTask::block(cur_task); + + let mut guard = (*my_lock).lock(); + match f(task) { + Ok(()) => guard.wait(), + Err(task) => { cast::forget(task.wake()); } + } + drop(guard); + cur_task = cast::transmute::(cur_task_dupe); + } + Local::put(cur_task); + } + fn reawaken(mut ~self, mut to_wake: ~Task) { + let lock: *mut LittleLock = &mut self.lock; + to_wake.put_runtime(self as ~Runtime); + unsafe { + cast::forget(to_wake); + let _l = (*lock).lock(); + (*lock).signal(); + } + } + + // These functions are all unimplemented and fail as a result. This is on + // purpose. A "simple task" is just that, a very simple task that can't + // really do a whole lot. The only purpose of the task is to get us off our + // feet and running. + fn yield_now(~self, _cur_task: ~Task) { fail!() } + fn maybe_yield(~self, _cur_task: ~Task) { fail!() } + fn spawn_sibling(~self, _cur_task: ~Task, _opts: TaskOpts, _f: proc()) { + fail!() + } + fn local_io<'a>(&'a mut self) -> Option> { None } + fn wrap(~self) -> ~Any { fail!() } +} + +pub fn task() -> ~Task { + let mut task = ~Task::new(); + task.put_runtime(~SimpleTask { lock: LittleLock::new() } as ~Runtime); + return task; +} diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index 44b66a7804d..60ae239ee97 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -33,15 +33,16 @@ // answer is that you don't need them) use std::os; +use std::rt::local::Local; +use std::rt::task::Task; use std::rt; -use stdtask = std::rt::task; pub mod io; pub mod task; // XXX: this should not exist here -#[cfg(stage0, notready)] +#[cfg(stage0)] #[lang = "start"] pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int { use std::cast; @@ -72,9 +73,13 @@ pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int { /// exited. pub fn start(argc: int, argv: **u8, main: proc()) -> int { rt::init(argc, argv); - let exit_code = run(main); + let mut exit_code = None; + let mut main = Some(main); + task::new().run(|| { + exit_code = Some(run(main.take_unwrap())); + }); unsafe { rt::cleanup(); } - return exit_code; + return exit_code.unwrap(); } /// Executes a procedure on the current thread in a Rust task context. @@ -82,11 +87,11 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { /// This function has all of the same details as `start` except for a different /// number of arguments. pub fn run(main: proc()) -> int { - // Create a task, run the procedure in it, and then wait for everything. - task::run(task::new(), main); - - // Block this OS task waiting for everything to finish. - unsafe { stdtask::wait_for_completion() } - + // Run the main procedure and then wait for everything to finish + main(); + unsafe { + let mut task = Local::borrow(None::); + task.get().wait_for_other_tasks(); + } os::get_exit_status() } diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 48768def067..0d5e08979ca 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -77,17 +77,11 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) { stack::record_stack_bounds(my_stack - stack + 1024, my_stack); } - run(task, f); + let mut f = Some(f); + task.run(|| { f.take_unwrap()() }); }) } -/// Runs a task once, consuming the task. The given procedure is run inside of -/// the task. -pub fn run(t: ~Task, f: proc()) { - let mut f = Some(f); - t.run(|| { f.take_unwrap()(); }); -} - // This structure is the glue between channels and the 1:1 scheduling mode. This // structure is allocated once per task. struct Ops { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c0e1086483d..765f0b427cd 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -292,6 +292,21 @@ impl Task { pub fn local_io<'a>(&'a mut self) -> Option> { self.imp.get_mut_ref().local_io() } + + /// The main function of all rust executables will by default use this + /// function. This function will *block* the OS thread (hence the `unsafe`) + /// waiting for all known tasks to complete. Once this function has + /// returned, it is guaranteed that no more user-defined code is still + /// running. + pub unsafe fn wait_for_other_tasks(&mut self) { + TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves + TASK_LOCK.lock(); + while TASK_COUNT.load(SeqCst) > 0 { + TASK_LOCK.wait(); + } + TASK_LOCK.unlock(); + TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in + } } impl Drop for Task { @@ -396,18 +411,6 @@ impl Drop for Death { } } -/// The main function of all rust executables will by default use this function. -/// This function will *block* the OS thread (hence the `unsafe`) waiting for -/// all known tasks to complete. Once this function has returned, it is -/// guaranteed that no more user-defined code is still running. -pub unsafe fn wait_for_completion() { - TASK_LOCK.lock(); - while TASK_COUNT.load(SeqCst) > 0 { - TASK_LOCK.wait(); - } - TASK_LOCK.unlock(); -} - #[cfg(test)] mod test { use super::*;