Fix select() in light of the deschedule...and then race. Close #8347.
This commit is contained in:
parent
31f9b51592
commit
ce48e71d28
@ -114,7 +114,9 @@ impl<T> ChanOne<T> {
|
||||
// 'do_resched' configures whether the scheduler immediately switches to
|
||||
// the receiving task, or leaves the sending task still running.
|
||||
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
|
||||
rtassert!(!rt::in_sched_context());
|
||||
if do_resched {
|
||||
rtassert!(!rt::in_sched_context());
|
||||
}
|
||||
|
||||
let mut this = self;
|
||||
let mut recvr_active = true;
|
||||
|
@ -8,6 +8,8 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use cell::Cell;
|
||||
use comm;
|
||||
use container::Container;
|
||||
use iterator::Iterator;
|
||||
use option::*;
|
||||
@ -16,6 +18,8 @@ use option::*;
|
||||
use rt::sched::Scheduler;
|
||||
use rt::select::{SelectInner, SelectPortInner};
|
||||
use rt::local::Local;
|
||||
use rt::rtio::EventLoop;
|
||||
use task;
|
||||
use vec::{OwnedVector, MutableVector};
|
||||
|
||||
/// Trait for message-passing primitives that can be select()ed on.
|
||||
@ -45,6 +49,14 @@ pub fn select<A: Select>(ports: &mut [A]) -> uint {
|
||||
// (If not, we need to unblock from all of them. Length is a placeholder.)
|
||||
let mut ready_index = ports.len();
|
||||
|
||||
// XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
|
||||
// in that we need to continue mutating the ready_index in the environment
|
||||
// after letting the task get woken up. The and_then closure needs to delay
|
||||
// the task from resuming until all ports have become blocked_on.
|
||||
let (p,c) = comm::oneshot();
|
||||
let p = Cell::new(p);
|
||||
let c = Cell::new(c);
|
||||
|
||||
let sched = Local::take::<Scheduler>();
|
||||
do sched.deschedule_running_task_and_then |sched, task| {
|
||||
let task_handles = task.make_selectable(ports.len());
|
||||
@ -57,8 +69,16 @@ pub fn select<A: Select>(ports: &mut [A]) -> uint {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let c = Cell::new(c.take());
|
||||
do sched.event_loop.callback { c.take().send_deferred(()) }
|
||||
}
|
||||
|
||||
// Unkillable is necessary not because getting killed is dangerous here,
|
||||
// but to force the recv not to use the same kill-flag that we used for
|
||||
// selecting. Otherwise a user-sender could spuriously wakeup us here.
|
||||
do task::unkillable { p.take().recv(); }
|
||||
|
||||
// Task resumes. Now unblock ourselves from all the ports we blocked on.
|
||||
// If the success index wasn't reset, 'take' will just take all of them.
|
||||
// Iterate in reverse so the 'earliest' index that's ready gets returned.
|
||||
|
Loading…
x
Reference in New Issue
Block a user