rust/src/libcore/comm.rs

507 lines
12 KiB
Rust
Raw Normal View History

/*!
Deprecated communication between tasks
Communication between tasks is facilitated by ports (in the receiving
task), and channels (in the sending task). Any number of channels may
feed into a single port. Ports and channels may only transmit values
of unique types; that is, values that are statically guaranteed to be
accessed by a single 'owner' at a time. Unique types include scalars,
vectors, strings, and records, tags, tuples and unique boxes (`~T`)
thereof. Most notably, shared boxes (`@T`) may not be transmitted
across channels.
# Example
~~~
let po = comm::Port();
let ch = comm::Chan(po);
do task::spawn {
comm::send(ch, "Hello, World");
}
io::println(comm::recv(p));
~~~
# Note
Use of this module is deprecated in favor of `core::pipes`. In the
`core::comm` will likely be rewritten with pipes, at which point it
will once again be the preferred module for intertask communication.
*/
2012-10-03 14:38:01 -07:00
// NB: transitionary, de-mode-ing.
#[forbid(deprecated_mode)];
#[forbid(deprecated_pattern)];
2012-09-04 11:12:17 -07:00
use either::Either;
use libc::size_t;
// After snapshot, change p2::addr_of => addr_of
2012-02-01 11:45:23 +01:00
/**
* A communication endpoint that can receive messages
*
* Each port has a unique per-task identity and may not be replicated or
* transmitted. If a port value is copied, both copies refer to the same
* port. Ports may be associated with multiple `chan`s.
*/
2012-09-19 10:20:30 -07:00
pub enum Port<T: Send> {
2012-08-15 14:10:46 -07:00
Port_(@PortPtr<T>)
}
// It's critical that this only have one variant, so it has a record
// layout, and will work in the rust_task structure in task.rs.
/**
* A communication endpoint that can send messages
*
* Each channel is bound to a port when the channel is constructed, so
* the destination port for a channel must exist before the channel
* itself. Channels are weak: a channel does not keep the port it is
* bound to alive. If a channel attempts to send data to a dead port that
* data will be silently dropped. Channels may be duplicated and
* themselves transmitted over other channels.
*/
2012-09-19 10:20:30 -07:00
pub enum Chan<T: Send> {
2012-08-15 14:10:46 -07:00
Chan_(port_id)
}
/// Constructs a port
2012-09-19 10:20:30 -07:00
pub fn Port<T: Send>() -> Port<T> {
2012-08-15 14:10:46 -07:00
Port_(@PortPtr(rustrt::new_port(sys::size_of::<T>() as size_t)))
}
impl<T: Send> Port<T> {
2012-10-03 14:38:01 -07:00
fn chan() -> Chan<T> { Chan(&self) }
fn send(v: T) { self.chan().send(move v) }
fn recv() -> T { recv(self) }
fn peek() -> bool { peek(self) }
}
impl<T: Send> Chan<T> {
2012-08-15 14:10:46 -07:00
fn chan() -> Chan<T> { self }
fn send(v: T) { send(self, move v) }
fn recv() -> T { recv_chan(self) }
fn peek() -> bool { peek_chan(self) }
}
/// Open a new receiving channel for the duration of a function
2012-09-19 10:20:30 -07:00
pub fn listen<T: Send, U>(f: fn(Chan<T>) -> U) -> U {
2012-08-27 14:22:25 -07:00
let po = Port();
2012-05-02 00:55:40 -07:00
f(po.chan())
}
struct PortPtr<T:Send> {
2012-09-06 19:40:15 -07:00
po: *rust_port,
drop unsafe {
do task::unkillable {
2012-05-15 11:51:24 -07:00
// Once the port is detached it's guaranteed not to receive further
// messages
2012-09-10 12:14:14 -07:00
let yield = 0;
let yieldp = ptr::addr_of(&yield);
rustrt::rust_port_begin_detach(self.po, yieldp);
2012-09-10 12:14:14 -07:00
if yield != 0 {
2012-05-15 11:51:24 -07:00
// Need to wait for the port to be detached
task::yield();
}
rustrt::rust_port_end_detach(self.po);
2012-05-15 11:51:24 -07:00
// Drain the port so that all the still-enqueued items get dropped
2012-09-10 12:14:14 -07:00
while rustrt::rust_port_size(self.po) > 0 as size_t {
recv_::<T>(self.po);
2012-05-15 11:51:24 -07:00
}
rustrt::del_port(self.po);
}
}
}
fn PortPtr<T: Send>(po: *rust_port) -> PortPtr<T> {
2012-09-04 15:23:28 -07:00
PortPtr {
po: po
}
}
/**
* Internal function for converting from a channel to a port
*
* # Failure
*
* Fails if the port is detached or dead. Fails if the port
* is owned by a different task.
*/
fn as_raw_port<T: Send, U>(ch: comm::Chan<T>, f: fn(*rust_port) -> U) -> U {
2012-08-15 18:46:55 -07:00
struct PortRef {
2012-09-06 19:40:15 -07:00
p: *rust_port,
drop {
if !ptr::is_null(self.p) {
rustrt::rust_port_drop(self.p);
}
}
}
2012-09-04 15:23:28 -07:00
fn PortRef(p: *rust_port) -> PortRef {
PortRef {
p: p
}
}
2012-08-15 14:10:46 -07:00
let p = PortRef(rustrt::rust_port_take(*ch));
if ptr::is_null(p.p) {
fail ~"unable to locate port for channel"
} else if rustrt::get_task_id() != rustrt::rust_port_task(p.p) {
fail ~"unable to access unowned port"
}
f(p.p)
}
/**
* Constructs a channel. The channel is bound to the port used to
* construct it.
*/
2012-10-03 14:38:01 -07:00
pub fn Chan<T: Send>(p: &Port<T>) -> Chan<T> {
2012-08-15 14:10:46 -07:00
Chan_(rustrt::get_port_id((**p).po))
}
/**
* Sends data over a channel. The sent data is moved into the channel,
* whereupon the caller loses access to it.
*/
pub fn send<T: Send>(ch: Chan<T>, data: T) {
2012-08-15 14:10:46 -07:00
let Chan_(p) = ch;
let data_ptr = ptr::addr_of(&data) as *();
let res = rustrt::rust_port_id_send(p, data_ptr);
2012-09-10 12:14:14 -07:00
if res != 0 unsafe {
// Data sent successfully
2012-09-18 17:34:08 -07:00
cast::forget(move data);
}
task::yield();
}
/**
* Receive from a port. If no data is available on the port then the
* task will block until data becomes available.
*/
2012-09-19 10:20:30 -07:00
pub fn recv<T: Send>(p: Port<T>) -> T { recv_((**p).po) }
/// Returns true if there are messages available
2012-09-19 10:20:30 -07:00
pub fn peek<T: Send>(p: Port<T>) -> bool { peek_((**p).po) }
#[doc(hidden)]
2012-09-19 10:20:30 -07:00
pub fn recv_chan<T: Send>(ch: comm::Chan<T>) -> T {
2012-06-30 16:19:07 -07:00
as_raw_port(ch, |x|recv_(x))
}
fn peek_chan<T: Send>(ch: comm::Chan<T>) -> bool {
2012-06-30 16:19:07 -07:00
as_raw_port(ch, |x|peek_(x))
}
/// Receive on a raw port pointer
fn recv_<T: Send>(p: *rust_port) -> T {
2012-09-10 12:14:14 -07:00
let yield = 0;
let yieldp = ptr::addr_of(&yield);
let mut res;
res = rusti::init::<T>();
rustrt::port_recv(ptr::addr_of(&res) as *uint, p, yieldp);
2012-09-10 12:14:14 -07:00
if yield != 0 {
// Data isn't available yet, so res has not been initialized.
task::yield();
2012-03-02 00:31:14 -08:00
} else {
// In the absence of compiler-generated preemption points
2012-03-02 00:31:14 -08:00
// this is a good place to yield
task::yield();
}
2012-09-10 12:14:14 -07:00
move res
}
fn peek_(p: *rust_port) -> bool {
2012-07-17 23:13:11 -05:00
// Yield here before we check to see if someone sent us a message
2012-09-10 12:14:14 -07:00
// FIXME #524, if the compiler generates yields, we don't need this
2012-07-17 23:13:11 -05:00
task::yield();
2012-09-10 12:14:14 -07:00
rustrt::rust_port_size(p) != 0 as libc::size_t
}
/// Receive on one of two ports
2012-09-19 10:20:30 -07:00
pub fn select2<A: Send, B: Send>(p_a: Port<A>, p_b: Port<B>)
2012-08-14 16:54:13 -07:00
-> Either<A, B> {
let ports = ~[(**p_a).po, (**p_b).po];
let yield = 0, yieldp = ptr::addr_of(&yield);
let mut resport: *rust_port;
resport = rusti::init::<*rust_port>();
do vec::as_imm_buf(ports) |ports, n_ports| {
rustrt::rust_port_select(ptr::addr_of(&resport), ports,
n_ports as size_t, yieldp);
}
2012-09-10 12:14:14 -07:00
if yield != 0 {
// Wait for data
task::yield();
2012-03-02 00:31:14 -08:00
} else {
// As in recv, this is a good place to yield anyway until
// the compiler generates yield calls
task::yield();
}
// Now we know the port we're supposed to receive from
assert resport != ptr::null();
if resport == (**p_a).po {
2012-08-14 16:54:13 -07:00
either::Left(recv(p_a))
} else if resport == (**p_b).po {
2012-08-14 16:54:13 -07:00
either::Right(recv(p_b))
} else {
fail ~"unexpected result from rust_port_select";
}
}
/* Implementation details */
2012-08-15 14:10:46 -07:00
#[allow(non_camel_case_types)] // runtime type
enum rust_port {}
2012-08-15 14:10:46 -07:00
#[allow(non_camel_case_types)] // runtime type
type port_id = int;
#[abi = "cdecl"]
extern mod rustrt {
fn rust_port_id_send(target_port: port_id, data: *()) -> libc::uintptr_t;
fn new_port(unit_sz: libc::size_t) -> *rust_port;
fn del_port(po: *rust_port);
fn rust_port_begin_detach(po: *rust_port,
yield: *libc::uintptr_t);
fn rust_port_end_detach(po: *rust_port);
fn get_port_id(po: *rust_port) -> port_id;
fn rust_port_size(po: *rust_port) -> libc::size_t;
fn port_recv(dptr: *uint, po: *rust_port,
yield: *libc::uintptr_t);
fn rust_port_select(dptr: **rust_port, ports: **rust_port,
n_ports: libc::size_t,
yield: *libc::uintptr_t);
fn rust_port_take(port_id: port_id) -> *rust_port;
fn rust_port_drop(p: *rust_port);
fn rust_port_task(p: *rust_port) -> libc::uintptr_t;
fn get_task_id() -> libc::uintptr_t;
}
#[abi = "rust-intrinsic"]
extern mod rusti {
fn init<T>() -> T;
}
2012-01-17 17:28:21 -08:00
/* Tests */
2012-01-17 17:28:21 -08:00
#[test]
2012-10-03 14:38:01 -07:00
fn create_port_and_chan() { let p = Port::<int>(); Chan(&p); }
2012-01-17 17:28:21 -08:00
#[test]
fn send_int() {
2012-08-27 14:22:25 -07:00
let p = Port::<int>();
2012-10-03 14:38:01 -07:00
let c = Chan(&p);
2012-01-17 17:28:21 -08:00
send(c, 22);
}
#[test]
fn send_recv_fn() {
2012-08-27 14:22:25 -07:00
let p = Port::<int>();
2012-10-03 14:38:01 -07:00
let c = Chan::<int>(&p);
2012-01-17 17:28:21 -08:00
send(c, 42);
assert (recv(p) == 42);
}
#[test]
fn send_recv_fn_infer() {
2012-08-27 14:22:25 -07:00
let p = Port();
2012-10-03 14:38:01 -07:00
let c = Chan(&p);
2012-01-17 17:28:21 -08:00
send(c, 42);
assert (recv(p) == 42);
}
#[test]
fn chan_chan_infer() {
2012-08-27 14:22:25 -07:00
let p = Port(), p2 = Port::<int>();
2012-10-03 14:38:01 -07:00
let c = Chan(&p);
send(c, Chan(&p2));
2012-01-17 17:28:21 -08:00
recv(p);
}
#[test]
fn chan_chan() {
2012-08-27 14:22:25 -07:00
let p = Port::<Chan<int>>(), p2 = Port::<int>();
2012-10-03 14:38:01 -07:00
let c = Chan(&p);
send(c, Chan(&p2));
2012-01-17 17:28:21 -08:00
recv(p);
}
#[test]
fn test_peek() {
2012-08-27 14:22:25 -07:00
let po = Port();
2012-10-03 14:38:01 -07:00
let ch = Chan(&po);
assert !peek(po);
send(ch, ());
assert peek(po);
recv(po);
assert !peek(po);
}
#[test]
fn test_select2_available() {
2012-08-27 14:22:25 -07:00
let po_a = Port();
let po_b = Port();
2012-10-03 14:38:01 -07:00
let ch_a = Chan(&po_a);
let ch_b = Chan(&po_b);
send(ch_a, ~"a");
2012-08-14 16:54:13 -07:00
assert select2(po_a, po_b) == either::Left(~"a");
send(ch_b, ~"b");
2012-08-14 16:54:13 -07:00
assert select2(po_a, po_b) == either::Right(~"b");
}
#[test]
fn test_select2_rendezvous() {
2012-08-27 14:22:25 -07:00
let po_a = Port();
let po_b = Port();
2012-10-03 14:38:01 -07:00
let ch_a = Chan(&po_a);
let ch_b = Chan(&po_b);
2012-09-10 12:14:14 -07:00
for iter::repeat(10) {
do task::spawn {
2012-09-10 12:14:14 -07:00
for iter::repeat(10) { task::yield() }
send(ch_a, ~"a");
};
2012-08-14 16:54:13 -07:00
assert select2(po_a, po_b) == either::Left(~"a");
do task::spawn {
2012-09-10 12:14:14 -07:00
for iter::repeat(10) { task::yield() }
send(ch_b, ~"b");
};
2012-08-14 16:54:13 -07:00
assert select2(po_a, po_b) == either::Right(~"b");
}
}
#[test]
fn test_select2_stress() {
2012-08-27 14:22:25 -07:00
let po_a = Port();
let po_b = Port();
2012-10-03 14:38:01 -07:00
let ch_a = Chan(&po_a);
let ch_b = Chan(&po_b);
2012-09-10 12:14:14 -07:00
let msgs = 100;
let times = 4u;
for iter::repeat(times) {
do task::spawn {
for iter::repeat(msgs) {
send(ch_a, ~"a")
}
};
do task::spawn {
for iter::repeat(msgs) {
send(ch_b, ~"b")
}
};
}
let mut as_ = 0;
let mut bs = 0;
for iter::repeat(msgs * times * 2u) {
2012-08-15 11:55:17 -07:00
match select2(po_a, po_b) {
either::Left(~"a") => as_ += 1,
2012-08-14 16:54:13 -07:00
either::Right(~"b") => bs += 1,
2012-08-15 11:55:17 -07:00
_ => fail ~"test_select_2_stress failed"
}
}
assert as_ == 400;
assert bs == 400;
}
#[test]
fn test_recv_chan() {
2012-08-27 14:22:25 -07:00
let po = Port();
2012-10-03 14:38:01 -07:00
let ch = Chan(&po);
send(ch, ~"flower");
assert recv_chan(ch) == ~"flower";
}
#[test]
#[should_fail]
#[ignore(cfg(windows))]
fn test_recv_chan_dead() {
2012-10-03 14:38:01 -07:00
let ch = Chan(&Port());
send(ch, ~"flower");
recv_chan(ch);
}
#[test]
#[ignore(cfg(windows))]
fn test_recv_chan_wrong_task() {
2012-08-27 14:22:25 -07:00
let po = Port();
2012-10-03 14:38:01 -07:00
let ch = Chan(&po);
send(ch, ~"flower");
2012-09-25 16:23:04 -07:00
assert result::is_err(&task::try(||
recv_chan(ch)
2012-06-30 16:19:07 -07:00
))
}
#[test]
fn test_port_send() {
2012-08-27 14:22:25 -07:00
let po = Port();
po.send(());
po.recv();
}
#[test]
fn test_chan_peek() {
2012-08-27 14:22:25 -07:00
let po = Port();
let ch = po.chan();
ch.send(());
assert ch.peek();
2012-05-02 00:55:40 -07:00
}
#[test]
fn test_listen() {
2012-06-30 16:19:07 -07:00
do listen |parent| {
do task::spawn {
parent.send(~"oatmeal-salad");
2012-05-02 00:55:40 -07:00
}
assert parent.recv() == ~"oatmeal-salad";
2012-05-02 00:55:40 -07:00
}
2012-05-15 11:51:24 -07:00
}
#[test]
#[ignore(cfg(windows))]
2012-05-15 11:51:24 -07:00
fn test_port_detach_fail() {
2012-09-10 12:14:14 -07:00
for iter::repeat(100) {
do task::spawn_unlinked {
2012-08-27 14:22:25 -07:00
let po = Port();
2012-05-15 11:51:24 -07:00
let ch = po.chan();
do task::spawn {
2012-05-15 11:51:24 -07:00
fail;
}
do task::spawn {
2012-05-15 11:51:24 -07:00
ch.send(());
}
}
}
}