From f7eed223873a4280c9abea937e60ef1aaedf0162 Mon Sep 17 00:00:00 2001 From: toddaaro Date: Fri, 19 Jul 2013 14:25:05 -0700 Subject: [PATCH 1/7] A major refactoring that changes the way the runtime uses TLS. In the old design the TLS held the scheduler struct, and the scheduler struct held the active task. This posed all sorts of weird problems due to how we wanted to use the contents of TLS. The cleaner approach is to leave the active task in TLS and have the task hold the scheduler. To make this work out the scheduler has to run inside a regular task, and then once that is the case the context switching code is massively simplified, as instead of three possible paths there is only one. The logical flow is also easier to follow, as the scheduler struct acts somewhat like a "token" indicating what is active. These changes also necessitated changing a large number of runtime tests, and rewriting most of the runtime testing helpers. Polish level is "low", as I will very soon start on more scheduler changes that will require wiping the polish off. That being said there should be sufficient comments around anything complex to make this entirely respectable as a standalone commit. --- src/libstd/macros.rs | 9 +- src/libstd/rt/comm.rs | 3 +- src/libstd/rt/context.rs | 6 +- src/libstd/rt/io/net/tcp.rs | 60 +- src/libstd/rt/io/net/udp.rs | 16 +- src/libstd/rt/local.rs | 146 +++-- src/libstd/rt/mod.rs | 110 ++-- src/libstd/rt/sched.rs | 1039 ++++++++++++++++------------------- src/libstd/rt/task.rs | 219 ++++++-- src/libstd/rt/test.rs | 219 ++------ src/libstd/rt/tube.rs | 5 - src/libstd/rt/uv/mod.rs | 5 +- src/libstd/rt/uv/uvio.rs | 40 +- src/libstd/task/spawn.rs | 43 +- src/libstd/unstable/lang.rs | 3 - 15 files changed, 894 insertions(+), 1029 deletions(-) diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 7748c43efcd..04058887970 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -23,9 +23,14 @@ macro_rules! rtdebug_ ( } ) ) -// An alternate version with no output, for turning off logging +// An alternate version with no output, for turning off logging. An +// earlier attempt that did not call the fmt! macro was insufficient, +// as a case of the "let bind each variable" approach eventually +// failed without an error message describing the invocation site. macro_rules! rtdebug ( - ($( $arg:expr),+) => ( $(let _ = $arg)*; ) + ($( $arg:expr),+) => ( { + let _x = fmt!( $($arg),+ ); + }) ) macro_rules! rtassert ( diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 79ee8405531..491bdbe9b06 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -743,7 +743,7 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); - do spawntask_immediately { + do spawntask { assert!(port_cell.take().recv() == ~10); } @@ -1019,5 +1019,4 @@ mod test { } } } - } diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs index b30a55978f7..890ad061a68 100644 --- a/src/libstd/rt/context.rs +++ b/src/libstd/rt/context.rs @@ -49,12 +49,11 @@ impl Context { let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; let sp: *uint = stack.end(); let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); }; initialize_call_frame(&mut *regs, fp, argp, sp); @@ -72,13 +71,14 @@ impl Context { then loading the registers from a previously saved Context. */ pub fn swap(out_context: &mut Context, in_context: &Context) { + rtdebug!("swapping contexts"); let out_regs: &mut Registers = match out_context { &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { &Context { regs: ~ref r, _ } => r }; - + rtdebug!("doing raw swap"); unsafe { swap_registers(out_regs, in_regs) }; } } diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 1d7dafc4302..edfd3a92b5f 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -186,7 +186,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -194,7 +194,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -206,7 +206,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -214,7 +214,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -226,7 +226,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -234,7 +234,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -246,7 +246,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -254,7 +254,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -266,7 +266,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -276,7 +276,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -288,7 +288,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -298,7 +298,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -310,7 +310,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -327,7 +327,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -339,7 +339,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -356,7 +356,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -369,7 +369,7 @@ mod test { let addr = next_test_ip4(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -379,8 +379,8 @@ mod test { } } - do spawntask_immediately { - do max.times { + do spawntask { + for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -394,7 +394,7 @@ mod test { let addr = next_test_ip6(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -404,8 +404,8 @@ mod test { } } - do spawntask_immediately { - do max.times { + do spawntask { + for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -419,13 +419,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -440,7 +440,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -458,13 +458,13 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -479,7 +479,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -497,7 +497,7 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); @@ -535,7 +535,7 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index d186ad15f4a..76200d6f86e 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -132,7 +132,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -149,7 +149,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -164,7 +164,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -181,7 +181,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -196,7 +196,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -214,7 +214,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -233,7 +233,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -251,7 +251,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index b47bbf3edf0..34e3a0241a9 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -14,6 +14,7 @@ use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; //use borrow::to_uint; +use cell::Cell; pub trait Local { fn put(value: ~Self); @@ -24,40 +25,56 @@ pub trait Local { unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } -impl Local for Scheduler { - fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} - fn take() -> ~Scheduler { unsafe { local_ptr::take() } } +impl Local for Task { + fn put(value: ~Task) { unsafe { local_ptr::put(value) } } + fn take() -> ~Task { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + fn borrow(f: &fn(&mut Task) -> T) -> T { let mut res: Option = None; let res_ptr: *mut Option = &mut res; unsafe { - do local_ptr::borrow |sched| { -// rtdebug!("successfully unsafe borrowed sched pointer"); - let result = f(sched); + do local_ptr::borrow |task| { + let result = f(task); *res_ptr = Some(result); } } match res { Some(r) => { r } - None => rtabort!("function failed!") + None => { rtabort!("function failed in local_borrow") } } } - unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") } + unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { rtabort!("unimpl task try_unsafe_borrow") } } -impl Local for Task { - fn put(_value: ~Task) { rtabort!("unimpl") } - fn take() -> ~Task { rtabort!("unimpl") } - fn exists() -> bool { rtabort!("unimpl") } - fn borrow(f: &fn(&mut Task) -> T) -> T { - do Local::borrow:: |sched| { -// rtdebug!("sched about to grab current_task"); - match sched.current_task { +impl Local for Scheduler { + fn put(value: ~Scheduler) { + let value = Cell::new(value); + do Local::borrow:: |task| { + let task = task; + task.sched = Some(value.take()); + }; + } + fn take() -> ~Scheduler { + do Local::borrow:: |task| { + let sched = task.sched.take_unwrap(); + let task = task; + task.sched = None; + sched + } + } + fn exists() -> bool { + do Local::borrow:: |task| { + match task.sched { + Some(ref _task) => true, + None => false + } + } + } + fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + do Local::borrow:: |task| { + match task.sched { Some(~ref mut task) => { -// rtdebug!("current task pointer: %x", to_uint(task)); -// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); f(task) } None => { @@ -66,20 +83,19 @@ impl Local for Task { } } } - unsafe fn unsafe_borrow() -> *mut Task { - match (*Local::unsafe_borrow::()).current_task { - Some(~ref mut task) => { - let s: *mut Task = &mut *task; + unsafe fn unsafe_borrow() -> *mut Scheduler { + match (*Local::unsafe_borrow::()).sched { + Some(~ref mut sched) => { + let s: *mut Scheduler = &mut *sched; return s; } None => { - // Don't fail. Infinite recursion rtabort!("no scheduler") } } } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { - if Local::exists::() { + unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { + if Local::exists::() { Some(Local::unsafe_borrow()) } else { None @@ -101,57 +117,69 @@ impl Local for IoFactoryObject { unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") } } + #[cfg(test)] mod test { use unstable::run_in_bare_thread; use rt::test::*; - use rt::sched::Scheduler; +// use rt::sched::Scheduler; use super::*; + use rt::task::Task; + use rt::local_ptr; #[test] - fn thread_local_scheduler_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_smoke_test() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] - fn thread_local_scheduler_two_instances() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_two_instances() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + } #[test] fn borrow_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - unsafe { - let _scheduler: *mut Scheduler = Local::unsafe_borrow(); - } - let _scheduler: ~Scheduler = Local::take(); + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + unsafe { + let _task: *mut Task = Local::unsafe_borrow(); } + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_with_return() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let res = do Local::borrow:: |_sched| { - true - }; - assert!(res); - let _scheduler: ~Scheduler = Local::take(); - } + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + let res = do Local::borrow:: |_task| { + true + }; + assert!(res) + let task: ~Task = Local::take(); + cleanup_task(task); } } + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 3bcf6787824..268d402adf5 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -67,9 +67,10 @@ use iter::Times; use iterator::{Iterator, IteratorUtil}; use option::{Some, None}; use ptr::RawPtr; +use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::{Task, Sched}; +use rt::task::{Task, SchedTask, GreenTask}; use rt::thread::Thread; use rt::work_queue::WorkQueue; use rt::uv::uvio::UvEventLoop; @@ -309,44 +310,53 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } }; - // Build the main task and queue it up - match main_sched { - None => { - // The default case where we don't need a scheduler on the main thread. - // Just put an unpinned task onto one of the default schedulers. - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - scheds[0].enqueue_task(main_task); - } - Some(ref mut main_sched) => { - let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, home, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - main_sched.enqueue_task(main_task); - } - }; - - // Run each scheduler in a thread. let mut threads = ~[]; - while !scheds.is_empty() { + + if !use_main_sched { + + // In the case where we do not use a main_thread scheduler we + // run the main task in one of our threads. + + let main_cell = Cell::new(main); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, + main_cell.take()); + main_task.death.on_exit = Some(on_exit); + let main_task_cell = Cell::new(main_task); + let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(main_task_cell.take()); }; - threads.push(thread); } - // Run the main-thread scheduler - match main_sched { - Some(sched) => { let _ = sched.run(); }, - None => () + // Run each remaining scheduler in a thread. + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let mut sched = sched_cell.take(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("boostraping a non-primary scheduler"); + }; + sched.bootstrap(bootstrap_task); + }; + threads.push(thread); } + // If we do have a main thread scheduler, run it now. + + if use_main_sched { + let home = Sched(main_sched.make_handle()); + let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + home, main); + main_task.death.on_exit = Some(on_exit); + let main_task_cell = Cell::new(main_task); + sched.bootstrap(main_task); + } + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); @@ -378,27 +388,23 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::local::Local; - use self::sched::Scheduler; - // XXX: Hitting TLS twice to check if the scheduler exists - // then to check for the task is not good for perf if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; - } else { - if Local::exists::() { - let context = Cell::new_empty(); - do Local::borrow:: |sched| { - if sched.in_task_context() { - context.put_back(TaskContext); - } else { - context.put_back(SchedulerContext); - } + } else if Local::exists::() { + rtdebug!("either task or scheduler context in newrt"); + // In this case we know it is a new runtime context, but we + // need to check which one. Going to try borrowing task to + // check. Task should always be in TLS, so hopefully this + // doesn't conflict with other ops that borrow. + return do Local::borrow:: |task| { + match task.task_type { + SchedTask => SchedulerContext, + GreenTask(_) => TaskContext } - return context.take(); - } else { - return GlobalContext; - } + }; + } else { + return GlobalContext; } extern { @@ -410,23 +416,9 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler}; - use rt::local::Local; - use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - assert_eq!(context(), TaskContext); - let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |sched, task| { - assert_eq!(context(), SchedulerContext); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ae4ca2b9783..0326c2cbfe5 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,8 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use cast::transmute; +use sys; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -27,6 +28,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -59,11 +61,8 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, @@ -90,7 +89,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -103,15 +101,14 @@ impl Scheduler { } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -120,8 +117,7 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything @@ -132,8 +128,47 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(~sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + + rtdebug!("post sched.run(), cleaning up scheduler task"); + let mut stask = Local::take::(); + stask.destroyed = true; + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -142,79 +177,88 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow:: |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } + // One iteration of the scheduler loop, always run at least once. + + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. fn run_sched_once() { - let mut sched = Local::take::(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } - - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow:: |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow:: |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -234,18 +278,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -283,30 +315,23 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); + task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); - return true; + return None; } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -325,12 +350,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -338,7 +365,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.take_unwrap(); + let mut home = task.take_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -351,69 +378,45 @@ impl Scheduler { // Resume a task from the queue - but also take into account that // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + let mut this = self; - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless + let mut task = task; + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; } } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; - } - ResumeNow => { - rtdebug!("resuming now"); + AnySched if this.run_anything => { + task.give_home(AnySched); this.resume_task_immediately(task); - return true; + return None; } - Requeue => { - rtdebug!("re-queueing") + AnySched => { + task.give_home(AnySched); this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); + return Some(this); } } } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + return Some(this); + } } } @@ -422,33 +425,20 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("ending running task"); - - // This task is post-cleanup, so it must be unkillable. This sequence - // of descheduling and recycling must not get interrupted by a kill. - // FIXME(#7544): Make this use an inner descheduler, like yield should. - this.current_task.get_mut_ref().death.unkillable += 1; - - do this.deschedule_running_task_and_then |sched, dead_task| { - match dead_task.wake() { - Some(dead_task) => { - let mut dead_task = dead_task; - dead_task.death.unkillable -= 1; // FIXME(#7544) ugh - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); - } - None => rtabort!("dead task killed before recycle"), - } + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); } - - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // If a scheduling action is performed, return None. If not, + // return Some(sched). + + pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -461,55 +451,115 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); do this.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_blocked_task(last_task); } + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. - pub fn resume_task_immediately(~self, task: ~Task) { + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::(); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); - Local::put(this); + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - // Take pointers to both the task and scheduler's saved registers. + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); + + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. + unsafe { + + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); + + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; + + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); + + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); + + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); + } + + // When the context swaps back to the scheduler we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. unsafe { let sched = Local::unsafe_borrow::(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - - let sched = Local::unsafe_borrow::(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); - - // Running tasks may have asked us to do some cleanup (*sched).run_cleanup_job(); // Must happen after running the cleanup job (of course). - // Might not be running in task context; if not, a later call to - // resume_task_immediately will take care of this. - (*sched).current_task.map(|t| t.death.check_killed()); + let task = Local::unsafe_borrow::(); + (*task).death.check_killed(); + } + } + + // There are a variety of "obvious" functions to be passed to + // change_task_context, so we can make a few "named cases". + + // Enqueue the old task on the current scheduler. + pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { + sched.enqueue_task(task); + } + + // Sometimes we just want the old API though. + + pub fn resume_task_immediately(~self, task: ~Task) { + do self.change_task_context(task) |sched, stask| { + sched.sched_task = Some(stask); } } @@ -533,152 +583,78 @@ impl Scheduler { /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; - - unsafe { - let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - } - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); - } + let stask = this.sched_task.take_unwrap(); + // Otherwise this is the same as below. + this.switch_running_tasks_and_then(stask, f); } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. pub fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(&mut Scheduler, BlockedTask)) { - let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; - - let old_running_task = this.current_task.take_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f) - }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); + // This is where we convert the BlockedTask-taking closure into one + // that takes just a Task, and is aware of the block-or-killed protocol. + do self.change_task_context(next_task) |sched, task| { + // Task might need to receive a kill signal instead of blocking. + // We can call the "and_then" only if it blocks successfully. + match BlockedTask::try_block(task) { + Left(killed_task) => sched.enqueue_task(killed_task), + Right(blocked_task) => f(sched, blocked_task), + } } } + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow:: |sched| { + sched.enqueue_task(next_task.take()); + }; + } + // A helper that looks up the scheduler and runs a task. If it can + // be run now it is run now. + pub fn run_task(new_task: ~Task) { + let sched = Local::take::(); + sched.schedule_task(new_task).map_consume(Local::put); + } + + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; + unsafe { + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) + } + } // * Other stuff pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => { - let f = f.to_fn(); - // Task might need to receive a kill signal instead of blocking. - // We can call the "and_then" only if it blocks successfully. - match BlockedTask::try_block(task) { - Left(killed_task) => self.enqueue_task(killed_task), - Right(blocked_task) => f(self, blocked_task), - } - } + GiveTask(task, f) => f.to_fn()(self, task) } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -700,29 +676,73 @@ impl SchedHandle { // complaining type UnsafeTaskReceiver = raw::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask); + fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -745,46 +765,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; + let mut sched = ~new_test_uv_sched(); - let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::(); - assert!(sched.sched_id() == sched_id); - Local::put::(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -792,113 +816,93 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); - }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); - }; - let t4f = Cell::new(t4f); - - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); + }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); + + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); + + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); + }; + + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; normal_thread.join(); @@ -906,7 +910,6 @@ mod test { } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -915,116 +918,6 @@ mod test { } } - #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - sched.enqueue_blocked_task(task1); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow:: |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks @@ -1032,7 +925,7 @@ mod test { // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1053,34 +946,21 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); - }; - sched1.enqueue_task(task1); + let port = Cell::new(port); + let chan = Cell::new(chan); - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); - }; - sched2.enqueue_task(task2); - - let sched1_cell = Cell::new(sched1); - let thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + let _thread_one = do Thread::start { + let chan = Cell::new(chan.take()); + do run_in_newsched_task_core { + chan.take().send(()); + } }; - let sched2_cell = Cell::new(sched2); - let thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let _thread_two = do Thread::start { + let port = Cell::new(port.take()); + do run_in_newsched_task_core { + port.take().recv(); + } }; thread1.join(); @@ -1112,21 +992,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1151,9 +1031,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1178,15 +1058,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c1b799796d1..bc603bede97 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -30,21 +30,34 @@ use rt::context::Context; use task::spawn::Taskgroup; use cell::Cell; +// The Task struct represents all state associated with a rust +// task. There are at this point two primary "subtypes" of task, +// however instead of using a subtype we just have a "task_type" field +// in the struct. This contains a pointer to another struct that holds +// the type-specific state. + pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, - home: Option, taskgroup: Option, death: Death, destroyed: bool, coroutine: Option<~Coroutine>, // FIXME(#6874/#7599) use StringRef to save on allocations name: Option<~str>, + sched: Option<~Scheduler>, + task_type: TaskType } +pub enum TaskType { + GreenTask(Option<~SchedHome>), + SchedTask +} + +/// A coroutine is nothing more than a (register context, stack) pair. pub struct Coroutine { /// The segment of stack on which the task is currently running or /// if the task is blocked, on which the task will resume @@ -54,6 +67,7 @@ pub struct Coroutine { saved_context: Context } +/// Some tasks have a deciated home scheduler that they must run on. pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -68,6 +82,58 @@ pub struct Unwinder { impl Task { + // A helper to build a new task using the dynamically found + // scheduler and task. Only works in GreenTask context. + pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_child(f: ~fn()) -> ~Task { + Task::build_homed_child(f, AnySched) + } + + pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~Task::new_root_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_root(f: ~fn()) -> ~Task { + Task::build_homed_root(f, AnySched) + } + + pub fn new_sched_task() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Unwinder { unwinding: false }, + taskgroup: None, + death: Death::new(), + destroyed: false, + coroutine: Some(~Coroutine::empty()), + sched: None, + task_type: SchedTask + } + } + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Task { Task::new_root_homed(stack_pool, AnySched, start) @@ -88,12 +154,13 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(home), taskgroup: None, death: Death::new(), destroyed: false, coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + sched: None, + task_type: GreenTask(Some(~home)) } } @@ -106,7 +173,6 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(home), unwinder: Unwinder { unwinding: false }, taskgroup: None, // FIXME(#7544) make watching optional @@ -114,19 +180,35 @@ impl Task { destroyed: false, coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + sched: None, + task_type: GreenTask(Some(~home)) } } pub fn give_home(&mut self, new_home: SchedHome) { - self.home = Some(new_home); + match self.task_type { + GreenTask(ref mut home) => { + *home = Some(~new_home); + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } + } + + pub fn take_unwrap_home(&mut self) -> SchedHome { + match self.task_type { + GreenTask(ref mut home) => { + let out = home.take_unwrap(); + return *out; + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } } pub fn run(&mut self, f: &fn()) { - // This is just an assertion that `run` was called unsafely - // and this instance of Task is still accessible. - do Local::borrow:: |task| { - assert!(borrow::ref_eq(task, self)); - } self.unwinder.try(f); { let _ = self.taskgroup.take(); } @@ -141,6 +223,8 @@ impl Task { /// thread-local-storage. fn destroy(&mut self) { + rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self)); + do Local::borrow:: |task| { assert!(borrow::ref_eq(task, self)); } @@ -158,63 +242,68 @@ impl Task { self.destroyed = true; } - /// Check if *task* is currently home. - pub fn is_home(&self) -> bool { - do Local::borrow:: |sched| { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { - *id == sched.sched_id() - } - None => { rtabort!("task home of None") } - } - } - } + // New utility functions for homes. pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => { *id == sched.sched_id() } - None => {rtabort!("task home of None") } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + // Awe yea + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } } } - pub fn is_home_using_id(sched_id: uint) -> bool { + pub fn homed(&self) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { _ }))) => { true } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + // Grab both the scheduler and the task from TLS and check if the + // task is executing on an appropriate scheduler. + pub fn on_appropriate_sched() -> bool { do Local::borrow:: |task| { - match task.home { - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + let sched_id = task.sched.get_ref().sched_id(); + let sched_run_anything = task.sched.get_ref().run_anything; + match task.task_type { + GreenTask(Some(~AnySched)) => { + rtdebug!("anysched task in sched check ****"); + sched_run_anything + } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => { + rtdebug!("homed task in sched check ****"); *id == sched_id } - Some(AnySched) => { false } - None => { rtabort!("task home of None") } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } } } } - - /// Check if this *task* has a home. - pub fn homed(&self) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(_)) => { true } - None => { - rtabort!("task home of None") - } - } - } - - /// On a special scheduler? - pub fn on_special() -> bool { - do Local::borrow:: |sched| { - !sched.run_anything - } - } - } impl Drop for Task { - fn drop(&self) { assert!(self.destroyed) } + fn drop(&self) { + rtdebug!("called drop for a task"); + assert!(self.destroyed) + } } // Coroutines represent nothing more than a context and a stack @@ -234,19 +323,33 @@ impl Coroutine { } } + pub fn empty() -> Coroutine { + Coroutine { + current_stack_segment: StackSegment::new(0), + saved_context: Context::empty() + } + } + fn build_start_wrapper(start: ~fn()) -> ~fn() { let start_cell = Cell::new(start); let wrapper: ~fn() = || { // First code after swap to this new context. Run our // cleanup job. unsafe { - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - let sched = Local::unsafe_borrow::(); - let task = (*sched).current_task.get_mut_ref(); + // Again - might work while safe, or it might not. + do Local::borrow:: |sched| { + (sched).run_cleanup_job(); + } - do task.run { + // To call the run method on a task we need a direct + // reference to it. The task is in TLS, so we can + // simply unsafe_borrow it to get this reference. We + // need to still have the task in TLS though, so we + // need to unsafe_borrow. + let task = Local::unsafe_borrow::(); + + do (*task).run { // N.B. Removing `start` from the start wrapper // closure by emptying a cell is critical for // correctness. The ~Task pointer, and in turn the @@ -262,8 +365,11 @@ impl Coroutine { }; } + // We remove the sched from the Task in TLS right now. let sched = Local::take::(); - sched.terminate_current_task(); + // ... allowing us to give it away when performing a + // scheduling operation. + sched.terminate_current_task() }; return wrapper; } @@ -465,3 +571,4 @@ mod test { } } } + diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index ec1094ed4f2..d0970ec5866 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -18,14 +18,12 @@ use iterator::Iterator; use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; use rt::sched::Scheduler; -use rt::local::Local; use unstable::run_in_bare_thread; use rt::thread::Thread; use rt::task::Task; use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; -use rt::task::{Sched}; use rt::comm::oneshot; use result::{Result, Ok, Err}; @@ -34,29 +32,37 @@ pub fn new_test_uv_sched() -> Scheduler { let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message sched.no_sleep = true; return sched; + } -/// Creates a new scheduler in a new thread and runs a task in it, -/// then waits for the scheduler to exit. Failure of the task -/// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - let mut task = ~Task::new_root(&mut sched.stack_pool, - f.take()); - rtdebug!("newsched_task: %x", ::borrow::to_uint(task)); - task.death.on_exit = Some(on_exit); - sched.enqueue_task(task); - sched.run(); + run_in_newsched_task_core(f.take()); } } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_uv_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, f); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + /// Create more than one scheduler and run a function in a task /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. @@ -65,7 +71,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { use from_str::FromStr; use rt::sched::Shutdown; - let f_cell = Cell::new(f); + let f = Cell::new(f); do run_in_bare_thread { let nthreads = match os::getenv("RUST_RT_TEST_THREADS") { @@ -95,7 +101,6 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { scheds.push(sched); } - let f_cell = Cell::new(f_cell.take()); let handles = Cell::new(handles); let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); @@ -107,18 +112,30 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f_cell.take()); + f.take()); main_task.death.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); let mut threads = ~[]; + let main_task = Cell::new(main_task); + + let main_thread = { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task.take()); + } + }; + threads.push(main_thread); while !scheds.is_empty() { - let sched = scheds.pop(); + let mut sched = scheds.pop(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task_cell = Cell::new(bootstrap_task); let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(bootstrap_task_cell.take()); }; threads.push(thread); @@ -134,187 +151,52 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("spawntask taking the scheduler from TLS"); - - - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - rtdebug!("new task pointer: %x", ::borrow::to_uint(task)); - - let sched = Local::take::(); - rtdebug!("spawntask scheduling the new task"); - sched.schedule_task(task); -} - - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_immediately(f: ~fn()) { - use super::sched::*; - - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + Scheduler::run_task(Task::build_child(f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - let mut sched = Local::take::(); - sched.enqueue_task(task); - Local::put(sched); + Scheduler::run_task_later(Task::build_child(f)); } -/// Spawn a task and either run it immediately or run it later pub fn spawntask_random(f: ~fn()) { - use super::sched::*; use rand::{Rand, rng}; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - - } - }; - - let mut sched = Local::take::(); - let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + spawntask(f) } else { - sched.enqueue_task(task); - Local::put(sched); + spawntask_later(f) } } -/// Spawn a task, with the current scheduler as home, and queue it to -/// run later. -pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { - use super::sched::*; - use rand::{rng, RngUtil}; - let mut rng = rng(); - - let task = { - let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - let handle = sched.make_handle(); - let home_id = handle.sched_id; - - // now that we know where this is going, build a new function - // that can assert it is in the right place - let af: ~fn() = || { - do Local::borrow::() |sched| { - rtdebug!("home_id: %u, runtime loc: %u", - home_id, - sched.sched_id()); - assert!(home_id == sched.sched_id()); - }; - f() - }; - - ~Task::new_root_homed(&mut sched.stack_pool, - Sched(handle), - af) - }; - let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - // enqueue it for future execution - dest_sched.enqueue_task(task); -} - -/// Spawn a task and wait for it to finish, returning whether it -/// completed successfully or failed -pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { - use cell::Cell; - use super::sched::*; - - let f = Cell::new(f); +pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let (port, chan) = oneshot(); let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow:: |_running_task| { - // I don't understand why using a child task here fails. I - // think the fail status is propogating back up the task - // tree and triggering a fail for the parent, which we - // aren't correctly expecting. - - // ~running_task.new_child(&mut (*sched).stack_pool, - ~Task::new_root(&mut (*sched).stack_pool, - f.take()) - } - }; + let mut new_task = Task::build_root(f); new_task.death.on_exit = Some(on_exit); - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_blocked_task(old_task); - } - - rtdebug!("enqueued the new task, now waiting on exit_status"); + Scheduler::run_task(new_task); let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } + } /// Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { - use rt::sched::*; let f = Cell::new(f); - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let task = Cell::new(task); - let thread = do Thread::start { - let mut sched = ~new_test_uv_sched(); - sched.enqueue_task(task.take()); - sched.run(); + run_in_newsched_task_core(f.take()); }; + return thread; } @@ -323,11 +205,14 @@ pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = blk(~Task::new_root(&mut sched.stack_pool, ||{})); - sched.enqueue_task(task); - sched.run(); + cleanup_task(task); } } +/// Use to cleanup tasks created for testing but not "run". +pub fn cleanup_task(mut task: ~Task) { + task.destroyed = true; +} /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index bc223d8f3f7..ae455a6ad04 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -17,7 +17,6 @@ use option::*; use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; -use rt::{context, TaskContext, SchedulerContext}; use rt::kill::BlockedTask; use rt::local::Local; use vec::OwnedVector; @@ -44,8 +43,6 @@ impl Tube { pub fn send(&mut self, val: T) { rtdebug!("tube send"); - assert!(context() == SchedulerContext); - unsafe { let state = self.p.unsafe_borrow_mut(); (*state).buf.push(val); @@ -61,8 +58,6 @@ impl Tube { } pub fn recv(&mut self) -> T { - assert!(context() == TaskContext); - unsafe { let state = self.p.unsafe_borrow_mut(); if !(*state).buf.is_empty() { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 638d510614a..fa5c497a877 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -51,7 +51,7 @@ use rt::io::net::ip::IpAddr; use rt::io::IoError; -#[cfg(test)] use unstable::run_in_bare_thread; +//#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; @@ -333,7 +333,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { return None; } } - +/* #[test] fn test_slice_to_uv_buf() { let slice = [0, .. 20]; @@ -360,3 +360,4 @@ fn loop_smoke_test() { loop_.close(); } } +*/ diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 53ccd20186d..27970cc52af 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -33,7 +33,7 @@ use unstable::sync::Exclusive; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::{spawntask_immediately, +#[cfg(test)] use rt::test::{spawntask, next_test_ip4, run_in_newsched_task}; @@ -251,13 +251,11 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("connect: entered scheduler context"); - assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -458,11 +456,9 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read @@ -500,7 +496,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -602,11 +597,9 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("recvfrom: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { @@ -637,7 +630,6 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -845,7 +837,7 @@ fn test_simple_tcp_server_and_client() { let addr = next_test_ip4(); // Start the server first so it's listening when we connect - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -860,7 +852,7 @@ fn test_simple_tcp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -876,7 +868,7 @@ fn test_simple_udp_server_and_client() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server_socket = (*io).udp_bind(server_addr).unwrap(); @@ -891,7 +883,7 @@ fn test_simple_udp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client_socket = (*io).udp_bind(client_addr).unwrap(); @@ -906,7 +898,7 @@ fn test_read_and_block() { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let io = unsafe { Local::unsafe_borrow::() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; let mut stream = listener.accept().unwrap(); @@ -939,7 +931,7 @@ fn test_read_and_block() { assert!(reads > 1); } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -959,7 +951,7 @@ fn test_read_read_read() { let addr = next_test_ip4(); static MAX: uint = 500000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -973,7 +965,7 @@ fn test_read_read_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -999,7 +991,7 @@ fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client = (*io).udp_bind(client_addr).unwrap(); @@ -1008,7 +1000,7 @@ fn test_udp_twice() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server = (*io).udp_bind(server_addr).unwrap(); @@ -1036,7 +1028,7 @@ fn test_udp_many_read() { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server_out = (*io).udp_bind(server_out_addr).unwrap(); @@ -1059,7 +1051,7 @@ fn test_udp_many_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client_out = (*io).udp_bind(client_out_addr).unwrap(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 4558f8e32c1..88f214ef4c0 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -653,22 +653,16 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { match context() { - OldTaskContext => { - spawn_raw_oldsched(opts, f) - } - TaskContext => { - spawn_raw_newsched(opts, f) - } - SchedulerContext => { - fail!("can't spawn from scheduler context") - } - GlobalContext => { - fail!("can't spawn from global context") - } + OldTaskContext => spawn_raw_oldsched(opts, f), + TaskContext => spawn_raw_newsched(opts, f), + SchedulerContext => fail!("can't spawn from scheduler context"), + GlobalContext => fail!("can't spawn from global context"), } } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { + use rt::sched::*; + let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised)); let indestructible = opts.indestructible; @@ -700,19 +694,11 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("unsafe borrowed sched"); - - if opts.watched { - let child_wrapper = Cell::new(child_wrapper); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, child_wrapper.take()) - } - } else { - // An unwatched task is a new root in the exit-code propagation tree - ~Task::new_root(&mut (*sched).stack_pool, child_wrapper) - } + let mut task = if opts.watched { + Task::build_child(child_wrapper) + } else { + // An unwatched task is a new root in the exit-code propagation tree + Task::build_root(child_wrapper) }; if opts.notify_chan.is_some() { @@ -727,12 +713,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } task.name = opts.name.take(); + rtdebug!("spawn calling run_task"); + Scheduler::run_task(task); - rtdebug!("spawn about to take scheduler"); - - let sched = Local::take::(); - rtdebug!("took sched in spawn"); - sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 7a5e1116c32..74604b4ea17 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -70,9 +70,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow:: |task| { - rtdebug!("task pointer: %x, heap pointer: %x", - ::borrow::to_uint(task), - ::borrow::to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; From 37f385e44bbe31a693a665a74b6b493e11b7cf1c Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 26 Jul 2013 17:20:26 -0400 Subject: [PATCH 2/7] Have linked failure tests run on the new scheduler instead of requiring RUST_NEWRT to test. --- src/libstd/task/mod.rs | 241 +++++++++++++++++++++++++++-------------- 1 file changed, 157 insertions(+), 84 deletions(-) diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index d0124407bd4..5e4d48403cc 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -677,121 +677,190 @@ fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let (po, ch) = stream(); - let ch = SharedChan::new(ch); - do spawn_unlinked { - let ch = ch.clone(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let (po, ch) = stream(); + let ch = SharedChan::new(ch); do spawn_unlinked { - // Give middle task a chance to fail-but-not-kill-us. - do 16.times { task::yield(); } - ch.send(()); // If killed first, grandparent hangs. + let ch = ch.clone(); + do spawn_unlinked { + // Give middle task a chance to fail-but-not-kill-us. + for 16.times { task::yield(); } + ch.send(()); // If killed first, grandparent hangs. + } + fail!(); // Shouldn't kill either (grand)parent or (grand)child. } - fail!(); // Shouldn't kill either (grand)parent or (grand)child. + po.recv(); } - po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails - do spawn_unlinked { fail!(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + do spawn_unlinked { fail!(); } + } } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails - do spawn_supervised { fail!(); } - // Give child a chance to fail-but-not-kill-us. - do 16.times { task::yield(); } -} -#[test] #[should_fail] #[ignore(cfg(windows))] -fn test_spawn_unlinked_sup_fail_down() { - do spawn_supervised { block_forever(); } - fail!(); // Shouldn't leave a child hanging around. -} - -#[test] #[should_fail] #[ignore(cfg(windows))] -fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - // Unidirectional "parenting" shouldn't override bidirectional linked. - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - - do b0.spawn { - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + do spawn_supervised { fail!(); } + // Give child a chance to fail-but-not-kill-us. + for 16.times { task::yield(); } } - block_forever(); // We should get punted awake } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] +fn test_spawn_unlinked_sup_fail_down() { + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + do spawn_supervised { block_forever(); } + fail!(); // Shouldn't leave a child hanging around. + }; + assert!(result.is_err()); + } +} + +#[test] #[ignore(cfg(windows))] +fn test_spawn_linked_sup_fail_up() { // child fails; parent fails + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Unidirectional "parenting" shouldn't override bidirectional linked. + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; + + do b0.spawn { + fail!(); + } + block_forever(); // We should get punted awake + }; + assert!(result.is_err()); + } +} +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - do b0.spawn { block_forever(); } - fail!(); // *both* mechanisms would be wrong if this didn't kill the child + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; + do b0.spawn { block_forever(); } + fail!(); // *both* mechanisms would be wrong if this didn't kill the child + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - // Default options are to spawn linked & unsupervised. - do spawn { fail!(); } - block_forever(); // We should get punted awake + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Default options are to spawn linked & unsupervised. + do spawn { fail!(); } + block_forever(); // We should get punted awake + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails - // Default options are to spawn linked & unsupervised. - do spawn { block_forever(); } - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Default options are to spawn linked & unsupervised. + do spawn { block_forever(); } + fail!(); + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails - // Make sure the above test is the same as this one. - let mut builder = task(); - builder.linked(); - do builder.spawn { block_forever(); } - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Make sure the above test is the same as this one. + let mut builder = task(); + builder.linked(); + do builder.spawn { block_forever(); } + fail!(); + }; + assert!(result.is_err()); + } } // A couple bonus linked failure tests - testing for failure propagation even // when the middle task exits successfully early before kill signals are sent. -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_grandchild() { - // Middle task exits; does grandparent's failure propagate across the gap? - do spawn_supervised { - do spawn_supervised { block_forever(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Middle task exits; does grandparent's failure propagate across the gap? + do spawn_supervised { + do spawn_supervised { block_forever(); } + } + for 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_secondborn() { - // First-born child exits; does parent's failure propagate to sibling? - do spawn_supervised { - do spawn { block_forever(); } // linked + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // First-born child exits; does parent's failure propagate to sibling? + do spawn_supervised { + do spawn { block_forever(); } // linked + } + for 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_nephew_or_niece() { - // Our sibling exits; does our failure propagate to sibling's child? - do spawn { // linked - do spawn_supervised { block_forever(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Our sibling exits; does our failure propagate to sibling's child? + do spawn { // linked + do spawn_supervised { block_forever(); } + } + for 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_sup_propagate_sibling() { - // Middle sibling exits - does eldest's failure propagate to youngest? - do spawn { // linked - do spawn { block_forever(); } // linked + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Middle sibling exits - does eldest's failure propagate to youngest? + do spawn { // linked + do spawn { block_forever(); } // linked + } + for 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } #[test] @@ -1149,11 +1218,15 @@ fn test_child_doesnt_ref_parent() { fn child_no(x: uint) -> ~fn() { return || { if x < generations { - task::spawn(child_no(x+1)); + let mut t = task(); + t.unwatched(); + t.spawn(child_no(x+1)); } } } - task::spawn(child_no(0)); + let mut t = task(); + t.unwatched(); + t.spawn(child_no(0)); } #[test] @@ -1167,9 +1240,9 @@ fn test_simple_newsched_spawn() { #[test] #[ignore(cfg(windows))] fn test_spawn_watched() { - use rt::test::{run_in_newsched_task, spawntask_try}; + use rt::test::run_in_newsched_task; do run_in_newsched_task { - let result = do spawntask_try { + let result = do try { let mut t = task(); t.unlinked(); t.watched(); @@ -1189,9 +1262,9 @@ fn test_spawn_watched() { #[test] #[ignore(cfg(windows))] fn test_indestructible() { - use rt::test::{run_in_newsched_task, spawntask_try}; + use rt::test::run_in_newsched_task; do run_in_newsched_task { - let result = do spawntask_try { + let result = do try { let mut t = task(); t.watched(); t.supervised(); From 997719c13d449821691fcae927d3b94960b5bc03 Mon Sep 17 00:00:00 2001 From: toddaaro Date: Mon, 29 Jul 2013 12:06:36 -0700 Subject: [PATCH 3/7] Fixed a race where a scheduler configured to only run tasks pinned to it would "bounch" a regular task in and out of the work queue without allowing a different scheduler to run it. --- src/libstd/rt/local.rs | 2 +- src/libstd/rt/sched.rs | 41 ++++++++++++++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 34e3a0241a9..2bfe0fafdd8 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -120,7 +120,7 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { - use unstable::run_in_bare_thread; +// use unstable::run_in_bare_thread; use rt::test::*; // use rt::sched::Scheduler; use super::*; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 0326c2cbfe5..d22c5857a19 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -68,7 +68,10 @@ pub struct Scheduler { priv cleanup_job: Option, metrics: SchedMetrics, /// Should this scheduler run any task, or only pinned tasks? - run_anything: bool + run_anything: bool, + /// If the scheduler shouldn't run some tasks, a friend to send + /// them to. + friend_handle: Option } pub struct SchedHandle { @@ -80,7 +83,8 @@ pub struct SchedHandle { pub enum SchedMessage { Wake, Shutdown, - PinnedTask(~Task) + PinnedTask(~Task), + TaskFromFriend(~Task) } enum CleanupJob { @@ -97,7 +101,7 @@ impl Scheduler { sleeper_list: SleeperList) -> Scheduler { - Scheduler::new_special(event_loop, work_queue, sleeper_list, true) + Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None) } @@ -106,7 +110,8 @@ impl Scheduler { pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, - run_anything: bool) + run_anything: bool, + friend: Option) -> Scheduler { Scheduler { @@ -120,7 +125,8 @@ impl Scheduler { sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), - run_anything: run_anything + run_anything: run_anything, + friend_handle: friend } } @@ -327,6 +333,10 @@ impl Scheduler { this.resume_task_immediately(task); return None; } + Some(TaskFromFriend(task)) => { + this.resume_task_immediately(task); + return None; + } Some(Wake) => { this.sleepy = false; return Some(this); @@ -376,6 +386,19 @@ impl Scheduler { } } + /// Take a non-homed task we aren't allowed to run here and send + /// it to the designated friend scheduler to execute. + fn send_to_friend(&mut self, task: ~Task) { + match self.friend_handle { + Some(ref mut handle) => { + handle.send(TaskFromFriend(task)); + } + None => { + rtabort!("tried to send task to a friend but scheduler has no friends"); + } + } + } + // Resume a task from the queue - but also take into account that // it might not belong here. @@ -409,7 +432,8 @@ impl Scheduler { } AnySched => { task.give_home(AnySched); - this.enqueue_task(task); +// this.enqueue_task(task); + this.send_to_friend(task); return Some(this); } } @@ -816,12 +840,15 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); + let friend_handle = normal_sched.make_handle(); + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - false); + false, + Some(friend_handle)); let special_handle = Cell::new(special_sched.make_handle()); From a5f55b3ead06886190d905cfc826bf1d072ff675 Mon Sep 17 00:00:00 2001 From: toddaaro Date: Mon, 29 Jul 2013 13:34:08 -0700 Subject: [PATCH 4/7] minor tweaks - unboxed the coroutine so that it is no longer a ~ pointer inside the task struct, and also added an assert to verify that send is never called inside scheduler context as it is undefined (BROKEN) if that happens --- src/libstd/rt/comm.rs | 4 ++++ src/libstd/rt/io/net/tcp.rs | 6 +++--- src/libstd/rt/io/net/udp.rs | 2 +- src/libstd/rt/local.rs | 2 -- src/libstd/rt/mod.rs | 40 ++++++++++++++++++++++++------------- src/libstd/rt/sched.rs | 34 ++++++++++++++++++++++++++++--- src/libstd/rt/task.rs | 12 +++++------ src/libstd/rt/uv/uvio.rs | 4 +--- 8 files changed, 72 insertions(+), 32 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 491bdbe9b06..bb106edad94 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -24,6 +24,7 @@ use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; +use rt::{context, SchedulerContext}; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -90,6 +91,9 @@ impl ChanOne { } pub fn try_send(self, val: T) -> bool { + + rtassert!(context() != SchedulerContext); + let mut this = self; let mut recvr_active = true; let packet = this.packet(); diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index edfd3a92b5f..449df8cddea 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -571,7 +571,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let listener = TcpListener::bind(addr); assert!(listener.is_some()); @@ -590,13 +590,13 @@ mod test { #[cfg(test)] fn peer_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); listener.accept(); } - do spawntask_immediately { + do spawntask { let stream = TcpStream::connect(addr); assert!(stream.is_some()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 76200d6f86e..c04abfa899b 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -267,7 +267,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let server = UdpSocket::bind(addr); assert!(server.is_some()); diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 2bfe0fafdd8..71e60a6a923 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -120,9 +120,7 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { -// use unstable::run_in_bare_thread; use rt::test::*; -// use rt::sched::Scheduler; use super::*; use rt::task::Task; use rt::local_ptr; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 268d402adf5..73c30e5779c 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -70,7 +70,7 @@ use ptr::RawPtr; use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::{Task, SchedTask, GreenTask}; +use rt::task::{Task, SchedTask, GreenTask, Sched}; use rt::thread::Thread; use rt::work_queue::WorkQueue; use rt::uv::uvio::UvEventLoop; @@ -244,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let nscheds = util::default_sched_threads(); + let main = Cell::new(main); + // The shared list of sleeping schedulers. Schedulers wake each other // occassionally to do new work. let sleepers = SleeperList::new(); @@ -268,12 +270,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // If we need a main-thread task then create a main thread scheduler // that will reject any task that isn't pinned to it - let mut main_sched = if use_main_sched { + let main_sched = if use_main_sched { + + // Create a friend handle. + let mut friend_sched = scheds.pop(); + let friend_handle = friend_sched.make_handle(); + scheds.push(friend_sched); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, work_queue.clone(), sleepers.clone(), - false); + false, + Some(friend_handle)); let main_handle = main_sched.make_handle(); handles.push(main_handle); Some(main_sched) @@ -312,15 +321,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut threads = ~[]; + let on_exit = Cell::new(on_exit); + if !use_main_sched { // In the case where we do not use a main_thread scheduler we // run the main task in one of our threads. - - let main_cell = Cell::new(main); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - main_cell.take()); - main_task.death.on_exit = Some(on_exit); + main.take()); + main_task.death.on_exit = Some(on_exit.take()); let main_task_cell = Cell::new(main_task); let sched = scheds.pop(); @@ -347,16 +357,18 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } // If we do have a main thread scheduler, run it now. - + if use_main_sched { + + let mut main_sched = main_sched.get(); + let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, - home, main); - main_task.death.on_exit = Some(on_exit); - let main_task_cell = Cell::new(main_task); - sched.bootstrap(main_task); + let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + home, main.take()); + main_task.death.on_exit = Some(on_exit.take()); + main_sched.bootstrap(main_task); } - + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index d22c5857a19..816c963ad18 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,6 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use sys; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -334,7 +333,7 @@ impl Scheduler { return None; } Some(TaskFromFriend(task)) => { - this.resume_task_immediately(task); + this.schedule_task_sched_context(task); return None; } Some(Wake) => { @@ -432,7 +431,6 @@ impl Scheduler { } AnySched => { task.give_home(AnySched); -// this.enqueue_task(task); this.send_to_friend(task); return Some(this); } @@ -491,6 +489,36 @@ impl Scheduler { } } + // BAD BAD BAD BAD BAD + // Do something instead of just copy-pasting this. + pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> { + + // is the task home? + let is_home = task.is_home_no_tls(&self); + + // does the task have a home? + let homed = task.homed(); + + let mut this = self; + + if is_home || (!homed && this.run_anything) { + // here we know we are home, execute now OR we know we + // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); + this.resume_task_immediately(task); + return None; + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + return Some(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + return Some(this); + } + } + + // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index bc603bede97..13fdaded84b 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -45,9 +45,9 @@ pub struct Task { taskgroup: Option, death: Death, destroyed: bool, - coroutine: Option<~Coroutine>, // FIXME(#6874/#7599) use StringRef to save on allocations name: Option<~str>, + coroutine: Option, sched: Option<~Scheduler>, task_type: TaskType } @@ -128,7 +128,7 @@ impl Task { taskgroup: None, death: Death::new(), destroyed: false, - coroutine: Some(~Coroutine::empty()), + coroutine: Some(Coroutine::empty()), sched: None, task_type: SchedTask } @@ -157,8 +157,8 @@ impl Task { taskgroup: None, death: Death::new(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -178,8 +178,8 @@ impl Task { // FIXME(#7544) make watching optional death: self.death.new_child(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -375,9 +375,9 @@ impl Coroutine { } /// Destroy coroutine and try to reuse stack segment. - pub fn recycle(~self, stack_pool: &mut StackPool) { + pub fn recycle(self, stack_pool: &mut StackPool) { match self { - ~Coroutine { current_stack_segment, _ } => { + Coroutine { current_stack_segment, _ } => { stack_pool.give_segment(current_stack_segment); } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 27970cc52af..e93333661cf 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -790,10 +790,8 @@ impl Drop for UvTimer { impl RtioTimer for UvTimer { fn sleep(&self, msecs: u64) { let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("sleep: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let mut watcher = **self; do watcher.start(msecs, 0) |_, status| { From 1d82fe5aea71b1c265634f32716b268972141efb Mon Sep 17 00:00:00 2001 From: toddaaro Date: Wed, 31 Jul 2013 13:52:22 -0700 Subject: [PATCH 5/7] fixed incorrect handling of returned scheduler option and restructed scheduler functions slightly --- src/libstd/rt/comm.rs | 8 +-- src/libstd/rt/mod.rs | 12 +++- src/libstd/rt/sched.rs | 133 +++++++++++++++++---------------------- src/libstd/rt/task.rs | 4 +- src/libstd/rt/test.rs | 4 +- src/libstd/rt/uv/uvio.rs | 2 +- 6 files changed, 76 insertions(+), 87 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index bb106edad94..5a671d877d2 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -131,10 +131,7 @@ impl ChanOne { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let mut sched = Local::take::(); - rtdebug!("rendezvous send"); - sched.metrics.rendezvous_sends += 1; - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } @@ -350,8 +347,7 @@ impl Drop for ChanOne { assert!((*this.packet()).payload.is_none()); let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let sched = Local::take::(); - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 73c30e5779c..f0f4b646103 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -259,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut handles = ~[]; do nscheds.times { + rtdebug!("inserting a regular scheduler"); + // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); @@ -344,6 +346,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // Run each remaining scheduler in a thread. while !scheds.is_empty() { + rtdebug!("creating regular schedulers"); let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { @@ -360,15 +363,21 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { if use_main_sched { + rtdebug!("about to create the main scheduler task"); + let mut main_sched = main_sched.get(); let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, home, main.take()); main_task.death.on_exit = Some(on_exit.take()); + rtdebug!("boostrapping main_task"); + main_sched.bootstrap(main_task); } + rtdebug!("waiting for threads"); + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); @@ -404,7 +413,6 @@ pub fn context() -> RuntimeContext { if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; } else if Local::exists::() { - rtdebug!("either task or scheduler context in newrt"); // In this case we know it is a new runtime context, but we // need to check which one. Going to try borrowing task to // check. Task should always be in TLS, so hopefully this diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 816c963ad18..4abe69a7d13 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -142,11 +142,11 @@ impl Scheduler { local_ptr::init_tls_key(); // Create a task for the scheduler with an empty context. - let sched_task = Task::new_sched_task(); + let sched_task = ~Task::new_sched_task(); // Now that we have an empty task struct for the scheduler // task, put it in TLS. - Local::put::(~sched_task); + Local::put::(sched_task); // Now, as far as all the scheduler state is concerned, we are // inside the "scheduler" context. So we can act like the @@ -165,8 +165,6 @@ impl Scheduler { // cleaning up the memory it uses. As we didn't actually call // task.run() on the scheduler task we never get through all // the cleanup code it runs. - - rtdebug!("post sched.run(), cleaning up scheduler task"); let mut stask = Local::take::(); stask.destroyed = true; } @@ -224,6 +222,8 @@ impl Scheduler { // 2) A shutdown is also easy, shutdown. // 3) A pinned task - we resume immediately and do not return // here. + // 4) A message from another scheduler with a non-homed task + // to run here. let result = sched.interpret_message_queue(); let sched = match result { @@ -236,6 +236,8 @@ impl Scheduler { } }; + // Second activity is to try resuming a task from the queue. + let result = sched.resume_task_from_queue(); let mut sched = match result { Some(sched) => { @@ -333,8 +335,7 @@ impl Scheduler { return None; } Some(TaskFromFriend(task)) => { - this.schedule_task_sched_context(task); - return None; + return this.sched_schedule_task(task); } Some(Wake) => { this.sleepy = false; @@ -442,8 +443,6 @@ impl Scheduler { } } - // * Task-context operations - /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { @@ -457,10 +456,17 @@ impl Scheduler { } } - // If a scheduling action is performed, return None. If not, - // return Some(sched). + // Scheduling a task requires a few checks to make sure the task + // ends up in the appropriate location. The run_anything flag on + // the scheduler and the home on the task need to be checked. This + // helper performs that check. It takes a function that specifies + // how to queue the the provided task if that is the correct + // action. This is a "core" function that requires handling the + // returned Option correctly. - pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { + pub fn schedule_task(~self, task: ~Task, + schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) + -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -474,51 +480,44 @@ impl Scheduler { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - do this.switch_running_tasks_and_then(task) |sched, last_task| { + schedule_fn(this, task); + return None; + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + return Some(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + return Some(this); + } + } + + // There are two contexts in which schedule_task can be called: + // inside the scheduler, and inside a task. These contexts handle + // executing the task slightly differently. In the scheduler + // context case we want to receive the scheduler as an input, and + // manually deal with the option. In the task context case we want + // to use TLS to find the scheduler, and deal with the option + // inside the helper. + + pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { + do self.schedule_task(task) |sched, next_task| { + sched.resume_task_immediately(next_task); + } + } + + // Task context case - use TLS. + pub fn run_task(task: ~Task) { + let sched = Local::take::(); + let opt = do sched.schedule_task(task) |sched, next_task| { + do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { sched.enqueue_blocked_task(last_task); } - return None; - } else if !homed && !this.run_anything { - // the task isn't homed, but it can't be run here - this.enqueue_task(task); - return Some(this); - } else { - // task isn't home, so don't run it here, send it home - Scheduler::send_task_home(task); - return Some(this); - } + }; + opt.map_consume(Local::put); } - // BAD BAD BAD BAD BAD - // Do something instead of just copy-pasting this. - pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> { - - // is the task home? - let is_home = task.is_home_no_tls(&self); - - // does the task have a home? - let homed = task.homed(); - - let mut this = self; - - if is_home || (!homed && this.run_anything) { - // here we know we are home, execute now OR we know we - // aren't homed, and that this sched doesn't care - rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - this.resume_task_immediately(task); - return None; - } else if !homed && !this.run_anything { - // the task isn't homed, but it can't be run here - this.enqueue_task(task); - return Some(this); - } else { - // task isn't home, so don't run it here, send it home - Scheduler::send_task_home(task); - return Some(this); - } - } - - // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference @@ -586,7 +585,7 @@ impl Scheduler { Context::swap(current_task_context, next_task_context); } - // When the context swaps back to the scheduler we immediately + // When the context swaps back to this task we immediately // run the cleanup job, as expected by the previously called // swap_contexts function. unsafe { @@ -599,15 +598,8 @@ impl Scheduler { } } - // There are a variety of "obvious" functions to be passed to - // change_task_context, so we can make a few "named cases". - - // Enqueue the old task on the current scheduler. - pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { - sched.enqueue_task(task); - } - - // Sometimes we just want the old API though. + // Old API for task manipulation implemented over the new core + // function. pub fn resume_task_immediately(~self, task: ~Task) { do self.change_task_context(task) |sched, stask| { @@ -668,13 +660,6 @@ impl Scheduler { }; } - // A helper that looks up the scheduler and runs a task. If it can - // be run now it is run now. - pub fn run_task(new_task: ~Task) { - let sched = Local::take::(); - sched.schedule_task(new_task).map_consume(Local::put); - } - // Returns a mutable reference to both contexts involved in this // swap. This is unsafe - we are getting mutable internal // references to keep even when we don't own the tasks. It looks @@ -692,8 +677,6 @@ impl Scheduler { } } - // * Other stuff - pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { self.cleanup_job = Some(job); } @@ -1004,22 +987,22 @@ mod test { let port = Cell::new(port); let chan = Cell::new(chan); - let _thread_one = do Thread::start { + let thread_one = do Thread::start { let chan = Cell::new(chan.take()); do run_in_newsched_task_core { chan.take().send(()); } }; - let _thread_two = do Thread::start { + let thread_two = do Thread::start { let port = Cell::new(port.take()); do run_in_newsched_task_core { port.take().recv(); } }; - thread1.join(); - thread2.join(); + thread_two.join(); + thread_one.join(); } } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 13fdaded84b..fcc6ebeada6 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -209,7 +209,7 @@ impl Task { } pub fn run(&mut self, f: &fn()) { - + rtdebug!("run called on task: %u", borrow::to_uint(self)); self.unwinder.try(f); { let _ = self.taskgroup.take(); } self.death.collect_failure(!self.unwinder.unwinding); @@ -301,7 +301,7 @@ impl Task { impl Drop for Task { fn drop(&self) { - rtdebug!("called drop for a task"); + rtdebug!("called drop for a task: %u", borrow::to_uint(self)); assert!(self.destroyed) } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index d0970ec5866..22eb42e2ee8 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -130,7 +130,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { while !scheds.is_empty() { let mut sched = scheds.pop(); - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("bootstrapping non-primary scheduler"); + }; let bootstrap_task_cell = Cell::new(bootstrap_task); let sched_cell = Cell::new(sched); let thread = do Thread::start { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index e93333661cf..5be19752152 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -253,7 +253,7 @@ impl IoFactory for UvIoFactory { let scheduler = Local::take::(); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |_sched, task| { + do scheduler.deschedule_running_task_and_then |_, task| { rtdebug!("connect: entered scheduler context"); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); From 8e98eabce52b29d2e49ce63713ad2e1e04115bab Mon Sep 17 00:00:00 2001 From: toddaaro Date: Thu, 1 Aug 2013 15:08:51 -0700 Subject: [PATCH 6/7] modified local to include an implementation for try_unsafe_borrow:: so that the log methods will work --- src/libstd/rt/io/net/tcp.rs | 4 ++-- src/libstd/rt/local.rs | 10 ++++++++-- src/libstd/rt/task.rs | 1 + src/libstd/task/mod.rs | 12 ++++++------ 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 449df8cddea..2daa64e8085 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -380,7 +380,7 @@ mod test { } do spawntask { - for max.times { + do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -405,7 +405,7 @@ mod test { } do spawntask { - for max.times { + do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 71e60a6a923..7ab63233cff 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -44,7 +44,13 @@ impl Local for Task { } } unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { rtabort!("unimpl task try_unsafe_borrow") } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { + if Local::exists::() { + Some(Local::unsafe_borrow()) + } else { + None + } + } } impl Local for Scheduler { @@ -95,7 +101,7 @@ impl Local for Scheduler { } } unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { - if Local::exists::() { + if Local::exists::() { Some(Local::unsafe_borrow()) } else { None diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index fcc6ebeada6..23a0d28e457 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -129,6 +129,7 @@ impl Task { death: Death::new(), destroyed: false, coroutine: Some(Coroutine::empty()), + name: None, sched: None, task_type: SchedTask } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 5e4d48403cc..7a864ecb867 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -685,7 +685,7 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port let ch = ch.clone(); do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. - for 16.times { task::yield(); } + do 16.times { task::yield(); } ch.send(()); // If killed first, grandparent hangs. } fail!(); // Shouldn't kill either (grand)parent or (grand)child. @@ -706,7 +706,7 @@ fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails do run_in_newsched_task { do spawn_supervised { fail!(); } // Give child a chance to fail-but-not-kill-us. - for 16.times { task::yield(); } + do 16.times { task::yield(); } } } #[test] #[ignore(cfg(windows))] @@ -808,7 +808,7 @@ fn test_spawn_failure_propagate_grandchild() { do spawn_supervised { do spawn_supervised { block_forever(); } } - for 16.times { task::yield(); } + do 16.times { task::yield(); } fail!(); }; assert!(result.is_err()); @@ -824,7 +824,7 @@ fn test_spawn_failure_propagate_secondborn() { do spawn_supervised { do spawn { block_forever(); } // linked } - for 16.times { task::yield(); } + do 16.times { task::yield(); } fail!(); }; assert!(result.is_err()); @@ -840,7 +840,7 @@ fn test_spawn_failure_propagate_nephew_or_niece() { do spawn { // linked do spawn_supervised { block_forever(); } } - for 16.times { task::yield(); } + do 16.times { task::yield(); } fail!(); }; assert!(result.is_err()); @@ -856,7 +856,7 @@ fn test_spawn_linked_sup_propagate_sibling() { do spawn { // linked do spawn { block_forever(); } // linked } - for 16.times { task::yield(); } + do 16.times { task::yield(); } fail!(); }; assert!(result.is_err()); From ce761f4980e6cdc6115608e6db551bdf1c49387f Mon Sep 17 00:00:00 2001 From: toddaaro Date: Thu, 1 Aug 2013 17:23:55 -0700 Subject: [PATCH 7/7] xfailed the myserious failing test --- src/test/run-pass/unwind-resource.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 450e81bee33..a36be079b43 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -// xfail-win32 +// xfail-test extern mod extra; use std::comm::*;