core: Add comm::select2
Receives on two ports simultaneously
This commit is contained in:
parent
b2cfb7ef82
commit
601f7144d8
@ -27,6 +27,7 @@
|
||||
export send;
|
||||
export recv;
|
||||
export peek;
|
||||
export select2;
|
||||
export chan::{};
|
||||
export port::{};
|
||||
|
||||
@ -46,10 +47,14 @@ fn chan_id_send<T: send>(t: *sys::type_desc,
|
||||
fn port_recv(dptr: *uint, po: *rust_port,
|
||||
yield: *ctypes::uintptr_t,
|
||||
killed: *ctypes::uintptr_t);
|
||||
fn rust_port_select(dptr: **rust_port, ports: **rust_port,
|
||||
n_ports: ctypes::size_t,
|
||||
yield: *ctypes::uintptr_t);
|
||||
}
|
||||
|
||||
#[abi = "rust-intrinsic"]
|
||||
native mod rusti {
|
||||
// FIXME: This should probably not take a boxed closure
|
||||
fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T;
|
||||
}
|
||||
|
||||
@ -154,6 +159,45 @@ fn recv(dptr: *uint, port: *rust_port,
|
||||
ret res;
|
||||
}
|
||||
|
||||
#[doc = "Receive on one of two ports"]
|
||||
fn select2<A: send, B: send>(
|
||||
p_a: port<A>, p_b: port<B>
|
||||
) -> either::t<A, B> unsafe {
|
||||
|
||||
fn select(dptr: **rust_port, ports: **rust_port,
|
||||
n_ports: ctypes::size_t, yield: *ctypes::uintptr_t) {
|
||||
rustrt::rust_port_select(dptr, ports, n_ports, yield)
|
||||
}
|
||||
|
||||
let ports = [];
|
||||
ports += [***p_a, ***p_b];
|
||||
let n_ports = 2 as ctypes::size_t;
|
||||
let yield = 0u;
|
||||
let yieldp = ptr::addr_of(yield);
|
||||
|
||||
let resport: *rust_port = vec::as_buf(ports) {|ports|
|
||||
rusti::call_with_retptr {|retptr|
|
||||
select(unsafe::reinterpret_cast(retptr), ports, n_ports, yieldp)
|
||||
}
|
||||
};
|
||||
|
||||
if yield != 0u {
|
||||
// Wait for data
|
||||
task::yield();
|
||||
}
|
||||
|
||||
// Now we know the port we're supposed to receive from
|
||||
assert resport != ptr::null();
|
||||
|
||||
if resport == ***p_a {
|
||||
either::left(recv(p_a))
|
||||
} else if resport == ***p_b {
|
||||
either::right(recv(p_b))
|
||||
} else {
|
||||
fail "unexpected result from rust_port_select";
|
||||
}
|
||||
}
|
||||
|
||||
#[doc = "Returns true if there are messages available"]
|
||||
fn peek<T: send>(p: port<T>) -> bool {
|
||||
rustrt::rust_port_size(***p) != 0u as ctypes::size_t
|
||||
@ -218,4 +262,80 @@ fn test_peek() {
|
||||
assert peek(po);
|
||||
recv(po);
|
||||
assert !peek(po);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select2_available() {
|
||||
let po_a = port();
|
||||
let po_b = port();
|
||||
let ch_a = chan(po_a);
|
||||
let ch_b = chan(po_b);
|
||||
|
||||
send(ch_a, "a");
|
||||
|
||||
assert select2(po_a, po_b) == either::left("a");
|
||||
|
||||
send(ch_b, "b");
|
||||
|
||||
assert select2(po_a, po_b) == either::right("b");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select2_rendezvous() {
|
||||
let po_a = port();
|
||||
let po_b = port();
|
||||
let ch_a = chan(po_a);
|
||||
let ch_b = chan(po_b);
|
||||
|
||||
iter::repeat(10u) {||
|
||||
task::spawn {||
|
||||
iter::repeat(10u) {|| task::yield() }
|
||||
send(ch_a, "a");
|
||||
};
|
||||
|
||||
assert select2(po_a, po_b) == either::left("a");
|
||||
|
||||
task::spawn {||
|
||||
iter::repeat(10u) {|| task::yield() }
|
||||
send(ch_b, "b");
|
||||
};
|
||||
|
||||
assert select2(po_a, po_b) == either::right("b");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select2_stress() {
|
||||
let po_a = port();
|
||||
let po_b = port();
|
||||
let ch_a = chan(po_a);
|
||||
let ch_b = chan(po_b);
|
||||
|
||||
let msgs = 100u;
|
||||
let times = 4u;
|
||||
|
||||
iter::repeat(times) {||
|
||||
task::spawn {||
|
||||
iter::repeat(msgs) {||
|
||||
send(ch_a, "a")
|
||||
}
|
||||
};
|
||||
task::spawn {||
|
||||
iter::repeat(msgs) {||
|
||||
send(ch_b, "b")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let as = 0;
|
||||
let bs = 0;
|
||||
iter::repeat(msgs * times * 2u) {||
|
||||
alt check select2(po_a, po_b) {
|
||||
either::left("a") { as += 1 }
|
||||
either::right("b") { bs += 1 }
|
||||
}
|
||||
}
|
||||
|
||||
assert as == 400;
|
||||
assert bs == 400;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user