From 5b2dc520340103491088616ba4f58095948f5821 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 18 Jun 2013 00:17:14 -0700 Subject: [PATCH] std::rt: Turn on multithreaded scheduling --- src/libstd/rt/mod.rs | 92 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 0a269aa8767..581e3addff0 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -60,7 +60,21 @@ Several modules in `core` are clients of `rt`: #[deny(unused_variable)]; use cell::Cell; +use clone::Clone; +use container::Container; +use from_str::FromStr; +use iterator::IteratorUtil; +use option::{Some, None}; +use os; use ptr::RawPtr; +use uint; +use rt::sched::{Scheduler, Coroutine, Shutdown}; +use rt::sleeper_list::SleeperList; +use rt::task::Task; +use rt::thread::Thread; +use rt::work_queue::WorkQueue; +use rt::uv::uvio::UvEventLoop; +use vec::{OwnedVector, MutableVector}; /// The global (exchange) heap. pub mod global_heap; @@ -159,23 +173,8 @@ pub mod util; /// The return value is used as the process return code. 0 on success, 101 on error. pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { - use self::sched::{Scheduler, Coroutine}; - use self::work_queue::WorkQueue; - use self::uv::uvio::UvEventLoop; - use self::sleeper_list::SleeperList; - init(crate_map); - - let loop_ = ~UvEventLoop::new(); - let work_queue = WorkQueue::new(); - let sleepers = SleeperList::new(); - let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); - sched.no_sleep = true; - let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main); - - sched.enqueue_task(main_task); - sched.run(); - + run(main); cleanup(); return 0; @@ -191,6 +190,67 @@ pub fn cleanup() { global_heap::cleanup(); } +pub fn run(main: ~fn()) { + let nthreads = match os::getenv("RUST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code + // to force the OS to preempt them frequently. + // Assuming that this help stress test concurrent types. + util::num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, nthreads) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + + handles.push(handle); + scheds.push(sched); + } + + let main_cell = Cell::new(main); + let handles = Cell::new(handles); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| { + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + + rtassert!(exit_status); + }; + new_task.on_exit = Some(on_exit); + let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, + new_task, main_cell.take()); + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; +} + /// Possible contexts in which Rust code may be executing. /// Different runtime services are available depending on context. /// Mostly used for determining if we're using the new scheduler