// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! Message passing */ #[allow(missing_doc)]; use cast::{transmute, transmute_mut}; use container::Container; use either::{Either, Left, Right}; use kinds::Owned; use option::{Option, Some, None}; use uint; use vec; use vec::OwnedVector; use util::replace; use unstable::sync::{Exclusive, exclusive}; use rtcomm = rt::comm; use rt; use pipes::{wait_many, PacketHeader}; // FIXME #5160: Making this public exposes some plumbing from // pipes. Needs some refactoring pub use pipes::Selectable; /// A trait for things that can send multiple messages. pub trait GenericChan { /// Sends a message. fn send(&self, x: T); } /// Things that can send multiple messages and can detect when the receiver /// is closed pub trait GenericSmartChan { /// Sends a message, or report if the receiver has closed the connection. fn try_send(&self, x: T) -> bool; } /// A trait for things that can receive multiple messages. pub trait GenericPort { /// Receives a message, or fails if the connection closes. fn recv(&self) -> T; /** Receives a message, or returns `none` if the connection is closed or closes. */ fn try_recv(&self) -> Option; } /// Ports that can `peek` pub trait Peekable { /// Returns true if a message is available fn peek(&self) -> bool; } /// An endpoint that can send many messages. pub struct Chan { inner: Either, rtcomm::Chan> } /// An endpoint that can receive many messages. pub struct Port { inner: Either, rtcomm::Port> } /** Creates a `(Port, Chan)` pair. These allow sending or receiving an unlimited number of messages. */ pub fn stream() -> (Port, Chan) { let (port, chan) = match rt::context() { rt::OldTaskContext => match pipesy::stream() { (p, c) => (Left(p), Left(c)) }, _ => match rtcomm::stream() { (p, c) => (Right(p), Right(c)) } }; let port = Port { inner: port }; let chan = Chan { inner: chan }; return (port, chan); } impl GenericChan for Chan { fn send(&self, x: T) { match self.inner { Left(ref chan) => chan.send(x), Right(ref chan) => chan.send(x) } } } impl GenericSmartChan for Chan { fn try_send(&self, x: T) -> bool { match self.inner { Left(ref chan) => chan.try_send(x), Right(ref chan) => chan.try_send(x) } } } impl GenericPort for Port { fn recv(&self) -> T { match self.inner { Left(ref port) => port.recv(), Right(ref port) => port.recv() } } fn try_recv(&self) -> Option { match self.inner { Left(ref port) => port.try_recv(), Right(ref port) => port.try_recv() } } } impl Peekable for Port { fn peek(&self) -> bool { match self.inner { Left(ref port) => port.peek(), Right(ref port) => port.peek() } } } impl Selectable for Port { fn header(&mut self) -> *mut PacketHeader { match self.inner { Left(ref mut port) => port.header(), Right(_) => fail!("can't select on newsched ports") } } } /// Treat many ports as one. #[unsafe_mut_field(ports)] pub struct PortSet { ports: ~[pipesy::Port], } pub impl PortSet { fn new() -> PortSet { PortSet { ports: ~[] } } fn add(&self, port: Port) { let Port { inner } = port; let port = match inner { Left(p) => p, Right(_) => fail!("PortSet not implemented") }; unsafe { let self_ports = transmute_mut(&self.ports); self_ports.push(port) } } fn chan(&self) -> Chan { let (po, ch) = stream(); self.add(po); ch } } impl GenericPort for PortSet { fn try_recv(&self) -> Option { unsafe { let self_ports = transmute_mut(&self.ports); let mut result = None; // we have to swap the ports array so we aren't borrowing // aliasable mutable memory. let mut ports = replace(self_ports, ~[]); while result.is_none() && ports.len() > 0 { let i = wait_many(ports); match ports[i].try_recv() { Some(m) => { result = Some(m); } None => { // Remove this port. let _ = ports.swap_remove(i); } } } *self_ports = ports; result } } fn recv(&self) -> T { self.try_recv().expect("port_set: endpoints closed") } } impl Peekable for PortSet { fn peek(&self) -> bool { // It'd be nice to use self.port.each, but that version isn't // pure. for uint::range(0, vec::uniq_len(&const self.ports)) |i| { let port: &pipesy::Port = &self.ports[i]; if port.peek() { return true; } } false } } /// A channel that can be shared between many senders. pub struct SharedChan { ch: Exclusive> } impl SharedChan { /// Converts a `chan` into a `shared_chan`. pub fn new(c: Chan) -> SharedChan { let Chan { inner } = c; let c = match inner { Left(c) => c, Right(_) => fail!("SharedChan not implemented") }; SharedChan { ch: exclusive(c) } } } impl GenericChan for SharedChan { fn send(&self, x: T) { unsafe { let mut xx = Some(x); do self.ch.with_imm |chan| { let x = replace(&mut xx, None); chan.send(x.unwrap()) } } } } impl GenericSmartChan for SharedChan { fn try_send(&self, x: T) -> bool { unsafe { let mut xx = Some(x); do self.ch.with_imm |chan| { let x = replace(&mut xx, None); chan.try_send(x.unwrap()) } } } } impl ::clone::Clone for SharedChan { fn clone(&self) -> SharedChan { SharedChan { ch: self.ch.clone() } } } pub struct PortOne { inner: Either, rtcomm::PortOne> } pub struct ChanOne { inner: Either, rtcomm::ChanOne> } pub fn oneshot() -> (PortOne, ChanOne) { let (port, chan) = match rt::context() { rt::OldTaskContext => match pipesy::oneshot() { (p, c) => (Left(p), Left(c)), }, _ => match rtcomm::oneshot() { (p, c) => (Right(p), Right(c)) } }; let port = PortOne { inner: port }; let chan = ChanOne { inner: chan }; return (port, chan); } impl PortOne { pub fn recv(self) -> T { let PortOne { inner } = self; match inner { Left(p) => p.recv(), Right(p) => p.recv() } } pub fn try_recv(self) -> Option { let PortOne { inner } = self; match inner { Left(p) => p.try_recv(), Right(p) => p.try_recv() } } } impl ChanOne { pub fn send(self, data: T) { let ChanOne { inner } = self; match inner { Left(p) => p.send(data), Right(p) => p.send(data) } } pub fn try_send(self, data: T) -> bool { let ChanOne { inner } = self; match inner { Left(p) => p.try_send(data), Right(p) => p.try_send(data) } } } pub fn recv_one(port: PortOne) -> T { let PortOne { inner } = port; match inner { Left(p) => pipesy::recv_one(p), Right(p) => p.recv() } } pub fn try_recv_one(port: PortOne) -> Option { let PortOne { inner } = port; match inner { Left(p) => pipesy::try_recv_one(p), Right(p) => p.try_recv() } } pub fn send_one(chan: ChanOne, data: T) { let ChanOne { inner } = chan; match inner { Left(c) => pipesy::send_one(c, data), Right(c) => c.send(data) } } pub fn try_send_one(chan: ChanOne, data: T) -> bool { let ChanOne { inner } = chan; match inner { Left(c) => pipesy::try_send_one(c, data), Right(c) => c.try_send(data) } } mod pipesy { use kinds::Owned; use option::{Option, Some, None}; use pipes::{recv, try_recv, peek, PacketHeader}; use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable}; use cast::transmute_mut; use util::replace; /*proto! oneshot ( Oneshot:send { send(T) -> ! } )*/ #[allow(non_camel_case_types)] pub mod oneshot { priv use core::kinds::Owned; use ptr::to_mut_unsafe_ptr; pub fn init() -> (client::Oneshot, server::Oneshot) { pub use core::pipes::HasBuffer; let buffer = ~::core::pipes::Buffer { header: ::core::pipes::BufferHeader(), data: __Buffer { Oneshot: ::core::pipes::mk_packet::>() }, }; do ::core::pipes::entangle_buffer(buffer) |buffer, data| { data.Oneshot.set_buffer(buffer); to_mut_unsafe_ptr(&mut data.Oneshot) } } #[allow(non_camel_case_types)] pub enum Oneshot { pub send(T), } #[allow(non_camel_case_types)] pub struct __Buffer { Oneshot: ::core::pipes::Packet>, } #[allow(non_camel_case_types)] pub mod client { priv use core::kinds::Owned; #[allow(non_camel_case_types)] pub fn try_send(pipe: Oneshot, x_0: T) -> ::core::option::Option<()> { { use super::send; let message = send(x_0); if ::core::pipes::send(pipe, message) { ::core::pipes::rt::make_some(()) } else { ::core::pipes::rt::make_none() } } } #[allow(non_camel_case_types)] pub fn send(pipe: Oneshot, x_0: T) { { use super::send; let message = send(x_0); ::core::pipes::send(pipe, message); } } #[allow(non_camel_case_types)] pub type Oneshot = ::core::pipes::SendPacketBuffered, super::__Buffer>; } #[allow(non_camel_case_types)] pub mod server { #[allow(non_camel_case_types)] pub type Oneshot = ::core::pipes::RecvPacketBuffered, super::__Buffer>; } } /// The send end of a oneshot pipe. pub struct ChanOne { contents: oneshot::client::Oneshot } impl ChanOne { pub fn new(contents: oneshot::client::Oneshot) -> ChanOne { ChanOne { contents: contents } } } /// The receive end of a oneshot pipe. pub struct PortOne { contents: oneshot::server::Oneshot } impl PortOne { pub fn new(contents: oneshot::server::Oneshot) -> PortOne { PortOne { contents: contents } } } /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. pub fn oneshot() -> (PortOne, ChanOne) { let (chan, port) = oneshot::init(); (PortOne::new(port), ChanOne::new(chan)) } pub impl PortOne { fn recv(self) -> T { recv_one(self) } fn try_recv(self) -> Option { try_recv_one(self) } fn unwrap(self) -> oneshot::server::Oneshot { match self { PortOne { contents: s } => s } } } pub impl ChanOne { fn send(self, data: T) { send_one(self, data) } fn try_send(self, data: T) -> bool { try_send_one(self, data) } fn unwrap(self) -> oneshot::client::Oneshot { match self { ChanOne { contents: s } => s } } } /** * Receive a message from a oneshot pipe, failing if the connection was * closed. */ pub fn recv_one(port: PortOne) -> T { match port { PortOne { contents: port } => { let oneshot::send(message) = recv(port); message } } } /// Receive a message from a oneshot pipe unless the connection was closed. pub fn try_recv_one (port: PortOne) -> Option { match port { PortOne { contents: port } => { let message = try_recv(port); if message.is_none() { None } else { let oneshot::send(message) = message.unwrap(); Some(message) } } } } /// Send a message on a oneshot pipe, failing if the connection was closed. pub fn send_one(chan: ChanOne, data: T) { match chan { ChanOne { contents: chan } => oneshot::client::send(chan, data), } } /** * Send a message on a oneshot pipe, or return false if the connection was * closed. */ pub fn try_send_one(chan: ChanOne, data: T) -> bool { match chan { ChanOne { contents: chan } => { oneshot::client::try_send(chan, data).is_some() } } } // Streams - Make pipes a little easier in general. /*proto! streamp ( Open:send { data(T) -> Open } )*/ #[allow(non_camel_case_types)] pub mod streamp { priv use core::kinds::Owned; pub fn init() -> (client::Open, server::Open) { pub use core::pipes::HasBuffer; ::core::pipes::entangle() } #[allow(non_camel_case_types)] pub enum Open { pub data(T, server::Open), } #[allow(non_camel_case_types)] pub mod client { priv use core::kinds::Owned; #[allow(non_camel_case_types)] pub fn try_data(pipe: Open, x_0: T) -> ::core::option::Option> { { use super::data; let (c, s) = ::core::pipes::entangle(); let message = data(x_0, s); if ::core::pipes::send(pipe, message) { ::core::pipes::rt::make_some(c) } else { ::core::pipes::rt::make_none() } } } #[allow(non_camel_case_types)] pub fn data(pipe: Open, x_0: T) -> Open { { use super::data; let (c, s) = ::core::pipes::entangle(); let message = data(x_0, s); ::core::pipes::send(pipe, message); c } } #[allow(non_camel_case_types)] pub type Open = ::core::pipes::SendPacket>; } #[allow(non_camel_case_types)] pub mod server { #[allow(non_camel_case_types)] pub type Open = ::core::pipes::RecvPacket>; } } /// An endpoint that can send many messages. #[unsafe_mut_field(endp)] pub struct Chan { endp: Option> } /// An endpoint that can receive many messages. #[unsafe_mut_field(endp)] pub struct Port { endp: Option>, } /** Creates a `(Port, Chan)` pair. These allow sending or receiving an unlimited number of messages. */ pub fn stream() -> (Port, Chan) { let (c, s) = streamp::init(); (Port { endp: Some(s) }, Chan { endp: Some(c) }) } impl GenericChan for Chan { #[inline(always)] fn send(&self, x: T) { unsafe { let self_endp = transmute_mut(&self.endp); let endp = replace(self_endp, None); *self_endp = Some(streamp::client::data(endp.unwrap(), x)) } } } impl GenericSmartChan for Chan { #[inline(always)] fn try_send(&self, x: T) -> bool { unsafe { let self_endp = transmute_mut(&self.endp); let endp = replace(self_endp, None); match streamp::client::try_data(endp.unwrap(), x) { Some(next) => { *self_endp = Some(next); true } None => false } } } } impl GenericPort for Port { #[inline(always)] fn recv(&self) -> T { unsafe { let self_endp = transmute_mut(&self.endp); let endp = replace(self_endp, None); let streamp::data(x, endp) = recv(endp.unwrap()); *self_endp = Some(endp); x } } #[inline(always)] fn try_recv(&self) -> Option { unsafe { let self_endp = transmute_mut(&self.endp); let endp = replace(self_endp, None); match try_recv(endp.unwrap()) { Some(streamp::data(x, endp)) => { *self_endp = Some(endp); Some(x) } None => None } } } } impl Peekable for Port { #[inline(always)] fn peek(&self) -> bool { unsafe { let self_endp = transmute_mut(&self.endp); let mut endp = replace(self_endp, None); let peek = match endp { Some(ref mut endp) => peek(endp), None => fail!("peeking empty stream") }; *self_endp = endp; peek } } } impl Selectable for Port { fn header(&mut self) -> *mut PacketHeader { match self.endp { Some(ref mut endp) => endp.header(), None => fail!("peeking empty stream") } } } } /// Returns the index of an endpoint that is ready to receive. pub fn selecti(endpoints: &mut [T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive pub fn select2i(a: &mut A, b: &mut B) -> Either<(), ()> { let mut endpoints = [ a.header(), b.header() ]; match wait_many(endpoints) { 0 => Left(()), 1 => Right(()), _ => fail!("wait returned unexpected index"), } } /// Receive a message from one of two endpoints. pub trait Select2 { /// Receive a message or return `None` if a connection closes. fn try_select(&mut self) -> Either, Option>; /// Receive a message or fail if a connection closes. fn select(&mut self) -> Either; } impl, Right:Selectable + GenericPort> Select2 for (Left, Right) { fn select(&mut self) -> Either { // XXX: Bad borrow check workaround. unsafe { let this: &(Left, Right) = transmute(self); match *this { (ref lp, ref rp) => { let lp: &mut Left = transmute(lp); let rp: &mut Right = transmute(rp); match select2i(lp, rp) { Left(()) => Left(lp.recv()), Right(()) => Right(rp.recv()), } } } } } fn try_select(&mut self) -> Either, Option> { // XXX: Bad borrow check workaround. unsafe { let this: &(Left, Right) = transmute(self); match *this { (ref lp, ref rp) => { let lp: &mut Left = transmute(lp); let rp: &mut Right = transmute(rp); match select2i(lp, rp) { Left(()) => Left (lp.try_recv()), Right(()) => Right(rp.try_recv()), } } } } } } #[cfg(test)] mod test { use either::Right; use super::{Chan, Port, oneshot, stream}; #[test] fn test_select2() { let (p1, c1) = stream(); let (p2, c2) = stream(); c1.send(~"abc"); let mut tuple = (p1, p2); match tuple.select() { Right(_) => fail!(), _ => (), } c2.send(123); } #[test] fn test_oneshot() { let (p, c) = oneshot(); c.send(()); p.recv() } #[test] fn test_peek_terminated() { let (port, chan): (Port, Chan) = stream(); { // Destroy the channel let _chan = chan; } assert!(!port.peek()); } }