// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! Message passing */ use cast; use either::{Either, Left, Right}; use kinds::Owned; use option; use option::{Option, Some, None, unwrap}; use uint; use unstable; use vec; use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; // FIXME #5160: Making this public exposes some plumbing from // pipes. Needs some refactoring pub use pipes::Selectable; /// A trait for things that can send multiple messages. pub trait GenericChan { /// Sends a message. fn send(&self, x: T); } /// Things that can send multiple messages and can detect when the receiver /// is closed pub trait GenericSmartChan { /// Sends a message, or report if the receiver has closed the connection. fn try_send(&self, x: T) -> bool; } /// A trait for things that can receive multiple messages. pub trait GenericPort { /// Receives a message, or fails if the connection closes. fn recv(&self) -> T; /** Receives a message, or returns `none` if the connection is closed or closes. */ fn try_recv(&self) -> Option; } /// Ports that can `peek` pub trait Peekable { /// Returns true if a message is available fn peek(&self) -> bool; } /// Returns the index of an endpoint that is ready to receive. pub fn selecti(endpoints: &[T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive pub fn select2i(a: &A, b: &B) -> Either<(), ()> { match wait_many([a.header(), b.header()]) { 0 => Left(()), 1 => Right(()), _ => fail!(~"wait returned unexpected index") } } // Streams - Make pipes a little easier in general. proto! streamp ( Open:send { data(T) -> Open } ) #[doc(hidden)] struct Chan_ { mut endp: Option> } /// An endpoint that can send many messages. pub enum Chan { Chan_(Chan_) } struct Port_ { mut endp: Option>, } /// An endpoint that can receive many messages. pub enum Port { Port_(Port_) } /** Creates a `(chan, port)` pair. These allow sending or receiving an unlimited number of messages. */ pub fn stream() -> (Port, Chan) { let (c, s) = streamp::init(); (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) })) } // Add an inherent method so that imports of GenericChan are not // required. pub impl Chan { fn send(&self, x: T) { chan_send(self, x) } fn try_send(&self, x: T) -> bool { chan_try_send(self, x) } } impl GenericChan for Chan { fn send(&self, x: T) { chan_send(self, x) } } #[inline(always)] fn chan_send(self: &Chan, x: T) { let mut endp = None; endp <-> self.endp; self.endp = Some( streamp::client::data(unwrap(endp), x)) } impl GenericSmartChan for Chan { fn try_send(&self, x: T) -> bool { chan_try_send(self, x) } } #[inline(always)] fn chan_try_send(self: &Chan, x: T) -> bool { let mut endp = None; endp <-> self.endp; match streamp::client::try_data(unwrap(endp), x) { Some(next) => { self.endp = Some(next); true } None => false } } // Use an inherent impl so that imports are not required: pub impl Port { fn recv(&self) -> T { port_recv(self) } fn try_recv(&self) -> Option { port_try_recv(self) } fn peek(&self) -> bool { port_peek(self) } } impl GenericPort for Port { // These two calls will prefer the inherent versions above: fn recv(&self) -> T { port_recv(self) } fn try_recv(&self) -> Option { port_try_recv(self) } } #[inline(always)] fn port_recv(self: &Port) -> T { let mut endp = None; endp <-> self.endp; let streamp::data(x, endp) = recv(unwrap(endp)); self.endp = Some(endp); x } #[inline(always)] fn port_try_recv(self: &Port) -> Option { let mut endp = None; endp <-> self.endp; match try_recv(unwrap(endp)) { Some(streamp::data(x, endp)) => { self.endp = Some(endp); Some(x) } None => None } } impl Peekable for Port { fn peek(&self) -> bool { port_peek(self) } } #[inline(always)] fn port_peek(self: &Port) -> bool { unsafe { let mut endp = None; endp <-> self.endp; let peek = match &endp { &Some(ref endp) => peek(endp), &None => fail!(~"peeking empty stream") }; self.endp <-> endp; peek } } impl Selectable for Port { fn header(&self) -> *PacketHeader { unsafe { match self.endp { Some(ref endp) => endp.header(), None => fail!(~"peeking empty stream") } } } } /// Treat many ports as one. pub struct PortSet { mut ports: ~[Port], } pub fn PortSet() -> PortSet{ PortSet { ports: ~[] } } // Use an inherent impl so that imports are not required: pub impl PortSet { fn recv(&self) -> T { port_set_recv(self) } fn try_recv(&self) -> Option { port_set_try_recv(self) } fn peek(&self) -> bool { port_set_peek(self) } } pub impl PortSet { fn add(&self, port: Port) { self.ports.push(port) } fn chan(&self) -> Chan { let (po, ch) = stream(); self.add(po); ch } } impl GenericPort for PortSet { fn try_recv(&self) -> Option { port_set_try_recv(self) } fn recv(&self) -> T { port_set_recv(self) } } #[inline(always)] fn port_set_recv(self: &PortSet) -> T { port_set_try_recv(self).expect("port_set: endpoints closed") } #[inline(always)] fn port_set_try_recv(self: &PortSet) -> Option { let mut result = None; // we have to swap the ports array so we aren't borrowing // aliasable mutable memory. let mut ports = ~[]; ports <-> self.ports; while result.is_none() && ports.len() > 0 { let i = wait_many(ports); match ports[i].try_recv() { Some(m) => { result = Some(m); } None => { // Remove this port. let _ = ports.swap_remove(i); } } } ports <-> self.ports; result } impl Peekable for PortSet { fn peek(&self) -> bool { port_set_peek(self) } } #[inline(always)] fn port_set_peek(self: &PortSet) -> bool { // It'd be nice to use self.port.each, but that version isn't // pure. for uint::range(0, vec::uniq_len(&const self.ports)) |i| { // XXX: Botch pending demuting. unsafe { let port: &Port = cast::transmute(&mut self.ports[i]); if port.peek() { return true } } } false } /// A channel that can be shared between many senders. pub type SharedChan = unstable::Exclusive>; pub impl SharedChan { fn send(&self, x: T) { shared_chan_send(self, x) } fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) } } impl GenericChan for SharedChan { fn send(&self, x: T) { shared_chan_send(self, x) } } #[inline(always)] fn shared_chan_send(self: &SharedChan, x: T) { let mut xx = Some(x); do self.with_imm |chan| { let mut x = None; x <-> xx; chan.send(option::unwrap(x)) } } impl GenericSmartChan for SharedChan { fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) } } #[inline(always)] fn shared_chan_try_send(self: &SharedChan, x: T) -> bool { let mut xx = Some(x); do self.with_imm |chan| { let mut x = None; x <-> xx; chan.try_send(option::unwrap(x)) } } /// Converts a `chan` into a `shared_chan`. pub fn SharedChan(c: Chan) -> SharedChan { unstable::exclusive(c) } /// Receive a message from one of two endpoints. pub trait Select2 { /// Receive a message or return `None` if a connection closes. fn try_select(&self) -> Either, Option>; /// Receive a message or fail if a connection closes. fn select(&self) -> Either; } impl, Right: Selectable + GenericPort> Select2 for (Left, Right) { fn select(&self) -> Either { match *self { (ref lp, ref rp) => match select2i(lp, rp) { Left(()) => Left (lp.recv()), Right(()) => Right(rp.recv()) } } } fn try_select(&self) -> Either, Option> { match *self { (ref lp, ref rp) => match select2i(lp, rp) { Left(()) => Left (lp.try_recv()), Right(()) => Right(rp.try_recv()) } } } } proto! oneshot ( Oneshot:send { send(T) -> ! } ) /// The send end of a oneshot pipe. pub type ChanOne = oneshot::client::Oneshot; /// The receive end of a oneshot pipe. pub type PortOne = oneshot::server::Oneshot; /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. pub fn oneshot() -> (PortOne, ChanOne) { let (chan, port) = oneshot::init(); (port, chan) } pub impl PortOne { fn recv(self) -> T { recv_one(self) } fn try_recv(self) -> Option { try_recv_one(self) } } pub impl ChanOne { fn send(self, data: T) { send_one(self, data) } fn try_send(self, data: T) -> bool { try_send_one(self, data) } } /** * Receive a message from a oneshot pipe, failing if the connection was * closed. */ pub fn recv_one(port: PortOne) -> T { let oneshot::send(message) = recv(port); message } /// Receive a message from a oneshot pipe unless the connection was closed. pub fn try_recv_one (port: PortOne) -> Option { let message = try_recv(port); if message.is_none() { None } else { let oneshot::send(message) = option::unwrap(message); Some(message) } } /// Send a message on a oneshot pipe, failing if the connection was closed. pub fn send_one(chan: ChanOne, data: T) { oneshot::client::send(chan, data); } /** * Send a message on a oneshot pipe, or return false if the connection was * closed. */ pub fn try_send_one(chan: ChanOne, data: T) -> bool { oneshot::client::try_send(chan, data).is_some() } #[cfg(test)] pub mod test { use either::{Either, Left, Right}; use super::{Chan, Port, oneshot, recv_one, stream}; #[test] pub fn test_select2() { let (p1, c1) = stream(); let (p2, c2) = stream(); c1.send(~"abc"); match (p1, p2).select() { Right(_) => fail!(), _ => () } c2.send(123); } #[test] pub fn test_oneshot() { let (c, p) = oneshot::init(); oneshot::client::send(c, ()); recv_one(p) } #[test] fn test_peek_terminated() { let (port, chan): (Port, Chan) = stream(); { // Destroy the channel let _chan = chan; } fail_unless!(!port.peek()); } }