core: Add send/recv/peek methods for both ports and chans
Calling peek or recv on channels can fail when the associated port is dead or unowned.
This commit is contained in:
parent
18f898315a
commit
13a4b59cc8
@ -33,6 +33,7 @@
|
||||
export peek;
|
||||
export recv_chan;
|
||||
export select2;
|
||||
export methods;
|
||||
|
||||
|
||||
#[doc = "
|
||||
@ -67,6 +68,71 @@ fn port<T: send>() -> port<T> {
|
||||
port_t(@port_ptr(rustrt::new_port(sys::size_of::<T>())))
|
||||
}
|
||||
|
||||
impl methods<T: send> for port<T> {
|
||||
|
||||
fn chan() -> chan<T> { chan(self) }
|
||||
fn send(+v: T) { self.chan().send(v) }
|
||||
fn recv() -> T { recv(self) }
|
||||
fn peek() -> bool { peek(self) }
|
||||
|
||||
}
|
||||
|
||||
impl methods<T: send> for chan<T> {
|
||||
|
||||
fn chan() -> chan<T> { self }
|
||||
fn send(+v: T) { send(self, v) }
|
||||
fn recv() -> T { recv_chan(self) }
|
||||
fn peek() -> bool { peek_chan(self) }
|
||||
|
||||
}
|
||||
|
||||
resource port_ptr<T: send>(po: *rust_port) {
|
||||
// Once the port is detached it's guaranteed not to receive further
|
||||
// messages
|
||||
let yield = 0u;
|
||||
let yieldp = ptr::addr_of(yield);
|
||||
rustrt::rust_port_begin_detach(po, yieldp);
|
||||
if yield != 0u {
|
||||
// Need to wait for the port to be detached
|
||||
// FIXME: If this fails then we're going to leave our port
|
||||
// in a bogus state. (Issue #1988)
|
||||
task::yield();
|
||||
}
|
||||
rustrt::rust_port_end_detach(po);
|
||||
|
||||
// Drain the port so that all the still-enqueued items get dropped
|
||||
while rustrt::rust_port_size(po) > 0u {
|
||||
recv_::<T>(po);
|
||||
}
|
||||
rustrt::del_port(po);
|
||||
}
|
||||
|
||||
#[doc = "
|
||||
Internal function for converting from a channel to a port
|
||||
|
||||
# Failure
|
||||
|
||||
Fails if the port is detached or dead. Fails if the port
|
||||
is owned by a different task.
|
||||
"]
|
||||
fn as_raw_port<T: send, U>(ch: comm::chan<T>, f: fn(*rust_port) -> U) -> U {
|
||||
resource portref(p: *rust_port) {
|
||||
if !ptr::is_null(p) {
|
||||
rustrt::rust_port_drop(p);
|
||||
}
|
||||
}
|
||||
|
||||
let p = portref(rustrt::rust_port_take(*ch));
|
||||
|
||||
if ptr::is_null(*p) {
|
||||
fail "unable to locate port for channel"
|
||||
} else if rustrt::get_task_id() != rustrt::rust_port_task(*p) {
|
||||
fail "unable to access unowned port"
|
||||
}
|
||||
|
||||
f(*p)
|
||||
}
|
||||
|
||||
#[doc = "
|
||||
Constructs a channel. The channel is bound to the port used to
|
||||
construct it.
|
||||
@ -96,49 +162,15 @@ fn send<T: send>(ch: chan<T>, -data: T) {
|
||||
fn recv<T: send>(p: port<T>) -> T { recv_(***p) }
|
||||
|
||||
#[doc = "Returns true if there are messages available"]
|
||||
fn peek<T: send>(p: port<T>) -> bool {
|
||||
rustrt::rust_port_size(***p) != 0u as libc::size_t
|
||||
}
|
||||
|
||||
resource port_ptr<T: send>(po: *rust_port) {
|
||||
// Once the port is detached it's guaranteed not to receive further
|
||||
// messages
|
||||
let yield = 0u;
|
||||
let yieldp = ptr::addr_of(yield);
|
||||
rustrt::rust_port_begin_detach(po, yieldp);
|
||||
if yield != 0u {
|
||||
// Need to wait for the port to be detached
|
||||
// FIXME: If this fails then we're going to leave our port
|
||||
// in a bogus state. (Issue #1988)
|
||||
task::yield();
|
||||
}
|
||||
rustrt::rust_port_end_detach(po);
|
||||
|
||||
// Drain the port so that all the still-enqueued items get dropped
|
||||
while rustrt::rust_port_size(po) > 0u {
|
||||
recv_::<T>(po);
|
||||
}
|
||||
rustrt::del_port(po);
|
||||
}
|
||||
|
||||
fn peek<T: send>(p: port<T>) -> bool { peek_(***p) }
|
||||
|
||||
#[doc(hidden)]
|
||||
fn recv_chan<T: send>(ch: comm::chan<T>) -> T {
|
||||
resource portref(p: *rust_port) {
|
||||
if !ptr::is_null(p) {
|
||||
rustrt::rust_port_drop(p);
|
||||
}
|
||||
}
|
||||
as_raw_port(ch, recv_(_))
|
||||
}
|
||||
|
||||
let p = portref(rustrt::rust_port_take(*ch));
|
||||
|
||||
if ptr::is_null(*p) {
|
||||
fail "unable to locate port for channel"
|
||||
} else if rustrt::get_task_id() != rustrt::rust_port_task(*p) {
|
||||
fail "attempting to receive on unowned channel"
|
||||
}
|
||||
|
||||
recv_(*p)
|
||||
fn peek_chan<T: send>(ch: comm::chan<T>) -> bool {
|
||||
as_raw_port(ch, peek_(_))
|
||||
}
|
||||
|
||||
#[doc = "Receive on a raw port pointer"]
|
||||
@ -160,6 +192,10 @@ fn recv_<T: send>(p: *rust_port) -> T {
|
||||
ret res;
|
||||
}
|
||||
|
||||
fn peek_(p: *rust_port) -> bool unsafe {
|
||||
rustrt::rust_port_size(p) != 0u as libc::size_t
|
||||
}
|
||||
|
||||
#[doc = "Receive on one of two ports"]
|
||||
fn select2<A: send, B: send>(p_a: port<A>, p_b: port<B>)
|
||||
-> either<A, B> unsafe {
|
||||
@ -391,3 +427,18 @@ fn test_recv_chan_wrong_task() {
|
||||
recv_chan(ch)
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_port_send() {
|
||||
let po = port();
|
||||
po.send(());
|
||||
po.recv();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chan_peek() {
|
||||
let po = port();
|
||||
let ch = po.chan();
|
||||
ch.send(());
|
||||
assert ch.peek();
|
||||
}
|
Loading…
Reference in New Issue
Block a user