diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index e020d5c3e9f..3bb2467f3df 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -7,6 +7,12 @@ import arc::methods; // Things used by code generated by the pipe compiler. export entangle, get_buffer, drop_buffer; +export send_packet_buffered, recv_packet_buffered; +export mk_packet; + +// export these so we can find them in the buffer_resource +// destructor. This is probably another metadata bug. +export atomic_add_acq, atomic_sub_rel; // User-level things export send_packet, recv_packet, send, recv, try_recv, peek; @@ -71,7 +77,7 @@ higher level buffer structure. Packets can maintain a pointer to their buffer, and this is the part that gets freed. It might be helpful to have some idea of a semi-unique pointer (like -being partially pregnant, also like an ARC). +being partially pregnant, also like an ARC). */ @@ -87,7 +93,7 @@ class buffer_header { // get away with restricting it to 0 or 1, if we're careful. let mut ref_count: int; - new() { self.ref_count = 1; } + new() { self.ref_count = 0; } // We may want a drop, and to be careful about stringing this // thing along. @@ -134,6 +140,10 @@ class packet_header { assert self.buffer.is_not_null(); reinterpret_cast(self.buffer) } + + fn set_buffer(b: ~buffer) unsafe { + self.buffer = reinterpret_cast(b); + } } type packet = { @@ -141,6 +151,13 @@ type packet = { mut payload: option, }; +fn mk_packet() -> packet { + { + header: packet_header(), + mut payload: none + } +} + fn unibuffer() -> ~buffer> { let b = ~{ header: buffer_header(), @@ -170,12 +187,25 @@ extern mod rusti { fn atomic_xchng(&dst: int, src: int) -> int; fn atomic_xchng_acq(&dst: int, src: int) -> int; fn atomic_xchng_rel(&dst: int, src: int) -> int; + + fn atomic_add_acq(&dst: int, src: int) -> int; + fn atomic_sub_rel(&dst: int, src: int) -> int; } +// If I call the rusti versions directly from a polymorphic function, +// I get link errors. This is a bug that needs investigated more. fn atomic_xchng_rel(&dst: int, src: int) -> int { rusti::atomic_xchng_rel(dst, src) } +fn atomic_add_acq(&dst: int, src: int) -> int { + rusti::atomic_add_acq(dst, src) +} + +fn atomic_sub_rel(&dst: int, src: int) -> int { + rusti::atomic_sub_rel(dst, src) +} + type rust_task = libc::c_void; extern mod rustrt { @@ -222,13 +252,21 @@ unsafe fn get_buffer(p: *packet_header) -> ~buffer { class buffer_resource { let buffer: ~buffer; new(+b: ~buffer) { + let p = ptr::addr_of(*b); + #error("take %?", p); + atomic_add_acq(b.header.ref_count, 1); self.buffer = b; } drop unsafe { let b = move!{self.buffer}; - let old_count = atomic_xchng_rel(b.header.ref_count, 0); - if old_count == 0 { + let p = ptr::addr_of(*b); + #error("drop %?", p); + let old_count = atomic_sub_rel(b.header.ref_count, 1); + //let old_count = atomic_xchng_rel(b.header.ref_count, 0); + if old_count == 1 { + // The new count is 0. + // go go gadget drop glue } else { @@ -237,7 +275,8 @@ class buffer_resource { } } -fn send(-p: send_packet, -payload: T) { +fn send(-p: send_packet_buffered, + -payload: T) { let header = p.header(); let p_ = p.unwrap(); let p = unsafe { &*p_ }; @@ -273,11 +312,13 @@ fn send(-p: send_packet, -payload: T) { } } -fn recv(-p: recv_packet) -> T { +fn recv(-p: recv_packet_buffered) -> T { option::unwrap(try_recv(p)) } -fn try_recv(-p: recv_packet) -> option { +fn try_recv(-p: recv_packet_buffered) + -> option +{ let p_ = p.unwrap(); let p = unsafe { &*p_ }; let this = rustrt::rust_get_task(); @@ -498,6 +539,10 @@ class send_packet_buffered { p <-> self.p; sender_terminate(option::unwrap(p)) } + unsafe { #error("send_drop: %?", + if self.buffer == none { + "none" + } else { "some" }); } } fn unwrap() -> *packet { let mut p = none; @@ -518,6 +563,13 @@ class send_packet_buffered { none { fail ~"packet already consumed" } } } + + fn reuse_buffer() -> buffer_resource { + #error("send reuse_buffer"); + let mut tmp = none; + tmp <-> self.buffer; + option::unwrap(tmp) + } } type recv_packet = recv_packet_buffered>; @@ -547,6 +599,10 @@ class recv_packet_buffered : selectable { p <-> self.p; receiver_terminate(option::unwrap(p)) } + unsafe { #error("recv_drop: %?", + if self.buffer == none { + "none" + } else { "some" }); } } fn unwrap() -> *packet { let mut p = none; @@ -567,6 +623,13 @@ class recv_packet_buffered : selectable { none { fail ~"packet already consumed" } } } + + fn reuse_buffer() -> buffer_resource { + #error("recv reuse_buffer"); + let mut tmp = none; + tmp <-> self.buffer; + option::unwrap(tmp) + } } fn entangle() -> (send_packet, recv_packet) { diff --git a/src/test/run-pass/pipe-pingpong-bounded.rs b/src/test/run-pass/pipe-pingpong-bounded.rs new file mode 100644 index 00000000000..bad605b8929 --- /dev/null +++ b/src/test/run-pass/pipe-pingpong-bounded.rs @@ -0,0 +1,106 @@ +// Ping-pong is a bounded protocol. This is place where I can +// experiment with what code the compiler should generate for bounded +// protocols. + +// xfail-pretty + +// This was generated initially by the pipe compiler, but it's been +// modified in hopefully straightforward ways. +mod pingpong { + import pipes::*; + + type packets = { + ping: packet, + pong: packet, + }; + + fn init() -> (client::ping, server::ping) { + let buffer = ~{ + header: buffer_header(), + data: { + ping: mk_packet::(), + pong: mk_packet::() + } + }; + unsafe { + buffer.data.ping.header.set_buffer(buffer); + buffer.data.pong.header.set_buffer(buffer); + } + let client = send_packet_buffered(ptr::addr_of(buffer.data.ping)); + let server = recv_packet_buffered(ptr::addr_of(buffer.data.ping)); + (client, server) + } + enum ping = server::pong; + enum pong = client::ping; + mod client { + fn ping(+pipe: ping) -> pong { + { + let b = pipe.reuse_buffer(); + let s = send_packet_buffered(ptr::addr_of(b.buffer.data.pong)); + let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.pong)); + let message = pingpong::ping(s); + pipes::send(pipe, message); + c + } + } + type ping = pipes::send_packet_buffered; + type pong = pipes::recv_packet_buffered; + } + mod server { + type ping = pipes::recv_packet_buffered; + fn pong(+pipe: pong) -> ping { + { + let b = pipe.reuse_buffer(); + let s = send_packet_buffered(ptr::addr_of(b.buffer.data.ping)); + let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.ping)); + let message = pingpong::pong(s); + pipes::send(pipe, message); + c + } + } + type pong = pipes::send_packet_buffered; + } +} + +mod test { + import pipes::recv; + import pingpong::{ping, pong}; + + fn client(-chan: pingpong::client::ping) { + import pingpong::client; + + let chan = client::ping(chan); ret; + log(error, "Sent ping"); + let pong(_chan) = recv(chan); + log(error, "Received pong"); + } + + fn server(-chan: pingpong::server::ping) { + import pingpong::server; + + let ping(chan) = recv(chan); ret; + log(error, "Received ping"); + let _chan = server::pong(chan); + log(error, "Sent pong"); + } +} + +fn main() { + let (client_, server_) = pingpong::init(); + let client_ = ~mut some(client_); + let server_ = ~mut some(server_); + do task::spawn |move client_| { + let mut client__ = none; + *client_ <-> client__; + test::client(option::unwrap(client__)); + }; + do task::spawn |move server_| { + let mut server_ˊ = none; + *server_ <-> server_ˊ; + test::server(option::unwrap(server_ˊ)); + }; +}