rust/src/libstd/rt/comm.rs

898 lines
27 KiB
Rust

// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Ports and channels.
use option::*;
use cast;
use util;
use ops::Drop;
use rt::kill::BlockedTask;
use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
/// A combined refcount / BlockedTask-as-uint pointer.
///
/// Can be equal to the following values:
///
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
type State = uint;
static STATE_BOTH: State = 2;
static STATE_ONE: State = 1;
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
state: AtomicUint,
payload: Option<T>,
}
/// A one-shot channel.
pub struct ChanOne<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
/// A one-shot port.
pub struct PortOne<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
let packet: ~Packet<T> = ~Packet {
state: AtomicUint::new(STATE_BOTH),
payload: None
};
unsafe {
let packet: *mut Void = cast::transmute(packet);
let port = PortOne {
void_packet: packet,
suppress_finalize: false
};
let chan = ChanOne {
void_packet: packet,
suppress_finalize: false
};
return (port, chan);
}
}
impl<T> ChanOne<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
pub fn send(self, val: T) {
self.try_send(val);
}
pub fn try_send(self, val: T) -> bool {
let mut this = self;
let mut recvr_active = true;
let packet = this.packet();
unsafe {
// Install the payload
assert!((*packet).payload.is_none());
(*packet).payload = Some(val);
// Atomically swap out the old state to figure out what
// the port's up to, issuing a release barrier to prevent
// reordering of the payload write. This also issues an
// acquire barrier that keeps the subsequent access of the
// ~Task pointer from being reordered.
let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port is not waiting yet. Nothing to do
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("non-rendezvous send");
sched.metrics.non_rendezvous_sends += 1;
}
}
STATE_ONE => {
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
}
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
recvr_active = false;
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(woken_task);
};
}
}
}
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.suppress_finalize = true;
return recvr_active;
}
}
impl<T> PortOne<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
pub fn recv(self) -> T {
match self.try_recv() {
Some(val) => val,
None => {
fail!("receiving on closed channel");
}
}
}
pub fn try_recv(self) -> Option<T> {
let mut this = self;
let packet = this.packet();
// XXX: Optimize this to not require the two context switches when data is available
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state = task.cast_to_uint();
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
rtdebug!("non-rendezvous recv");
sched.metrics.non_rendezvous_recvs += 1;
}
STATE_ONE => {
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
// Channel is closed. Switch back and check the data.
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let recvr = BlockedTask::cast_from_uint(task_as_state);
sched.enqueue_blocked_task(recvr);
}
_ => util::unreachable()
}
}
}
// Task resumes.
// No further memory barrier is needed here to access the
// payload. Some scenarios:
//
// 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
// 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
// and ran on its thread. The sending task issued a read barrier when taking the
// pointer to the receiving task.
// 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
// is pinned to some other scheduler, so the sending task had to give us to
// a different scheduler for resuming. That send synchronized memory.
unsafe {
let payload = util::replace(&mut (*packet).payload, None);
// The sender has closed up shop. Drop the packet.
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.suppress_finalize = true;
return payload;
}
}
}
impl<T> Peekable<T> for PortOne<T> {
fn peek(&self) -> bool {
unsafe {
let packet: *mut Packet<T> = self.packet();
let oldstate = (*packet).state.load(SeqCst);
match oldstate {
STATE_BOTH => false,
STATE_ONE => (*packet).payload.is_some(),
_ => util::unreachable()
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for ChanOne<T> {
fn drop(&self) {
if self.suppress_finalize { return }
unsafe {
let this = cast::transmute_mut(self);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port still active. It will destroy the Packet.
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
},
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let sched = Local::take::<Scheduler>();
sched.schedule_task(woken_task);
};
}
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for PortOne<T> {
fn drop(&self) {
if self.suppress_finalize { return }
unsafe {
let this = cast::transmute_mut(self);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Chan still active. It will destroy the packet.
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
}
task_as_state => {
// This case occurs during unwinding, when the blocked
// receiver was killed awake. The task can't still be
// blocked (we are it), but we need to free the handle.
let recvr = BlockedTask::cast_from_uint(task_as_state);
// FIXME(#7554)(bblum): Make this cfg(test) dependent.
// in a later commit.
assert!(recvr.wake().is_none());
}
}
}
}
}
struct StreamPayload<T> {
val: T,
next: PortOne<StreamPayload<T>>
}
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
/// A channel with unbounded size.
pub struct Chan<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<StreamChanOne<T>>
}
/// An port with unbounded size.
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<StreamPortOne<T>>
}
pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
let (pone, cone) = oneshot();
let port = Port { next: Cell::new(pone) };
let chan = Chan { next: Cell::new(cone) };
return (port, chan);
}
impl<T: Send> GenericChan<T> for Chan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Send> GenericSmartChan<T> for Chan<T> {
fn try_send(&self, val: T) -> bool {
let (next_pone, next_cone) = oneshot();
let cone = self.next.take();
self.next.put_back(next_cone);
cone.try_send(StreamPayload { val: val, next: next_pone })
}
}
impl<T> GenericPort<T> for Port<T> {
fn recv(&self) -> T {
match self.try_recv() {
Some(val) => val,
None => {
fail!("receiving on closed channel");
}
}
}
fn try_recv(&self) -> Option<T> {
let pone = self.next.take();
match pone.try_recv() {
Some(StreamPayload { val, next }) => {
self.next.put_back(next);
Some(val)
}
None => None
}
}
}
impl<T> Peekable<T> for Port<T> {
fn peek(&self) -> bool {
self.next.with_mut_ref(|p| p.peek())
}
}
pub struct SharedChan<T> {
// Just like Chan, but a shared AtomicOption instead of Cell
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
}
impl<T> SharedChan<T> {
pub fn new(chan: Chan<T>) -> SharedChan<T> {
let next = chan.next.take();
let next = AtomicOption::new(~next);
SharedChan { next: UnsafeAtomicRcBox::new(next) }
}
}
impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, val: T) -> bool {
unsafe {
let (next_pone, next_cone) = oneshot();
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
}
}
}
impl<T> Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan {
next: self.next.clone()
}
}
}
pub struct SharedPort<T> {
// The next port on which we will receive the next port on which we will receive T
priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
}
impl<T> SharedPort<T> {
pub fn new(port: Port<T>) -> SharedPort<T> {
// Put the data port into a new link pipe
let next_data_port = port.next.take();
let (next_link_port, next_link_chan) = oneshot();
next_link_chan.send(next_data_port);
let next_link = AtomicOption::new(~next_link_port);
SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
}
}
impl<T: Send> GenericPort<T> for SharedPort<T> {
fn recv(&self) -> T {
match self.try_recv() {
Some(val) => val,
None => {
fail!("receiving on a closed channel");
}
}
}
fn try_recv(&self) -> Option<T> {
unsafe {
let (next_link_port, next_link_chan) = oneshot();
let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
let link_port = link_port.unwrap();
let data_port = link_port.recv();
let (next_data_port, res) = match data_port.try_recv() {
Some(StreamPayload { val, next }) => {
(next, Some(val))
}
None => {
let (next_data_port, _) = oneshot();
(next_data_port, None)
}
};
next_link_chan.send(next_data_port);
return res;
}
}
}
impl<T> Clone for SharedPort<T> {
fn clone(&self) -> SharedPort<T> {
SharedPort {
next_link: self.next_link.clone()
}
}
}
// XXX: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
pub fn megapipe<T: Send>() -> MegaPipe<T> {
let (port, chan) = stream();
(SharedPort::new(port), SharedChan::new(chan))
}
impl<T: Send> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
match *self {
(_, ref c) => c.send(val)
}
}
}
impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
match *self {
(_, ref c) => c.try_send(val)
}
}
}
impl<T: Send> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
match *self {
(ref p, _) => p.recv()
}
}
fn try_recv(&self) -> Option<T> {
match *self {
(ref p, _) => p.try_recv()
}
}
}
#[cfg(test)]
mod test {
use super::*;
use option::*;
use rt::test::*;
use cell::Cell;
use iter::Times;
#[test]
fn oneshot_single_thread_close_port_first() {
// Simple test of closing without sending
do run_in_newsched_task {
let (port, _chan) = oneshot::<int>();
{ let _p = port; }
}
}
#[test]
fn oneshot_single_thread_close_chan_first() {
// Simple test of closing without sending
do run_in_newsched_task {
let (_port, chan) = oneshot::<int>();
{ let _c = chan; }
}
}
#[test]
fn oneshot_single_thread_send_port_close() {
// Testing that the sender cleans up the payload if receiver is closed
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
{ let _p = port; }
chan.send(~0);
}
}
#[test]
fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will fail
do run_in_newsched_task {
let res = do spawntask_try {
let (port, chan) = oneshot::<~int>();
{ let _c = chan; }
port.recv();
};
// What is our res?
rtdebug!("res is: %?", res.is_err());
assert!(res.is_err());
}
}
#[test]
fn oneshot_single_thread_send_then_recv() {
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
chan.send(~10);
assert!(port.recv() == ~10);
}
}
#[test]
fn oneshot_single_thread_try_send_open() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
assert!(chan.try_send(10));
assert!(port.recv() == 10);
}
}
#[test]
fn oneshot_single_thread_try_send_closed() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
{ let _p = port; }
assert!(!chan.try_send(10));
}
}
#[test]
fn oneshot_single_thread_try_recv_open() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
chan.send(10);
assert!(port.try_recv() == Some(10));
}
}
#[test]
fn oneshot_single_thread_try_recv_closed() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
{ let _c = chan; }
assert!(port.try_recv() == None);
}
}
#[test]
fn oneshot_single_thread_peek_data() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
assert!(!port.peek());
chan.send(10);
assert!(port.peek());
}
}
#[test]
fn oneshot_single_thread_peek_close() {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
{ let _c = chan; }
assert!(!port.peek());
assert!(!port.peek());
}
}
#[test]
fn oneshot_single_thread_peek_open() {
do run_in_newsched_task {
let (port, _) = oneshot::<int>();
assert!(!port.peek());
}
}
#[test]
fn oneshot_multi_task_recv_then_send() {
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
let port_cell = Cell::new(port);
do spawntask_immediately {
assert!(port_cell.take().recv() == ~10);
}
chan.send(~10);
}
}
#[test]
fn oneshot_multi_task_recv_then_close() {
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
let port_cell = Cell::new(port);
let chan_cell = Cell::new(chan);
do spawntask_later {
let _cell = chan_cell.take();
}
let res = do spawntask_try {
assert!(port_cell.take().recv() == ~10);
};
assert!(res.is_err());
}
}
#[test]
fn oneshot_multi_thread_close_stress() {
for stress_factor().times {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
let port_cell = Cell::new(port);
let _thread = do spawntask_thread {
let _p = port_cell.take();
};
let _chan = chan;
}
}
}
#[test]
fn oneshot_multi_thread_send_close_stress() {
for stress_factor().times {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
let _p = port_cell.take();
};
let _thread2 = do spawntask_thread {
let c = chan_cell.take();
c.send(1);
};
}
}
}
#[test]
fn oneshot_multi_thread_recv_close_stress() {
for stress_factor().times {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
let port_cell = Cell::new(port_cell.take());
let res = do spawntask_try {
port_cell.take().recv();
};
assert!(res.is_err());
};
let _thread2 = do spawntask_thread {
let chan_cell = Cell::new(chan_cell.take());
do spawntask {
chan_cell.take();
}
};
}
}
}
#[test]
fn oneshot_multi_thread_send_recv_stress() {
for stress_factor().times {
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
chan_cell.take().send(~10);
};
let _thread2 = do spawntask_thread {
assert!(port_cell.take().recv() == ~10);
};
}
}
}
#[test]
fn stream_send_recv_stress() {
for stress_factor().times {
do run_in_mt_newsched_task {
let (port, chan) = stream::<~int>();
send(chan, 0);
recv(port, 0);
fn send(chan: Chan<~int>, i: int) {
if i == 10 { return }
let chan_cell = Cell::new(chan);
do spawntask_random {
let chan = chan_cell.take();
chan.send(~i);
send(chan, i + 1);
}
}
fn recv(port: Port<~int>, i: int) {
if i == 10 { return }
let port_cell = Cell::new(port);
do spawntask_random {
let port = port_cell.take();
assert!(port.recv() == ~i);
recv(port, i + 1);
};
}
}
}
}
#[test]
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
do run_in_newsched_task {
let (port, chan) = stream();
for 10000.times { chan.send(()) }
for 10000.times { port.recv() }
}
}
#[test]
fn shared_chan_stress() {
do run_in_mt_newsched_task {
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let total = stress_factor() + 100;
for total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
for total.times {
port.recv();
}
}
}
#[test]
fn shared_port_stress() {
do run_in_mt_newsched_task {
// XXX: Removing these type annotations causes an ICE
let (end_port, end_chan) = stream::<()>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let total = stress_factor() + 100;
for total.times {
let end_chan_clone = end_chan.clone();
let port_clone = port.clone();
do spawntask_random {
port_clone.recv();
end_chan_clone.send(());
}
}
for total.times {
chan.send(());
}
for total.times {
end_port.recv();
}
}
}
#[test]
fn shared_port_close_simple() {
do run_in_mt_newsched_task {
let (port, chan) = stream::<()>();
let port = SharedPort::new(port);
{ let _chan = chan; }
assert!(port.try_recv().is_none());
}
}
#[test]
fn shared_port_close() {
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<bool>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let chan = SharedChan::new(chan);
let send_total = 10;
let recv_total = 20;
do spawntask_random {
for send_total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
}
let end_chan_clone = end_chan.clone();
do spawntask_random {
for recv_total.times {
let port_clone = port.clone();
let end_chan_clone = end_chan_clone.clone();
do spawntask_random {
let recvd = port_clone.try_recv().is_some();
end_chan_clone.send(recvd);
}
}
}
let mut recvd = 0;
for recv_total.times {
recvd += if end_port.recv() { 1 } else { 0 };
}
assert!(recvd == send_total);
}
}
#[test]
fn megapipe_stress() {
use rand;
use rand::RngUtil;
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let pipe = megapipe();
let total = stress_factor() + 10;
let mut rng = rand::rng();
for total.times {
let msgs = rng.gen_uint_range(0, 10);
let pipe_clone = pipe.clone();
let end_chan_clone = end_chan.clone();
do spawntask_random {
for msgs.times {
pipe_clone.send(());
}
for msgs.times {
pipe_clone.recv();
}
}
end_chan_clone.send(());
}
for total.times {
end_port.recv();
}
}
}
}