2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
|
|
|
brief = "Communication between tasks",
|
2012-01-23 18:07:05 -08:00
|
|
|
desc = "
|
|
|
|
|
|
|
|
Communication between tasks is facilitated by ports (in the receiving
|
|
|
|
task), and channels (in the sending task). Any number of channels may
|
|
|
|
feed into a single port. Ports and channels may only transmit values
|
|
|
|
of unique types; that is, values that are statically guaranteed to be
|
|
|
|
accessed by a single 'owner' at a time. Unique types include scalars,
|
|
|
|
vectors, strings, and records, tags, tuples and unique boxes (`~T`)
|
|
|
|
thereof. Most notably, shared boxes (`@T`) may not be transmitted
|
|
|
|
across channels.
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
|
|
|
let p = comm::port();
|
|
|
|
task::spawn(comm::chan(p), fn (c: chan<str>) {
|
|
|
|
comm::send(c, \"Hello, World\");
|
|
|
|
});
|
|
|
|
io::println(comm::recv(p));
|
|
|
|
|
|
|
|
")];
|
2011-12-13 16:25:51 -08:00
|
|
|
|
|
|
|
import sys;
|
|
|
|
import task;
|
|
|
|
|
|
|
|
export send;
|
|
|
|
export recv;
|
2012-02-14 14:07:06 -08:00
|
|
|
export peek;
|
2012-02-14 22:25:05 -08:00
|
|
|
export select2;
|
2012-01-23 08:57:51 -08:00
|
|
|
export chan::{};
|
|
|
|
export port::{};
|
2011-12-13 16:25:51 -08:00
|
|
|
|
2012-02-01 11:45:23 +01:00
|
|
|
enum rust_port {}
|
|
|
|
|
2011-12-13 16:25:51 -08:00
|
|
|
#[abi = "cdecl"]
|
|
|
|
native mod rustrt {
|
2012-02-18 16:34:42 -08:00
|
|
|
fn get_task_id() -> task_id;
|
2012-01-05 15:35:37 +01:00
|
|
|
fn chan_id_send<T: send>(t: *sys::type_desc,
|
2012-02-18 16:34:42 -08:00
|
|
|
target_task: task_id, target_port: port_id,
|
2011-12-13 16:25:51 -08:00
|
|
|
data: T) -> ctypes::uintptr_t;
|
|
|
|
|
2012-01-16 18:21:01 +08:00
|
|
|
fn new_port(unit_sz: ctypes::size_t) -> *rust_port;
|
2011-12-13 16:25:51 -08:00
|
|
|
fn del_port(po: *rust_port);
|
2012-03-04 16:52:19 -08:00
|
|
|
fn rust_port_begin_detach(po: *rust_port,
|
|
|
|
yield: *ctypes::uintptr_t);
|
2012-03-05 20:02:25 -08:00
|
|
|
fn rust_port_end_detach(po: *rust_port);
|
2011-12-13 16:25:51 -08:00
|
|
|
fn get_port_id(po: *rust_port) -> port_id;
|
|
|
|
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
|
|
|
|
fn port_recv(dptr: *uint, po: *rust_port,
|
2012-03-02 00:31:14 -08:00
|
|
|
yield: *ctypes::uintptr_t);
|
2012-02-14 22:25:05 -08:00
|
|
|
fn rust_port_select(dptr: **rust_port, ports: **rust_port,
|
|
|
|
n_ports: ctypes::size_t,
|
|
|
|
yield: *ctypes::uintptr_t);
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[abi = "rust-intrinsic"]
|
|
|
|
native mod rusti {
|
2012-02-14 22:25:05 -08:00
|
|
|
// FIXME: This should probably not take a boxed closure
|
2012-01-05 15:35:37 +01:00
|
|
|
fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T;
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
|
|
|
|
2012-02-18 16:34:42 -08:00
|
|
|
type task_id = int;
|
2011-12-13 16:25:51 -08:00
|
|
|
type port_id = int;
|
|
|
|
|
|
|
|
// It's critical that this only have one variant, so it has a record
|
|
|
|
// layout, and will work in the rust_task structure in task.rs.
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
2012-01-16 22:41:56 -08:00
|
|
|
brief = "A communication endpoint that can send messages. \
|
2012-01-16 18:34:03 -08:00
|
|
|
Channels send messages to ports.",
|
2012-01-16 22:41:56 -08:00
|
|
|
desc = "Each channel is bound to a port when the channel is \
|
|
|
|
constructed, so the destination port for a channel \
|
|
|
|
must exist before the channel itself. \
|
|
|
|
Channels are weak: a channel does not keep the port it \
|
|
|
|
is bound to alive. If a channel attempts to send data \
|
|
|
|
to a dead port that data will be silently dropped. \
|
|
|
|
Channels may be duplicated and themselves transmitted \
|
2012-01-16 18:34:03 -08:00
|
|
|
over other channels."
|
|
|
|
)]
|
2012-01-19 15:56:54 -08:00
|
|
|
enum chan<T: send> {
|
2012-02-18 16:34:42 -08:00
|
|
|
chan_t(task_id, port_id)
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
|
|
|
|
2012-02-01 11:45:23 +01:00
|
|
|
resource port_ptr<T: send>(po: *rust_port) {
|
2011-12-13 16:25:51 -08:00
|
|
|
// Once the port is detached it's guaranteed not to receive further
|
|
|
|
// messages
|
2012-03-04 16:52:19 -08:00
|
|
|
let yield = 0u;
|
|
|
|
let yieldp = ptr::addr_of(yield);
|
|
|
|
rustrt::rust_port_begin_detach(po, yieldp);
|
|
|
|
if yield != 0u {
|
|
|
|
// Need to wait for the port to be detached
|
|
|
|
// FIXME: If this fails then we're going to leave our port
|
|
|
|
// in a bogus state.
|
|
|
|
task::yield();
|
|
|
|
}
|
2012-03-05 20:02:25 -08:00
|
|
|
rustrt::rust_port_end_detach(po);
|
2012-03-04 16:52:19 -08:00
|
|
|
|
2011-12-13 16:25:51 -08:00
|
|
|
// Drain the port so that all the still-enqueued items get dropped
|
|
|
|
while rustrt::rust_port_size(po) > 0u {
|
|
|
|
// FIXME: For some reason if we don't assign to something here
|
|
|
|
// we end up with invalid reads in the drop glue.
|
|
|
|
let _t = recv_::<T>(po);
|
|
|
|
}
|
|
|
|
rustrt::del_port(po);
|
|
|
|
}
|
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
2012-01-16 22:24:56 -08:00
|
|
|
brief = "A communication endpoint that can receive messages. \
|
2012-01-16 18:34:03 -08:00
|
|
|
Ports receive messages from channels.",
|
2012-01-16 22:24:56 -08:00
|
|
|
desc = "Each port has a unique per-task identity and may not \
|
|
|
|
be replicated or transmitted. If a port value is \
|
|
|
|
copied, both copies refer to the same port. \
|
2012-01-17 11:56:13 -08:00
|
|
|
Ports may be associated with multiple <chan>s."
|
2012-01-16 18:34:03 -08:00
|
|
|
)]
|
2012-01-19 19:29:21 -08:00
|
|
|
enum port<T: send> { port_t(@port_ptr<T>) }
|
2011-12-13 16:25:51 -08:00
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
2012-01-16 22:24:56 -08:00
|
|
|
brief = "Sends data over a channel. The sent data is moved \
|
|
|
|
into the channel, whereupon the caller loses \
|
2012-01-16 18:34:03 -08:00
|
|
|
access to it."
|
|
|
|
)]
|
2012-01-05 15:35:37 +01:00
|
|
|
fn send<T: send>(ch: chan<T>, -data: T) {
|
2011-12-13 16:25:51 -08:00
|
|
|
let chan_t(t, p) = ch;
|
|
|
|
let res = rustrt::chan_id_send(sys::get_type_desc::<T>(), t, p, data);
|
|
|
|
if res != 0u unsafe {
|
|
|
|
// Data sent successfully
|
|
|
|
unsafe::leak(data);
|
|
|
|
}
|
|
|
|
task::yield();
|
|
|
|
}
|
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
|
|
|
brief = "Constructs a port."
|
|
|
|
)]
|
2012-01-05 15:35:37 +01:00
|
|
|
fn port<T: send>() -> port<T> {
|
2011-12-13 16:25:51 -08:00
|
|
|
port_t(@port_ptr(rustrt::new_port(sys::size_of::<T>())))
|
|
|
|
}
|
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
2012-01-16 22:24:56 -08:00
|
|
|
brief = "Receive from a port. \
|
|
|
|
If no data is available on the port then the task will \
|
2012-01-16 18:34:03 -08:00
|
|
|
block until data becomes available."
|
|
|
|
)]
|
2012-01-05 15:35:37 +01:00
|
|
|
fn recv<T: send>(p: port<T>) -> T { recv_(***p) }
|
2011-12-13 16:25:51 -08:00
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
|
|
|
brief = "Receive on a raw port pointer"
|
|
|
|
)]
|
2012-02-01 11:45:23 +01:00
|
|
|
fn recv_<T: send>(p: *rust_port) -> T {
|
2011-12-13 16:25:51 -08:00
|
|
|
// FIXME: Due to issue 1185 we can't use a return pointer when
|
|
|
|
// calling C code, and since we can't create our own return
|
|
|
|
// pointer on the stack, we're going to call a little intrinsic
|
|
|
|
// that will grab the value of the return pointer, then call this
|
|
|
|
// function, which we will then use to call the runtime.
|
2012-02-01 11:45:23 +01:00
|
|
|
fn recv(dptr: *uint, port: *rust_port,
|
2012-03-02 00:31:14 -08:00
|
|
|
yield: *ctypes::uintptr_t) unsafe {
|
|
|
|
rustrt::port_recv(dptr, port, yield);
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
|
|
|
let yield = 0u;
|
|
|
|
let yieldp = ptr::addr_of(yield);
|
2012-03-02 00:31:14 -08:00
|
|
|
let res = rusti::call_with_retptr(bind recv(_, p, yieldp));
|
2011-12-13 16:25:51 -08:00
|
|
|
if yield != 0u {
|
|
|
|
// Data isn't available yet, so res has not been initialized.
|
|
|
|
task::yield();
|
2012-03-02 00:31:14 -08:00
|
|
|
} else {
|
|
|
|
// In the absense of compiler-generated preemption points
|
|
|
|
// this is a good place to yield
|
|
|
|
task::yield();
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
|
|
|
ret res;
|
|
|
|
}
|
|
|
|
|
2012-02-14 22:25:05 -08:00
|
|
|
#[doc = "Receive on one of two ports"]
|
|
|
|
fn select2<A: send, B: send>(
|
|
|
|
p_a: port<A>, p_b: port<B>
|
|
|
|
) -> either::t<A, B> unsafe {
|
|
|
|
|
|
|
|
fn select(dptr: **rust_port, ports: **rust_port,
|
|
|
|
n_ports: ctypes::size_t, yield: *ctypes::uintptr_t) {
|
|
|
|
rustrt::rust_port_select(dptr, ports, n_ports, yield)
|
|
|
|
}
|
|
|
|
|
|
|
|
let ports = [];
|
|
|
|
ports += [***p_a, ***p_b];
|
|
|
|
let n_ports = 2 as ctypes::size_t;
|
|
|
|
let yield = 0u;
|
|
|
|
let yieldp = ptr::addr_of(yield);
|
|
|
|
|
|
|
|
let resport: *rust_port = vec::as_buf(ports) {|ports|
|
|
|
|
rusti::call_with_retptr {|retptr|
|
|
|
|
select(unsafe::reinterpret_cast(retptr), ports, n_ports, yieldp)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if yield != 0u {
|
|
|
|
// Wait for data
|
|
|
|
task::yield();
|
2012-03-02 00:31:14 -08:00
|
|
|
} else {
|
|
|
|
// As in recv, this is a good place to yield anyway until
|
|
|
|
// the compiler generates yield calls
|
|
|
|
task::yield();
|
2012-02-14 22:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Now we know the port we're supposed to receive from
|
|
|
|
assert resport != ptr::null();
|
|
|
|
|
|
|
|
if resport == ***p_a {
|
|
|
|
either::left(recv(p_a))
|
|
|
|
} else if resport == ***p_b {
|
|
|
|
either::right(recv(p_b))
|
|
|
|
} else {
|
|
|
|
fail "unexpected result from rust_port_select";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-02-14 14:07:06 -08:00
|
|
|
#[doc = "Returns true if there are messages available"]
|
|
|
|
fn peek<T: send>(p: port<T>) -> bool {
|
|
|
|
rustrt::rust_port_size(***p) != 0u as ctypes::size_t
|
|
|
|
}
|
|
|
|
|
2012-01-16 18:34:03 -08:00
|
|
|
#[doc(
|
2012-01-16 22:41:56 -08:00
|
|
|
brief = "Constructs a channel. The channel is bound to the \
|
2012-01-16 18:34:03 -08:00
|
|
|
port used to construct it."
|
|
|
|
)]
|
2012-01-05 15:35:37 +01:00
|
|
|
fn chan<T: send>(p: port<T>) -> chan<T> {
|
2012-02-18 16:34:42 -08:00
|
|
|
chan_t(rustrt::get_task_id(), rustrt::get_port_id(***p))
|
2011-12-13 16:25:51 -08:00
|
|
|
}
|
2012-01-17 17:28:21 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn create_port_and_chan() { let p = port::<int>(); chan(p); }
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn send_int() {
|
|
|
|
let p = port::<int>();
|
|
|
|
let c = chan(p);
|
|
|
|
send(c, 22);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn send_recv_fn() {
|
|
|
|
let p = port::<int>();
|
|
|
|
let c = chan::<int>(p);
|
|
|
|
send(c, 42);
|
|
|
|
assert (recv(p) == 42);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn send_recv_fn_infer() {
|
|
|
|
let p = port();
|
|
|
|
let c = chan(p);
|
|
|
|
send(c, 42);
|
|
|
|
assert (recv(p) == 42);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn chan_chan_infer() {
|
|
|
|
let p = port(), p2 = port::<int>();
|
|
|
|
let c = chan(p);
|
|
|
|
send(c, chan(p2));
|
|
|
|
recv(p);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn chan_chan() {
|
|
|
|
let p = port::<chan<int>>(), p2 = port::<int>();
|
|
|
|
let c = chan(p);
|
|
|
|
send(c, chan(p2));
|
|
|
|
recv(p);
|
|
|
|
}
|
|
|
|
|
2012-02-14 14:07:06 -08:00
|
|
|
#[test]
|
|
|
|
fn test_peek() {
|
|
|
|
let po = port();
|
|
|
|
let ch = chan(po);
|
|
|
|
assert !peek(po);
|
|
|
|
send(ch, ());
|
|
|
|
assert peek(po);
|
|
|
|
recv(po);
|
|
|
|
assert !peek(po);
|
2012-02-14 22:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_select2_available() {
|
|
|
|
let po_a = port();
|
|
|
|
let po_b = port();
|
|
|
|
let ch_a = chan(po_a);
|
|
|
|
let ch_b = chan(po_b);
|
|
|
|
|
|
|
|
send(ch_a, "a");
|
|
|
|
|
|
|
|
assert select2(po_a, po_b) == either::left("a");
|
|
|
|
|
|
|
|
send(ch_b, "b");
|
|
|
|
|
|
|
|
assert select2(po_a, po_b) == either::right("b");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_select2_rendezvous() {
|
|
|
|
let po_a = port();
|
|
|
|
let po_b = port();
|
|
|
|
let ch_a = chan(po_a);
|
|
|
|
let ch_b = chan(po_b);
|
|
|
|
|
|
|
|
iter::repeat(10u) {||
|
|
|
|
task::spawn {||
|
|
|
|
iter::repeat(10u) {|| task::yield() }
|
|
|
|
send(ch_a, "a");
|
|
|
|
};
|
|
|
|
|
|
|
|
assert select2(po_a, po_b) == either::left("a");
|
|
|
|
|
|
|
|
task::spawn {||
|
|
|
|
iter::repeat(10u) {|| task::yield() }
|
|
|
|
send(ch_b, "b");
|
|
|
|
};
|
|
|
|
|
|
|
|
assert select2(po_a, po_b) == either::right("b");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_select2_stress() {
|
|
|
|
let po_a = port();
|
|
|
|
let po_b = port();
|
|
|
|
let ch_a = chan(po_a);
|
|
|
|
let ch_b = chan(po_b);
|
|
|
|
|
|
|
|
let msgs = 100u;
|
|
|
|
let times = 4u;
|
|
|
|
|
|
|
|
iter::repeat(times) {||
|
|
|
|
task::spawn {||
|
|
|
|
iter::repeat(msgs) {||
|
|
|
|
send(ch_a, "a")
|
|
|
|
}
|
|
|
|
};
|
|
|
|
task::spawn {||
|
|
|
|
iter::repeat(msgs) {||
|
|
|
|
send(ch_b, "b")
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
let as = 0;
|
|
|
|
let bs = 0;
|
|
|
|
iter::repeat(msgs * times * 2u) {||
|
|
|
|
alt check select2(po_a, po_b) {
|
|
|
|
either::left("a") { as += 1 }
|
|
|
|
either::right("b") { bs += 1 }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
assert as == 400;
|
|
|
|
assert bs == 400;
|
|
|
|
}
|