Carefully destroy channels at the right time.
When a channel is destroyed, it may attempt scheduler operations which could move a task off of it's I/O scheduler. This is obviously a bad interaction, and some finesse is required to make it work (making destructors run at the right time). Closes #10375
This commit is contained in:
parent
86a321b65d
commit
c5fdd69d3e
@ -77,3 +77,29 @@ impl Drop for SignalWatcher {
|
||||
self.close_async_();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::cell::Cell;
|
||||
use super::super::local_loop;
|
||||
use std::rt::io::signal;
|
||||
use std::comm::{SharedChan, stream};
|
||||
|
||||
#[test]
|
||||
fn closing_channel_during_drop_doesnt_kill_everything() {
|
||||
// see issue #10375, relates to timers as well.
|
||||
let (port, chan) = stream();
|
||||
let chan = SharedChan::new(chan);
|
||||
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
|
||||
chan);
|
||||
|
||||
let port = Cell::new(port);
|
||||
do spawn {
|
||||
port.take().try_recv();
|
||||
}
|
||||
|
||||
// when we drop the SignalWatcher we're going to destroy the channel,
|
||||
// which must wake up the task on the other end
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioTimer;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::util;
|
||||
|
||||
use uvll;
|
||||
use super::{Loop, UvHandle, ForbidUnwind, ForbidSwitch};
|
||||
@ -82,9 +83,13 @@ impl RtioTimer for TimerWatcher {
|
||||
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
|
||||
let (port, chan) = oneshot();
|
||||
|
||||
let _m = self.fire_homing_missile();
|
||||
self.action = Some(SendOnce(chan));
|
||||
self.start(msecs, 0);
|
||||
// similarly to the destructor, we must drop the previous action outside
|
||||
// of the homing missile
|
||||
let _prev_action = {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.start(msecs, 0);
|
||||
util::replace(&mut self.action, Some(SendOnce(chan)))
|
||||
};
|
||||
|
||||
return port;
|
||||
}
|
||||
@ -93,8 +98,14 @@ impl RtioTimer for TimerWatcher {
|
||||
let (port, chan) = stream();
|
||||
|
||||
let _m = self.fire_homing_missile();
|
||||
self.action = Some(SendMany(chan));
|
||||
self.start(msecs, msecs);
|
||||
|
||||
// similarly to the destructor, we must drop the previous action outside
|
||||
// of the homing missile
|
||||
let _prev_action = {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.start(msecs, msecs);
|
||||
util::replace(&mut self.action, Some(SendMany(chan)))
|
||||
};
|
||||
|
||||
return port;
|
||||
}
|
||||
@ -120,16 +131,24 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
|
||||
|
||||
impl Drop for TimerWatcher {
|
||||
fn drop(&mut self) {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.action = None;
|
||||
self.stop();
|
||||
self.close_async_();
|
||||
// note that this drop is a little subtle. Dropping a channel which is
|
||||
// held internally may invoke some scheduling operations. We can't take
|
||||
// the channel unless we're on the home scheduler, but once we're on the
|
||||
// home scheduler we should never move. Hence, we take the timer's
|
||||
// action item and then move it outside of the homing block.
|
||||
let _action = {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.stop();
|
||||
self.close_async_();
|
||||
self.action.take()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::cell::Cell;
|
||||
use std::rt::rtio::RtioTimer;
|
||||
use super::super::local_loop;
|
||||
|
||||
@ -205,6 +224,19 @@ mod test {
|
||||
// which must wake up the task on the other end
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reset_doesnt_switch_tasks() {
|
||||
// similar test to the one above.
|
||||
let mut timer = TimerWatcher::new(local_loop());
|
||||
let timer_port = Cell::new(timer.period(1000));
|
||||
|
||||
do spawn {
|
||||
timer_port.take().try_recv();
|
||||
}
|
||||
|
||||
timer.oneshot(1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sender_goes_away_oneshot() {
|
||||
let port = {
|
||||
|
Loading…
x
Reference in New Issue
Block a user