Hand-written bounded pingpong implementation.

This commit is contained in:
Eric Holk 2012-07-23 11:28:38 -07:00
parent d74fb9875b
commit 4f29814f2a
2 changed files with 176 additions and 7 deletions

View File

@ -7,6 +7,12 @@
// Things used by code generated by the pipe compiler.
export entangle, get_buffer, drop_buffer;
export send_packet_buffered, recv_packet_buffered;
export mk_packet;
// export these so we can find them in the buffer_resource
// destructor. This is probably another metadata bug.
export atomic_add_acq, atomic_sub_rel;
// User-level things
export send_packet, recv_packet, send, recv, try_recv, peek;
@ -71,7 +77,7 @@ fn move<T>(-x: T) -> T { x }
buffer, and this is the part that gets freed.
It might be helpful to have some idea of a semi-unique pointer (like
being partially pregnant, also like an ARC).
being partially pregnant, also like an ARC).
*/
@ -87,7 +93,7 @@ enum state {
// get away with restricting it to 0 or 1, if we're careful.
let mut ref_count: int;
new() { self.ref_count = 1; }
new() { self.ref_count = 0; }
// We may want a drop, and to be careful about stringing this
// thing along.
@ -134,6 +140,10 @@ unsafe fn buf_header() -> ~buffer_header {
assert self.buffer.is_not_null();
reinterpret_cast(self.buffer)
}
fn set_buffer<T: send>(b: ~buffer<T>) unsafe {
self.buffer = reinterpret_cast(b);
}
}
type packet<T: send> = {
@ -141,6 +151,13 @@ unsafe fn buf_header() -> ~buffer_header {
mut payload: option<T>,
};
fn mk_packet<T: send>() -> packet<T> {
{
header: packet_header(),
mut payload: none
}
}
fn unibuffer<T: send>() -> ~buffer<packet<T>> {
let b = ~{
header: buffer_header(),
@ -170,12 +187,25 @@ fn packet<T: send>() -> *packet<T> {
fn atomic_xchng(&dst: int, src: int) -> int;
fn atomic_xchng_acq(&dst: int, src: int) -> int;
fn atomic_xchng_rel(&dst: int, src: int) -> int;
fn atomic_add_acq(&dst: int, src: int) -> int;
fn atomic_sub_rel(&dst: int, src: int) -> int;
}
// If I call the rusti versions directly from a polymorphic function,
// I get link errors. This is a bug that needs investigated more.
fn atomic_xchng_rel(&dst: int, src: int) -> int {
rusti::atomic_xchng_rel(dst, src)
}
fn atomic_add_acq(&dst: int, src: int) -> int {
rusti::atomic_add_acq(dst, src)
}
fn atomic_sub_rel(&dst: int, src: int) -> int {
rusti::atomic_sub_rel(dst, src)
}
type rust_task = libc::c_void;
extern mod rustrt {
@ -222,13 +252,21 @@ unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
class buffer_resource<T: send> {
let buffer: ~buffer<T>;
new(+b: ~buffer<T>) {
let p = ptr::addr_of(*b);
#error("take %?", p);
atomic_add_acq(b.header.ref_count, 1);
self.buffer = b;
}
drop unsafe {
let b = move!{self.buffer};
let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 0 {
let p = ptr::addr_of(*b);
#error("drop %?", p);
let old_count = atomic_sub_rel(b.header.ref_count, 1);
//let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 1 {
// The new count is 0.
// go go gadget drop glue
}
else {
@ -237,7 +275,8 @@ unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
}
}
fn send<T: send>(-p: send_packet<T>, -payload: T) {
fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
-payload: T) {
let header = p.header();
let p_ = p.unwrap();
let p = unsafe { &*p_ };
@ -273,11 +312,13 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
}
}
fn recv<T: send>(-p: recv_packet<T>) -> T {
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
option::unwrap(try_recv(p))
}
fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
-> option<T>
{
let p_ = p.unwrap();
let p = unsafe { &*p_ };
let this = rustrt::rust_get_task();
@ -498,6 +539,10 @@ fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
p <-> self.p;
sender_terminate(option::unwrap(p))
}
unsafe { #error("send_drop: %?",
if self.buffer == none {
"none"
} else { "some" }); }
}
fn unwrap() -> *packet<T> {
let mut p = none;
@ -518,6 +563,13 @@ fn unwrap() -> *packet<T> {
none { fail ~"packet already consumed" }
}
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
#error("send reuse_buffer");
let mut tmp = none;
tmp <-> self.buffer;
option::unwrap(tmp)
}
}
type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
@ -547,6 +599,10 @@ fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
p <-> self.p;
receiver_terminate(option::unwrap(p))
}
unsafe { #error("recv_drop: %?",
if self.buffer == none {
"none"
} else { "some" }); }
}
fn unwrap() -> *packet<T> {
let mut p = none;
@ -567,6 +623,13 @@ fn unwrap() -> *packet<T> {
none { fail ~"packet already consumed" }
}
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
#error("recv reuse_buffer");
let mut tmp = none;
tmp <-> self.buffer;
option::unwrap(tmp)
}
}
fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {

View File

@ -0,0 +1,106 @@
// Ping-pong is a bounded protocol. This is place where I can
// experiment with what code the compiler should generate for bounded
// protocols.
// xfail-pretty
// This was generated initially by the pipe compiler, but it's been
// modified in hopefully straightforward ways.
mod pingpong {
import pipes::*;
type packets = {
ping: packet<ping>,
pong: packet<pong>,
};
fn init() -> (client::ping, server::ping) {
let buffer = ~{
header: buffer_header(),
data: {
ping: mk_packet::<ping>(),
pong: mk_packet::<pong>()
}
};
unsafe {
buffer.data.ping.header.set_buffer(buffer);
buffer.data.pong.header.set_buffer(buffer);
}
let client = send_packet_buffered(ptr::addr_of(buffer.data.ping));
let server = recv_packet_buffered(ptr::addr_of(buffer.data.ping));
(client, server)
}
enum ping = server::pong;
enum pong = client::ping;
mod client {
fn ping(+pipe: ping) -> pong {
{
let b = pipe.reuse_buffer();
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.pong));
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.pong));
let message = pingpong::ping(s);
pipes::send(pipe, message);
c
}
}
type ping = pipes::send_packet_buffered<pingpong::ping,
pingpong::packets>;
type pong = pipes::recv_packet_buffered<pingpong::pong,
pingpong::packets>;
}
mod server {
type ping = pipes::recv_packet_buffered<pingpong::ping,
pingpong::packets>;
fn pong(+pipe: pong) -> ping {
{
let b = pipe.reuse_buffer();
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.ping));
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.ping));
let message = pingpong::pong(s);
pipes::send(pipe, message);
c
}
}
type pong = pipes::send_packet_buffered<pingpong::pong,
pingpong::packets>;
}
}
mod test {
import pipes::recv;
import pingpong::{ping, pong};
fn client(-chan: pingpong::client::ping) {
import pingpong::client;
let chan = client::ping(chan); ret;
log(error, "Sent ping");
let pong(_chan) = recv(chan);
log(error, "Received pong");
}
fn server(-chan: pingpong::server::ping) {
import pingpong::server;
let ping(chan) = recv(chan); ret;
log(error, "Received ping");
let _chan = server::pong(chan);
log(error, "Sent pong");
}
}
fn main() {
let (client_, server_) = pingpong::init();
let client_ = ~mut some(client_);
let server_ = ~mut some(server_);
do task::spawn |move client_| {
let mut client__ = none;
*client_ <-> client__;
test::client(option::unwrap(client__));
};
do task::spawn |move server_| {
let mut server_ˊ = none;
*server_ <-> server_ˊ;
test::server(option::unwrap(server_ˊ));
};
}