// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! Message passing */ use cast; use either::{Either, Left, Right}; use kinds::Owned; use option::{Option, Some, None}; use uint; use unstable; use vec; use unstable::Exclusive; use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; // FIXME #5160: Making this public exposes some plumbing from // pipes. Needs some refactoring pub use pipes::Selectable; /// A trait for things that can send multiple messages. pub trait GenericChan { /// Sends a message. fn send(&self, x: T); } /// Things that can send multiple messages and can detect when the receiver /// is closed pub trait GenericSmartChan { /// Sends a message, or report if the receiver has closed the connection. fn try_send(&self, x: T) -> bool; } /// A trait for things that can receive multiple messages. pub trait GenericPort { /// Receives a message, or fails if the connection closes. fn recv(&self) -> T; /** Receives a message, or returns `none` if the connection is closed or closes. */ fn try_recv(&self) -> Option; } /// Ports that can `peek` pub trait Peekable { /// Returns true if a message is available fn peek(&self) -> bool; } // Streams - Make pipes a little easier in general. /*proto! streamp ( Open:send { data(T) -> Open } )*/ #[allow(non_camel_case_types)] pub mod streamp { priv use core::kinds::Owned; pub fn init() -> (client::Open, server::Open) { pub use core::pipes::HasBuffer; ::core::pipes::entangle() } #[allow(non_camel_case_types)] pub enum Open { pub data(T, server::Open), } #[allow(non_camel_case_types)] pub mod client { priv use core::kinds::Owned; #[allow(non_camel_case_types)] pub fn try_data(pipe: Open, x_0: T) -> ::core::option::Option> { { use super::data; let (c, s) = ::core::pipes::entangle(); let message = data(x_0, s); if ::core::pipes::send(pipe, message) { ::core::pipes::rt::make_some(c) } else { ::core::pipes::rt::make_none() } } } #[allow(non_camel_case_types)] pub fn data(pipe: Open, x_0: T) -> Open { { use super::data; let (c, s) = ::core::pipes::entangle(); let message = data(x_0, s); ::core::pipes::send(pipe, message); c } } #[allow(non_camel_case_types)] pub type Open = ::core::pipes::SendPacket>; } #[allow(non_camel_case_types)] pub mod server { #[allow(non_camel_case_types)] pub type Open = ::core::pipes::RecvPacket>; } } /// An endpoint that can send many messages. pub struct Chan { mut endp: Option> } /// An endpoint that can receive many messages. pub struct Port { mut endp: Option>, } /** Creates a `(Port, Chan)` pair. These allow sending or receiving an unlimited number of messages. */ pub fn stream() -> (Port, Chan) { let (c, s) = streamp::init(); (Port { endp: Some(s) }, Chan { endp: Some(c) }) } impl GenericChan for Chan { #[inline(always)] fn send(&self, x: T) { let mut endp = None; endp <-> self.endp; self.endp = Some( streamp::client::data(endp.unwrap(), x)) } } impl GenericSmartChan for Chan { #[inline(always)] fn try_send(&self, x: T) -> bool { let mut endp = None; endp <-> self.endp; match streamp::client::try_data(endp.unwrap(), x) { Some(next) => { self.endp = Some(next); true } None => false } } } impl GenericPort for Port { #[inline(always)] fn recv(&self) -> T { let mut endp = None; endp <-> self.endp; let streamp::data(x, endp) = recv(endp.unwrap()); self.endp = Some(endp); x } #[inline(always)] fn try_recv(&self) -> Option { let mut endp = None; endp <-> self.endp; match try_recv(endp.unwrap()) { Some(streamp::data(x, endp)) => { self.endp = Some(endp); Some(x) } None => None } } } impl Peekable for Port { #[inline(always)] fn peek(&self) -> bool { let mut endp = None; endp <-> self.endp; let peek = match &endp { &Some(ref endp) => peek(endp), &None => fail!(~"peeking empty stream") }; self.endp <-> endp; peek } } impl Selectable for Port { fn header(&self) -> *PacketHeader { unsafe { match self.endp { Some(ref endp) => endp.header(), None => fail!(~"peeking empty stream") } } } } /// Treat many ports as one. pub struct PortSet { mut ports: ~[Port], } pub impl PortSet { fn new() -> PortSet { PortSet { ports: ~[] } } fn add(&self, port: Port) { self.ports.push(port) } fn chan(&self) -> Chan { let (po, ch) = stream(); self.add(po); ch } } impl GenericPort for PortSet { fn try_recv(&self) -> Option { let mut result = None; // we have to swap the ports array so we aren't borrowing // aliasable mutable memory. let mut ports = ~[]; ports <-> self.ports; while result.is_none() && ports.len() > 0 { let i = wait_many(ports); match ports[i].try_recv() { Some(m) => { result = Some(m); } None => { // Remove this port. let _ = ports.swap_remove(i); } } } ports <-> self.ports; result } fn recv(&self) -> T { self.try_recv().expect("port_set: endpoints closed") } } impl Peekable for PortSet { fn peek(&self) -> bool { // It'd be nice to use self.port.each, but that version isn't // pure. for uint::range(0, vec::uniq_len(&const self.ports)) |i| { // XXX: Botch pending demuting. unsafe { let port: &Port = cast::transmute(&mut self.ports[i]); if port.peek() { return true } } } false } } /// A channel that can be shared between many senders. pub struct SharedChan { ch: Exclusive> } impl SharedChan { /// Converts a `chan` into a `shared_chan`. pub fn new(c: Chan) -> SharedChan { SharedChan { ch: unstable::exclusive(c) } } } impl GenericChan for SharedChan { fn send(&self, x: T) { let mut xx = Some(x); do self.ch.with_imm |chan| { let mut x = None; x <-> xx; chan.send(x.unwrap()) } } } impl GenericSmartChan for SharedChan { fn try_send(&self, x: T) -> bool { let mut xx = Some(x); do self.ch.with_imm |chan| { let mut x = None; x <-> xx; chan.try_send(x.unwrap()) } } } impl ::clone::Clone for SharedChan { fn clone(&self) -> SharedChan { SharedChan { ch: self.ch.clone() } } } /*proto! oneshot ( Oneshot:send { send(T) -> ! } )*/ #[allow(non_camel_case_types)] pub mod oneshot { priv use core::kinds::Owned; use ptr::to_unsafe_ptr; pub fn init() -> (client::Oneshot, server::Oneshot) { pub use core::pipes::HasBuffer; let buffer = ~::core::pipes::Buffer{ header: ::core::pipes::BufferHeader(), data: __Buffer{ Oneshot: ::core::pipes::mk_packet::>() }, }; do ::core::pipes::entangle_buffer(buffer) |buffer, data| { { data.Oneshot.set_buffer(buffer); to_unsafe_ptr(&data.Oneshot) } } } #[allow(non_camel_case_types)] pub enum Oneshot { pub send(T), } #[allow(non_camel_case_types)] pub struct __Buffer { Oneshot: ::core::pipes::Packet>, } #[allow(non_camel_case_types)] pub mod client { priv use core::kinds::Owned; #[allow(non_camel_case_types)] pub fn try_send(pipe: Oneshot, x_0: T) -> ::core::option::Option<()> { { use super::send; let message = send(x_0); if ::core::pipes::send(pipe, message) { ::core::pipes::rt::make_some(()) } else { ::core::pipes::rt::make_none() } } } #[allow(non_camel_case_types)] pub fn send(pipe: Oneshot, x_0: T) { { use super::send; let message = send(x_0); ::core::pipes::send(pipe, message); } } #[allow(non_camel_case_types)] pub type Oneshot = ::core::pipes::SendPacketBuffered, super::__Buffer>; } #[allow(non_camel_case_types)] pub mod server { #[allow(non_camel_case_types)] pub type Oneshot = ::core::pipes::RecvPacketBuffered, super::__Buffer>; } } /// The send end of a oneshot pipe. pub struct ChanOne { contents: oneshot::client::Oneshot } impl ChanOne { pub fn new(contents: oneshot::client::Oneshot) -> ChanOne { ChanOne { contents: contents } } } /// The receive end of a oneshot pipe. pub struct PortOne { contents: oneshot::server::Oneshot } impl PortOne { pub fn new(contents: oneshot::server::Oneshot) -> PortOne { PortOne { contents: contents } } } /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. pub fn oneshot() -> (PortOne, ChanOne) { let (chan, port) = oneshot::init(); (PortOne::new(port), ChanOne::new(chan)) } pub impl PortOne { fn recv(self) -> T { recv_one(self) } fn try_recv(self) -> Option { try_recv_one(self) } fn unwrap(self) -> oneshot::server::Oneshot { match self { PortOne { contents: s } => s } } } pub impl ChanOne { fn send(self, data: T) { send_one(self, data) } fn try_send(self, data: T) -> bool { try_send_one(self, data) } fn unwrap(self) -> oneshot::client::Oneshot { match self { ChanOne { contents: s } => s } } } /** * Receive a message from a oneshot pipe, failing if the connection was * closed. */ pub fn recv_one(port: PortOne) -> T { match port { PortOne { contents: port } => { let oneshot::send(message) = recv(port); message } } } /// Receive a message from a oneshot pipe unless the connection was closed. pub fn try_recv_one (port: PortOne) -> Option { match port { PortOne { contents: port } => { let message = try_recv(port); if message.is_none() { None } else { let oneshot::send(message) = message.unwrap(); Some(message) } } } } /// Send a message on a oneshot pipe, failing if the connection was closed. pub fn send_one(chan: ChanOne, data: T) { match chan { ChanOne { contents: chan } => oneshot::client::send(chan, data), } } /** * Send a message on a oneshot pipe, or return false if the connection was * closed. */ pub fn try_send_one(chan: ChanOne, data: T) -> bool { match chan { ChanOne { contents: chan } => { oneshot::client::try_send(chan, data).is_some() } } } /// Returns the index of an endpoint that is ready to receive. pub fn selecti(endpoints: &[T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive pub fn select2i(a: &A, b: &B) -> Either<(), ()> { match wait_many([a.header(), b.header()]) { 0 => Left(()), 1 => Right(()), _ => fail!(~"wait returned unexpected index") } } /// Receive a message from one of two endpoints. pub trait Select2 { /// Receive a message or return `None` if a connection closes. fn try_select(&self) -> Either, Option>; /// Receive a message or fail if a connection closes. fn select(&self) -> Either; } impl, Right: Selectable + GenericPort> Select2 for (Left, Right) { fn select(&self) -> Either { match *self { (ref lp, ref rp) => match select2i(lp, rp) { Left(()) => Left (lp.recv()), Right(()) => Right(rp.recv()) } } } fn try_select(&self) -> Either, Option> { match *self { (ref lp, ref rp) => match select2i(lp, rp) { Left(()) => Left (lp.try_recv()), Right(()) => Right(rp.try_recv()) } } } } #[cfg(test)] mod test { use either::Right; use super::{Chan, Port, oneshot, recv_one, stream}; #[test] fn test_select2() { let (p1, c1) = stream(); let (p2, c2) = stream(); c1.send(~"abc"); match (p1, p2).select() { Right(_) => fail!(), _ => () } c2.send(123); } #[test] fn test_oneshot() { let (p, c) = oneshot(); c.send(()); p.recv() } #[test] fn test_peek_terminated() { let (port, chan): (Port, Chan) = stream(); { // Destroy the channel let _chan = chan; } assert!(!port.peek()); } }