From 8aeb9303e954502f67f4af7c5c7e79f6d4f706eb Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Fri, 8 Jul 2016 22:12:36 +1000 Subject: [PATCH 1/5] add a non blocking iterator for the mpsc::Receiver Currently, the `mpsc::Receiver` offers methods for receiving values in both blocking (`recv`) and non-blocking (`try_recv`) flavours. However only blocking iteration over values is supported. This commit adds a non-blocking iterator to complement the `try_recv` method, just as the blocking iterator complements the `recv` method. --- src/libstd/sync/mpsc/mod.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 34bc210b3c8..30ce9c3f382 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -311,6 +311,16 @@ pub struct Iter<'a, T: 'a> { rx: &'a Receiver } +/// An iterator that attempts to yield all pending values for a receiver. +/// `None` will be returned when there are no pending values remaining or +/// if the corresponding channel has hung up. +/// +/// This Iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return `None`. +pub struct TryIter<'a, T: 'a> { + rx: &'a Receiver +} + /// An owning iterator over messages on a receiver, this iterator will block /// whenever `next` is called, waiting for a new message, and `None` will be /// returned when the corresponding channel has hung up. @@ -982,6 +992,15 @@ impl Receiver { pub fn iter(&self) -> Iter { Iter { rx: self } } + + /// Returns an iterator that will attempt to yield all pending values. + /// It will return `None` if there are no more pending values or if the + /// channel has hung up. The iterator will never `panic!` or block the + /// user by waiting for values. + pub fn try_iter(&self) -> TryIter { + TryIter { rx: self } + } + } impl select::Packet for Receiver { @@ -1077,6 +1096,12 @@ impl<'a, T> Iterator for Iter<'a, T> { fn next(&mut self) -> Option { self.rx.recv().ok() } } +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { self.rx.try_recv().ok() } +} + #[stable(feature = "receiver_into_iter", since = "1.1.0")] impl<'a, T> IntoIterator for &'a Receiver { type Item = T; From b354887180b665441dd6e3ea51b2651085de88f9 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 9 Jul 2016 00:21:26 +1000 Subject: [PATCH 2/5] Add a test for Receiver::try_iter --- src/libstd/sync/mpsc/mod.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 30ce9c3f382..51a820b2e91 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -1839,6 +1839,34 @@ mod tests { assert_eq!(count_rx.recv().unwrap(), 4); } + #[test] + fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move|| { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + drop(response_rx); + drop(request_tx); + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + response_tx.send(2).unwrap(); + } + + assert_eq!(t.join().unwrap(), 6); + } + #[test] fn test_recv_into_iter_owned() { let mut iter = { From b02b38e1c4f7c8075c92d41e9e08bf2a20982f66 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 9 Jul 2016 00:35:08 +1000 Subject: [PATCH 3/5] Add the unstable attribute to the new mpsc::Receiver::try_iter API --- src/libstd/sync/mpsc/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 51a820b2e91..2d2bded9f60 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -317,6 +317,7 @@ pub struct Iter<'a, T: 'a> { /// /// This Iterator will never block the caller in order to wait for data to /// become available. Instead, it will return `None`. +#[unstable(feature = "receiver_try_iter")] pub struct TryIter<'a, T: 'a> { rx: &'a Receiver } @@ -997,6 +998,7 @@ impl Receiver { /// It will return `None` if there are no more pending values or if the /// channel has hung up. The iterator will never `panic!` or block the /// user by waiting for values. + #[unstable(feature = "receiver_try_iter")] pub fn try_iter(&self) -> TryIter { TryIter { rx: self } } @@ -1096,6 +1098,7 @@ impl<'a, T> Iterator for Iter<'a, T> { fn next(&mut self) -> Option { self.rx.recv().ok() } } +#[unstable(feature = "receiver_try_iter")] impl<'a, T> Iterator for TryIter<'a, T> { type Item = T; From aed2e5c1e5d1774fba671ba60ee6347a1d524f2e Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Wed, 20 Jul 2016 14:49:40 +1000 Subject: [PATCH 4/5] Add the missing tracking issue field for #34931 to the receiver_try_iter stability attributes --- src/libstd/sync/mpsc/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 2d2bded9f60..b862a594ed2 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -317,7 +317,7 @@ pub struct Iter<'a, T: 'a> { /// /// This Iterator will never block the caller in order to wait for data to /// become available. Instead, it will return `None`. -#[unstable(feature = "receiver_try_iter")] +#[unstable(feature = "receiver_try_iter", issue = "34931")] pub struct TryIter<'a, T: 'a> { rx: &'a Receiver } @@ -998,7 +998,7 @@ impl Receiver { /// It will return `None` if there are no more pending values or if the /// channel has hung up. The iterator will never `panic!` or block the /// user by waiting for values. - #[unstable(feature = "receiver_try_iter")] + #[unstable(feature = "receiver_try_iter", issue = "34931")] pub fn try_iter(&self) -> TryIter { TryIter { rx: self } } @@ -1098,7 +1098,7 @@ impl<'a, T> Iterator for Iter<'a, T> { fn next(&mut self) -> Option { self.rx.recv().ok() } } -#[unstable(feature = "receiver_try_iter")] +#[unstable(feature = "receiver_try_iter", issue = "34931")] impl<'a, T> Iterator for TryIter<'a, T> { type Item = T; From 05af033b7fec63638497a9780e6b323d327d1e17 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Thu, 21 Jul 2016 19:32:24 +1000 Subject: [PATCH 5/5] Fix issue in receiver_try_iter test where response sender would panic instead of break from the loop --- src/libstd/sync/mpsc/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index b862a594ed2..6fe1b2a3b47 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -1854,8 +1854,6 @@ mod tests { for x in response_rx.try_iter() { count += x; if count == 6 { - drop(response_rx); - drop(request_tx); return count; } } @@ -1864,7 +1862,9 @@ mod tests { }); for _ in request_rx.iter() { - response_tx.send(2).unwrap(); + if response_tx.send(2).is_err() { + break; + } } assert_eq!(t.join().unwrap(), 6);