diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs index 776e25cac89..dc6f4964b31 100644 --- a/src/libextra/comm.rs +++ b/src/libextra/comm.rs @@ -1,4 +1,4 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // @@ -90,9 +90,55 @@ pub fn DuplexStream() }) } +/// An extension of `pipes::stream` that provides synchronous message sending. +pub struct SyncChan { priv duplex_stream: DuplexStream } +/// An extension of `pipes::stream` that acknowledges each message received. +pub struct SyncPort { priv duplex_stream: DuplexStream<(), T> } + +impl GenericChan for SyncChan { + fn send(&self, val: T) { + assert!(self.try_send(val), "SyncChan.send: receiving port closed"); + } +} + +impl GenericSmartChan for SyncChan { + /// Sends a message, or report if the receiver has closed the connection before receiving. + fn try_send(&self, val: T) -> bool { + self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some() + } +} + +impl GenericPort for SyncPort { + fn recv(&self) -> T { + self.try_recv().expect("SyncPort.recv: sending channel closed") + } + + fn try_recv(&self) -> Option { + do self.duplex_stream.try_recv().map_move |val| { + self.duplex_stream.try_send(()); + val + } + } +} + +impl Peekable for SyncPort { + fn peek(&self) -> bool { + self.duplex_stream.peek() + } +} + +/// Creates a stream whose channel, upon sending a message, blocks until the message is received. +pub fn rendezvous() -> (SyncPort, SyncChan) { + let (chan_stream, port_stream) = DuplexStream(); + (SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream }) +} + #[cfg(test)] mod test { - use comm::DuplexStream; + use comm::{DuplexStream, rendezvous}; + use std::rt::test::run_in_newsched_task; + use std::task::spawn_unlinked; + #[test] pub fn DuplexStream1() { @@ -104,4 +150,58 @@ mod test { assert!(left.recv() == 123); assert!(right.recv() == ~"abc"); } + + #[test] + pub fn basic_rendezvous_test() { + let (port, chan) = rendezvous(); + + do spawn { + chan.send("abc"); + } + + assert!(port.recv() == "abc"); + } + + #[test] + fn recv_a_lot() { + // Rendezvous streams should be able to handle any number of messages being sent + do run_in_newsched_task { + let (port, chan) = rendezvous(); + do spawn { + do 1000000.times { chan.send(()) } + } + do 1000000.times { port.recv() } + } + } + + #[test] + fn send_and_fail_and_try_recv() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + chan.duplex_stream.send(()); // Can't access this field outside this module + fail!() + } + port.recv() + } + + #[test] + fn try_send_and_recv_then_fail_before_ack() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + port.duplex_stream.recv(); + fail!() + } + chan.try_send(()); + } + + #[test] + #[should_fail] + fn send_and_recv_then_fail_before_ack() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + port.duplex_stream.recv(); + fail!() + } + chan.send(()); + } }