auto merge of #8411 : bblum/rust/assorted-fixes, r=brson
Each commit is pretty much what it says on the tin. r anybody.
This commit is contained in:
commit
f02cc6bf0a
@ -13,6 +13,7 @@
|
||||
#[missing_doc];
|
||||
|
||||
use cast::transmute_mut;
|
||||
use unstable::finally::Finally;
|
||||
use prelude::*;
|
||||
|
||||
/*
|
||||
@ -65,18 +66,17 @@ pub fn is_empty(&self) -> bool {
|
||||
|
||||
/// Calls a closure with a reference to the value.
|
||||
pub fn with_ref<R>(&self, op: &fn(v: &T) -> R) -> R {
|
||||
let v = self.take();
|
||||
let r = op(&v);
|
||||
self.put_back(v);
|
||||
r
|
||||
do self.with_mut_ref |ptr| { op(ptr) }
|
||||
}
|
||||
|
||||
/// Calls a closure with a mutable reference to the value.
|
||||
pub fn with_mut_ref<R>(&self, op: &fn(v: &mut T) -> R) -> R {
|
||||
let mut v = self.take();
|
||||
let r = op(&mut v);
|
||||
self.put_back(v);
|
||||
r
|
||||
let mut v = Some(self.take());
|
||||
do (|| {
|
||||
op(v.get_mut_ref())
|
||||
}).finally {
|
||||
self.put_back(v.take_unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,8 @@
|
||||
use rt;
|
||||
use rt::sched::Scheduler;
|
||||
use rt::local::Local;
|
||||
use rt::select::{Select, SelectPort};
|
||||
use rt::select::{SelectInner, SelectPortInner};
|
||||
use select::{Select, SelectPort};
|
||||
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
|
||||
use unstable::sync::UnsafeAtomicRcBox;
|
||||
use util::Void;
|
||||
@ -113,7 +114,9 @@ pub fn try_send_deferred(self, val: T) -> bool {
|
||||
// 'do_resched' configures whether the scheduler immediately switches to
|
||||
// the receiving task, or leaves the sending task still running.
|
||||
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
|
||||
rtassert!(!rt::in_sched_context());
|
||||
if do_resched {
|
||||
rtassert!(!rt::in_sched_context());
|
||||
}
|
||||
|
||||
let mut this = self;
|
||||
let mut recvr_active = true;
|
||||
@ -215,7 +218,7 @@ pub fn try_recv(self) -> Option<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Select for PortOne<T> {
|
||||
impl<T> SelectInner for PortOne<T> {
|
||||
#[inline] #[cfg(not(test))]
|
||||
fn optimistic_check(&mut self) -> bool {
|
||||
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
|
||||
@ -318,7 +321,9 @@ fn unblock_from(&mut self) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SelectPort<T> for PortOne<T> {
|
||||
impl<T> Select for PortOne<T> { }
|
||||
|
||||
impl<T> SelectPortInner<T> for PortOne<T> {
|
||||
fn recv_ready(self) -> Option<T> {
|
||||
let mut this = self;
|
||||
let packet = this.packet();
|
||||
@ -349,6 +354,8 @@ fn recv_ready(self) -> Option<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SelectPort<T> for PortOne<T> { }
|
||||
|
||||
impl<T> Peekable<T> for PortOne<T> {
|
||||
fn peek(&self) -> bool {
|
||||
unsafe {
|
||||
@ -513,7 +520,7 @@ fn peek(&self) -> bool {
|
||||
// of them, but a &Port<T> should also be selectable so you can select2 on it
|
||||
// alongside a PortOne<U> without passing the port by value in recv_ready.
|
||||
|
||||
impl<'self, T> Select for &'self Port<T> {
|
||||
impl<'self, T> SelectInner for &'self Port<T> {
|
||||
#[inline]
|
||||
fn optimistic_check(&mut self) -> bool {
|
||||
do self.next.with_mut_ref |pone| { pone.optimistic_check() }
|
||||
@ -531,7 +538,9 @@ fn unblock_from(&mut self) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Select for Port<T> {
|
||||
impl<'self, T> Select for &'self Port<T> { }
|
||||
|
||||
impl<T> SelectInner for Port<T> {
|
||||
#[inline]
|
||||
fn optimistic_check(&mut self) -> bool {
|
||||
(&*self).optimistic_check()
|
||||
@ -548,7 +557,9 @@ fn unblock_from(&mut self) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'self, T> SelectPort<T> for &'self Port<T> {
|
||||
impl<T> Select for Port<T> { }
|
||||
|
||||
impl<'self, T> SelectPortInner<T> for &'self Port<T> {
|
||||
fn recv_ready(self) -> Option<T> {
|
||||
match self.next.take().recv_ready() {
|
||||
Some(StreamPayload { val, next }) => {
|
||||
@ -560,6 +571,8 @@ fn recv_ready(self) -> Option<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'self, T> SelectPort<T> for &'self Port<T> { }
|
||||
|
||||
pub struct SharedChan<T> {
|
||||
// Just like Chan, but a shared AtomicOption instead of Cell
|
||||
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
|
||||
|
@ -488,8 +488,8 @@ pub fn collect_failure(&mut self, mut success: bool, group: Option<Taskgroup>) {
|
||||
rtassert!(self.unkillable == 0);
|
||||
self.unkillable = 1;
|
||||
|
||||
// FIXME(#7544): See corresponding fixme at the callsite in task.rs.
|
||||
// NB(#8192): Doesn't work with "let _ = ..."
|
||||
// NB. See corresponding comment at the callsite in task.rs.
|
||||
// FIXME(#8192): Doesn't work with "let _ = ..."
|
||||
{ use util; util::ignore(group); }
|
||||
|
||||
// Step 1. Decide if we need to collect child failures synchronously.
|
||||
|
@ -142,8 +142,7 @@
|
||||
/// Simple reimplementation of core::comm
|
||||
pub mod comm;
|
||||
|
||||
/// Routines for select()ing on pipes.
|
||||
pub mod select;
|
||||
mod select;
|
||||
|
||||
// FIXME #5248 shouldn't be pub
|
||||
/// The runtime needs to be able to put a pointer into thread-local storage.
|
||||
|
@ -8,14 +8,13 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use option::*;
|
||||
// use either::{Either, Left, Right};
|
||||
//! Module for private, abstraction-leaking select traits. Wrapped in std::select.
|
||||
|
||||
use rt::kill::BlockedTask;
|
||||
use rt::sched::Scheduler;
|
||||
use rt::local::Local;
|
||||
use option::Option;
|
||||
|
||||
/// Trait for message-passing primitives that can be select()ed on.
|
||||
pub trait Select {
|
||||
pub trait SelectInner {
|
||||
// Returns true if data was available.
|
||||
fn optimistic_check(&mut self) -> bool;
|
||||
// Returns true if data was available. If so, shall also wake() the task.
|
||||
@ -24,305 +23,6 @@ pub trait Select {
|
||||
fn unblock_from(&mut self) -> bool;
|
||||
}
|
||||
|
||||
/// Trait for message-passing primitives that can use the select2() convenience wrapper.
|
||||
// (This is separate from the above trait to enable heterogeneous lists of ports
|
||||
// that implement Select on different types to use select().)
|
||||
pub trait SelectPort<T> : Select {
|
||||
pub trait SelectPortInner<T> {
|
||||
fn recv_ready(self) -> Option<T>;
|
||||
}
|
||||
|
||||
/// Receive a message from any one of many ports at once.
|
||||
pub fn select<A: Select>(ports: &mut [A]) -> uint {
|
||||
if ports.is_empty() {
|
||||
fail!("can't select on an empty list");
|
||||
}
|
||||
|
||||
for (index, port) in ports.mut_iter().enumerate() {
|
||||
if port.optimistic_check() {
|
||||
return index;
|
||||
}
|
||||
}
|
||||
|
||||
// If one of the ports already contains data when we go to block on it, we
|
||||
// don't bother enqueueing on the rest of them, so we shouldn't bother
|
||||
// unblocking from it either. This is just for efficiency, not correctness.
|
||||
// (If not, we need to unblock from all of them. Length is a placeholder.)
|
||||
let mut ready_index = ports.len();
|
||||
|
||||
let sched = Local::take::<Scheduler>();
|
||||
do sched.deschedule_running_task_and_then |sched, task| {
|
||||
let task_handles = task.make_selectable(ports.len());
|
||||
|
||||
for (index, (port, task_handle)) in
|
||||
ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
|
||||
// If one of the ports has data by now, it will wake the handle.
|
||||
if port.block_on(sched, task_handle) {
|
||||
ready_index = index;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Task resumes. Now unblock ourselves from all the ports we blocked on.
|
||||
// If the success index wasn't reset, 'take' will just take all of them.
|
||||
// Iterate in reverse so the 'earliest' index that's ready gets returned.
|
||||
for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
|
||||
if port.unblock_from() {
|
||||
ready_index = index;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(ready_index < ports.len());
|
||||
return ready_index;
|
||||
}
|
||||
|
||||
/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
|
||||
|
||||
impl <'self> Select for &'self mut Select {
|
||||
fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
|
||||
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
|
||||
self.block_on(sched, task)
|
||||
}
|
||||
fn unblock_from(&mut self) -> bool { self.unblock_from() }
|
||||
}
|
||||
|
||||
pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
|
||||
-> Either<(Option<TA>, B), (A, Option<TB>)> {
|
||||
let result = {
|
||||
let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
|
||||
select(ports)
|
||||
};
|
||||
match result {
|
||||
0 => Left ((a.recv_ready(), b)),
|
||||
1 => Right((a, b.recv_ready())),
|
||||
x => fail!("impossible case in select2: %?", x)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use option::*;
|
||||
use rt::comm::*;
|
||||
use rt::test::*;
|
||||
use vec::*;
|
||||
use comm::GenericChan;
|
||||
use task;
|
||||
use cell::Cell;
|
||||
use iterator::{Iterator, range};
|
||||
|
||||
#[test] #[ignore(cfg(windows))] #[should_fail]
|
||||
fn select_doesnt_get_trolled() {
|
||||
select::<PortOne<()>>([]);
|
||||
}
|
||||
|
||||
/* non-blocking select tests */
|
||||
|
||||
#[cfg(test)]
|
||||
fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
|
||||
// Unfortunately this does not actually test the block_on early-break
|
||||
// codepath in select -- racing between the sender and the receiver in
|
||||
// separate tasks is necessary to get around the optimistic check.
|
||||
let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
|
||||
let mut dead_chans = ~[];
|
||||
let mut ports = ports;
|
||||
for (i, chan) in chans.move_iter().enumerate() {
|
||||
if send_on_chans.contains(&i) {
|
||||
chan.send(());
|
||||
} else {
|
||||
dead_chans.push(chan);
|
||||
}
|
||||
}
|
||||
let ready_index = select(ports);
|
||||
assert!(send_on_chans.contains(&ready_index));
|
||||
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
|
||||
let _ = dead_chans;
|
||||
|
||||
// Same thing with streams instead.
|
||||
// FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
|
||||
let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
|
||||
let mut dead_chans = ~[];
|
||||
let mut ports = ports;
|
||||
for (i, chan) in chans.move_iter().enumerate() {
|
||||
if send_on_chans.contains(&i) {
|
||||
chan.send(());
|
||||
} else {
|
||||
dead_chans.push(chan);
|
||||
}
|
||||
}
|
||||
let ready_index = select(ports);
|
||||
assert!(send_on_chans.contains(&ready_index));
|
||||
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
|
||||
let _ = dead_chans;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_one() {
|
||||
do run_in_newsched_task { select_helper(1, [0]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_two() {
|
||||
// NB. I would like to have a test that tests the first one that is
|
||||
// ready is the one that's returned, but that can't be reliably tested
|
||||
// with the randomized behaviour of optimistic_check.
|
||||
do run_in_newsched_task { select_helper(2, [1]) }
|
||||
do run_in_newsched_task { select_helper(2, [0]) }
|
||||
do run_in_newsched_task { select_helper(2, [1,0]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_a_lot() {
|
||||
do run_in_newsched_task { select_helper(12, [7,8,9]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_stream() {
|
||||
use util;
|
||||
use comm::GenericChan;
|
||||
use iter::Times;
|
||||
|
||||
// Sends 10 buffered packets, and uses select to retrieve them all.
|
||||
// Puts the port in a different spot in the vector each time.
|
||||
do run_in_newsched_task {
|
||||
let (ports, _) = unzip(from_fn(10, |_| stream()));
|
||||
let (port, chan) = stream();
|
||||
do 10.times { chan.send(31337); }
|
||||
let mut ports = ports;
|
||||
let mut port = Some(port);
|
||||
let order = [5u,0,4,3,2,6,9,8,7,1];
|
||||
for &index in order.iter() {
|
||||
// put the port in the vector at any index
|
||||
util::swap(port.get_mut_ref(), &mut ports[index]);
|
||||
assert!(select(ports) == index);
|
||||
// get it back out
|
||||
util::swap(port.get_mut_ref(), &mut ports[index]);
|
||||
// NB. Not recv(), because optimistic_check randomly fails.
|
||||
assert!(port.get_ref().recv_ready().unwrap() == 31337);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_unkillable() {
|
||||
do run_in_newsched_task {
|
||||
do task::unkillable { select_helper(2, [1]) }
|
||||
}
|
||||
}
|
||||
|
||||
/* blocking select tests */
|
||||
|
||||
#[test]
|
||||
fn select_blocking() {
|
||||
select_blocking_helper(true);
|
||||
select_blocking_helper(false);
|
||||
|
||||
fn select_blocking_helper(killable: bool) {
|
||||
do run_in_newsched_task {
|
||||
let (p1,_c) = oneshot();
|
||||
let (p2,c2) = oneshot();
|
||||
let mut ports = [p1,p2];
|
||||
|
||||
let (p3,c3) = oneshot();
|
||||
let (p4,c4) = oneshot();
|
||||
|
||||
let x = Cell::new((c2, p3, c4));
|
||||
do task::spawn {
|
||||
let (c2, p3, c4) = x.take();
|
||||
p3.recv(); // handshake parent
|
||||
c4.send(()); // normal receive
|
||||
task::yield();
|
||||
c2.send(()); // select receive
|
||||
}
|
||||
|
||||
// Try to block before child sends on c2.
|
||||
c3.send(());
|
||||
p4.recv();
|
||||
if killable {
|
||||
assert!(select(ports) == 1);
|
||||
} else {
|
||||
do task::unkillable { assert!(select(ports) == 1); }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_racing_senders() {
|
||||
static NUM_CHANS: uint = 10;
|
||||
|
||||
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
|
||||
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
|
||||
select_racing_senders_helper(true, ~[0,1,2]);
|
||||
select_racing_senders_helper(false, ~[0,1,2]);
|
||||
select_racing_senders_helper(true, ~[3,4,5,6]);
|
||||
select_racing_senders_helper(false, ~[3,4,5,6]);
|
||||
select_racing_senders_helper(true, ~[7,8,9]);
|
||||
select_racing_senders_helper(false, ~[7,8,9]);
|
||||
|
||||
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
|
||||
use rt::test::spawntask_random;
|
||||
use iter::Times;
|
||||
|
||||
do run_in_newsched_task {
|
||||
// A bit of stress, since ordinarily this is just smoke and mirrors.
|
||||
do 4.times {
|
||||
let send_on_chans = send_on_chans.clone();
|
||||
do task::spawn {
|
||||
let mut ports = ~[];
|
||||
for i in range(0u, NUM_CHANS) {
|
||||
let (p,c) = oneshot();
|
||||
ports.push(p);
|
||||
if send_on_chans.contains(&i) {
|
||||
let c = Cell::new(c);
|
||||
do spawntask_random {
|
||||
task::yield();
|
||||
c.take().send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
// nondeterministic result, but should succeed
|
||||
if killable {
|
||||
select(ports);
|
||||
} else {
|
||||
do task::unkillable { select(ports); }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn select_killed() {
|
||||
do run_in_newsched_task {
|
||||
let (success_p, success_c) = oneshot::<bool>();
|
||||
let success_c = Cell::new(success_c);
|
||||
do task::try {
|
||||
let success_c = Cell::new(success_c.take());
|
||||
do task::unkillable {
|
||||
let (p,c) = oneshot();
|
||||
let c = Cell::new(c);
|
||||
do task::spawn {
|
||||
let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
|
||||
let mut ports = dead_ps;
|
||||
select(ports); // should get killed; nothing should leak
|
||||
c.take().send(()); // must not happen
|
||||
// Make sure dead_cs doesn't get closed until after select.
|
||||
let _ = dead_cs;
|
||||
}
|
||||
do task::spawn {
|
||||
fail!(); // should kill sibling awake
|
||||
}
|
||||
|
||||
// wait for killed selector to close (NOT send on) its c.
|
||||
// hope to send 'true'.
|
||||
success_c.take().send(p.try_recv().is_none());
|
||||
}
|
||||
};
|
||||
assert!(success_p.recv());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -253,12 +253,10 @@ pub fn run(&mut self, f: &fn()) {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#7544): We pass the taskgroup into death so that it can be
|
||||
// dropped while the unkillable counter is set. This should not be
|
||||
// necessary except for an extraneous clone() in task/spawn.rs that
|
||||
// causes a killhandle to get dropped, which mustn't receive a kill
|
||||
// signal since we're outside of the unwinder's try() scope.
|
||||
// { let _ = self.taskgroup.take(); }
|
||||
// NB. We pass the taskgroup into death so that it can be dropped while
|
||||
// the unkillable counter is set. This is necessary for when the
|
||||
// taskgroup destruction code drops references on KillHandles, which
|
||||
// might require using unkillable (to synchronize with an unwrapper).
|
||||
self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take());
|
||||
self.destroyed = true;
|
||||
}
|
||||
|
344
src/libstd/select.rs
Normal file
344
src/libstd/select.rs
Normal file
@ -0,0 +1,344 @@
|
||||
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use cell::Cell;
|
||||
use comm;
|
||||
use container::Container;
|
||||
use iterator::Iterator;
|
||||
use option::*;
|
||||
// use either::{Either, Left, Right};
|
||||
// use rt::kill::BlockedTask;
|
||||
use rt::sched::Scheduler;
|
||||
use rt::select::{SelectInner, SelectPortInner};
|
||||
use rt::local::Local;
|
||||
use rt::rtio::EventLoop;
|
||||
use task;
|
||||
use vec::{OwnedVector, MutableVector};
|
||||
|
||||
/// Trait for message-passing primitives that can be select()ed on.
|
||||
pub trait Select : SelectInner { }
|
||||
|
||||
/// Trait for message-passing primitives that can use the select2() convenience wrapper.
|
||||
// (This is separate from the above trait to enable heterogeneous lists of ports
|
||||
// that implement Select on different types to use select().)
|
||||
pub trait SelectPort<T> : SelectPortInner<T> { }
|
||||
|
||||
/// Receive a message from any one of many ports at once. Returns the index of the
|
||||
/// port whose data is ready. (If multiple are ready, returns the lowest index.)
|
||||
pub fn select<A: Select>(ports: &mut [A]) -> uint {
|
||||
if ports.is_empty() {
|
||||
fail!("can't select on an empty list");
|
||||
}
|
||||
|
||||
for (index, port) in ports.mut_iter().enumerate() {
|
||||
if port.optimistic_check() {
|
||||
return index;
|
||||
}
|
||||
}
|
||||
|
||||
// If one of the ports already contains data when we go to block on it, we
|
||||
// don't bother enqueueing on the rest of them, so we shouldn't bother
|
||||
// unblocking from it either. This is just for efficiency, not correctness.
|
||||
// (If not, we need to unblock from all of them. Length is a placeholder.)
|
||||
let mut ready_index = ports.len();
|
||||
|
||||
// XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
|
||||
// in that we need to continue mutating the ready_index in the environment
|
||||
// after letting the task get woken up. The and_then closure needs to delay
|
||||
// the task from resuming until all ports have become blocked_on.
|
||||
let (p,c) = comm::oneshot();
|
||||
let p = Cell::new(p);
|
||||
let c = Cell::new(c);
|
||||
|
||||
let sched = Local::take::<Scheduler>();
|
||||
do sched.deschedule_running_task_and_then |sched, task| {
|
||||
let task_handles = task.make_selectable(ports.len());
|
||||
|
||||
for (index, (port, task_handle)) in
|
||||
ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
|
||||
// If one of the ports has data by now, it will wake the handle.
|
||||
if port.block_on(sched, task_handle) {
|
||||
ready_index = index;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let c = Cell::new(c.take());
|
||||
do sched.event_loop.callback { c.take().send_deferred(()) }
|
||||
}
|
||||
|
||||
// Unkillable is necessary not because getting killed is dangerous here,
|
||||
// but to force the recv not to use the same kill-flag that we used for
|
||||
// selecting. Otherwise a user-sender could spuriously wakeup us here.
|
||||
do task::unkillable { p.take().recv(); }
|
||||
|
||||
// Task resumes. Now unblock ourselves from all the ports we blocked on.
|
||||
// If the success index wasn't reset, 'take' will just take all of them.
|
||||
// Iterate in reverse so the 'earliest' index that's ready gets returned.
|
||||
for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
|
||||
if port.unblock_from() {
|
||||
ready_index = index;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(ready_index < ports.len());
|
||||
return ready_index;
|
||||
}
|
||||
|
||||
/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
|
||||
|
||||
impl <'self> Select for &'self mut Select {
|
||||
fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
|
||||
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
|
||||
self.block_on(sched, task)
|
||||
}
|
||||
fn unblock_from(&mut self) -> bool { self.unblock_from() }
|
||||
}
|
||||
|
||||
pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
|
||||
-> Either<(Option<TA>, B), (A, Option<TB>)> {
|
||||
let result = {
|
||||
let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
|
||||
select(ports)
|
||||
};
|
||||
match result {
|
||||
0 => Left ((a.recv_ready(), b)),
|
||||
1 => Right((a, b.recv_ready())),
|
||||
x => fail!("impossible case in select2: %?", x)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use clone::Clone;
|
||||
use iter::Times;
|
||||
use option::*;
|
||||
use rt::comm::*;
|
||||
use rt::test::*;
|
||||
use vec::*;
|
||||
use comm::GenericChan;
|
||||
use task;
|
||||
use cell::Cell;
|
||||
use iterator::{Iterator, range};
|
||||
|
||||
#[test] #[ignore(cfg(windows))] #[should_fail]
|
||||
fn select_doesnt_get_trolled() {
|
||||
select::<PortOne<()>>([]);
|
||||
}
|
||||
|
||||
/* non-blocking select tests */
|
||||
|
||||
#[cfg(test)]
|
||||
fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
|
||||
// Unfortunately this does not actually test the block_on early-break
|
||||
// codepath in select -- racing between the sender and the receiver in
|
||||
// separate tasks is necessary to get around the optimistic check.
|
||||
let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
|
||||
let mut dead_chans = ~[];
|
||||
let mut ports = ports;
|
||||
for (i, chan) in chans.move_iter().enumerate() {
|
||||
if send_on_chans.contains(&i) {
|
||||
chan.send(());
|
||||
} else {
|
||||
dead_chans.push(chan);
|
||||
}
|
||||
}
|
||||
let ready_index = select(ports);
|
||||
assert!(send_on_chans.contains(&ready_index));
|
||||
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
|
||||
let _ = dead_chans;
|
||||
|
||||
// Same thing with streams instead.
|
||||
// FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
|
||||
let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
|
||||
let mut dead_chans = ~[];
|
||||
let mut ports = ports;
|
||||
for (i, chan) in chans.move_iter().enumerate() {
|
||||
if send_on_chans.contains(&i) {
|
||||
chan.send(());
|
||||
} else {
|
||||
dead_chans.push(chan);
|
||||
}
|
||||
}
|
||||
let ready_index = select(ports);
|
||||
assert!(send_on_chans.contains(&ready_index));
|
||||
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
|
||||
let _ = dead_chans;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_one() {
|
||||
do run_in_newsched_task { select_helper(1, [0]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_two() {
|
||||
// NB. I would like to have a test that tests the first one that is
|
||||
// ready is the one that's returned, but that can't be reliably tested
|
||||
// with the randomized behaviour of optimistic_check.
|
||||
do run_in_newsched_task { select_helper(2, [1]) }
|
||||
do run_in_newsched_task { select_helper(2, [0]) }
|
||||
do run_in_newsched_task { select_helper(2, [1,0]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_a_lot() {
|
||||
do run_in_newsched_task { select_helper(12, [7,8,9]) }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_stream() {
|
||||
use util;
|
||||
use comm::GenericChan;
|
||||
|
||||
// Sends 10 buffered packets, and uses select to retrieve them all.
|
||||
// Puts the port in a different spot in the vector each time.
|
||||
do run_in_newsched_task {
|
||||
let (ports, _) = unzip(from_fn(10, |_| stream()));
|
||||
let (port, chan) = stream();
|
||||
do 10.times { chan.send(31337); }
|
||||
let mut ports = ports;
|
||||
let mut port = Some(port);
|
||||
let order = [5u,0,4,3,2,6,9,8,7,1];
|
||||
for &index in order.iter() {
|
||||
// put the port in the vector at any index
|
||||
util::swap(port.get_mut_ref(), &mut ports[index]);
|
||||
assert!(select(ports) == index);
|
||||
// get it back out
|
||||
util::swap(port.get_mut_ref(), &mut ports[index]);
|
||||
// NB. Not recv(), because optimistic_check randomly fails.
|
||||
assert!(port.get_ref().recv_ready().unwrap() == 31337);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_unkillable() {
|
||||
do run_in_newsched_task {
|
||||
do task::unkillable { select_helper(2, [1]) }
|
||||
}
|
||||
}
|
||||
|
||||
/* blocking select tests */
|
||||
|
||||
#[test]
|
||||
fn select_blocking() {
|
||||
select_blocking_helper(true);
|
||||
select_blocking_helper(false);
|
||||
|
||||
fn select_blocking_helper(killable: bool) {
|
||||
do run_in_newsched_task {
|
||||
let (p1,_c) = oneshot();
|
||||
let (p2,c2) = oneshot();
|
||||
let mut ports = [p1,p2];
|
||||
|
||||
let (p3,c3) = oneshot();
|
||||
let (p4,c4) = oneshot();
|
||||
|
||||
let x = Cell::new((c2, p3, c4));
|
||||
do task::spawn {
|
||||
let (c2, p3, c4) = x.take();
|
||||
p3.recv(); // handshake parent
|
||||
c4.send(()); // normal receive
|
||||
task::yield();
|
||||
c2.send(()); // select receive
|
||||
}
|
||||
|
||||
// Try to block before child sends on c2.
|
||||
c3.send(());
|
||||
p4.recv();
|
||||
if killable {
|
||||
assert!(select(ports) == 1);
|
||||
} else {
|
||||
do task::unkillable { assert!(select(ports) == 1); }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn select_racing_senders() {
|
||||
static NUM_CHANS: uint = 10;
|
||||
|
||||
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
|
||||
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
|
||||
select_racing_senders_helper(true, ~[0,1,2]);
|
||||
select_racing_senders_helper(false, ~[0,1,2]);
|
||||
select_racing_senders_helper(true, ~[3,4,5,6]);
|
||||
select_racing_senders_helper(false, ~[3,4,5,6]);
|
||||
select_racing_senders_helper(true, ~[7,8,9]);
|
||||
select_racing_senders_helper(false, ~[7,8,9]);
|
||||
|
||||
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
|
||||
use rt::test::spawntask_random;
|
||||
|
||||
do run_in_newsched_task {
|
||||
// A bit of stress, since ordinarily this is just smoke and mirrors.
|
||||
do 4.times {
|
||||
let send_on_chans = send_on_chans.clone();
|
||||
do task::spawn {
|
||||
let mut ports = ~[];
|
||||
for i in range(0u, NUM_CHANS) {
|
||||
let (p,c) = oneshot();
|
||||
ports.push(p);
|
||||
if send_on_chans.contains(&i) {
|
||||
let c = Cell::new(c);
|
||||
do spawntask_random {
|
||||
task::yield();
|
||||
c.take().send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
// nondeterministic result, but should succeed
|
||||
if killable {
|
||||
select(ports);
|
||||
} else {
|
||||
do task::unkillable { select(ports); }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn select_killed() {
|
||||
do run_in_newsched_task {
|
||||
let (success_p, success_c) = oneshot::<bool>();
|
||||
let success_c = Cell::new(success_c);
|
||||
do task::try {
|
||||
let success_c = Cell::new(success_c.take());
|
||||
do task::unkillable {
|
||||
let (p,c) = oneshot();
|
||||
let c = Cell::new(c);
|
||||
do task::spawn {
|
||||
let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
|
||||
let mut ports = dead_ps;
|
||||
select(ports); // should get killed; nothing should leak
|
||||
c.take().send(()); // must not happen
|
||||
// Make sure dead_cs doesn't get closed until after select.
|
||||
let _ = dead_cs;
|
||||
}
|
||||
do task::spawn {
|
||||
fail!(); // should kill sibling awake
|
||||
}
|
||||
|
||||
// wait for killed selector to close (NOT send on) its c.
|
||||
// hope to send 'true'.
|
||||
success_c.take().send(p.try_recv().is_none());
|
||||
}
|
||||
};
|
||||
assert!(success_p.recv());
|
||||
}
|
||||
}
|
||||
}
|
@ -164,6 +164,7 @@ pub mod linkhack {
|
||||
|
||||
pub mod task;
|
||||
pub mod comm;
|
||||
pub mod select;
|
||||
pub mod local_data;
|
||||
|
||||
|
||||
|
@ -38,7 +38,6 @@
|
||||
use prelude::*;
|
||||
|
||||
use cell::Cell;
|
||||
use cmp::Eq;
|
||||
use comm::{stream, Chan, GenericChan, GenericPort, Port};
|
||||
use result::Result;
|
||||
use result;
|
||||
|
@ -22,10 +22,9 @@
|
||||
*
|
||||
* A new one of these is created each spawn_linked or spawn_supervised.
|
||||
*
|
||||
* (2) The "tcb" is a per-task control structure that tracks a task's spawn
|
||||
* configuration. It contains a reference to its taskgroup_arc, a
|
||||
* reference to its node in the ancestor list (below), a flag for
|
||||
* whether it's part of the 'main'/'root' taskgroup, and an optionally
|
||||
* (2) The "taskgroup" is a per-task control structure that tracks a task's
|
||||
* spawn configuration. It contains a reference to its taskgroup_arc, a
|
||||
* reference to its node in the ancestor list (below), and an optionally
|
||||
* configured notification port. These are stored in TLS.
|
||||
*
|
||||
* (3) The "ancestor_list" is a cons-style list of unsafe::exclusives which
|
||||
@ -84,7 +83,6 @@
|
||||
use task::{Failure, SingleThreaded};
|
||||
use task::{Success, TaskOpts, TaskResult};
|
||||
use task::unkillable;
|
||||
use to_bytes::IterBytes;
|
||||
use uint;
|
||||
use util;
|
||||
use unstable::sync::Exclusive;
|
||||
@ -101,29 +99,7 @@
|
||||
#[cfg(test)] use comm;
|
||||
#[cfg(test)] use task;
|
||||
|
||||
// Transitionary.
|
||||
#[deriving(Eq)]
|
||||
enum TaskHandle {
|
||||
NewTask(KillHandle),
|
||||
}
|
||||
|
||||
impl Clone for TaskHandle {
|
||||
fn clone(&self) -> TaskHandle {
|
||||
match *self {
|
||||
NewTask(ref x) => NewTask(x.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IterBytes for TaskHandle {
|
||||
fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool {
|
||||
match *self {
|
||||
NewTask(ref x) => x.iter_bytes(lsb0, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TaskSet(HashSet<TaskHandle>);
|
||||
struct TaskSet(HashSet<KillHandle>);
|
||||
|
||||
impl TaskSet {
|
||||
#[inline]
|
||||
@ -131,17 +107,17 @@ fn new() -> TaskSet {
|
||||
TaskSet(HashSet::new())
|
||||
}
|
||||
#[inline]
|
||||
fn insert(&mut self, task: TaskHandle) {
|
||||
fn insert(&mut self, task: KillHandle) {
|
||||
let didnt_overwrite = (**self).insert(task);
|
||||
assert!(didnt_overwrite);
|
||||
}
|
||||
#[inline]
|
||||
fn remove(&mut self, task: &TaskHandle) {
|
||||
fn remove(&mut self, task: &KillHandle) {
|
||||
let was_present = (**self).remove(task);
|
||||
assert!(was_present);
|
||||
}
|
||||
#[inline]
|
||||
fn move_iter(self) -> HashSetMoveIterator<TaskHandle> {
|
||||
fn move_iter(self) -> HashSetMoveIterator<KillHandle> {
|
||||
(*self).move_iter()
|
||||
}
|
||||
}
|
||||
@ -291,7 +267,7 @@ fn iterate(ancestors: &mut AncestorList,
|
||||
None => nobe_is_dead
|
||||
};
|
||||
// Call iterator block. (If the group is dead, it's
|
||||
// safe to skip it. This will leave our TaskHandle
|
||||
// safe to skip it. This will leave our KillHandle
|
||||
// hanging around in the group even after it's freed,
|
||||
// but that's ok because, by virtue of the group being
|
||||
// dead, nobody will ever kill-all (for) over it.)
|
||||
@ -338,7 +314,6 @@ pub struct Taskgroup {
|
||||
tasks: TaskGroupArc, // 'none' means the group has failed.
|
||||
// Lists of tasks who will kill us if they fail, but whom we won't kill.
|
||||
ancestors: AncestorList,
|
||||
is_main: bool,
|
||||
notifier: Option<AutoNotify>,
|
||||
}
|
||||
|
||||
@ -355,14 +330,18 @@ fn drop(&self) {
|
||||
for x in this.notifier.mut_iter() {
|
||||
x.failed = true;
|
||||
}
|
||||
// Take everybody down with us.
|
||||
do access_group(&self.tasks) |tg| {
|
||||
kill_taskgroup(tg, &me, self.is_main);
|
||||
}
|
||||
// Take everybody down with us. After this point, every
|
||||
// other task in the group will see 'tg' as none, which
|
||||
// indicates the whole taskgroup is failing (and forbids
|
||||
// new spawns from succeeding).
|
||||
let tg = do access_group(&self.tasks) |tg| { tg.take() };
|
||||
// It's safe to send kill signals outside the lock, because
|
||||
// we have a refcount on all kill-handles in the group.
|
||||
kill_taskgroup(tg, me);
|
||||
} else {
|
||||
// Remove ourselves from the group(s).
|
||||
do access_group(&self.tasks) |tg| {
|
||||
leave_taskgroup(tg, &me, true);
|
||||
leave_taskgroup(tg, me, true);
|
||||
}
|
||||
}
|
||||
// It doesn't matter whether this happens before or after dealing
|
||||
@ -370,7 +349,7 @@ fn drop(&self) {
|
||||
// We remove ourself from every ancestor we can, so no cleanup; no
|
||||
// break.
|
||||
do each_ancestor(&mut this.ancestors, |_| {}) |ancestor_group| {
|
||||
leave_taskgroup(ancestor_group, &me, false);
|
||||
leave_taskgroup(ancestor_group, me, false);
|
||||
true
|
||||
};
|
||||
}
|
||||
@ -380,7 +359,6 @@ fn drop(&self) {
|
||||
|
||||
pub fn Taskgroup(tasks: TaskGroupArc,
|
||||
ancestors: AncestorList,
|
||||
is_main: bool,
|
||||
mut notifier: Option<AutoNotify>) -> Taskgroup {
|
||||
for x in notifier.mut_iter() {
|
||||
x.failed = false;
|
||||
@ -389,7 +367,6 @@ pub fn Taskgroup(tasks: TaskGroupArc,
|
||||
Taskgroup {
|
||||
tasks: tasks,
|
||||
ancestors: ancestors,
|
||||
is_main: is_main,
|
||||
notifier: notifier
|
||||
}
|
||||
}
|
||||
@ -413,7 +390,7 @@ fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
|
||||
}
|
||||
}
|
||||
|
||||
fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle,
|
||||
fn enlist_in_taskgroup(state: TaskGroupInner, me: KillHandle,
|
||||
is_member: bool) -> bool {
|
||||
let me = Cell::new(me); // :(
|
||||
// If 'None', the group was failing. Can't enlist.
|
||||
@ -428,8 +405,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle,
|
||||
}
|
||||
|
||||
// NB: Runs in destructor/post-exit context. Can't 'fail'.
|
||||
fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle,
|
||||
is_member: bool) {
|
||||
fn leave_taskgroup(state: TaskGroupInner, me: &KillHandle, is_member: bool) {
|
||||
let me = Cell::new(me); // :(
|
||||
// If 'None', already failing and we've already gotten a kill signal.
|
||||
do state.map_mut |group| {
|
||||
@ -442,43 +418,23 @@ fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle,
|
||||
}
|
||||
|
||||
// NB: Runs in destructor/post-exit context. Can't 'fail'.
|
||||
fn kill_taskgroup(state: TaskGroupInner, me: &TaskHandle, is_main: bool) {
|
||||
unsafe {
|
||||
// NB: We could do the killing iteration outside of the group arc, by
|
||||
// having "let mut newstate" here, swapping inside, and iterating
|
||||
// after. But that would let other exiting tasks fall-through and exit
|
||||
// while we were trying to kill them, causing potential
|
||||
// use-after-free. A task's presence in the arc guarantees it's alive
|
||||
// only while we hold the lock, so if we're failing, all concurrently
|
||||
// exiting tasks must wait for us. To do it differently, we'd have to
|
||||
// use the runtime's task refcounting, but that could leave task
|
||||
// structs around long after their task exited.
|
||||
let newstate = util::replace(state, None);
|
||||
// Might already be None, if Somebody is failing simultaneously.
|
||||
// That's ok; only one task needs to do the dirty work. (Might also
|
||||
// see 'None' if Somebody already failed and we got a kill signal.)
|
||||
if newstate.is_some() {
|
||||
let TaskGroupData { members: members, descendants: descendants } =
|
||||
newstate.unwrap();
|
||||
for sibling in members.move_iter() {
|
||||
// Skip self - killing ourself won't do much good.
|
||||
if &sibling != me {
|
||||
RuntimeGlue::kill_task(sibling);
|
||||
}
|
||||
fn kill_taskgroup(state: Option<TaskGroupData>, me: &KillHandle) {
|
||||
// Might already be None, if somebody is failing simultaneously.
|
||||
// That's ok; only one task needs to do the dirty work. (Might also
|
||||
// see 'None' if somebody already failed and we got a kill signal.)
|
||||
do state.map_move |TaskGroupData { members: members, descendants: descendants }| {
|
||||
for sibling in members.move_iter() {
|
||||
// Skip self - killing ourself won't do much good.
|
||||
if &sibling != me {
|
||||
RuntimeGlue::kill_task(sibling);
|
||||
}
|
||||
for child in descendants.move_iter() {
|
||||
assert!(&child != me);
|
||||
RuntimeGlue::kill_task(child);
|
||||
}
|
||||
// Only one task should ever do this.
|
||||
if is_main {
|
||||
RuntimeGlue::kill_all_tasks(me);
|
||||
}
|
||||
// Do NOT restore state to Some(..)! It stays None to indicate
|
||||
// that the whole taskgroup is failing, to forbid new spawns.
|
||||
}
|
||||
// (note: multiple tasks may reach this point)
|
||||
}
|
||||
for child in descendants.move_iter() {
|
||||
assert!(&child != me);
|
||||
RuntimeGlue::kill_task(child);
|
||||
}
|
||||
};
|
||||
// (note: multiple tasks may reach this point)
|
||||
}
|
||||
|
||||
// FIXME (#2912): Work around core-vs-coretest function duplication. Can't use
|
||||
@ -490,38 +446,23 @@ fn taskgroup_key() -> local_data::Key<@@mut Taskgroup> {
|
||||
// Transitionary.
|
||||
struct RuntimeGlue;
|
||||
impl RuntimeGlue {
|
||||
unsafe fn kill_task(task: TaskHandle) {
|
||||
match task {
|
||||
NewTask(handle) => {
|
||||
let mut handle = handle;
|
||||
do handle.kill().map_move |killed_task| {
|
||||
let killed_task = Cell::new(killed_task);
|
||||
do Local::borrow::<Scheduler, ()> |sched| {
|
||||
sched.enqueue_task(killed_task.take());
|
||||
}
|
||||
};
|
||||
fn kill_task(handle: KillHandle) {
|
||||
let mut handle = handle;
|
||||
do handle.kill().map_move |killed_task| {
|
||||
let killed_task = Cell::new(killed_task);
|
||||
do Local::borrow::<Scheduler, ()> |sched| {
|
||||
sched.enqueue_task(killed_task.take());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
unsafe fn kill_all_tasks(task: &TaskHandle) {
|
||||
match *task {
|
||||
// FIXME(#7544): Remove the kill_all feature entirely once the
|
||||
// oldsched goes away.
|
||||
NewTask(ref _handle) => rtabort!("can't kill_all in newsched"),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_task_handle_and_failing(blk: &fn(TaskHandle, bool)) {
|
||||
fn with_task_handle_and_failing(blk: &fn(&KillHandle, bool)) {
|
||||
if in_green_task_context() {
|
||||
unsafe {
|
||||
// Can't use safe borrow, because the taskgroup destructor needs to
|
||||
// access the scheduler again to send kill signals to other tasks.
|
||||
let me = Local::unsafe_borrow::<Task>();
|
||||
// FIXME(#7544): Get rid of this clone by passing by-ref.
|
||||
// Will probably have to wait until the old rt is gone.
|
||||
blk(NewTask((*me).death.kill_handle.get_ref().clone()),
|
||||
(*me).unwinder.unwinding)
|
||||
blk((*me).death.kill_handle.get_ref(), (*me).unwinder.unwinding)
|
||||
}
|
||||
} else {
|
||||
rtabort!("task dying in bad context")
|
||||
@ -540,15 +481,12 @@ fn with_my_taskgroup<U>(blk: &fn(&Taskgroup) -> U) -> U {
|
||||
// Lazily initialize.
|
||||
let mut members = TaskSet::new();
|
||||
let my_handle = (*me).death.kill_handle.get_ref().clone();
|
||||
members.insert(NewTask(my_handle));
|
||||
members.insert(my_handle);
|
||||
let tasks = Exclusive::new(Some(TaskGroupData {
|
||||
members: members,
|
||||
descendants: TaskSet::new(),
|
||||
}));
|
||||
// FIXME(#7544): Remove the is_main flag entirely once
|
||||
// the newsched goes away. The main taskgroup has no special
|
||||
// behaviour.
|
||||
let group = Taskgroup(tasks, AncestorList(None), false, None);
|
||||
let group = Taskgroup(tasks, AncestorList(None), None);
|
||||
(*me).taskgroup = Some(group);
|
||||
(*me).taskgroup.get_ref()
|
||||
}
|
||||
@ -563,9 +501,7 @@ fn with_my_taskgroup<U>(blk: &fn(&Taskgroup) -> U) -> U {
|
||||
|
||||
// Returns 'None' in the case where the child's TG should be lazily initialized.
|
||||
fn gen_child_taskgroup(linked: bool, supervised: bool)
|
||||
-> Option<(TaskGroupArc, AncestorList, bool)> {
|
||||
// FIXME(#7544): Not safe to lazily initialize in the old runtime. Remove
|
||||
// this context check once 'spawn_raw_oldsched' is gone.
|
||||
-> Option<(TaskGroupArc, AncestorList)> {
|
||||
if linked || supervised {
|
||||
// with_my_taskgroup will lazily initialize the parent's taskgroup if
|
||||
// it doesn't yet exist. We don't want to call it in the unlinked case.
|
||||
@ -574,8 +510,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
||||
if linked {
|
||||
// Child is in the same group as spawner.
|
||||
// Child's ancestors are spawner's ancestors.
|
||||
// Propagate main-ness.
|
||||
Some((spawner_group.tasks.clone(), ancestors, spawner_group.is_main))
|
||||
Some((spawner_group.tasks.clone(), ancestors))
|
||||
} else {
|
||||
// Child is in a separate group from spawner.
|
||||
let g = Exclusive::new(Some(TaskGroupData {
|
||||
@ -596,7 +531,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
||||
// Child has no ancestors.
|
||||
AncestorList(None)
|
||||
};
|
||||
Some((g, a, false))
|
||||
Some((g, a))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -607,7 +542,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
||||
// Set up membership in taskgroup and descendantship in all ancestor
|
||||
// groups. If any enlistment fails, Some task was already failing, so
|
||||
// don't let the child task run, and undo every successful enlistment.
|
||||
fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc,
|
||||
fn enlist_many(child: &KillHandle, child_arc: &TaskGroupArc,
|
||||
ancestors: &mut AncestorList) -> bool {
|
||||
// Join this taskgroup.
|
||||
let mut result = do access_group(child_arc) |child_tg| {
|
||||
@ -615,7 +550,7 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc,
|
||||
};
|
||||
if result {
|
||||
// Unwinding function in case any ancestral enlisting fails
|
||||
let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, &child, false) };
|
||||
let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, child, false) };
|
||||
// Attempt to join every ancestor group.
|
||||
result = do each_ancestor(ancestors, bail) |ancestor_tg| {
|
||||
// Enlist as a descendant, not as an actual member.
|
||||
@ -625,7 +560,7 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc,
|
||||
// If any ancestor group fails, need to exit this group too.
|
||||
if !result {
|
||||
do access_group(child_arc) |child_tg| {
|
||||
leave_taskgroup(child_tg, &child, true); // member
|
||||
leave_taskgroup(child_tg, child, true); // member
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -653,15 +588,14 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
|
||||
let enlist_success = do child_data.take().map_move_default(true) |child_data| {
|
||||
let child_data = Cell::new(child_data); // :(
|
||||
do Local::borrow::<Task, bool> |me| {
|
||||
let (child_tg, ancestors, is_main) = child_data.take();
|
||||
let (child_tg, ancestors) = child_data.take();
|
||||
let mut ancestors = ancestors;
|
||||
// FIXME(#7544): Optimize out the xadd in this clone, somehow.
|
||||
let handle = me.death.kill_handle.get_ref().clone();
|
||||
let handle = me.death.kill_handle.get_ref();
|
||||
// Atomically try to get into all of our taskgroups.
|
||||
if enlist_many(NewTask(handle), &child_tg, &mut ancestors) {
|
||||
if enlist_many(handle, &child_tg, &mut ancestors) {
|
||||
// Got in. We can run the provided child body, and can also run
|
||||
// the taskgroup's exit-time-destructor afterward.
|
||||
me.taskgroup = Some(Taskgroup(child_tg, ancestors, is_main, None));
|
||||
me.taskgroup = Some(Taskgroup(child_tg, ancestors, None));
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@ -678,14 +612,14 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
|
||||
}
|
||||
};
|
||||
|
||||
let mut task = unsafe {
|
||||
if opts.sched.mode != SingleThreaded {
|
||||
if opts.watched {
|
||||
Task::build_child(opts.stack_size, child_wrapper)
|
||||
} else {
|
||||
Task::build_root(opts.stack_size, child_wrapper)
|
||||
}
|
||||
let mut task = if opts.sched.mode != SingleThreaded {
|
||||
if opts.watched {
|
||||
Task::build_child(opts.stack_size, child_wrapper)
|
||||
} else {
|
||||
Task::build_root(opts.stack_size, child_wrapper)
|
||||
}
|
||||
} else {
|
||||
unsafe {
|
||||
// Creating a 1:1 task:thread ...
|
||||
let sched = Local::unsafe_borrow::<Scheduler>();
|
||||
let sched_handle = (*sched).make_handle();
|
||||
|
@ -229,20 +229,22 @@ fn drop(&self) {
|
||||
if self.data.is_null() {
|
||||
return; // Happens when destructing an unwrapper's handle.
|
||||
}
|
||||
do task::unkillable {
|
||||
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
|
||||
// Must be acquire+release, not just release, to make sure this
|
||||
// doesn't get reordered to after the unwrapper pointer load.
|
||||
let old_count = data.count.fetch_sub(1, SeqCst);
|
||||
assert!(old_count >= 1);
|
||||
if old_count == 1 {
|
||||
// Were we really last, or should we hand off to an
|
||||
// unwrapper? It's safe to not xchg because the unwrapper
|
||||
// will set the unwrap lock *before* dropping his/her
|
||||
// reference. In effect, being here means we're the only
|
||||
// *awake* task with the data.
|
||||
match data.unwrapper.take(Acquire) {
|
||||
Some(~(message,response)) => {
|
||||
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
|
||||
// Must be acquire+release, not just release, to make sure this
|
||||
// doesn't get reordered to after the unwrapper pointer load.
|
||||
let old_count = data.count.fetch_sub(1, SeqCst);
|
||||
assert!(old_count >= 1);
|
||||
if old_count == 1 {
|
||||
// Were we really last, or should we hand off to an
|
||||
// unwrapper? It's safe to not xchg because the unwrapper
|
||||
// will set the unwrap lock *before* dropping his/her
|
||||
// reference. In effect, being here means we're the only
|
||||
// *awake* task with the data.
|
||||
match data.unwrapper.take(Acquire) {
|
||||
Some(~(message,response)) => {
|
||||
let cell = Cell::new((message, response, data));
|
||||
do task::unkillable {
|
||||
let (message, response, data) = cell.take();
|
||||
// Send 'ready' and wait for a response.
|
||||
message.send(());
|
||||
// Unkillable wait. Message guaranteed to come.
|
||||
@ -253,13 +255,13 @@ fn drop(&self) {
|
||||
// Other task was killed. drop glue takes over.
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// drop glue takes over.
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cast::forget(data);
|
||||
None => {
|
||||
// drop glue takes over.
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cast::forget(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user