#[doc( brief = "Communication between tasks", desc = " Communication between tasks is facilitated by ports (in the receiving task), and channels (in the sending task). Any number of channels may feed into a single port. Ports and channels may only transmit values of unique types; that is, values that are statically guaranteed to be accessed by a single 'owner' at a time. Unique types include scalars, vectors, strings, and records, tags, tuples and unique boxes (`~T`) thereof. Most notably, shared boxes (`@T`) may not be transmitted across channels. Example: let p = comm::port(); task::spawn(comm::chan(p), fn (c: chan) { comm::send(c, \"Hello, World\"); }); io::println(comm::recv(p)); ")]; import sys; import task; export send; export recv; export peek; export select2; export chan::{}; export port::{}; enum rust_port {} #[abi = "cdecl"] native mod rustrt { fn chan_id_send(t: *sys::type_desc, target_task: task::task, target_port: port_id, data: T) -> ctypes::uintptr_t; fn new_port(unit_sz: ctypes::size_t) -> *rust_port; fn del_port(po: *rust_port); fn rust_port_detach(po: *rust_port); fn get_port_id(po: *rust_port) -> port_id; fn rust_port_size(po: *rust_port) -> ctypes::size_t; fn port_recv(dptr: *uint, po: *rust_port, yield: *ctypes::uintptr_t, killed: *ctypes::uintptr_t); fn rust_port_select(dptr: **rust_port, ports: **rust_port, n_ports: ctypes::size_t, yield: *ctypes::uintptr_t); } #[abi = "rust-intrinsic"] native mod rusti { // FIXME: This should probably not take a boxed closure fn call_with_retptr(&&f: fn@(*uint)) -> T; } type port_id = int; // It's critical that this only have one variant, so it has a record // layout, and will work in the rust_task structure in task.rs. #[doc( brief = "A communication endpoint that can send messages. \ Channels send messages to ports.", desc = "Each channel is bound to a port when the channel is \ constructed, so the destination port for a channel \ must exist before the channel itself. \ Channels are weak: a channel does not keep the port it \ is bound to alive. If a channel attempts to send data \ to a dead port that data will be silently dropped. \ Channels may be duplicated and themselves transmitted \ over other channels." )] enum chan { chan_t(task::task, port_id) } resource port_ptr(po: *rust_port) { // Once the port is detached it's guaranteed not to receive further // messages rustrt::rust_port_detach(po); // Drain the port so that all the still-enqueued items get dropped while rustrt::rust_port_size(po) > 0u { // FIXME: For some reason if we don't assign to something here // we end up with invalid reads in the drop glue. let _t = recv_::(po); } rustrt::del_port(po); } #[doc( brief = "A communication endpoint that can receive messages. \ Ports receive messages from channels.", desc = "Each port has a unique per-task identity and may not \ be replicated or transmitted. If a port value is \ copied, both copies refer to the same port. \ Ports may be associated with multiple s." )] enum port { port_t(@port_ptr) } #[doc( brief = "Sends data over a channel. The sent data is moved \ into the channel, whereupon the caller loses \ access to it." )] fn send(ch: chan, -data: T) { let chan_t(t, p) = ch; let res = rustrt::chan_id_send(sys::get_type_desc::(), t, p, data); if res != 0u unsafe { // Data sent successfully unsafe::leak(data); } task::yield(); } #[doc( brief = "Constructs a port." )] fn port() -> port { port_t(@port_ptr(rustrt::new_port(sys::size_of::()))) } #[doc( brief = "Receive from a port. \ If no data is available on the port then the task will \ block until data becomes available." )] fn recv(p: port) -> T { recv_(***p) } #[doc( brief = "Receive on a raw port pointer" )] fn recv_(p: *rust_port) -> T { // FIXME: Due to issue 1185 we can't use a return pointer when // calling C code, and since we can't create our own return // pointer on the stack, we're going to call a little intrinsic // that will grab the value of the return pointer, then call this // function, which we will then use to call the runtime. fn recv(dptr: *uint, port: *rust_port, yield: *ctypes::uintptr_t, killed: *ctypes::uintptr_t) unsafe { rustrt::port_recv(dptr, port, yield, killed); } let yield = 0u; let yieldp = ptr::addr_of(yield); let killed = 0u; let killedp = ptr::addr_of(killed); let res = rusti::call_with_retptr(bind recv(_, p, yieldp, killedp)); if killed != 0u { fail "killed"; } if yield != 0u { // Data isn't available yet, so res has not been initialized. task::yield(); } ret res; } #[doc = "Receive on one of two ports"] fn select2( p_a: port, p_b: port ) -> either::t unsafe { fn select(dptr: **rust_port, ports: **rust_port, n_ports: ctypes::size_t, yield: *ctypes::uintptr_t) { rustrt::rust_port_select(dptr, ports, n_ports, yield) } let ports = []; ports += [***p_a, ***p_b]; let n_ports = 2 as ctypes::size_t; let yield = 0u; let yieldp = ptr::addr_of(yield); let resport: *rust_port = vec::as_buf(ports) {|ports| rusti::call_with_retptr {|retptr| select(unsafe::reinterpret_cast(retptr), ports, n_ports, yieldp) } }; if yield != 0u { // Wait for data task::yield(); } // Now we know the port we're supposed to receive from assert resport != ptr::null(); if resport == ***p_a { either::left(recv(p_a)) } else if resport == ***p_b { either::right(recv(p_b)) } else { fail "unexpected result from rust_port_select"; } } #[doc = "Returns true if there are messages available"] fn peek(p: port) -> bool { rustrt::rust_port_size(***p) != 0u as ctypes::size_t } #[doc( brief = "Constructs a channel. The channel is bound to the \ port used to construct it." )] fn chan(p: port) -> chan { chan_t(task::get_task(), rustrt::get_port_id(***p)) } #[test] fn create_port_and_chan() { let p = port::(); chan(p); } #[test] fn send_int() { let p = port::(); let c = chan(p); send(c, 22); } #[test] fn send_recv_fn() { let p = port::(); let c = chan::(p); send(c, 42); assert (recv(p) == 42); } #[test] fn send_recv_fn_infer() { let p = port(); let c = chan(p); send(c, 42); assert (recv(p) == 42); } #[test] fn chan_chan_infer() { let p = port(), p2 = port::(); let c = chan(p); send(c, chan(p2)); recv(p); } #[test] fn chan_chan() { let p = port::>(), p2 = port::(); let c = chan(p); send(c, chan(p2)); recv(p); } #[test] fn test_peek() { let po = port(); let ch = chan(po); assert !peek(po); send(ch, ()); assert peek(po); recv(po); assert !peek(po); } #[test] fn test_select2_available() { let po_a = port(); let po_b = port(); let ch_a = chan(po_a); let ch_b = chan(po_b); send(ch_a, "a"); assert select2(po_a, po_b) == either::left("a"); send(ch_b, "b"); assert select2(po_a, po_b) == either::right("b"); } #[test] fn test_select2_rendezvous() { let po_a = port(); let po_b = port(); let ch_a = chan(po_a); let ch_b = chan(po_b); iter::repeat(10u) {|| task::spawn {|| iter::repeat(10u) {|| task::yield() } send(ch_a, "a"); }; assert select2(po_a, po_b) == either::left("a"); task::spawn {|| iter::repeat(10u) {|| task::yield() } send(ch_b, "b"); }; assert select2(po_a, po_b) == either::right("b"); } } #[test] fn test_select2_stress() { let po_a = port(); let po_b = port(); let ch_a = chan(po_a); let ch_b = chan(po_b); let msgs = 100u; let times = 4u; iter::repeat(times) {|| task::spawn {|| iter::repeat(msgs) {|| send(ch_a, "a") } }; task::spawn {|| iter::repeat(msgs) {|| send(ch_b, "b") } }; } let as = 0; let bs = 0; iter::repeat(msgs * times * 2u) {|| alt check select2(po_a, po_b) { either::left("a") { as += 1 } either::right("b") { bs += 1 } } } assert as == 400; assert bs == 400; }