diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 267140a0089..94e3d5ce2d3 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -280,6 +280,7 @@ mod select; mod oneshot; mod stream; mod shared; +mod sync; // Use a power of 2 to allow LLVM to optimize to something that's not a // division, this is hit pretty regularly. @@ -301,8 +302,8 @@ pub struct Messages<'a, T> { priv rx: &'a Receiver<T> } -/// The sending-half of Rust's channel type. This half can only be owned by one -/// task +/// The sending-half of Rust's asynchronous channel type. This half can only be +/// owned by one task, but it can be cloned to send to other tasks. pub struct Sender<T> { priv inner: Flavor<T>, priv sends: Cell<uint>, @@ -310,6 +311,14 @@ pub struct Sender<T> { priv marker: marker::NoShare, } +/// The sending-half of Rust's synchronous channel type. This half can only be +/// owned by one task, but it can be cloned to send to other tasks. +pub struct SyncSender<T> { + priv inner: UnsafeArc<sync::Packet<T>>, + // can't share in an arc + priv marker: marker::NoShare, +} + /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. #[deriving(Eq, Clone, Show)] @@ -324,10 +333,31 @@ pub enum TryRecvResult<T> { Data(T), } +/// This enumeration is the list of the possible outcomes for the +/// `SyncSender::try_send` method. +#[deriving(Eq, Clone, Show)] +pub enum TrySendResult<T> { + /// The data was successfully sent along the channel. This either means that + /// it was buffered in the channel, or handed off to a receiver. In either + /// case, the callee no longer has ownership of the data. + Sent, + /// The data could not be sent on the channel because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no receiver available to + /// acquire the data. + Full(T), + /// This channel's receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + RecvDisconnected(T), +} + enum Flavor<T> { Oneshot(UnsafeArc<oneshot::Packet<T>>), Stream(UnsafeArc<stream::Packet<T>>), Shared(UnsafeArc<shared::Packet<T>>), + Sync(UnsafeArc<sync::Packet<T>>), } /// Creates a new channel, returning the sender/receiver halves. All data sent @@ -338,6 +368,46 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) } +/// Creates a new synchronous, bounded channel. +/// +/// Like asynchronous channels, the `Receiver` will block until a message +/// becomes available. These channels differ greatly in the semantics of the +/// sender from asynchronous channels, however. +/// +/// This channel has an internal buffer on which messages will be queued. When +/// the internal buffer becomes full, future sends will *block* waiting for the +/// buffer to open up. Note that a buffer size of 0 is valid, in which case this +/// becomes "rendezvous channel" where each send will not return until a recv +/// is paired with it. +/// +/// As with asynchronous channels, all senders will fail in `send` if the +/// `Receiver` has been destroyed. +/// +/// # Example +/// +/// ``` +/// let (tx, rx) = sync_channel(1); +/// +/// // this returns immediately +/// tx.send(1); +/// +/// spawn(proc() { +/// // this will block until the previous message has been received +/// tx.send(2); +/// }); +/// +/// assert_eq!(rx.recv(), 1); +/// assert_eq!(rx.recv(), 2); +/// ``` +pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { + let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); + (SyncSender::new(a), Receiver::my_new(Sync(b))) +} + +//////////////////////////////////////////////////////////////////////////////// +// Sender +//////////////////////////////////////////////////////////////////////////////// + impl<T: Send> Sender<T> { fn my_new(inner: Flavor<T>) -> Sender<T> { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } @@ -422,6 +492,7 @@ impl<T: Send> Sender<T> { } Stream(ref p) => return unsafe { (*p.get()).send(t) }, Shared(ref p) => return unsafe { (*p.get()).send(t) }, + Sync(..) => unreachable!(), }; unsafe { @@ -453,6 +524,7 @@ impl<T: Send> Clone for Sender<T> { unsafe { (*p.get()).clone_chan(); } return Sender::my_new(Shared(p.clone())); } + Sync(..) => unreachable!(), }; unsafe { @@ -472,10 +544,100 @@ impl<T: Send> Drop for Sender<T> { Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Sync(..) => unreachable!(), } } } +//////////////////////////////////////////////////////////////////////////////// +// SyncSender +//////////////////////////////////////////////////////////////////////////////// + +impl<T: Send> SyncSender<T> { + fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> { + SyncSender { inner: inner, marker: marker::NoShare } + } + + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Messages may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, it can be guaranteed that the + /// receiver has indeed received the data if this function returns success. + /// + /// # Failure + /// + /// Similarly to `Sender::send`, this function will fail if the + /// corresponding `Receiver` for this channel has disconnected. This + /// behavior is used to propagate failure among tasks. + /// + /// If failure is not desired, you can achieve the same semantics with the + /// `SyncSender::send_opt` method which will not fail if the receiver + /// disconnects. + pub fn send(&self, t: T) { + if self.send_opt(t).is_some() { + fail!("sending on a closed channel"); + } + } + + /// Send a value on a channel, returning it back if the receiver + /// disconnected + /// + /// This method will *block* to send the value `t` on the channel, but if + /// the value could not be sent due to the receiver disconnecting, the value + /// is returned back to the callee. This function is similar to `try_send`, + /// except that it will block if the channel is currently full. + /// + /// # Failure + /// + /// This function cannot fail. + pub fn send_opt(&self, t: T) -> Option<T> { + match unsafe { (*self.inner.get()).send(t) } { + Ok(()) => None, + Err(t) => Some(t), + } + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method semantically differs from `Sender::try_send` because it can + /// fail if the receiver has not disconnected yet. If the buffer on this + /// channel is full, this function will immediately return the data back to + /// the callee. + /// + /// See `SyncSender::send` for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + /// + /// # Failure + /// + /// This function cannot fail + pub fn try_send(&self, t: T) -> TrySendResult<T> { + unsafe { (*self.inner.get()).try_send(t) } + } +} + +impl<T: Send> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + unsafe { (*self.inner.get()).clone_chan(); } + return SyncSender::new(self.inner.clone()); + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for SyncSender<T> { + fn drop(&mut self) { + unsafe { (*self.inner.get()).drop_chan(); } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Receiver +//////////////////////////////////////////////////////////////////////////////// + impl<T: Send> Receiver<T> { fn my_new(inner: Flavor<T>) -> Receiver<T> { Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare } @@ -554,6 +716,13 @@ impl<T: Send> Receiver<T> { Err(shared::Disconnected) => return Disconnected, } } + Sync(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(sync::Empty) => return Empty, + Err(sync::Disconnected) => return Disconnected, + } + } }; unsafe { mem::swap(&mut cast::transmute_mut(self).inner, @@ -600,6 +769,7 @@ impl<T: Send> Receiver<T> { Err(shared::Disconnected) => return None, } } + Sync(ref p) => return unsafe { (*p.get()).recv() } }; unsafe { mem::swap(&mut cast::transmute_mut(self).inner, @@ -634,6 +804,9 @@ impl<T: Send> select::Packet for Receiver<T> { Shared(ref p) => { return unsafe { (*p.get()).can_recv() }; } + Sync(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } }; unsafe { mem::swap(&mut cast::transmute_mut(self).inner, @@ -662,6 +835,9 @@ impl<T: Send> select::Packet for Receiver<T> { Shared(ref p) => { return unsafe { (*p.get()).start_selection(task) }; } + Sync(ref p) => { + return unsafe { (*p.get()).start_selection(task) }; + } }; task = t; unsafe { @@ -682,6 +858,9 @@ impl<T: Send> select::Packet for Receiver<T> { Shared(ref p) => return unsafe { (*p.get()).abort_selection(was_upgrade) }, + Sync(ref p) => return unsafe { + (*p.get()).abort_selection() + }, }; let mut new_port = match result { Ok(b) => return b, Err(p) => p }; was_upgrade = true; @@ -704,6 +883,7 @@ impl<T: Send> Drop for Receiver<T> { Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, } } } @@ -1243,3 +1423,517 @@ mod test { pdone.recv(); }) } + +#[cfg(test)] +mod sync_tests { + use prelude::*; + use os; + + pub fn stress_factor() -> uint { + match os::getenv("RUST_TEST_STRESS") { + Some(val) => from_str::<uint>(val).unwrap(), + None => 1, + } + } + + test!(fn smoke() { + let (tx, rx) = sync_channel(1); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn drop_full() { + let (tx, _rx) = sync_channel(1); + tx.send(~1); + }) + + test!(fn smoke_shared() { + let (tx, rx) = sync_channel(1); + tx.send(1); + assert_eq!(rx.recv(), 1); + let tx = tx.clone(); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_threads() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + tx.send(1); + }); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_port_gone() { + let (tx, rx) = sync_channel(0); + drop(rx); + tx.send(1); + } #[should_fail]) + + test!(fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + tx2.send(1); + } #[should_fail]) + + test!(fn port_gone_concurrent() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + rx.recv(); + }); + loop { tx.send(1) } + } #[should_fail]) + + test!(fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel(0); + let tx2 = tx.clone(); + spawn(proc() { + rx.recv(); + }); + loop { + tx.send(1); + tx2.send(1); + } + } #[should_fail]) + + test!(fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + rx.recv(); + } #[should_fail]) + + test!(fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + rx.recv(); + } #[should_fail]) + + test!(fn chan_gone_concurrent() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + tx.send(1); + tx.send(1); + }); + loop { rx.recv(); } + } #[should_fail]) + + test!(fn stress() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + for _ in range(0, 10000) { tx.send(1); } + }); + for _ in range(0, 10000) { + assert_eq!(rx.recv(), 1); + } + }) + + test!(fn stress_shared() { + static AMT: uint = 1000; + static NTHREADS: uint = 8; + let (tx, rx) = sync_channel::<int>(0); + let (dtx, drx) = sync_channel::<()>(0); + + spawn(proc() { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(rx.recv(), 1); + } + match rx.try_recv() { + Data(..) => fail!(), + _ => {} + } + dtx.send(()); + }); + + for _ in range(0, NTHREADS) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, AMT) { tx.send(1); } + }); + } + drop(tx); + drx.recv(); + }) + + test!(fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<int>(0); + drop(rx); + }) + + test!(fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<int>(0); + drop(tx); + }) + + test!(fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<~int>(0); + drop(rx); + tx.send(~0); + } #[should_fail]) + + test!(fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will fail + let res = task::try(proc() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + rx.recv(); + }); + // What is our res? + assert!(res.is_err()); + }) + + test!(fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<~int>(1); + tx.send(~10); + assert!(rx.recv() == ~10); + }) + + test!(fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(tx.try_send(10), Sent); + assert!(rx.recv() == 10); + }) + + test!(fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<int>(0); + drop(rx); + assert_eq!(tx.try_send(10), RecvDisconnected(10)); + }) + + test!(fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<int>(0); + assert_eq!(tx.try_send(10), Full(10)); + }) + + test!(fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<int>(1); + tx.send(10); + assert!(rx.recv_opt() == Some(10)); + }) + + test!(fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + assert!(rx.recv_opt() == None); + }) + + test!(fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(rx.try_recv(), Empty) + tx.send(10); + assert_eq!(rx.try_recv(), Data(10)); + }) + + test!(fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Disconnected); + }) + + test!(fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<int>(0); + assert_eq!(rx.try_recv(), Empty); + }) + + test!(fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<~int>(0); + spawn(proc() { + assert!(rx.recv() == ~10); + }); + + tx.send(~10); + }) + + test!(fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<~int>(0); + spawn(proc() { + drop(tx); + }); + let res = task::try(proc() { + assert!(rx.recv() == ~10); + }); + assert!(res.is_err()); + }) + + test!(fn oneshot_multi_thread_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + drop(rx); + }); + drop(tx); + } + }) + + test!(fn oneshot_multi_thread_send_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + drop(rx); + }); + let _ = task::try(proc() { + tx.send(1); + }); + } + }) + + test!(fn oneshot_multi_thread_recv_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + let res = task::try(proc() { + rx.recv(); + }); + assert!(res.is_err()); + }); + spawn(proc() { + spawn(proc() { + drop(tx); + }); + }); + } + }) + + test!(fn oneshot_multi_thread_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel(0); + spawn(proc() { + tx.send(~10); + }); + spawn(proc() { + assert!(rx.recv() == ~10); + }); + } + }) + + test!(fn stream_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender<~int>, i: int) { + if i == 10 { return } + + spawn(proc() { + tx.send(~i); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<~int>, i: int) { + if i == 10 { return } + + spawn(proc() { + assert!(rx.recv() == ~i); + recv(rx, i + 1); + }); + } + } + }) + + test!(fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in range(0, 10000) { tx.send(()); } + for _ in range(0, 10000) { rx.recv(); } + }) + + test!(fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in range(0, total) { + let tx = tx.clone(); + spawn(proc() { + tx.send(()); + }); + } + + for _ in range(0, total) { + rx.recv(); + } + }) + + test!(fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<int>(0); + let (total_tx, total_rx) = sync_channel::<int>(0); + + spawn(proc() { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc); + }); + + tx.send(3); + tx.send(1); + tx.send(2); + drop(tx); + assert_eq!(total_rx.recv(), 6); + }) + + test!(fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<int>(0); + let (count_tx, count_rx) = sync_channel(0); + + spawn(proc() { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count); + }); + + tx.send(2); + tx.send(2); + tx.send(2); + tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv(), 4); + }) + + test!(fn try_recv_states() { + let (tx1, rx1) = sync_channel::<int>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + spawn(proc() { + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); + drop(tx1); + tx3.send(()); + }); + + assert_eq!(rx1.try_recv(), Empty); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Data(1)); + assert_eq!(rx1.try_recv(), Empty); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Disconnected); + }) + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + test!(fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel(0); + let (tx2, rx2) = sync_channel(0); + spawn(proc() { + rx.recv(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()); + }); + // make sure the other task has gone to sleep + for _ in range(0, 5000) { task::deschedule(); } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()); + + // wait for the child task to exit before we exit + rx2.recv(); + }) + + test!(fn try_recvs_off_the_runtime() { + use std::rt::thread::Thread; + + let (tx, rx) = sync_channel(0); + let (cdone, pdone) = channel(); + let t = Thread::start(proc() { + let mut hits = 0; + while hits < 10 { + match rx.try_recv() { + Data(()) => { hits += 1; } + Empty => { Thread::yield_now(); } + Disconnected => return, + } + } + cdone.send(()); + }); + for _ in range(0, 10) { + tx.send(()); + } + t.join(); + pdone.recv(); + }) + + test!(fn send_opt1() { + let (tx, rx) = sync_channel(0); + spawn(proc() { rx.recv(); }); + assert_eq!(tx.send_opt(1), None); + }) + + test!(fn send_opt2() { + let (tx, rx) = sync_channel(0); + spawn(proc() { drop(rx); }); + assert_eq!(tx.send_opt(1), Some(1)); + }) + + test!(fn send_opt3() { + let (tx, rx) = sync_channel(1); + assert_eq!(tx.send_opt(1), None); + spawn(proc() { drop(rx); }); + assert_eq!(tx.send_opt(1), Some(1)); + }) + + test!(fn send_opt4() { + let (tx, rx) = sync_channel(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + spawn(proc() { + assert_eq!(tx.send_opt(1), Some(1)); + done.send(()); + }); + spawn(proc() { + assert_eq!(tx2.send_opt(2), Some(2)); + done2.send(()); + }); + drop(rx); + donerx.recv(); + donerx.recv(); + }) + + test!(fn try_send1() { + let (tx, _rx) = sync_channel(0); + assert_eq!(tx.try_send(1), Full(1)); + }) + + test!(fn try_send2() { + let (tx, _rx) = sync_channel(1); + assert_eq!(tx.try_send(1), Sent); + assert_eq!(tx.try_send(1), Full(1)); + }) + + test!(fn try_send3() { + let (tx, rx) = sync_channel(1); + assert_eq!(tx.try_send(1), Sent); + drop(rx); + assert_eq!(tx.try_send(1), RecvDisconnected(1)); + }) + + test!(fn try_send4() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + for _ in range(0, 1000) { task::deschedule(); } + assert_eq!(tx.try_send(1), Sent); + }); + assert_eq!(rx.recv(), 1); + }) +} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 5872c308f93..1b2e79e02b4 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -648,4 +648,40 @@ mod test { tx1.send(()); rx2.recv(); }) + + test!(fn sync1() { + let (tx, rx) = sync_channel(1); + tx.send(1); + select! { + n = rx.recv() => { assert_eq!(n, 1); } + } + }) + + test!(fn sync2() { + let (tx, rx) = sync_channel(0); + spawn(proc() { + for _ in range(0, 100) { task::deschedule() } + tx.send(1); + }); + select! { + n = rx.recv() => { assert_eq!(n, 1); } + } + }) + + test!(fn sync3() { + let (tx1, rx1) = sync_channel(0); + let (tx2, rx2) = channel(); + spawn(proc() { tx1.send(1); }); + spawn(proc() { tx2.send(2); }); + select! { + n = rx1.recv() => { + assert_eq!(n, 1); + assert_eq!(rx2.recv(), 2); + }, + n = rx2.recv() => { + assert_eq!(n, 2); + assert_eq!(rx1.recv(), 1); + } + } + }) } diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs new file mode 100644 index 00000000000..b3591dad274 --- /dev/null +++ b/src/libstd/comm/sync.rs @@ -0,0 +1,485 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Synchronous channels/ports +/// +/// This channel implementation differs significantly from the asynchronous +/// implementations found next to it (oneshot/stream/share). This is an +/// implementation of a synchronous, bounded buffer channel. +/// +/// Each channel is created with some amount of backing buffer, and sends will +/// *block* until buffer space becomes available. A buffer size of 0 is valid, +/// which means that every successful send is paired with a successful recv. +/// +/// This flavor of channels defines a new `send_opt` method for channels which +/// is the method by which a message is sent but the task does not fail if it +/// cannot be delivered. +/// +/// Another major difference is that send() will *always* return back the data +/// if it couldn't be sent. This is because it is deterministically known when +/// the data is received and when it is not received. +/// +/// Implementation-wise, it can all be summed up with "use a mutex plus some +/// logic". The mutex used here is an OS native mutex, meaning that no user code +/// is run inside of the mutex (to prevent context switching). This +/// implementation shares almost all code for the buffered and unbuffered cases +/// of a synchronous channel. There are a few branches for the unbuffered case, +/// but they're mostly just relevant to blocking senders. + +use cast; +use container::Container; +use iter::Iterator; +use kinds::Send; +use mem; +use ops::Drop; +use option::{Some, None, Option}; +use ptr::RawPtr; +use result::{Result, Ok, Err}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use sync::atomics; +use ty::Unsafe; +use unstable::mutex::{NativeMutex, LockGuard}; +use vec::Vec; + +pub struct Packet<T> { + /// Only field outside of the mutex. Just done for kicks, but mainly because + /// the other shared channel already had the code implemented + channels: atomics::AtomicUint, + + /// The state field is protected by this mutex + lock: NativeMutex, + state: Unsafe<State<T>>, +} + +struct State<T> { + disconnected: bool, // Is the channel disconnected yet? + queue: Queue, // queue of senders waiting to send data + blocker: Blocker, // currently blocked task on this channel + buf: Buffer<T>, // storage for buffered messages + cap: uint, // capacity of this channel + + /// A curious flag used to indicate whether a sender failed or succeeded in + /// blocking. This is used to transmit information back to the task that it + /// must dequeue its message from the buffer because it was not received. + /// This is only relevant in the 0-buffer case. This obviously cannot be + /// safely constructed, but it's guaranteed to always have a valid pointer + /// value. + canceled: Option<&'static mut bool>, +} + +/// Possible flavors of tasks who can be blocked on this channel. +enum Blocker { + BlockedSender(BlockedTask), + BlockedReceiver(BlockedTask), + NoneBlocked +} + +/// Simple queue for threading tasks together. Nodes are stack-allocated, so +/// this structure is not safe at all +struct Queue { + head: *mut Node, + tail: *mut Node, +} + +struct Node { + task: Option<BlockedTask>, + next: *mut Node, +} + +/// A simple ring-buffer +struct Buffer<T> { + buf: Vec<Option<T>>, + start: uint, + size: uint, +} + +#[deriving(Show)] +pub enum Failure { + Empty, + Disconnected, +} + +/// Atomically blocks the current task, placing it into `slot`, unlocking `lock` +/// in the meantime. This re-locks the mutex upon returning. +fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker, + lock: &NativeMutex) { + let me: ~Task = Local::take(); + me.deschedule(1, |task| { + match mem::replace(slot, f(task)) { + NoneBlocked => {} + _ => unreachable!(), + } + unsafe { lock.unlock_noguard(); } + Ok(()) + }); + unsafe { lock.lock_noguard(); } +} + +/// Wakes up a task, dropping the lock at the correct time +fn wakeup(task: BlockedTask, guard: LockGuard) { + // We need to be careful to wake up the waiting task *outside* of the mutex + // in case it incurs a context switch. + mem::drop(guard); + task.wake().map(|t| t.reawaken()); +} + +impl<T: Send> Packet<T> { + pub fn new(cap: uint) -> Packet<T> { + Packet { + channels: atomics::AtomicUint::new(1), + lock: unsafe { NativeMutex::new() }, + state: Unsafe::new(State { + disconnected: false, + blocker: NoneBlocked, + cap: cap, + canceled: None, + queue: Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }, + buf: Buffer { + buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None), + start: 0, + size: 0, + }, + }), + } + } + + // Locks this channel, returning a guard for the state and the mutable state + // itself. Care should be taken to ensure that the state does not escape the + // guard! + // + // Note that we're ok promoting an & reference to an &mut reference because + // the lock ensures that we're the only ones in the world with a pointer to + // the state. + fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) { + unsafe { + let guard = self.lock.lock(); + (guard, &mut *self.state.get()) + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + let (guard, state) = self.lock(); + + // wait for a slot to become available, and enqueue the data + while !state.disconnected && state.buf.size() == state.buf.cap() { + state.queue.enqueue(&self.lock); + } + if state.disconnected { return Err(t) } + state.buf.enqueue(t); + + match mem::replace(&mut state.blocker, NoneBlocked) { + // if our capacity is 0, then we need to wait for a receiver to be + // available to take our data. After waiting, we check again to make + // sure the port didn't go away in the meantime. If it did, we need + // to hand back our data. + NoneBlocked if state.cap == 0 => { + let mut canceled = false; + assert!(state.canceled.is_none()); + state.canceled = Some(unsafe { cast::transmute(&mut canceled) }); + wait(&mut state.blocker, BlockedSender, &self.lock); + if canceled {Err(state.buf.dequeue())} else {Ok(())} + } + + // success, we buffered some data + NoneBlocked => Ok(()), + + // success, someone's about to receive our buffered data. + BlockedReceiver(task) => { wakeup(task, guard); Ok(()) } + + BlockedSender(..) => fail!("lolwut"), + } + } + + pub fn try_send(&self, t: T) -> super::TrySendResult<T> { + let (guard, state) = self.lock(); + if state.disconnected { + super::RecvDisconnected(t) + } else if state.buf.size() == state.buf.cap() { + super::Full(t) + } else if state.cap == 0 { + // With capacity 0, even though we have buffer space we can't + // transfer the data unless there's a receiver waiting. + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => super::Full(t), + BlockedSender(..) => unreachable!(), + BlockedReceiver(task) => { + state.buf.enqueue(t); + wakeup(task, guard); + super::Sent + } + } + } else { + // If the buffer has some space and the capacity isn't 0, then we + // just enqueue the data for later retrieval. + assert!(state.buf.size() < state.buf.cap()); + state.buf.enqueue(t); + super::Sent + } + } + + // Receives a message from this channel + // + // When reading this, remember that there can only ever be one receiver at + // time. + pub fn recv(&self) -> Option<T> { + let (guard, state) = self.lock(); + + // Wait for the buffer to have something in it. No need for a while loop + // because we're the only receiver. + let mut waited = false; + if !state.disconnected && state.buf.size() == 0 { + wait(&mut state.blocker, BlockedReceiver, &self.lock); + waited = true; + } + if state.disconnected && state.buf.size() == 0 { return None } + + // Pick up the data, wake up our neighbors, and carry on + assert!(state.buf.size() > 0); + let ret = state.buf.dequeue(); + self.wakeup_senders(waited, guard, state); + return Some(ret); + } + + pub fn try_recv(&self) -> Result<T, Failure> { + let (guard, state) = self.lock(); + + // Easy cases first + if state.disconnected { return Err(Disconnected) } + if state.buf.size() == 0 { return Err(Empty) } + + // Be sure to wake up neighbors + let ret = Ok(state.buf.dequeue()); + self.wakeup_senders(false, guard, state); + + return ret; + } + + // Wake up pending senders after some data has been received + // + // * `waited` - flag if the receiver blocked to receive some data, or if it + // just picked up some data on the way out + // * `guard` - the lock guard that is held over this channel's lock + fn wakeup_senders(&self, waited: bool, + guard: LockGuard, + state: &mut State<T>) { + let pending_sender1: Option<BlockedTask> = state.queue.dequeue(); + + // If this is a no-buffer channel (cap == 0), then if we didn't wait we + // need to ACK the sender. If we waited, then the sender waking us up + // was already the ACK. + let pending_sender2 = if state.cap == 0 && !waited { + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedReceiver(..) => unreachable!(), + BlockedSender(task) => { + state.canceled.take(); + Some(task) + } + } + } else { + None + }; + mem::drop((state, guard)); + + // only outside of the lock do we wake up the pending tasks + pending_sender1.map(|t| t.wake().map(|t| t.reawaken())); + pending_sender2.map(|t| t.wake().map(|t| t.reawaken())); + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + self.channels.fetch_add(1, atomics::SeqCst); + } + + pub fn drop_chan(&self) { + // Only flag the channel as disconnected if we're the last channel + match self.channels.fetch_sub(1, atomics::SeqCst) { + 1 => {} + _ => return + } + + // Not much to do other than wake up a receiver if one's there + let (guard, state) = self.lock(); + if state.disconnected { return } + state.disconnected = true; + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(task) => wakeup(task, guard), + } + } + + pub fn drop_port(&self) { + let (guard, state) = self.lock(); + + if state.disconnected { return } + state.disconnected = true; + + // If the capacity is 0, then the sender may want its data back after + // we're disconnected. Otherwise it's now our responsibility to destroy + // the buffered data. As with many other portions of this code, this + // needs to be careful to destroy the data *outside* of the lock to + // prevent deadlock. + let _data = if state.cap != 0 { + mem::replace(&mut state.buf.buf, Vec::new()) + } else { + Vec::new() + }; + let mut queue = mem::replace(&mut state.queue, Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }); + + let waiter = match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedSender(task) => { + *state.canceled.take_unwrap() = true; + Some(task) + } + BlockedReceiver(..) => unreachable!(), + }; + mem::drop((state, guard)); + + loop { + match queue.dequeue() { + Some(task) => { task.wake().map(|t| t.reawaken()); } + None => break, + } + } + waiter.map(|t| t.wake().map(|t| t.reawaken())); + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&self) -> bool { + let (_g, state) = self.lock(); + state.disconnected || state.buf.size() > 0 + } + + // Attempts to start selection on this port. This can either succeed or fail + // because there is data waiting. + pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{ + let (_g, state) = self.lock(); + if state.disconnected || state.buf.size() > 0 { + Err(task) + } else { + match mem::replace(&mut state.blocker, BlockedReceiver(task)) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(..) => unreachable!(), + } + Ok(()) + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> bool { + let (_g, state) = self.lock(); + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => true, + BlockedSender(task) => { + state.blocker = BlockedSender(task); + true + } + BlockedReceiver(task) => { task.trash(); false } + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Packet<T> { + fn drop(&mut self) { + assert_eq!(self.channels.load(atomics::SeqCst), 0); + let (_g, state) = self.lock(); + assert!(state.queue.dequeue().is_none()); + assert!(state.canceled.is_none()); + } +} + + +//////////////////////////////////////////////////////////////////////////////// +// Buffer, a simple ring buffer backed by Vec<T> +//////////////////////////////////////////////////////////////////////////////// + +impl<T> Buffer<T> { + fn enqueue(&mut self, t: T) { + let pos = (self.start + self.size) % self.buf.len(); + self.size += 1; + let prev = mem::replace(self.buf.get_mut(pos), Some(t)); + assert!(prev.is_none()); + } + + fn dequeue(&mut self) -> T { + let start = self.start; + self.size -= 1; + self.start = (self.start + 1) % self.buf.len(); + self.buf.get_mut(start).take_unwrap() + } + + fn size(&self) -> uint { self.size } + fn cap(&self) -> uint { self.buf.len() } +} + +//////////////////////////////////////////////////////////////////////////////// +// Queue, a simple queue to enqueue tasks with (stack-allocated nodes) +//////////////////////////////////////////////////////////////////////////////// + +impl Queue { + fn enqueue(&mut self, lock: &NativeMutex) { + let task: ~Task = Local::take(); + let mut node = Node { + task: None, + next: 0 as *mut Node, + }; + task.deschedule(1, |task| { + node.task = Some(task); + if self.tail.is_null() { + self.head = &mut node as *mut Node; + self.tail = &mut node as *mut Node; + } else { + unsafe { + (*self.tail).next = &mut node as *mut Node; + self.tail = &mut node as *mut Node; + } + } + unsafe { lock.unlock_noguard(); } + Ok(()) + }); + unsafe { lock.lock_noguard(); } + assert!(node.next.is_null()); + } + + fn dequeue(&mut self) -> Option<BlockedTask> { + if self.head.is_null() { + return None + } + let node = self.head; + self.head = unsafe { (*node).next }; + if self.head.is_null() { + self.tail = 0 as *mut Node; + } + unsafe { + (*node).next = 0 as *mut Node; + Some((*node).task.take_unwrap()) + } + } +} diff --git a/src/libstd/io/fs.rs b/src/libstd/io/fs.rs index 8681ab21f10..e66aa8c0046 100644 --- a/src/libstd/io/fs.rs +++ b/src/libstd/io/fs.rs @@ -1282,10 +1282,10 @@ mod test { } iotest!(fn binary_file() { - use rand::{Rng, task_rng}; + use rand::{StdRng, Rng}; let mut bytes = [0, ..1024]; - task_rng().fill_bytes(bytes); + StdRng::new().fill_bytes(bytes); let tmpdir = tmpdir(); diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index d487aa638ac..a42ee80b53a 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -62,7 +62,7 @@ pub use slice::{Vector, VectorVector, CloneableVector, ImmutableVector}; pub use vec::Vec; // Reexported runtime types -pub use comm::{channel, Sender, Receiver}; +pub use comm::{sync_channel, channel, SyncSender, Sender, Receiver}; pub use task::spawn; // Reexported statics diff --git a/src/libstd/rt/args.rs b/src/libstd/rt/args.rs index fc9e571b270..328de696914 100644 --- a/src/libstd/rt/args.rs +++ b/src/libstd/rt/args.rs @@ -69,6 +69,7 @@ mod imp { use iter::Iterator; use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; use mem; + #[cfg(not(test))] use ptr::RawPtr; static mut global_args_ptr: uint = 0; static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index cd047c815e9..ededc69c5a1 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -433,8 +433,8 @@ mod test { #[test] fn rng() { - use rand::{Rng, task_rng}; - let mut r = task_rng(); + use rand::{StdRng, Rng}; + let mut r = StdRng::new(); let _ = r.next_u32(); } diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 9802271e28f..6f5ef067e89 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -580,9 +580,9 @@ mod test { fn smoke_cond() { static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT; unsafe { - let mut guard = lock.lock(); + let guard = lock.lock(); let t = Thread::start(proc() { - let mut guard = lock.lock(); + let guard = lock.lock(); guard.signal(); }); guard.wait(); diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs index 4b2953204d0..5809fca9682 100644 --- a/src/libstd/vec.rs +++ b/src/libstd/vec.rs @@ -1355,13 +1355,8 @@ impl<T> Drop for MoveItems<T> { #[cfg(test)] mod tests { - use super::Vec; - use iter::{Iterator, range, Extendable}; - use mem::{drop, size_of}; - use ops::Drop; - use option::{Some, None}; - use container::Container; - use slice::{Vector, MutableVector, ImmutableVector}; + use prelude::*; + use mem::size_of; #[test] fn test_small_vec_struct() { diff --git a/src/libsync/comm.rs b/src/libsync/comm.rs index aecea37cce8..628f6459bad 100644 --- a/src/libsync/comm.rs +++ b/src/libsync/comm.rs @@ -51,54 +51,9 @@ impl<S:Send,R:Send> DuplexStream<S, R> { } } -/// An extension of `pipes::stream` that provides synchronous message sending. -pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> } -/// An extension of `pipes::stream` that acknowledges each message received. -pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> } - -impl<S: Send> SyncSender<S> { - pub fn send(&self, val: S) { - assert!(self.try_send(val), "SyncSender.send: receiving port closed"); - } - - /// Sends a message, or report if the receiver has closed the connection - /// before receiving. - pub fn try_send(&self, val: S) -> bool { - self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some() - } -} - -impl<R: Send> SyncReceiver<R> { - pub fn recv(&self) -> R { - self.recv_opt().expect("SyncReceiver.recv: sending channel closed") - } - - pub fn recv_opt(&self) -> Option<R> { - self.duplex_stream.recv_opt().map(|val| { - self.duplex_stream.try_send(()); - val - }) - } - - pub fn try_recv(&self) -> comm::TryRecvResult<R> { - match self.duplex_stream.try_recv() { - comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) } - state => state, - } - } -} - -/// Creates a stream whose channel, upon sending a message, blocks until the -/// message is received. -pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) { - let (chan_stream, port_stream) = duplex(); - (SyncReceiver { duplex_stream: port_stream }, - SyncSender { duplex_stream: chan_stream }) -} - #[cfg(test)] mod test { - use comm::{duplex, rendezvous}; + use comm::{duplex}; #[test] @@ -111,56 +66,4 @@ mod test { assert!(left.recv() == 123); assert!(right.recv() == ~"abc"); } - - #[test] - pub fn basic_rendezvous_test() { - let (port, chan) = rendezvous(); - - spawn(proc() { - chan.send("abc"); - }); - - assert!(port.recv() == "abc"); - } - - #[test] - fn recv_a_lot() { - // Rendezvous streams should be able to handle any number of messages being sent - let (port, chan) = rendezvous(); - spawn(proc() { - for _ in range(0, 10000) { chan.send(()); } - }); - for _ in range(0, 10000) { port.recv(); } - } - - #[test] - fn send_and_fail_and_try_recv() { - let (port, chan) = rendezvous(); - spawn(proc() { - chan.duplex_stream.send(()); // Can't access this field outside this module - fail!() - }); - port.recv() - } - - #[test] - fn try_send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); - spawn(proc() { - port.duplex_stream.recv(); - fail!() - }); - chan.try_send(()); - } - - #[test] - #[should_fail] - fn send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); - spawn(proc() { - port.duplex_stream.recv(); - fail!() - }); - chan.send(()); - } } diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs index d166076e96e..4df644e3b23 100644 --- a/src/libsync/lib.rs +++ b/src/libsync/lib.rs @@ -25,7 +25,7 @@ #[cfg(test)] #[phase(syntax, link)] extern crate log; -pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex}; +pub use comm::{DuplexStream, duplex}; pub use task_pool::TaskPool; pub use future::Future; pub use arc::{Arc, Weak};