From 3b30377e14f60e6381dc1536bd53b5f9c7a3d7c7 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Oct 2013 14:59:21 -0700 Subject: [PATCH 1/3] Fix a bug with the scheduler and destructor order The PausibleIdleCallback must have some handle into the event loop, and because struct destructors are run in order of top-to-bottom in order of fields, this meant that the event loop was getting destroyed before the idle callback was getting destroyed. I can't confirm that this fixes a problem in how we use libuv, but it does semantically fix a problem for usage with other event loops. --- src/libstd/rt/sched.rs | 12 ++++- src/test/run-pass/field-destruction-order.rs | 52 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 src/test/run-pass/field-destruction-order.rs diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 6e661884616..1a6529dab18 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -85,7 +85,17 @@ pub struct Scheduler { priv yield_check_count: uint, /// A flag to tell the scheduler loop it needs to do some stealing /// in order to introduce randomness as part of a yield - priv steal_for_yield: bool + priv steal_for_yield: bool, + + // n.b. currently destructors of an object are run in top-to-bottom in order + // of field declaration. Due to its nature, the pausible idle callback + // must have some sort of handle to the event loop, so it needs to get + // destroyed before the event loop itself. For this reason, we destroy + // the event loop last to ensure that any unsafe references to it are + // destroyed before it's actually destroyed. + + /// The event loop used to drive the scheduler and perform I/O + event_loop: ~EventLoopObject, } /// An indication of how hard to work on a given operation, the difference diff --git a/src/test/run-pass/field-destruction-order.rs b/src/test/run-pass/field-destruction-order.rs new file mode 100644 index 00000000000..1d4c08f0bb5 --- /dev/null +++ b/src/test/run-pass/field-destruction-order.rs @@ -0,0 +1,52 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// In theory, it doesn't matter what order destructors are run in for rust +// because we have explicit ownership of values meaning that there's no need to +// run one before another. With unsafe code, however, there may be a safe +// interface which relies on fields having their destructors run in a particular +// order. At the time of this writing, std::rt::sched::Scheduler is an example +// of a structure which contains unsafe handles to FFI-like types, and the +// destruction order of the fields matters in the sense that some handles need +// to get destroyed before others. +// +// In C++, destruction order happens bottom-to-top in order of field +// declarations, but we currently run them top-to-bottom. I don't think the +// order really matters that much as long as we define what it is. + +struct A; +struct B; +struct C { + a: A, + b: B, +} + +static mut hit: bool = false; + +impl Drop for A { + fn drop(&mut self) { + unsafe { + assert!(!hit); + hit = true; + } + } +} + +impl Drop for B { + fn drop(&mut self) { + unsafe { + assert!(hit); + } + } +} + +pub fn main() { + let _c = C { a: A, b: B }; +} From 3ee5ef12fb71be95d6e7f679900a497a2580d25e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 24 Oct 2013 11:30:35 -0700 Subject: [PATCH 2/3] Remove the 'callback_ms' function from EventLoop This is a peculiar function to require event loops to implement, and it's only used in one spot during tests right now. Instead, a possibly more robust apis for timers should be used rather than requiring all event loops to implement a curious-looking function. --- src/libstd/rt/rtio.rs | 1 - src/libstd/rt/sched.rs | 13 +++---------- src/libstd/rt/uv/uvio.rs | 9 --------- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 66a0676a2f4..29f728a5e0c 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -28,7 +28,6 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; - fn callback_ms(&mut self, ms: u64, ~fn()); fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback; /// The asynchronous I/O services. Not all event loops may provide one diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 1a6529dab18..0e993a3564f 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -1147,22 +1147,15 @@ mod test { #[test] fn test_io_callback() { + use rt::io::timer; + // This is a regression test that when there are no schedulable tasks // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { do spawntask { - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |sched, task| { - let task = Cell::new(task); - do sched.event_loop.callback_ms(10) { - rtdebug!("in callback"); - let mut sched: ~Scheduler = Local::take(); - sched.enqueue_blocked_task(task.take()); - Local::put(sched); - } - } + timer::sleep(10); } } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 29370c484eb..eee89365fb5 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -222,15 +222,6 @@ impl EventLoop for UvEventLoop { } as ~PausibleIdleCallback } - fn callback_ms(&mut self, ms: u64, f: ~fn()) { - let mut timer = TimerWatcher::new(self.uvio.uv_loop()); - do timer.start(ms, 0) |timer, status| { - assert!(status.is_none()); - timer.close(||()); - f(); - } - } - fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback } From 64a5c3bc1ee869990f8205374f9dac837a475dbd Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Oct 2013 15:00:37 -0700 Subject: [PATCH 3/3] Implement a basic event loop built on LittleLock It's not guaranteed that there will always be an event loop to run, and this implementation will serve as an incredibly basic one which does not provide any I/O, but allows the scheduler to still run. cc #9128 --- src/libextra/comm.rs | 4 +- src/libstd/rt/basic.rs | 256 ++++++++++++++++++++++++++++++++++++ src/libstd/rt/io/mod.rs | 7 + src/libstd/rt/mod.rs | 3 + src/libstd/rt/sched.rs | 16 +-- src/libstd/rt/task.rs | 4 +- src/libstd/rt/test.rs | 44 ++++++- src/libstd/select.rs | 20 +-- src/libstd/task/mod.rs | 84 ++++++------ src/libstd/unstable/sync.rs | 47 +++++++ src/rt/rust_builtin.cpp | 10 ++ src/rt/rustrt.def.in | 2 + 12 files changed, 429 insertions(+), 68 deletions(-) create mode 100644 src/libstd/rt/basic.rs diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs index 4a3801827a2..5cc5c140fd5 100644 --- a/src/libextra/comm.rs +++ b/src/libextra/comm.rs @@ -136,7 +136,7 @@ pub fn rendezvous() -> (SyncPort, SyncChan) { #[cfg(test)] mod test { use comm::{DuplexStream, rendezvous}; - use std::rt::test::run_in_newsched_task; + use std::rt::test::run_in_uv_task; use std::task::spawn_unlinked; @@ -165,7 +165,7 @@ mod test { #[test] fn recv_a_lot() { // Rendezvous streams should be able to handle any number of messages being sent - do run_in_newsched_task { + do run_in_uv_task { let (port, chan) = rendezvous(); do spawn { do 1000000.times { chan.send(()) } diff --git a/src/libstd/rt/basic.rs b/src/libstd/rt/basic.rs new file mode 100644 index 00000000000..86d3f8a52ba --- /dev/null +++ b/src/libstd/rt/basic.rs @@ -0,0 +1,256 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! This is a basic event loop implementation not meant for any "real purposes" +//! other than testing the scheduler and proving that it's possible to have a +//! pluggable event loop. + +use prelude::*; + +use cast; +use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback}; +use unstable::sync::Exclusive; +use util; + +/// This is the only exported function from this module. +pub fn event_loop() -> ~EventLoop { + ~BasicLoop::new() as ~EventLoop +} + +struct BasicLoop { + work: ~[~fn()], // pending work + idle: Option<*BasicPausible>, // only one is allowed + remotes: ~[(uint, ~fn())], + next_remote: uint, + messages: Exclusive<~[Message]> +} + +enum Message { RunRemote(uint), RemoveRemote(uint) } + +struct Time { + sec: u64, + nsec: u64, +} + +impl Ord for Time { + fn lt(&self, other: &Time) -> bool { + self.sec < other.sec || self.nsec < other.nsec + } +} + +impl BasicLoop { + fn new() -> BasicLoop { + BasicLoop { + work: ~[], + idle: None, + next_remote: 0, + remotes: ~[], + messages: Exclusive::new(~[]), + } + } + + /// Process everything in the work queue (continually) + fn work(&mut self) { + while self.work.len() > 0 { + for work in util::replace(&mut self.work, ~[]).move_iter() { + work(); + } + } + } + + fn remote_work(&mut self) { + let messages = unsafe { + do self.messages.with |messages| { + if messages.len() > 0 { + Some(util::replace(messages, ~[])) + } else { + None + } + } + }; + let messages = match messages { + Some(m) => m, None => return + }; + for message in messages.iter() { + self.message(*message); + } + } + + fn message(&mut self, message: Message) { + match message { + RunRemote(i) => { + match self.remotes.iter().find(|& &(id, _)| id == i) { + Some(&(_, ref f)) => (*f)(), + None => unreachable!() + } + } + RemoveRemote(i) => { + match self.remotes.iter().position(|&(id, _)| id == i) { + Some(i) => { self.remotes.remove(i); } + None => unreachable!() + } + } + } + } + + /// Run the idle callback if one is registered + fn idle(&mut self) { + unsafe { + match self.idle { + Some(idle) => { + if (*idle).active { + (*(*idle).work.get_ref())(); + } + } + None => {} + } + } + } + + fn has_idle(&self) -> bool { + unsafe { self.idle.is_some() && (**self.idle.get_ref()).active } + } +} + +impl EventLoop for BasicLoop { + fn run(&mut self) { + // Not exactly efficient, but it gets the job done. + while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() { + + self.work(); + self.remote_work(); + + if self.has_idle() { + self.idle(); + continue + } + + unsafe { + // We block here if we have no messages to process and we may + // receive a message at a later date + do self.messages.hold_and_wait |messages| { + self.remotes.len() > 0 && + messages.len() == 0 && + self.work.len() == 0 + } + } + } + } + + fn callback(&mut self, f: ~fn()) { + self.work.push(f); + } + + // XXX: Seems like a really weird requirement to have an event loop provide. + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { + let callback = ~BasicPausible::new(self); + rtassert!(self.idle.is_none()); + unsafe { + let cb_ptr: &*BasicPausible = cast::transmute(&callback); + self.idle = Some(*cb_ptr); + } + return callback as ~PausibleIdleCallback; + } + + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { + let id = self.next_remote; + self.next_remote += 1; + self.remotes.push((id, f)); + ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback + } + + /// This has no bindings for local I/O + fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {} +} + +struct BasicRemote { + queue: Exclusive<~[Message]>, + id: uint, +} + +impl BasicRemote { + fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote { + BasicRemote { queue: queue, id: id } + } +} + +impl RemoteCallback for BasicRemote { + fn fire(&mut self) { + unsafe { + do self.queue.hold_and_signal |queue| { + queue.push(RunRemote(self.id)); + } + } + } +} + +impl Drop for BasicRemote { + fn drop(&mut self) { + unsafe { + do self.queue.hold_and_signal |queue| { + queue.push(RemoveRemote(self.id)); + } + } + } +} + +struct BasicPausible { + eloop: *mut BasicLoop, + work: Option<~fn()>, + active: bool, +} + +impl BasicPausible { + fn new(eloop: &mut BasicLoop) -> BasicPausible { + BasicPausible { + active: false, + work: None, + eloop: eloop, + } + } +} + +impl PausibleIdleCallback for BasicPausible { + fn start(&mut self, f: ~fn()) { + rtassert!(!self.active && self.work.is_none()); + self.active = true; + self.work = Some(f); + } + fn pause(&mut self) { + self.active = false; + } + fn resume(&mut self) { + self.active = true; + } + fn close(&mut self) { + self.active = false; + self.work = None; + } +} + +impl Drop for BasicPausible { + fn drop(&mut self) { + unsafe { + (*self.eloop).idle = None; + } + } +} + +fn time() -> Time { + #[fixed_stack_segment]; #[inline(never)]; + extern { + fn get_time(sec: &mut i64, nsec: &mut i32); + } + let mut sec = 0; + let mut nsec = 0; + unsafe { get_time(&mut sec, &mut nsec) } + + Time { sec: sec as u64, nsec: nsec as u64 } +} diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 758c9779165..decf801d592 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError { detail: None } } + IoUnavailable => { + IoError { + kind: IoUnavailable, + desc: "I/O is unavailable", + detail: None + } + } _ => fail!() } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 66d7a6bf488..5113c28aa08 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -102,6 +102,9 @@ pub mod shouldnt_be_public { // Internal macros used by the runtime. mod macros; +/// Basic implementation of an EventLoop, provides no I/O interfaces +mod basic; + /// The global (exchange) heap. pub mod global_heap; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 0e993a3564f..b008a8a74f2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -62,8 +62,6 @@ pub struct Scheduler { /// no longer try to go to sleep, but exit instead. no_sleep: bool, stack_pool: StackPool, - /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoop, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. priv sched_task: Option<~Task>, @@ -95,7 +93,7 @@ pub struct Scheduler { // destroyed before it's actually destroyed. /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoopObject, + event_loop: ~EventLoop, } /// An indication of how hard to work on a given operation, the difference @@ -915,7 +913,7 @@ mod test { use cell::Cell; use rt::thread::Thread; use rt::task::{Task, Sched}; - use rt::rtio::EventLoop; + use rt::basic; use rt::util; use option::{Some}; @@ -1015,7 +1013,6 @@ mod test { #[test] fn test_schedule_home_states() { - use rt::uv::uvio::UvEventLoop; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; use rt::sched::Shutdown; @@ -1031,7 +1028,7 @@ mod test { // Our normal scheduler let mut normal_sched = ~Scheduler::new( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), normal_queue, queues.clone(), sleepers.clone()); @@ -1042,7 +1039,7 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), special_queue.clone(), queues.clone(), sleepers.clone(), @@ -1153,7 +1150,7 @@ mod test { // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue - do run_in_newsched_task { + do run_in_uv_task { do spawntask { timer::sleep(10); } @@ -1195,7 +1192,6 @@ mod test { use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; use rt::stack::StackPool; - use rt::uv::uvio::UvEventLoop; use rt::sched::{Shutdown, TaskFromFriend}; use util; @@ -1206,7 +1202,7 @@ mod test { let queues = ~[queue.clone()]; let mut sched = ~Scheduler::new( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), queue, queues.clone(), sleepers.clone()); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 1ea68bb52d7..7bf124ad312 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -637,7 +637,7 @@ mod test { #[test] fn rng() { - do run_in_newsched_task() { + do run_in_uv_task() { use rand::{rng, Rng}; let mut r = rng(); let _ = r.next_u32(); @@ -646,7 +646,7 @@ mod test { #[test] fn logging() { - do run_in_newsched_task() { + do run_in_uv_task() { info!("here i am. logging in a newsched task"); } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c238b1dfba1..e4bbfe0a5a3 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -21,6 +21,7 @@ use iter::{Iterator, range}; use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec::{OwnedVector, MutableVector, ImmutableVector}; use path::GenericPath; +use rt::basic; use rt::sched::Scheduler; use rt::rtio::EventLoop; use unstable::{run_in_bare_thread}; @@ -48,6 +49,28 @@ pub fn new_test_uv_sched() -> Scheduler { } +pub fn new_test_sched() -> Scheduler { + + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + + let mut sched = Scheduler::new(basic::event_loop(), + queue, + queues, + SleeperList::new()); + + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; +} + +pub fn run_in_uv_task(f: ~fn()) { + let f = Cell::new(f); + do run_in_bare_thread { + run_in_uv_task_core(f.take()); + } +} + pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); do run_in_bare_thread { @@ -55,7 +78,7 @@ pub fn run_in_newsched_task(f: ~fn()) { } } -pub fn run_in_newsched_task_core(f: ~fn()) { +pub fn run_in_uv_task_core(f: ~fn()) { use rt::sched::Shutdown; @@ -72,6 +95,23 @@ pub fn run_in_newsched_task_core(f: ~fn()) { sched.bootstrap(task); } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + #[cfg(target_os="macos")] #[allow(non_camel_case_types)] mod darwin_fd_limit { @@ -310,7 +350,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { /// Get a ~Task for testing purposes other than actually scheduling it. pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_sched(); let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{})); cleanup_task(task); } diff --git a/src/libstd/select.rs b/src/libstd/select.rs index 62a09984794..75b09187f04 100644 --- a/src/libstd/select.rs +++ b/src/libstd/select.rs @@ -183,7 +183,7 @@ mod test { #[test] fn select_one() { - do run_in_newsched_task { select_helper(1, [0]) } + do run_in_uv_task { select_helper(1, [0]) } } #[test] @@ -191,14 +191,14 @@ mod test { // NB. I would like to have a test that tests the first one that is // ready is the one that's returned, but that can't be reliably tested // with the randomized behaviour of optimistic_check. - do run_in_newsched_task { select_helper(2, [1]) } - do run_in_newsched_task { select_helper(2, [0]) } - do run_in_newsched_task { select_helper(2, [1,0]) } + do run_in_uv_task { select_helper(2, [1]) } + do run_in_uv_task { select_helper(2, [0]) } + do run_in_uv_task { select_helper(2, [1,0]) } } #[test] fn select_a_lot() { - do run_in_newsched_task { select_helper(12, [7,8,9]) } + do run_in_uv_task { select_helper(12, [7,8,9]) } } #[test] @@ -208,7 +208,7 @@ mod test { // Sends 10 buffered packets, and uses select to retrieve them all. // Puts the port in a different spot in the vector each time. - do run_in_newsched_task { + do run_in_uv_task { let (ports, _) = unzip(range(0u, 10).map(|_| stream::())); let (port, chan) = stream(); do 10.times { chan.send(31337); } @@ -229,7 +229,7 @@ mod test { #[test] fn select_unkillable() { - do run_in_newsched_task { + do run_in_uv_task { do task::unkillable { select_helper(2, [1]) } } } @@ -242,7 +242,7 @@ mod test { select_blocking_helper(false); fn select_blocking_helper(killable: bool) { - do run_in_newsched_task { + do run_in_uv_task { let (p1,_c) = oneshot(); let (p2,c2) = oneshot(); let mut ports = [p1,p2]; @@ -287,7 +287,7 @@ mod test { fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { use rt::test::spawntask_random; - do run_in_newsched_task { + do run_in_uv_task { // A bit of stress, since ordinarily this is just smoke and mirrors. do 4.times { let send_on_chans = send_on_chans.clone(); @@ -318,7 +318,7 @@ mod test { #[test] fn select_killed() { - do run_in_newsched_task { + do run_in_uv_task { let (success_p, success_c) = oneshot::(); let success_c = Cell::new(success_c); do task::try { diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 30c99c62885..b72d6773ec5 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -645,7 +645,7 @@ fn test_kill_unkillable_task() { // CPU, *after* the spawner is already switched-back-to (and passes the // killed check at the start of its timeslice). As far as I know, it's not // possible to make this race deterministic, or even more likely to happen. - do run_in_newsched_task { + do run_in_uv_task { do task::try { do task::spawn { fail!(); @@ -662,7 +662,7 @@ fn test_kill_rekillable_task() { // Tests that when a kill signal is received, 'rekillable' and // 'unkillable' unwind correctly in conjunction with each other. - do run_in_newsched_task { + do run_in_uv_task { do task::try { do task::unkillable { do task::rekillable { @@ -730,8 +730,8 @@ fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } #[ignore(reason = "linked failure")] #[test] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let (po, ch) = stream(); let ch = SharedChan::new(ch); do spawn_unlinked { @@ -749,16 +749,16 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port #[ignore(reason = "linked failure")] #[test] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { do spawn_unlinked { fail!(); } } } #[ignore(reason = "linked failure")] #[test] fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { do spawn_supervised { fail!(); } // Give child a chance to fail-but-not-kill-us. do 16.times { task::deschedule(); } @@ -767,8 +767,8 @@ fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_unlinked_sup_fail_down() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { do spawn_supervised { block_forever(); } fail!(); // Shouldn't leave a child hanging around. @@ -780,8 +780,8 @@ fn test_spawn_unlinked_sup_fail_down() { #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because @@ -801,8 +801,8 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). @@ -818,8 +818,8 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Default options are to spawn linked & unsupervised. do spawn { fail!(); } @@ -831,8 +831,8 @@ fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Default options are to spawn linked & unsupervised. do spawn { block_forever(); } @@ -844,8 +844,8 @@ fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Make sure the above test is the same as this one. let mut builder = task(); @@ -863,8 +863,8 @@ fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails #[ignore(reason = "linked failure")] #[test] fn test_spawn_failure_propagate_grandchild() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Middle task exits; does grandparent's failure propagate across the gap? do spawn_supervised { @@ -880,8 +880,8 @@ fn test_spawn_failure_propagate_grandchild() { #[ignore(reason = "linked failure")] #[test] fn test_spawn_failure_propagate_secondborn() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // First-born child exits; does parent's failure propagate to sibling? do spawn_supervised { @@ -897,8 +897,8 @@ fn test_spawn_failure_propagate_secondborn() { #[ignore(reason = "linked failure")] #[test] fn test_spawn_failure_propagate_nephew_or_niece() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Our sibling exits; does our failure propagate to sibling's child? do spawn { // linked @@ -914,8 +914,8 @@ fn test_spawn_failure_propagate_nephew_or_niece() { #[ignore(reason = "linked failure")] #[test] fn test_spawn_linked_sup_propagate_sibling() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result: Result<(),()> = do try { // Middle sibling exits - does eldest's failure propagate to youngest? do spawn { // linked @@ -930,9 +930,9 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_unnamed_task() { - use rt::test::run_in_newsched_task; + use rt::test::run_in_uv_task; - do run_in_newsched_task { + do run_in_uv_task { do spawn { do with_task_name |name| { assert!(name.is_none()); @@ -943,9 +943,9 @@ fn test_unnamed_task() { #[test] fn test_owned_named_task() { - use rt::test::run_in_newsched_task; + use rt::test::run_in_uv_task; - do run_in_newsched_task { + do run_in_uv_task { let mut t = task(); t.name(~"ada lovelace"); do t.spawn { @@ -958,9 +958,9 @@ fn test_owned_named_task() { #[test] fn test_static_named_task() { - use rt::test::run_in_newsched_task; + use rt::test::run_in_uv_task; - do run_in_newsched_task { + do run_in_uv_task { let mut t = task(); t.name("ada lovelace"); do t.spawn { @@ -973,9 +973,9 @@ fn test_static_named_task() { #[test] fn test_send_named_task() { - use rt::test::run_in_newsched_task; + use rt::test::run_in_uv_task; - do run_in_newsched_task { + do run_in_uv_task { let mut t = task(); t.name("ada lovelace".into_send_str()); do t.spawn { @@ -1326,9 +1326,9 @@ fn test_child_doesnt_ref_parent() { #[test] fn test_simple_newsched_spawn() { - use rt::test::run_in_newsched_task; + use rt::test::run_in_uv_task; - do run_in_newsched_task { + do run_in_uv_task { spawn(||()) } } @@ -1336,8 +1336,8 @@ fn test_simple_newsched_spawn() { #[ignore(reason = "linked failure")] #[test] fn test_spawn_watched() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result = do try { let mut t = task(); t.unlinked(); @@ -1359,8 +1359,8 @@ fn test_spawn_watched() { #[ignore(reason = "linked failure")] #[test] fn test_indestructible() { - use rt::test::run_in_newsched_task; - do run_in_newsched_task { + use rt::test::run_in_uv_task; + do run_in_uv_task { let result = do try { let mut t = task(); t.watched(); diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 9d15dd031e0..2b036c318ba 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -334,6 +334,23 @@ impl LittleLock { } } } + + pub unsafe fn signal(&self) { + rust_signal_little_lock(self.l); + } + + pub unsafe fn lock_and_wait(&self, f: &fn() -> bool) { + do atomically { + rust_lock_little_lock(self.l); + do (|| { + if f() { + rust_wait_little_lock(self.l); + } + }).finally { + rust_unlock_little_lock(self.l); + } + } + } } struct ExData { @@ -402,6 +419,34 @@ impl Exclusive { } } + #[inline] + pub unsafe fn hold_and_signal(&self, f: &fn(x: &mut T)) { + let rec = self.x.get(); + do (*rec).lock.lock { + if (*rec).failed { + fail!("Poisoned Exclusive::new - another task failed inside!"); + } + (*rec).failed = true; + f(&mut (*rec).data); + (*rec).failed = false; + (*rec).lock.signal(); + } + } + + #[inline] + pub unsafe fn hold_and_wait(&self, f: &fn(x: &T) -> bool) { + let rec = self.x.get(); + do (*rec).lock.lock_and_wait { + if (*rec).failed { + fail!("Poisoned Exclusive::new - another task failed inside!"); + } + (*rec).failed = true; + let result = f(&(*rec).data); + (*rec).failed = false; + result + } + } + pub fn unwrap(self) -> T { let Exclusive { x: x } = self; // Someday we might need to unkillably unwrap an Exclusive, but not today. @@ -415,6 +460,8 @@ externfn!(fn rust_create_little_lock() -> rust_little_lock) externfn!(fn rust_destroy_little_lock(lock: rust_little_lock)) externfn!(fn rust_lock_little_lock(lock: rust_little_lock)) externfn!(fn rust_unlock_little_lock(lock: rust_little_lock)) +externfn!(fn rust_signal_little_lock(lock: rust_little_lock)) +externfn!(fn rust_wait_little_lock(lock: rust_little_lock)) #[cfg(test)] mod tests { diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 77020537661..a8eec52943e 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -377,6 +377,16 @@ rust_unlock_little_lock(lock_and_signal *lock) { lock->unlock(); } +extern "C" void +rust_wait_little_lock(lock_and_signal *lock) { + lock->wait(); +} + +extern "C" void +rust_signal_little_lock(lock_and_signal *lock) { + lock->signal(); +} + typedef void(startfn)(void*, void*); class raw_thread: public rust_thread { diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 269da8e7882..06f4c0006f1 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -128,6 +128,8 @@ rust_create_little_lock rust_destroy_little_lock rust_lock_little_lock rust_unlock_little_lock +rust_signal_little_lock +rust_wait_little_lock tdefl_compress_mem_to_heap tinfl_decompress_mem_to_heap rust_uv_ip4_port