Allow more "error" values in try_recv()

This should allow callers to know whether the channel was empty or disconnected
without having to block.

Closes #11087
This commit is contained in:
Alex Crichton 2013-12-21 22:15:04 -08:00
parent 9008931125
commit adb895a34f
7 changed files with 114 additions and 35 deletions

View File

@ -16,6 +16,8 @@
#[allow(missing_doc)];
use std::comm;
/// An extension of `pipes::stream` that allows both sending and receiving.
pub struct DuplexStream<T, U> {
priv chan: Chan<T>,
@ -40,7 +42,7 @@ pub fn try_send(&self, x: T) -> bool {
pub fn recv(&self) -> U {
self.port.recv()
}
pub fn try_recv(&self) -> Option<U> {
pub fn try_recv(&self) -> comm::TryRecvResult<U> {
self.port.try_recv()
}
pub fn recv_opt(&self) -> Option<U> {
@ -77,11 +79,11 @@ pub fn recv_opt(&self) -> Option<T> {
})
}
pub fn try_recv(&self) -> Option<T> {
self.duplex_stream.try_recv().map(|val| {
self.duplex_stream.try_send(());
val
})
pub fn try_recv(&self) -> comm::TryRecvResult<T> {
match self.duplex_stream.try_recv() {
comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
state => state,
}
}
}

View File

@ -19,6 +19,7 @@
use std::borrow;
use std::comm;
use std::unstable::sync::Exclusive;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
@ -49,7 +50,7 @@ fn new() -> WaitQueue {
// Signals one live task from the queue.
fn signal(&self) -> bool {
match self.head.try_recv() {
Some(ch) => {
comm::Data(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send_deferred(()) {
@ -58,7 +59,7 @@ fn signal(&self) -> bool {
self.signal()
}
}
None => false
_ => false
}
}
@ -66,12 +67,12 @@ fn broadcast(&self) -> uint {
let mut count = 0;
loop {
match self.head.try_recv() {
None => break,
Some(ch) => {
comm::Data(ch) => {
if ch.try_send_deferred(()) {
count += 1;
}
}
_ => break
}
}
count

View File

@ -958,6 +958,7 @@ fn new_sched_rng() -> XorShiftRng {
#[cfg(test)]
mod test {
use std::comm;
use std::task::TaskOpts;
use std::rt::Runtime;
use std::rt::task::Task;
@ -1376,7 +1377,7 @@ fn dont_starve_1() {
// This task should not be able to starve the sender;
// The sender should get stolen to another thread.
do spawn {
while port.try_recv().is_none() { }
while port.try_recv() != comm::Data(()) { }
}
chan.send(());
@ -1393,7 +1394,7 @@ fn dont_starve_2() {
// This task should not be able to starve the other task.
// The sends should eventually yield.
do spawn {
while port.try_recv().is_none() {
while port.try_recv() != comm::Data(()) {
chan2.send(());
}
}

View File

@ -251,6 +251,7 @@ mod $name {
#[allow(unused_imports)];
use native;
use comm::*;
use prelude::*;
use super::*;
use super::super::*;
@ -323,6 +324,20 @@ pub struct SharedChan<T> {
priv queue: mpsc::Producer<T, Packet>,
}
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone)]
pub enum TryRecvResult<T> {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// This channel's sending half has become disconnected, and there will
/// never be any more data received on this channel
Disconnected,
/// The channel had some data and we successfully popped it
Data(T),
}
///////////////////////////////////////////////////////////////////////////////
// Internal struct definitions
///////////////////////////////////////////////////////////////////////////////
@ -739,11 +754,11 @@ pub fn recv(&self) -> T {
/// block on a port.
///
/// This function cannot fail.
pub fn try_recv(&self) -> Option<T> {
pub fn try_recv(&self) -> TryRecvResult<T> {
self.try_recv_inc(true)
}
fn try_recv_inc(&self, increment: bool) -> Option<T> {
fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
// This is a "best effort" situation, so if a queue is inconsistent just
// don't worry about it.
let this = unsafe { cast::transmute_mut(self) };
@ -807,7 +822,35 @@ fn try_recv_inc(&self, increment: bool) -> Option<T> {
if increment && ret.is_some() {
unsafe { (*this.queue.packet()).steals += 1; }
}
return ret;
match ret {
Some(t) => Data(t),
None => {
// It's possible that between the time that we saw the queue was
// empty and here the other side disconnected. It's also
// possible for us to see the disconnection here while there is
// data in the queue. It's pretty backwards-thinking to return
// Disconnected when there's actually data on the queue, so if
// we see a disconnected state be sure to check again to be 100%
// sure that there's no data in the queue.
let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
if cnt != DISCONNECTED { return Empty }
let ret = match this.queue {
SPSC(ref mut queue) => queue.pop(),
MPSC(ref mut queue) => match queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
fail!("inconsistent with no senders?!");
}
}
};
match ret {
Some(data) => Data(data),
None => Disconnected,
}
}
}
}
/// Attempt to wait for a value on this port, but does not fail if the
@ -824,7 +867,11 @@ fn try_recv_inc(&self, increment: bool) -> Option<T> {
/// the value found on the port is returned.
pub fn recv_opt(&self) -> Option<T> {
// optimistic preflight check (scheduling is expensive)
match self.try_recv() { None => {}, data => return data }
match self.try_recv() {
Empty => {},
Disconnected => return None,
Data(t) => return Some(t),
}
let packet;
let this;
@ -843,12 +890,11 @@ pub fn recv_opt(&self) -> Option<T> {
});
}
let data = self.try_recv_inc(false);
if data.is_none() &&
unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
match self.try_recv_inc(false) {
Data(t) => Some(t),
Empty => fail!("bug: woke up too soon"),
Disconnected => None,
}
return data;
}
/// Returns an iterator which will block waiting for messages, but never
@ -1005,7 +1051,10 @@ pub fn stress_factor() -> uint {
for _ in range(0, AMT * NTHREADS) {
assert_eq!(p.recv(), 1);
}
assert_eq!(p.try_recv(), None);
match p.try_recv() {
Data(..) => fail!(),
_ => {}
}
c1.send(());
}
@ -1129,7 +1178,7 @@ fn no_runtime() {
test!(fn oneshot_single_thread_try_recv_open() {
let (port, chan) = Chan::<int>::new();
chan.send(10);
assert!(port.try_recv() == Some(10));
assert!(port.recv_opt() == Some(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
@ -1140,21 +1189,21 @@ fn no_runtime() {
test!(fn oneshot_single_thread_peek_data() {
let (port, chan) = Chan::<int>::new();
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Empty)
chan.send(10);
assert!(port.try_recv().is_some());
assert_eq!(port.try_recv(), Data(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (port, chan) = Chan::<int>::new();
{ let _c = chan; }
assert!(port.try_recv().is_none());
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Disconnected);
assert_eq!(port.try_recv(), Disconnected);
})
test!(fn oneshot_single_thread_peek_open() {
let (port, _) = Chan::<int>::new();
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Empty);
})
test!(fn oneshot_multi_task_recv_then_send() {
@ -1321,4 +1370,27 @@ fn recv(port: Port<~int>, i: int) {
drop(chan);
assert_eq!(count_port.recv(), 4);
})
test!(fn try_recv_states() {
let (p, c) = Chan::<int>::new();
let (p1, c1) = Chan::<()>::new();
let (p2, c2) = Chan::<()>::new();
do spawn {
p1.recv();
c.send(1);
c2.send(());
p1.recv();
drop(c);
c2.send(());
}
assert_eq!(p.try_recv(), Empty);
c1.send(());
p2.recv();
assert_eq!(p.try_recv(), Data(1));
assert_eq!(p.try_recv(), Empty);
c1.send(());
p2.recv();
assert_eq!(p.try_recv(), Disconnected);
})
}

View File

@ -45,6 +45,7 @@
#[allow(dead_code)];
use cast;
use comm;
use iter::Iterator;
use kinds::Send;
use ops::Drop;
@ -279,7 +280,9 @@ pub fn recv(&mut self) -> T { self.port.recv() }
pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
/// Immediately attempt to receive a value on a port, this function will
/// never block. Has the same semantics as `Port.try_recv`.
pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
self.port.try_recv()
}
}
#[unsafe_destructor]
@ -409,8 +412,8 @@ mod test {
a = p1.recv() => { assert_eq!(a, 1); },
a = p2.recv() => { assert_eq!(a, 2); }
)
assert_eq!(p1.try_recv(), None);
assert_eq!(p2.try_recv(), None);
assert_eq!(p1.try_recv(), Empty);
assert_eq!(p2.try_recv(), Empty);
c3.send(());
})

View File

@ -144,6 +144,7 @@ pub fn unregister(&mut self, signum: Signum) {
#[cfg(test)]
mod test {
use libc;
use comm::Empty;
use io::timer;
use super::{Listener, Interrupt};
@ -194,7 +195,7 @@ fn test_io_signal_unregister() {
s2.unregister(Interrupt);
sigint();
timer::sleep(10);
assert!(s2.port.try_recv().is_none());
assert_eq!(s2.port.try_recv(), Empty);
}
#[cfg(windows)]

View File

@ -123,7 +123,7 @@ fn oneshot_twice() {
let port1 = timer.oneshot(10000);
let port = timer.oneshot(1);
port.recv();
assert_eq!(port1.try_recv(), None);
assert!(port1.recv_opt().is_none());
}
#[test]
@ -131,8 +131,7 @@ fn test_io_timer_oneshot_then_sleep() {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(100000000000);
timer.sleep(1); // this should invalidate the port
assert_eq!(port.try_recv(), None);
assert!(port.recv_opt().is_none());
}
#[test]