diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 7748c43efcd..04058887970 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -23,9 +23,14 @@ macro_rules! rtdebug_ ( } ) ) -// An alternate version with no output, for turning off logging +// An alternate version with no output, for turning off logging. An +// earlier attempt that did not call the fmt! macro was insufficient, +// as a case of the "let bind each variable" approach eventually +// failed without an error message describing the invocation site. macro_rules! rtdebug ( - ($( $arg:expr),+) => ( $(let _ = $arg)*; ) + ($( $arg:expr),+) => ( { + let _x = fmt!( $($arg),+ ); + }) ) macro_rules! rtassert ( diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 79ee8405531..5a671d877d2 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -24,6 +24,7 @@ use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; +use rt::{context, SchedulerContext}; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -90,6 +91,9 @@ impl ChanOne { } pub fn try_send(self, val: T) -> bool { + + rtassert!(context() != SchedulerContext); + let mut this = self; let mut recvr_active = true; let packet = this.packet(); @@ -127,10 +131,7 @@ impl ChanOne { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let mut sched = Local::take::(); - rtdebug!("rendezvous send"); - sched.metrics.rendezvous_sends += 1; - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } @@ -346,8 +347,7 @@ impl Drop for ChanOne { assert!((*this.packet()).payload.is_none()); let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let sched = Local::take::(); - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } @@ -743,7 +743,7 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); - do spawntask_immediately { + do spawntask { assert!(port_cell.take().recv() == ~10); } @@ -1019,5 +1019,4 @@ mod test { } } } - } diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs index b30a55978f7..890ad061a68 100644 --- a/src/libstd/rt/context.rs +++ b/src/libstd/rt/context.rs @@ -49,12 +49,11 @@ impl Context { let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; let sp: *uint = stack.end(); let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); }; initialize_call_frame(&mut *regs, fp, argp, sp); @@ -72,13 +71,14 @@ impl Context { then loading the registers from a previously saved Context. */ pub fn swap(out_context: &mut Context, in_context: &Context) { + rtdebug!("swapping contexts"); let out_regs: &mut Registers = match out_context { &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { &Context { regs: ~ref r, _ } => r }; - + rtdebug!("doing raw swap"); unsafe { swap_registers(out_regs, in_regs) }; } } diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 1d7dafc4302..2daa64e8085 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -186,7 +186,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -194,7 +194,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -206,7 +206,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -214,7 +214,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -226,7 +226,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -234,7 +234,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -246,7 +246,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -254,7 +254,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -266,7 +266,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -276,7 +276,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -288,7 +288,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -298,7 +298,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -310,7 +310,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -327,7 +327,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -339,7 +339,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -356,7 +356,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -369,7 +369,7 @@ mod test { let addr = next_test_ip4(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -379,7 +379,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -394,7 +394,7 @@ mod test { let addr = next_test_ip6(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -404,7 +404,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -419,13 +419,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -440,7 +440,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -458,13 +458,13 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -479,7 +479,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -497,7 +497,7 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); @@ -535,7 +535,7 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); @@ -571,7 +571,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let listener = TcpListener::bind(addr); assert!(listener.is_some()); @@ -590,13 +590,13 @@ mod test { #[cfg(test)] fn peer_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); listener.accept(); } - do spawntask_immediately { + do spawntask { let stream = TcpStream::connect(addr); assert!(stream.is_some()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index d186ad15f4a..c04abfa899b 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -132,7 +132,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -149,7 +149,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -164,7 +164,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -181,7 +181,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -196,7 +196,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -214,7 +214,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -233,7 +233,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -251,7 +251,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -267,7 +267,7 @@ mod test { #[cfg(test)] fn socket_name(addr: IpAddr) { do run_in_newsched_task { - do spawntask_immediately { + do spawntask { let server = UdpSocket::bind(addr); assert!(server.is_some()); diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index b47bbf3edf0..7ab63233cff 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -14,6 +14,7 @@ use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; //use borrow::to_uint; +use cell::Cell; pub trait Local { fn put(value: ~Self); @@ -24,40 +25,62 @@ pub trait Local { unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } -impl Local for Scheduler { - fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} - fn take() -> ~Scheduler { unsafe { local_ptr::take() } } +impl Local for Task { + fn put(value: ~Task) { unsafe { local_ptr::put(value) } } + fn take() -> ~Task { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + fn borrow(f: &fn(&mut Task) -> T) -> T { let mut res: Option = None; let res_ptr: *mut Option = &mut res; unsafe { - do local_ptr::borrow |sched| { -// rtdebug!("successfully unsafe borrowed sched pointer"); - let result = f(sched); + do local_ptr::borrow |task| { + let result = f(task); *res_ptr = Some(result); } } match res { Some(r) => { r } - None => rtabort!("function failed!") + None => { rtabort!("function failed in local_borrow") } + } + } + unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { + if Local::exists::() { + Some(Local::unsafe_borrow()) + } else { + None } } - unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") } } -impl Local for Task { - fn put(_value: ~Task) { rtabort!("unimpl") } - fn take() -> ~Task { rtabort!("unimpl") } - fn exists() -> bool { rtabort!("unimpl") } - fn borrow(f: &fn(&mut Task) -> T) -> T { - do Local::borrow:: |sched| { -// rtdebug!("sched about to grab current_task"); - match sched.current_task { +impl Local for Scheduler { + fn put(value: ~Scheduler) { + let value = Cell::new(value); + do Local::borrow:: |task| { + let task = task; + task.sched = Some(value.take()); + }; + } + fn take() -> ~Scheduler { + do Local::borrow:: |task| { + let sched = task.sched.take_unwrap(); + let task = task; + task.sched = None; + sched + } + } + fn exists() -> bool { + do Local::borrow:: |task| { + match task.sched { + Some(ref _task) => true, + None => false + } + } + } + fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + do Local::borrow:: |task| { + match task.sched { Some(~ref mut task) => { -// rtdebug!("current task pointer: %x", to_uint(task)); -// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); f(task) } None => { @@ -66,19 +89,18 @@ impl Local for Task { } } } - unsafe fn unsafe_borrow() -> *mut Task { - match (*Local::unsafe_borrow::()).current_task { - Some(~ref mut task) => { - let s: *mut Task = &mut *task; + unsafe fn unsafe_borrow() -> *mut Scheduler { + match (*Local::unsafe_borrow::()).sched { + Some(~ref mut sched) => { + let s: *mut Scheduler = &mut *sched; return s; } None => { - // Don't fail. Infinite recursion rtabort!("no scheduler") } } } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { + unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { if Local::exists::() { Some(Local::unsafe_borrow()) } else { @@ -101,57 +123,67 @@ impl Local for IoFactoryObject { unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") } } + #[cfg(test)] mod test { - use unstable::run_in_bare_thread; use rt::test::*; - use rt::sched::Scheduler; use super::*; + use rt::task::Task; + use rt::local_ptr; #[test] - fn thread_local_scheduler_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_smoke_test() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] - fn thread_local_scheduler_two_instances() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_two_instances() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + } #[test] fn borrow_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - unsafe { - let _scheduler: *mut Scheduler = Local::unsafe_borrow(); - } - let _scheduler: ~Scheduler = Local::take(); + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + unsafe { + let _task: *mut Task = Local::unsafe_borrow(); } + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_with_return() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let res = do Local::borrow:: |_sched| { - true - }; - assert!(res); - let _scheduler: ~Scheduler = Local::take(); - } + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + let res = do Local::borrow:: |_task| { + true + }; + assert!(res) + let task: ~Task = Local::take(); + cleanup_task(task); } } + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 3bcf6787824..f0f4b646103 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -67,9 +67,10 @@ use iter::Times; use iterator::{Iterator, IteratorUtil}; use option::{Some, None}; use ptr::RawPtr; +use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::{Task, Sched}; +use rt::task::{Task, SchedTask, GreenTask, Sched}; use rt::thread::Thread; use rt::work_queue::WorkQueue; use rt::uv::uvio::UvEventLoop; @@ -243,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let nscheds = util::default_sched_threads(); + let main = Cell::new(main); + // The shared list of sleeping schedulers. Schedulers wake each other // occassionally to do new work. let sleepers = SleeperList::new(); @@ -256,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut handles = ~[]; do nscheds.times { + rtdebug!("inserting a regular scheduler"); + // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); @@ -267,12 +272,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // If we need a main-thread task then create a main thread scheduler // that will reject any task that isn't pinned to it - let mut main_sched = if use_main_sched { + let main_sched = if use_main_sched { + + // Create a friend handle. + let mut friend_sched = scheds.pop(); + let friend_handle = friend_sched.make_handle(); + scheds.push(friend_sched); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, work_queue.clone(), sleepers.clone(), - false); + false, + Some(friend_handle)); let main_handle = main_sched.make_handle(); handles.push(main_handle); Some(main_sched) @@ -309,44 +321,63 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } }; - // Build the main task and queue it up - match main_sched { - None => { - // The default case where we don't need a scheduler on the main thread. - // Just put an unpinned task onto one of the default schedulers. - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - scheds[0].enqueue_task(main_task); - } - Some(ref mut main_sched) => { - let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, home, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - main_sched.enqueue_task(main_task); - } - }; - - // Run each scheduler in a thread. let mut threads = ~[]; - while !scheds.is_empty() { + + let on_exit = Cell::new(on_exit); + + if !use_main_sched { + + // In the case where we do not use a main_thread scheduler we + // run the main task in one of our threads. + + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, + main.take()); + main_task.death.on_exit = Some(on_exit.take()); + let main_task_cell = Cell::new(main_task); + let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(main_task_cell.take()); }; - threads.push(thread); } - // Run the main-thread scheduler - match main_sched { - Some(sched) => { let _ = sched.run(); }, - None => () + // Run each remaining scheduler in a thread. + while !scheds.is_empty() { + rtdebug!("creating regular schedulers"); + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let mut sched = sched_cell.take(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("boostraping a non-primary scheduler"); + }; + sched.bootstrap(bootstrap_task); + }; + threads.push(thread); } + // If we do have a main thread scheduler, run it now. + + if use_main_sched { + + rtdebug!("about to create the main scheduler task"); + + let mut main_sched = main_sched.get(); + + let home = Sched(main_sched.make_handle()); + let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, + home, main.take()); + main_task.death.on_exit = Some(on_exit.take()); + rtdebug!("boostrapping main_task"); + + main_sched.bootstrap(main_task); + } + + rtdebug!("waiting for threads"); + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); @@ -378,27 +409,22 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::local::Local; - use self::sched::Scheduler; - // XXX: Hitting TLS twice to check if the scheduler exists - // then to check for the task is not good for perf if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; - } else { - if Local::exists::() { - let context = Cell::new_empty(); - do Local::borrow:: |sched| { - if sched.in_task_context() { - context.put_back(TaskContext); - } else { - context.put_back(SchedulerContext); - } + } else if Local::exists::() { + // In this case we know it is a new runtime context, but we + // need to check which one. Going to try borrowing task to + // check. Task should always be in TLS, so hopefully this + // doesn't conflict with other ops that borrow. + return do Local::borrow:: |task| { + match task.task_type { + SchedTask => SchedulerContext, + GreenTask(_) => TaskContext } - return context.take(); - } else { - return GlobalContext; - } + }; + } else { + return GlobalContext; } extern { @@ -410,23 +436,9 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler}; - use rt::local::Local; - use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - assert_eq!(context(), TaskContext); - let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |sched, task| { - assert_eq!(context(), SchedulerContext); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ae4ca2b9783..4abe69a7d13 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,7 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use cast::transmute; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -27,6 +27,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -59,17 +60,17 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, metrics: SchedMetrics, /// Should this scheduler run any task, or only pinned tasks? - run_anything: bool + run_anything: bool, + /// If the scheduler shouldn't run some tasks, a friend to send + /// them to. + friend_handle: Option } pub struct SchedHandle { @@ -81,7 +82,8 @@ pub struct SchedHandle { pub enum SchedMessage { Wake, Shutdown, - PinnedTask(~Task) + PinnedTask(~Task), + TaskFromFriend(~Task) } enum CleanupJob { @@ -90,7 +92,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -99,19 +100,19 @@ impl Scheduler { sleeper_list: SleeperList) -> Scheduler { - Scheduler::new_special(event_loop, work_queue, sleeper_list, true) + Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None) } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, - run_anything: bool) + run_anything: bool, + friend: Option) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -120,11 +121,11 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), - run_anything: run_anything + run_anything: run_anything, + friend_handle: friend } } @@ -132,8 +133,45 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = ~Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + let mut stask = Local::take::(); + stask.destroyed = true; + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -142,79 +180,92 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow:: |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } + // One iteration of the scheduler loop, always run at least once. + + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. fn run_sched_once() { - let mut sched = Local::take::(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } - - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow:: |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + // 4) A message from another scheduler with a non-homed task + // to run here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + // Second activity is to try resuming a task from the queue. + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow:: |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -234,18 +285,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -283,30 +322,26 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); + task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); - return true; + return None; + } + Some(TaskFromFriend(task)) => { + return this.sched_schedule_task(task); } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -325,12 +360,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -338,7 +375,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.take_unwrap(); + let mut home = task.take_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -349,106 +386,87 @@ impl Scheduler { } } - // Resume a task from the queue - but also take into account that - // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking in work queue for task to schedule"); - let mut this = self; - - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. - match this.work_queue.pop() { - Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless - } - } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; - } - ResumeNow => { - rtdebug!("resuming now"); - this.resume_task_immediately(task); - return true; - } - Requeue => { - rtdebug!("re-queueing") - this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); - } - } + /// Take a non-homed task we aren't allowed to run here and send + /// it to the designated friend scheduler to execute. + fn send_to_friend(&mut self, task: ~Task) { + match self.friend_handle { + Some(ref mut handle) => { + handle.send(TaskFromFriend(task)); } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + rtabort!("tried to send task to a friend but scheduler has no friends"); + } } } - // * Task-context operations + // Resume a task from the queue - but also take into account that + // it might not belong here. + + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + + let mut this = self; + + match this.work_queue.pop() { + Some(task) => { + let mut task = task; + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; + } + } + AnySched if this.run_anything => { + task.give_home(AnySched); + this.resume_task_immediately(task); + return None; + } + AnySched => { + task.give_home(AnySched); + this.send_to_friend(task); + return Some(this); + } + } + } + None => { + return Some(this); + } + } + } /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("ending running task"); - - // This task is post-cleanup, so it must be unkillable. This sequence - // of descheduling and recycling must not get interrupted by a kill. - // FIXME(#7544): Make this use an inner descheduler, like yield should. - this.current_task.get_mut_ref().death.unkillable += 1; - - do this.deschedule_running_task_and_then |sched, dead_task| { - match dead_task.wake() { - Some(dead_task) => { - let mut dead_task = dead_task; - dead_task.death.unkillable -= 1; // FIXME(#7544) ugh - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); - } - None => rtabort!("dead task killed before recycle"), - } + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); } - - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // Scheduling a task requires a few checks to make sure the task + // ends up in the appropriate location. The run_anything flag on + // the scheduler and the home on the task need to be checked. This + // helper performs that check. It takes a function that specifies + // how to queue the the provided task if that is the correct + // action. This is a "core" function that requires handling the + // returned Option correctly. + + pub fn schedule_task(~self, task: ~Task, + schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) + -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -461,55 +479,131 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care - do this.switch_running_tasks_and_then(task) |sched, last_task| { - sched.enqueue_blocked_task(last_task); - } + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); + schedule_fn(this, task); + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops + // There are two contexts in which schedule_task can be called: + // inside the scheduler, and inside a task. These contexts handle + // executing the task slightly differently. In the scheduler + // context case we want to receive the scheduler as an input, and + // manually deal with the option. In the task context case we want + // to use TLS to find the scheduler, and deal with the option + // inside the helper. - pub fn resume_task_immediately(~self, task: ~Task) { + pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { + do self.schedule_task(task) |sched, next_task| { + sched.resume_task_immediately(next_task); + } + } + + // Task context case - use TLS. + pub fn run_task(task: ~Task) { + let sched = Local::take::(); + let opt = do sched.schedule_task(task) |sched, next_task| { + do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { + sched.enqueue_blocked_task(last_task); + } + }; + opt.map_consume(Local::put); + } + + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. + + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::(); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); - Local::put(this); + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - // Take pointers to both the task and scheduler's saved registers. + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); + + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. + unsafe { + + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); + + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; + + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); + + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); + + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); + } + + // When the context swaps back to this task we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. unsafe { let sched = Local::unsafe_borrow::(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - - let sched = Local::unsafe_borrow::(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); - - // Running tasks may have asked us to do some cleanup (*sched).run_cleanup_job(); // Must happen after running the cleanup job (of course). - // Might not be running in task context; if not, a later call to - // resume_task_immediately will take care of this. - (*sched).current_task.map(|t| t.death.check_killed()); + let task = Local::unsafe_borrow::(); + (*task).death.check_killed(); + } + } + + // Old API for task manipulation implemented over the new core + // function. + + pub fn resume_task_immediately(~self, task: ~Task) { + do self.change_task_context(task) |sched, stask| { + sched.sched_task = Some(stask); } } @@ -533,152 +627,69 @@ impl Scheduler { /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; - - unsafe { - let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - } - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); - } + let stask = this.sched_task.take_unwrap(); + // Otherwise this is the same as below. + this.switch_running_tasks_and_then(stask, f); } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. pub fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(&mut Scheduler, BlockedTask)) { - let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; - - let old_running_task = this.current_task.take_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f) - }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); + // This is where we convert the BlockedTask-taking closure into one + // that takes just a Task, and is aware of the block-or-killed protocol. + do self.change_task_context(next_task) |sched, task| { + // Task might need to receive a kill signal instead of blocking. + // We can call the "and_then" only if it blocks successfully. + match BlockedTask::try_block(task) { + Left(killed_task) => sched.enqueue_task(killed_task), + Right(blocked_task) => f(sched, blocked_task), + } } } + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow:: |sched| { + sched.enqueue_task(next_task.take()); + }; + } - - // * Other stuff + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; + unsafe { + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) + } + } pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => { - let f = f.to_fn(); - // Task might need to receive a kill signal instead of blocking. - // We can call the "and_then" only if it blocks successfully. - match BlockedTask::try_block(task) { - Left(killed_task) => self.enqueue_task(killed_task), - Right(blocked_task) => f(self, blocked_task), - } - } + GiveTask(task, f) => f.to_fn()(self, task) } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -700,29 +711,73 @@ impl SchedHandle { // complaining type UnsafeTaskReceiver = raw::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask); + fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -745,46 +800,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; + let mut sched = ~new_test_uv_sched(); - let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::(); - assert!(sched.sched_id() == sched_id); - Local::put::(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -792,113 +851,96 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + let friend_handle = normal_sched.make_handle(); + + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false, + Some(friend_handle)); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); - }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); - }; - let t4f = Cell::new(t4f); - - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); + }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); + + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); + + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); + }; + + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; normal_thread.join(); @@ -906,7 +948,6 @@ mod test { } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -915,116 +956,6 @@ mod test { } } - #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - sched.enqueue_blocked_task(task1); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow:: |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks @@ -1032,7 +963,7 @@ mod test { // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1053,38 +984,25 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); - }; - sched1.enqueue_task(task1); + let port = Cell::new(port); + let chan = Cell::new(chan); - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); - }; - sched2.enqueue_task(task2); - - let sched1_cell = Cell::new(sched1); - let thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + let thread_one = do Thread::start { + let chan = Cell::new(chan.take()); + do run_in_newsched_task_core { + chan.take().send(()); + } }; - let sched2_cell = Cell::new(sched2); - let thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let thread_two = do Thread::start { + let port = Cell::new(port.take()); + do run_in_newsched_task_core { + port.take().recv(); + } }; - thread1.join(); - thread2.join(); + thread_two.join(); + thread_one.join(); } } @@ -1112,21 +1030,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1151,9 +1069,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1178,15 +1096,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c1b799796d1..23a0d28e457 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -30,21 +30,34 @@ use rt::context::Context; use task::spawn::Taskgroup; use cell::Cell; +// The Task struct represents all state associated with a rust +// task. There are at this point two primary "subtypes" of task, +// however instead of using a subtype we just have a "task_type" field +// in the struct. This contains a pointer to another struct that holds +// the type-specific state. + pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, - home: Option, taskgroup: Option, death: Death, destroyed: bool, - coroutine: Option<~Coroutine>, // FIXME(#6874/#7599) use StringRef to save on allocations name: Option<~str>, + coroutine: Option, + sched: Option<~Scheduler>, + task_type: TaskType } +pub enum TaskType { + GreenTask(Option<~SchedHome>), + SchedTask +} + +/// A coroutine is nothing more than a (register context, stack) pair. pub struct Coroutine { /// The segment of stack on which the task is currently running or /// if the task is blocked, on which the task will resume @@ -54,6 +67,7 @@ pub struct Coroutine { saved_context: Context } +/// Some tasks have a deciated home scheduler that they must run on. pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -68,6 +82,59 @@ pub struct Unwinder { impl Task { + // A helper to build a new task using the dynamically found + // scheduler and task. Only works in GreenTask context. + pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_child(f: ~fn()) -> ~Task { + Task::build_homed_child(f, AnySched) + } + + pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~Task::new_root_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_root(f: ~fn()) -> ~Task { + Task::build_homed_root(f, AnySched) + } + + pub fn new_sched_task() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Unwinder { unwinding: false }, + taskgroup: None, + death: Death::new(), + destroyed: false, + coroutine: Some(Coroutine::empty()), + name: None, + sched: None, + task_type: SchedTask + } + } + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Task { Task::new_root_homed(stack_pool, AnySched, start) @@ -88,12 +155,13 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(home), taskgroup: None, death: Death::new(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } @@ -106,28 +174,43 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(home), unwinder: Unwinder { unwinding: false }, taskgroup: None, // FIXME(#7544) make watching optional death: self.death.new_child(), destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + coroutine: Some(Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } pub fn give_home(&mut self, new_home: SchedHome) { - self.home = Some(new_home); + match self.task_type { + GreenTask(ref mut home) => { + *home = Some(~new_home); + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } + } + + pub fn take_unwrap_home(&mut self) -> SchedHome { + match self.task_type { + GreenTask(ref mut home) => { + let out = home.take_unwrap(); + return *out; + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } } pub fn run(&mut self, f: &fn()) { - // This is just an assertion that `run` was called unsafely - // and this instance of Task is still accessible. - do Local::borrow:: |task| { - assert!(borrow::ref_eq(task, self)); - } - + rtdebug!("run called on task: %u", borrow::to_uint(self)); self.unwinder.try(f); { let _ = self.taskgroup.take(); } self.death.collect_failure(!self.unwinder.unwinding); @@ -141,6 +224,8 @@ impl Task { /// thread-local-storage. fn destroy(&mut self) { + rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self)); + do Local::borrow:: |task| { assert!(borrow::ref_eq(task, self)); } @@ -158,63 +243,68 @@ impl Task { self.destroyed = true; } - /// Check if *task* is currently home. - pub fn is_home(&self) -> bool { - do Local::borrow:: |sched| { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { - *id == sched.sched_id() - } - None => { rtabort!("task home of None") } - } - } - } + // New utility functions for homes. pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => { *id == sched.sched_id() } - None => {rtabort!("task home of None") } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + // Awe yea + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } } } - pub fn is_home_using_id(sched_id: uint) -> bool { + pub fn homed(&self) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { _ }))) => { true } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + // Grab both the scheduler and the task from TLS and check if the + // task is executing on an appropriate scheduler. + pub fn on_appropriate_sched() -> bool { do Local::borrow:: |task| { - match task.home { - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + let sched_id = task.sched.get_ref().sched_id(); + let sched_run_anything = task.sched.get_ref().run_anything; + match task.task_type { + GreenTask(Some(~AnySched)) => { + rtdebug!("anysched task in sched check ****"); + sched_run_anything + } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => { + rtdebug!("homed task in sched check ****"); *id == sched_id } - Some(AnySched) => { false } - None => { rtabort!("task home of None") } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } } } } - - /// Check if this *task* has a home. - pub fn homed(&self) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(_)) => { true } - None => { - rtabort!("task home of None") - } - } - } - - /// On a special scheduler? - pub fn on_special() -> bool { - do Local::borrow:: |sched| { - !sched.run_anything - } - } - } impl Drop for Task { - fn drop(&self) { assert!(self.destroyed) } + fn drop(&self) { + rtdebug!("called drop for a task: %u", borrow::to_uint(self)); + assert!(self.destroyed) + } } // Coroutines represent nothing more than a context and a stack @@ -234,19 +324,33 @@ impl Coroutine { } } + pub fn empty() -> Coroutine { + Coroutine { + current_stack_segment: StackSegment::new(0), + saved_context: Context::empty() + } + } + fn build_start_wrapper(start: ~fn()) -> ~fn() { let start_cell = Cell::new(start); let wrapper: ~fn() = || { // First code after swap to this new context. Run our // cleanup job. unsafe { - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - let sched = Local::unsafe_borrow::(); - let task = (*sched).current_task.get_mut_ref(); + // Again - might work while safe, or it might not. + do Local::borrow:: |sched| { + (sched).run_cleanup_job(); + } - do task.run { + // To call the run method on a task we need a direct + // reference to it. The task is in TLS, so we can + // simply unsafe_borrow it to get this reference. We + // need to still have the task in TLS though, so we + // need to unsafe_borrow. + let task = Local::unsafe_borrow::(); + + do (*task).run { // N.B. Removing `start` from the start wrapper // closure by emptying a cell is critical for // correctness. The ~Task pointer, and in turn the @@ -262,16 +366,19 @@ impl Coroutine { }; } + // We remove the sched from the Task in TLS right now. let sched = Local::take::(); - sched.terminate_current_task(); + // ... allowing us to give it away when performing a + // scheduling operation. + sched.terminate_current_task() }; return wrapper; } /// Destroy coroutine and try to reuse stack segment. - pub fn recycle(~self, stack_pool: &mut StackPool) { + pub fn recycle(self, stack_pool: &mut StackPool) { match self { - ~Coroutine { current_stack_segment, _ } => { + Coroutine { current_stack_segment, _ } => { stack_pool.give_segment(current_stack_segment); } } @@ -465,3 +572,4 @@ mod test { } } } + diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index ec1094ed4f2..22eb42e2ee8 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -18,14 +18,12 @@ use iterator::Iterator; use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; use rt::sched::Scheduler; -use rt::local::Local; use unstable::run_in_bare_thread; use rt::thread::Thread; use rt::task::Task; use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; -use rt::task::{Sched}; use rt::comm::oneshot; use result::{Result, Ok, Err}; @@ -34,29 +32,37 @@ pub fn new_test_uv_sched() -> Scheduler { let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message sched.no_sleep = true; return sched; + } -/// Creates a new scheduler in a new thread and runs a task in it, -/// then waits for the scheduler to exit. Failure of the task -/// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - let mut task = ~Task::new_root(&mut sched.stack_pool, - f.take()); - rtdebug!("newsched_task: %x", ::borrow::to_uint(task)); - task.death.on_exit = Some(on_exit); - sched.enqueue_task(task); - sched.run(); + run_in_newsched_task_core(f.take()); } } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_uv_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, f); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + /// Create more than one scheduler and run a function in a task /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. @@ -65,7 +71,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { use from_str::FromStr; use rt::sched::Shutdown; - let f_cell = Cell::new(f); + let f = Cell::new(f); do run_in_bare_thread { let nthreads = match os::getenv("RUST_RT_TEST_THREADS") { @@ -95,7 +101,6 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { scheds.push(sched); } - let f_cell = Cell::new(f_cell.take()); let handles = Cell::new(handles); let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); @@ -107,18 +112,32 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f_cell.take()); + f.take()); main_task.death.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); let mut threads = ~[]; + let main_task = Cell::new(main_task); + + let main_thread = { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task.take()); + } + }; + threads.push(main_thread); while !scheds.is_empty() { - let sched = scheds.pop(); + let mut sched = scheds.pop(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("bootstrapping non-primary scheduler"); + }; + let bootstrap_task_cell = Cell::new(bootstrap_task); let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(bootstrap_task_cell.take()); }; threads.push(thread); @@ -134,187 +153,52 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("spawntask taking the scheduler from TLS"); - - - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - rtdebug!("new task pointer: %x", ::borrow::to_uint(task)); - - let sched = Local::take::(); - rtdebug!("spawntask scheduling the new task"); - sched.schedule_task(task); -} - - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_immediately(f: ~fn()) { - use super::sched::*; - - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + Scheduler::run_task(Task::build_child(f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - let mut sched = Local::take::(); - sched.enqueue_task(task); - Local::put(sched); + Scheduler::run_task_later(Task::build_child(f)); } -/// Spawn a task and either run it immediately or run it later pub fn spawntask_random(f: ~fn()) { - use super::sched::*; use rand::{Rand, rng}; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - - } - }; - - let mut sched = Local::take::(); - let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + spawntask(f) } else { - sched.enqueue_task(task); - Local::put(sched); + spawntask_later(f) } } -/// Spawn a task, with the current scheduler as home, and queue it to -/// run later. -pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { - use super::sched::*; - use rand::{rng, RngUtil}; - let mut rng = rng(); - - let task = { - let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - let handle = sched.make_handle(); - let home_id = handle.sched_id; - - // now that we know where this is going, build a new function - // that can assert it is in the right place - let af: ~fn() = || { - do Local::borrow::() |sched| { - rtdebug!("home_id: %u, runtime loc: %u", - home_id, - sched.sched_id()); - assert!(home_id == sched.sched_id()); - }; - f() - }; - - ~Task::new_root_homed(&mut sched.stack_pool, - Sched(handle), - af) - }; - let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - // enqueue it for future execution - dest_sched.enqueue_task(task); -} - -/// Spawn a task and wait for it to finish, returning whether it -/// completed successfully or failed -pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { - use cell::Cell; - use super::sched::*; - - let f = Cell::new(f); +pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let (port, chan) = oneshot(); let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow:: |_running_task| { - // I don't understand why using a child task here fails. I - // think the fail status is propogating back up the task - // tree and triggering a fail for the parent, which we - // aren't correctly expecting. - - // ~running_task.new_child(&mut (*sched).stack_pool, - ~Task::new_root(&mut (*sched).stack_pool, - f.take()) - } - }; + let mut new_task = Task::build_root(f); new_task.death.on_exit = Some(on_exit); - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_blocked_task(old_task); - } - - rtdebug!("enqueued the new task, now waiting on exit_status"); + Scheduler::run_task(new_task); let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } + } /// Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { - use rt::sched::*; let f = Cell::new(f); - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let task = Cell::new(task); - let thread = do Thread::start { - let mut sched = ~new_test_uv_sched(); - sched.enqueue_task(task.take()); - sched.run(); + run_in_newsched_task_core(f.take()); }; + return thread; } @@ -323,11 +207,14 @@ pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = blk(~Task::new_root(&mut sched.stack_pool, ||{})); - sched.enqueue_task(task); - sched.run(); + cleanup_task(task); } } +/// Use to cleanup tasks created for testing but not "run". +pub fn cleanup_task(mut task: ~Task) { + task.destroyed = true; +} /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index bc223d8f3f7..ae455a6ad04 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -17,7 +17,6 @@ use option::*; use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; -use rt::{context, TaskContext, SchedulerContext}; use rt::kill::BlockedTask; use rt::local::Local; use vec::OwnedVector; @@ -44,8 +43,6 @@ impl Tube { pub fn send(&mut self, val: T) { rtdebug!("tube send"); - assert!(context() == SchedulerContext); - unsafe { let state = self.p.unsafe_borrow_mut(); (*state).buf.push(val); @@ -61,8 +58,6 @@ impl Tube { } pub fn recv(&mut self) -> T { - assert!(context() == TaskContext); - unsafe { let state = self.p.unsafe_borrow_mut(); if !(*state).buf.is_empty() { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 638d510614a..fa5c497a877 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -51,7 +51,7 @@ use rt::io::net::ip::IpAddr; use rt::io::IoError; -#[cfg(test)] use unstable::run_in_bare_thread; +//#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; @@ -333,7 +333,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { return None; } } - +/* #[test] fn test_slice_to_uv_buf() { let slice = [0, .. 20]; @@ -360,3 +360,4 @@ fn loop_smoke_test() { loop_.close(); } } +*/ diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 53ccd20186d..5be19752152 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -33,7 +33,7 @@ use unstable::sync::Exclusive; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::{spawntask_immediately, +#[cfg(test)] use rt::test::{spawntask, next_test_ip4, run_in_newsched_task}; @@ -251,13 +251,11 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_, task| { rtdebug!("connect: entered scheduler context"); - assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -458,11 +456,9 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read @@ -500,7 +496,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -602,11 +597,9 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("recvfrom: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { @@ -637,7 +630,6 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -798,10 +790,8 @@ impl Drop for UvTimer { impl RtioTimer for UvTimer { fn sleep(&self, msecs: u64) { let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("sleep: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let mut watcher = **self; do watcher.start(msecs, 0) |_, status| { @@ -845,7 +835,7 @@ fn test_simple_tcp_server_and_client() { let addr = next_test_ip4(); // Start the server first so it's listening when we connect - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -860,7 +850,7 @@ fn test_simple_tcp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -876,7 +866,7 @@ fn test_simple_udp_server_and_client() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server_socket = (*io).udp_bind(server_addr).unwrap(); @@ -891,7 +881,7 @@ fn test_simple_udp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client_socket = (*io).udp_bind(client_addr).unwrap(); @@ -906,7 +896,7 @@ fn test_read_and_block() { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let io = unsafe { Local::unsafe_borrow::() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; let mut stream = listener.accept().unwrap(); @@ -939,7 +929,7 @@ fn test_read_and_block() { assert!(reads > 1); } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -959,7 +949,7 @@ fn test_read_read_read() { let addr = next_test_ip4(); static MAX: uint = 500000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -973,7 +963,7 @@ fn test_read_read_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -999,7 +989,7 @@ fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client = (*io).udp_bind(client_addr).unwrap(); @@ -1008,7 +998,7 @@ fn test_udp_twice() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server = (*io).udp_bind(server_addr).unwrap(); @@ -1036,7 +1026,7 @@ fn test_udp_many_read() { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut server_out = (*io).udp_bind(server_out_addr).unwrap(); @@ -1059,7 +1049,7 @@ fn test_udp_many_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut client_out = (*io).udp_bind(client_out_addr).unwrap(); diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index d0124407bd4..7a864ecb867 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -677,121 +677,190 @@ fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let (po, ch) = stream(); - let ch = SharedChan::new(ch); - do spawn_unlinked { - let ch = ch.clone(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let (po, ch) = stream(); + let ch = SharedChan::new(ch); do spawn_unlinked { - // Give middle task a chance to fail-but-not-kill-us. - do 16.times { task::yield(); } - ch.send(()); // If killed first, grandparent hangs. + let ch = ch.clone(); + do spawn_unlinked { + // Give middle task a chance to fail-but-not-kill-us. + do 16.times { task::yield(); } + ch.send(()); // If killed first, grandparent hangs. + } + fail!(); // Shouldn't kill either (grand)parent or (grand)child. } - fail!(); // Shouldn't kill either (grand)parent or (grand)child. + po.recv(); } - po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails - do spawn_unlinked { fail!(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + do spawn_unlinked { fail!(); } + } } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails - do spawn_supervised { fail!(); } - // Give child a chance to fail-but-not-kill-us. - do 16.times { task::yield(); } -} -#[test] #[should_fail] #[ignore(cfg(windows))] -fn test_spawn_unlinked_sup_fail_down() { - do spawn_supervised { block_forever(); } - fail!(); // Shouldn't leave a child hanging around. -} - -#[test] #[should_fail] #[ignore(cfg(windows))] -fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - // Unidirectional "parenting" shouldn't override bidirectional linked. - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - - do b0.spawn { - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + do spawn_supervised { fail!(); } + // Give child a chance to fail-but-not-kill-us. + do 16.times { task::yield(); } } - block_forever(); // We should get punted awake } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] +fn test_spawn_unlinked_sup_fail_down() { + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + do spawn_supervised { block_forever(); } + fail!(); // Shouldn't leave a child hanging around. + }; + assert!(result.is_err()); + } +} + +#[test] #[ignore(cfg(windows))] +fn test_spawn_linked_sup_fail_up() { // child fails; parent fails + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Unidirectional "parenting" shouldn't override bidirectional linked. + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; + + do b0.spawn { + fail!(); + } + block_forever(); // We should get punted awake + }; + assert!(result.is_err()); + } +} +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - do b0.spawn { block_forever(); } - fail!(); // *both* mechanisms would be wrong if this didn't kill the child + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; + do b0.spawn { block_forever(); } + fail!(); // *both* mechanisms would be wrong if this didn't kill the child + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - // Default options are to spawn linked & unsupervised. - do spawn { fail!(); } - block_forever(); // We should get punted awake + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Default options are to spawn linked & unsupervised. + do spawn { fail!(); } + block_forever(); // We should get punted awake + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails - // Default options are to spawn linked & unsupervised. - do spawn { block_forever(); } - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Default options are to spawn linked & unsupervised. + do spawn { block_forever(); } + fail!(); + }; + assert!(result.is_err()); + } } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails - // Make sure the above test is the same as this one. - let mut builder = task(); - builder.linked(); - do builder.spawn { block_forever(); } - fail!(); + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Make sure the above test is the same as this one. + let mut builder = task(); + builder.linked(); + do builder.spawn { block_forever(); } + fail!(); + }; + assert!(result.is_err()); + } } // A couple bonus linked failure tests - testing for failure propagation even // when the middle task exits successfully early before kill signals are sent. -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_grandchild() { - // Middle task exits; does grandparent's failure propagate across the gap? - do spawn_supervised { - do spawn_supervised { block_forever(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Middle task exits; does grandparent's failure propagate across the gap? + do spawn_supervised { + do spawn_supervised { block_forever(); } + } + do 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_secondborn() { - // First-born child exits; does parent's failure propagate to sibling? - do spawn_supervised { - do spawn { block_forever(); } // linked + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // First-born child exits; does parent's failure propagate to sibling? + do spawn_supervised { + do spawn { block_forever(); } // linked + } + do 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_failure_propagate_nephew_or_niece() { - // Our sibling exits; does our failure propagate to sibling's child? - do spawn { // linked - do spawn_supervised { block_forever(); } + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Our sibling exits; does our failure propagate to sibling's child? + do spawn { // linked + do spawn_supervised { block_forever(); } + } + do 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } -#[test] #[should_fail] #[ignore(cfg(windows))] +#[test] #[ignore(cfg(windows))] fn test_spawn_linked_sup_propagate_sibling() { - // Middle sibling exits - does eldest's failure propagate to youngest? - do spawn { // linked - do spawn { block_forever(); } // linked + use rt::test::run_in_newsched_task; + do run_in_newsched_task { + let result: Result<(),()> = do try { + // Middle sibling exits - does eldest's failure propagate to youngest? + do spawn { // linked + do spawn { block_forever(); } // linked + } + do 16.times { task::yield(); } + fail!(); + }; + assert!(result.is_err()); } - do 16.times { task::yield(); } - fail!(); } #[test] @@ -1149,11 +1218,15 @@ fn test_child_doesnt_ref_parent() { fn child_no(x: uint) -> ~fn() { return || { if x < generations { - task::spawn(child_no(x+1)); + let mut t = task(); + t.unwatched(); + t.spawn(child_no(x+1)); } } } - task::spawn(child_no(0)); + let mut t = task(); + t.unwatched(); + t.spawn(child_no(0)); } #[test] @@ -1167,9 +1240,9 @@ fn test_simple_newsched_spawn() { #[test] #[ignore(cfg(windows))] fn test_spawn_watched() { - use rt::test::{run_in_newsched_task, spawntask_try}; + use rt::test::run_in_newsched_task; do run_in_newsched_task { - let result = do spawntask_try { + let result = do try { let mut t = task(); t.unlinked(); t.watched(); @@ -1189,9 +1262,9 @@ fn test_spawn_watched() { #[test] #[ignore(cfg(windows))] fn test_indestructible() { - use rt::test::{run_in_newsched_task, spawntask_try}; + use rt::test::run_in_newsched_task; do run_in_newsched_task { - let result = do spawntask_try { + let result = do try { let mut t = task(); t.watched(); t.supervised(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 4558f8e32c1..88f214ef4c0 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -653,22 +653,16 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { match context() { - OldTaskContext => { - spawn_raw_oldsched(opts, f) - } - TaskContext => { - spawn_raw_newsched(opts, f) - } - SchedulerContext => { - fail!("can't spawn from scheduler context") - } - GlobalContext => { - fail!("can't spawn from global context") - } + OldTaskContext => spawn_raw_oldsched(opts, f), + TaskContext => spawn_raw_newsched(opts, f), + SchedulerContext => fail!("can't spawn from scheduler context"), + GlobalContext => fail!("can't spawn from global context"), } } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { + use rt::sched::*; + let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised)); let indestructible = opts.indestructible; @@ -700,19 +694,11 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("unsafe borrowed sched"); - - if opts.watched { - let child_wrapper = Cell::new(child_wrapper); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, child_wrapper.take()) - } - } else { - // An unwatched task is a new root in the exit-code propagation tree - ~Task::new_root(&mut (*sched).stack_pool, child_wrapper) - } + let mut task = if opts.watched { + Task::build_child(child_wrapper) + } else { + // An unwatched task is a new root in the exit-code propagation tree + Task::build_root(child_wrapper) }; if opts.notify_chan.is_some() { @@ -727,12 +713,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } task.name = opts.name.take(); + rtdebug!("spawn calling run_task"); + Scheduler::run_task(task); - rtdebug!("spawn about to take scheduler"); - - let sched = Local::take::(); - rtdebug!("took sched in spawn"); - sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 7a5e1116c32..74604b4ea17 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -70,9 +70,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow:: |task| { - rtdebug!("task pointer: %x, heap pointer: %x", - ::borrow::to_uint(task), - ::borrow::to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 450e81bee33..a36be079b43 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -// xfail-win32 +// xfail-test extern mod extra; use std::comm::*;