// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! Deprecated communication between tasks Communication between tasks is facilitated by ports (in the receiving task), and channels (in the sending task). Any number of channels may feed into a single port. Ports and channels may only transmit values of unique types; that is, values that are statically guaranteed to be accessed by a single 'owner' at a time. Unique types include scalars, vectors, strings, and records, tags, tuples and unique boxes (`~T`) thereof. Most notably, shared boxes (`@T`) may not be transmitted across channels. # Example ~~~ let po = comm::Port(); let ch = comm::Chan(po); do task::spawn { comm::send(ch, "Hello, World"); } io::println(comm::recv(p)); ~~~ # Note Use of this module is deprecated in favor of `core::pipes`. In the `core::comm` will likely be rewritten with pipes, at which point it will once again be the preferred module for intertask communication. */ // NB: transitionary, de-mode-ing. #[forbid(deprecated_mode)]; #[forbid(deprecated_pattern)]; use cast; use either; use either::Either; use iter; use libc; use libc::size_t; use prelude::*; use ptr; use result; use sys; use task; use vec; // After snapshot, change p2::addr_of => addr_of /** * A communication endpoint that can receive messages * * Each port has a unique per-task identity and may not be replicated or * transmitted. If a port value is copied, both copies refer to the same * port. Ports may be associated with multiple `chan`s. */ pub enum Port { Port_(@PortPtr) } // It's critical that this only have one variant, so it has a record // layout, and will work in the rust_task structure in task.rs. /** * A communication endpoint that can send messages * * Each channel is bound to a port when the channel is constructed, so * the destination port for a channel must exist before the channel * itself. Channels are weak: a channel does not keep the port it is * bound to alive. If a channel attempts to send data to a dead port that * data will be silently dropped. Channels may be duplicated and * themselves transmitted over other channels. */ pub enum Chan { Chan_(port_id) } /// Constructs a port pub fn Port() -> Port { unsafe { Port_(@PortPtr(rustrt::new_port(sys::nonzero_size_of::() as size_t))) } } impl Port { fn chan() -> Chan { Chan(&self) } fn send(v: T) { self.chan().send(move v) } fn recv() -> T { recv(self) } fn peek() -> bool { peek(self) } } impl Chan { fn chan() -> Chan { self } fn send(v: T) { send(self, move v) } fn recv() -> T { recv_chan(self) } fn peek() -> bool { peek_chan(self) } } /// Open a new receiving channel for the duration of a function pub fn listen(f: fn(Chan) -> U) -> U { let po = Port(); f(po.chan()) } struct PortPtr { po: *rust_port, drop unsafe { do task::unkillable { // Once the port is detached it's guaranteed not to receive further // messages let yield = 0; let yieldp = ptr::addr_of(&yield); rustrt::rust_port_begin_detach(self.po, yieldp); if yield != 0 { // Need to wait for the port to be detached task::yield(); } rustrt::rust_port_end_detach(self.po); // Drain the port so that all the still-enqueued items get dropped while rustrt::rust_port_size(self.po) > 0 as size_t { recv_::(self.po); } rustrt::del_port(self.po); } } } fn PortPtr(po: *rust_port) -> PortPtr { PortPtr { po: po } } /** * Internal function for converting from a channel to a port * * # Failure * * Fails if the port is detached or dead. Fails if the port * is owned by a different task. */ fn as_raw_port(ch: Chan, f: fn(*rust_port) -> U) -> U { struct PortRef { p: *rust_port, drop { unsafe { if !ptr::is_null(self.p) { rustrt::rust_port_drop(self.p); } } } } fn PortRef(p: *rust_port) -> PortRef { PortRef { p: p } } unsafe { let p = PortRef(rustrt::rust_port_take(*ch)); if ptr::is_null(p.p) { fail ~"unable to locate port for channel" } else if rustrt::get_task_id() != rustrt::rust_port_task(p.p) { fail ~"unable to access unowned port" } f(p.p) } } /** * Constructs a channel. The channel is bound to the port used to * construct it. */ pub fn Chan(p: &Port) -> Chan { unsafe { Chan_(rustrt::get_port_id((**p).po)) } } /** * Sends data over a channel. The sent data is moved into the channel, * whereupon the caller loses access to it. */ pub fn send(ch: Chan, data: T) { unsafe { let Chan_(p) = ch; let data_ptr = ptr::addr_of(&data) as *(); let res = rustrt::rust_port_id_send(p, data_ptr); if res != 0 unsafe { // Data sent successfully cast::forget(move data); } task::yield(); } } /** * Receive from a port. If no data is available on the port then the * task will block until data becomes available. */ pub fn recv(p: Port) -> T { recv_((**p).po) } /// Returns true if there are messages available pub fn peek(p: Port) -> bool { peek_((**p).po) } #[doc(hidden)] pub fn recv_chan(ch: Chan) -> T { as_raw_port(ch, |x|recv_(x)) } fn peek_chan(ch: Chan) -> bool { as_raw_port(ch, |x|peek_(x)) } /// Receive on a raw port pointer fn recv_(p: *rust_port) -> T { unsafe { let yield = 0; let yieldp = ptr::addr_of(&yield); let mut res; res = rusti::init::(); rustrt::port_recv(ptr::addr_of(&res) as *uint, p, yieldp); if yield != 0 { // Data isn't available yet, so res has not been initialized. task::yield(); } else { // In the absence of compiler-generated preemption points // this is a good place to yield task::yield(); } move res } } fn peek_(p: *rust_port) -> bool { unsafe { // Yield here before we check to see if someone sent us a message // FIXME #524, if the compiler generates yields, we don't need this task::yield(); rustrt::rust_port_size(p) != 0 as libc::size_t } } /// Receive on one of two ports pub fn select2(p_a: Port, p_b: Port) -> Either { unsafe { let ports = ~[(**p_a).po, (**p_b).po]; let yield = 0, yieldp = ptr::addr_of(&yield); let mut resport: *rust_port; resport = rusti::init::<*rust_port>(); do vec::as_imm_buf(ports) |ports, n_ports| { rustrt::rust_port_select(ptr::addr_of(&resport), ports, n_ports as size_t, yieldp); } if yield != 0 { // Wait for data task::yield(); } else { // As in recv, this is a good place to yield anyway until // the compiler generates yield calls task::yield(); } // Now we know the port we're supposed to receive from assert resport != ptr::null(); if resport == (**p_a).po { either::Left(recv(p_a)) } else if resport == (**p_b).po { either::Right(recv(p_b)) } else { fail ~"unexpected result from rust_port_select"; } } } /* Implementation details */ #[allow(non_camel_case_types)] // runtime type enum rust_port {} #[allow(non_camel_case_types)] // runtime type type port_id = int; #[abi = "cdecl"] extern mod rustrt { unsafe fn rust_port_id_send(target_port: port_id, data: *()) -> libc::uintptr_t; unsafe fn new_port(unit_sz: libc::size_t) -> *rust_port; unsafe fn del_port(po: *rust_port); unsafe fn rust_port_begin_detach(po: *rust_port, yield: *libc::uintptr_t); unsafe fn rust_port_end_detach(po: *rust_port); unsafe fn get_port_id(po: *rust_port) -> port_id; unsafe fn rust_port_size(po: *rust_port) -> libc::size_t; unsafe fn port_recv(dptr: *uint, po: *rust_port, yield: *libc::uintptr_t); unsafe fn rust_port_select(dptr: **rust_port, ports: **rust_port, n_ports: libc::size_t, yield: *libc::uintptr_t); unsafe fn rust_port_take(port_id: port_id) -> *rust_port; unsafe fn rust_port_drop(p: *rust_port); unsafe fn rust_port_task(p: *rust_port) -> libc::uintptr_t; unsafe fn get_task_id() -> libc::uintptr_t; } #[abi = "rust-intrinsic"] extern mod rusti { fn init() -> T; } /* Tests */ #[test] fn create_port_and_chan() { let p = Port::(); Chan(&p); } #[test] fn send_int() { let p = Port::(); let c = Chan(&p); send(c, 22); } #[test] fn send_recv_fn() { let p = Port::(); let c = Chan::(&p); send(c, 42); assert (recv(p) == 42); } #[test] fn send_recv_fn_infer() { let p = Port(); let c = Chan(&p); send(c, 42); assert (recv(p) == 42); } #[test] fn chan_chan_infer() { let p = Port(), p2 = Port::(); let c = Chan(&p); send(c, Chan(&p2)); recv(p); } #[test] fn chan_chan() { let p = Port::>(), p2 = Port::(); let c = Chan(&p); send(c, Chan(&p2)); recv(p); } #[test] fn test_peek() { let po = Port(); let ch = Chan(&po); assert !peek(po); send(ch, ()); assert peek(po); recv(po); assert !peek(po); } #[test] fn test_select2_available() { let po_a = Port(); let po_b = Port(); let ch_a = Chan(&po_a); let ch_b = Chan(&po_b); send(ch_a, ~"a"); assert select2(po_a, po_b) == either::Left(~"a"); send(ch_b, ~"b"); assert select2(po_a, po_b) == either::Right(~"b"); } #[test] fn test_select2_rendezvous() { let po_a = Port(); let po_b = Port(); let ch_a = Chan(&po_a); let ch_b = Chan(&po_b); for iter::repeat(10) { do task::spawn { for iter::repeat(10) { task::yield() } send(ch_a, ~"a"); }; assert select2(po_a, po_b) == either::Left(~"a"); do task::spawn { for iter::repeat(10) { task::yield() } send(ch_b, ~"b"); }; assert select2(po_a, po_b) == either::Right(~"b"); } } #[test] fn test_select2_stress() { let po_a = Port(); let po_b = Port(); let ch_a = Chan(&po_a); let ch_b = Chan(&po_b); let msgs = 100; let times = 4u; for iter::repeat(times) { do task::spawn { for iter::repeat(msgs) { send(ch_a, ~"a") } }; do task::spawn { for iter::repeat(msgs) { send(ch_b, ~"b") } }; } let mut as_ = 0; let mut bs = 0; for iter::repeat(msgs * times * 2u) { match select2(po_a, po_b) { either::Left(~"a") => as_ += 1, either::Right(~"b") => bs += 1, _ => fail ~"test_select_2_stress failed" } } assert as_ == 400; assert bs == 400; } #[test] fn test_recv_chan() { let po = Port(); let ch = Chan(&po); send(ch, ~"flower"); assert recv_chan(ch) == ~"flower"; } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_recv_chan_dead() { let ch = Chan(&Port()); send(ch, ~"flower"); recv_chan(ch); } #[test] #[ignore(cfg(windows))] fn test_recv_chan_wrong_task() { let po = Port(); let ch = Chan(&po); send(ch, ~"flower"); assert result::is_err(&task::try(|| recv_chan(ch) )) } #[test] fn test_port_send() { let po = Port(); po.send(()); po.recv(); } #[test] fn test_chan_peek() { let po = Port(); let ch = po.chan(); ch.send(()); assert ch.peek(); } #[test] fn test_listen() { do listen |parent| { do task::spawn { parent.send(~"oatmeal-salad"); } assert parent.recv() == ~"oatmeal-salad"; } } #[test] #[ignore(cfg(windows))] fn test_port_detach_fail() { for iter::repeat(100) { do task::spawn_unlinked { let po = Port(); let ch = po.chan(); do task::spawn { fail; } do task::spawn { ch.send(()); } } } }