// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! Message passing */ #[allow(missing_doc)]; use clone::Clone; use iter::Iterator; use kinds::Send; use option::Option; use rtcomm = rt::comm; /// A trait for things that can send multiple messages. pub trait GenericChan { /// Sends a message. fn send(&self, x: T); } /// Things that can send multiple messages and can detect when the receiver /// is closed pub trait GenericSmartChan { /// Sends a message, or report if the receiver has closed the connection. fn try_send(&self, x: T) -> bool; } /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. pub trait SendDeferred { fn send_deferred(&self, val: T); fn try_send_deferred(&self, val: T) -> bool; } /// A trait for things that can receive multiple messages. pub trait GenericPort { /// Receives a message, or fails if the connection closes. fn recv(&self) -> T; /// Receives a message, or returns `none` if /// the connection is closed or closes. fn try_recv(&self) -> Option; /// Returns an iterator that breaks once the connection closes. /// /// # Example /// /// ~~~rust /// do spawn { /// for x in port.recv_iter() { /// if pred(x) { break; } /// println!("{}", x); /// } /// } /// ~~~ fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> { RecvIterator { port: self } } } pub struct RecvIterator<'a, P> { priv port: &'a P, } impl<'a, T, P: GenericPort> Iterator for RecvIterator<'a, P> { fn next(&mut self) -> Option { self.port.try_recv() } } /// Ports that can `peek` pub trait Peekable { /// Returns true if a message is available fn peek(&self) -> bool; } pub struct PortOne { priv x: rtcomm::PortOne } pub struct ChanOne { priv x: rtcomm::ChanOne } pub fn oneshot() -> (PortOne, ChanOne) { let (p, c) = rtcomm::oneshot(); (PortOne { x: p }, ChanOne { x: c }) } pub struct Port { priv x: rtcomm::Port } pub struct Chan { priv x: rtcomm::Chan } pub fn stream() -> (Port, Chan) { let (p, c) = rtcomm::stream(); (Port { x: p }, Chan { x: c }) } impl ChanOne { pub fn send(self, val: T) { let ChanOne { x: c } = self; c.send(val) } pub fn try_send(self, val: T) -> bool { let ChanOne { x: c } = self; c.try_send(val) } pub fn send_deferred(self, val: T) { let ChanOne { x: c } = self; c.send_deferred(val) } pub fn try_send_deferred(self, val: T) -> bool { let ChanOne{ x: c } = self; c.try_send_deferred(val) } } impl PortOne { pub fn recv(self) -> T { let PortOne { x: p } = self; p.recv() } pub fn try_recv(self) -> Option { let PortOne { x: p } = self; p.try_recv() } } impl Peekable for PortOne { fn peek(&self) -> bool { let &PortOne { x: ref p } = self; p.peek() } } impl GenericChan for Chan { fn send(&self, val: T) { let &Chan { x: ref c } = self; c.send(val) } } impl GenericSmartChan for Chan { fn try_send(&self, val: T) -> bool { let &Chan { x: ref c } = self; c.try_send(val) } } impl SendDeferred for Chan { fn send_deferred(&self, val: T) { let &Chan { x: ref c } = self; c.send_deferred(val) } fn try_send_deferred(&self, val: T) -> bool { let &Chan { x: ref c } = self; c.try_send_deferred(val) } } impl GenericPort for Port { fn recv(&self) -> T { let &Port { x: ref p } = self; p.recv() } fn try_recv(&self) -> Option { let &Port { x: ref p } = self; p.try_recv() } } impl Peekable for Port { fn peek(&self) -> bool { let &Port { x: ref p } = self; p.peek() } } pub struct SharedChan { priv x: rtcomm::SharedChan } impl SharedChan { pub fn new(c: Chan) -> SharedChan { let Chan { x: c } = c; SharedChan { x: rtcomm::SharedChan::new(c) } } } impl GenericChan for SharedChan { fn send(&self, val: T) { let &SharedChan { x: ref c } = self; c.send(val) } } impl GenericSmartChan for SharedChan { fn try_send(&self, val: T) -> bool { let &SharedChan { x: ref c } = self; c.try_send(val) } } impl SendDeferred for SharedChan { fn send_deferred(&self, val: T) { let &SharedChan { x: ref c } = self; c.send_deferred(val) } fn try_send_deferred(&self, val: T) -> bool { let &SharedChan { x: ref c } = self; c.try_send_deferred(val) } } impl Clone for SharedChan { fn clone(&self) -> SharedChan { let &SharedChan { x: ref c } = self; SharedChan { x: c.clone() } } } pub struct SharedPort { priv x: rtcomm::SharedPort } impl SharedPort { pub fn new(p: Port) -> SharedPort { let Port { x: p } = p; SharedPort { x: rtcomm::SharedPort::new(p) } } } impl GenericPort for SharedPort { fn recv(&self) -> T { let &SharedPort { x: ref p } = self; p.recv() } fn try_recv(&self) -> Option { let &SharedPort { x: ref p } = self; p.try_recv() } } impl Clone for SharedPort { fn clone(&self) -> SharedPort { let &SharedPort { x: ref p } = self; SharedPort { x: p.clone() } } } #[cfg(test)] mod tests { use comm::*; use prelude::*; #[test] fn test_nested_recv_iter() { let (port, chan) = stream::(); let (total_port, total_chan) = oneshot::(); do spawn { let mut acc = 0; for x in port.recv_iter() { acc += x; for x in port.recv_iter() { acc += x; for x in port.try_recv().move_iter() { acc += x; total_chan.send(acc); } } } } chan.send(3); chan.send(1); chan.send(2); assert_eq!(total_port.recv(), 6); } #[test] fn test_recv_iter_break() { let (port, chan) = stream::(); let (count_port, count_chan) = oneshot::(); do spawn { let mut count = 0; for x in port.recv_iter() { if count >= 3 { count_chan.send(count); break; } else { count += x; } } } chan.send(2); chan.send(2); chan.send(2); chan.send(2); assert_eq!(count_port.recv(), 4); } }