diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index da11e77339a..7362614a769 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -3,6 +3,19 @@ import unsafe::{forget, reinterpret_cast, transmute}; import either::{either, left, right}; import option::unwrap; +import arc::methods; + +/* Use this after the snapshot +macro_rules! move { + { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } +} +*/ + +fn macros() { + #macro[ + [#move(x), { unsafe { let y <- *ptr::addr_of(x); y } }] + ]; +} enum state { empty, @@ -455,11 +468,6 @@ enum port { fn stream() -> (chan, port) { let (c, s) = streamp::init(); - #macro[ - [#move[x], - unsafe { let y <- *ptr::addr_of(x); y }] - ]; - (chan_({ mut endp: some(c) }), port_({ mut endp: some(s) })) } @@ -506,3 +514,69 @@ impl port for port { peek } } + +// Treat a whole bunch of ports as one. +class port_set { + let mut ports: ~[pipes::port]; + + new() { self.ports = ~[]; } + + fn add(+port: pipes::port) { + vec::push(self.ports, port) + } + + fn try_recv() -> option { + let mut result = none; + while result == none && self.ports.len() > 0 { + let i = pipes::wait_many(self.ports.map(|p| p.header())); + // dereferencing an unsafe pointer nonsense to appease the + // borrowchecker. + alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} { + some(m) { + result = some(#move(m)); + } + none { + // Remove this port. + let mut ports = ~[]; + self.ports <-> ports; + vec::consume(ports, + |j, x| if i != j { vec::push(self.ports, x) }); + } + } + } + result + } + + fn recv() -> T { + option::unwrap(self.try_recv()) + } +} + +impl private_methods/& for pipes::port { + pure fn header() -> *pipes::packet_header unchecked { + alt self.endp { + some(endp) { + endp.header() + } + none { fail "peeking empty stream" } + } + } +} + + +type shared_chan = arc::exclusive>; + +impl chan for shared_chan { + fn send(+x: T) { + let mut xx = some(x); + do self.with |_c, chan| { + let mut x = none; + x <-> xx; + chan.send(option::unwrap(x)) + } + } +} + +fn shared_chan(+c: pipes::chan) -> shared_chan { + arc::exclusive(c) +} diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index ea483460953..b5fd5a4526e 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -13,7 +13,7 @@ import io::writer; import io::writer_util; import arc::methods; -import pipes::{port, chan}; +import pipes::{port, chan, shared_chan}; macro_rules! move { { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } @@ -96,113 +96,3 @@ fn main(args: ~[str]) { #debug("%?", args); run(args); } - -// Treat a whole bunch of ports as one. -class box { - let mut contents: option; - new(+x: T) { self.contents = some(x); } - - fn swap(f: fn(+T) -> T) { - let mut tmp = none; - self.contents <-> tmp; - self.contents = some(f(option::unwrap(tmp))); - } - - fn unwrap() -> T { - let mut tmp = none; - self.contents <-> tmp; - option::unwrap(tmp) - } -} - -class port_set { - let mut ports: ~[pipes::port]; - - new() { self.ports = ~[]; } - - fn add(+port: pipes::port) { - vec::push(self.ports, port) - } - - fn try_recv() -> option { - let mut result = none; - while result == none && self.ports.len() > 0 { - let i = pipes::wait_many(self.ports.map(|p| p.header())); - // dereferencing an unsafe pointer nonsense to appease the - // borrowchecker. - alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} { - some(m) { - result = some(move!{m}); - } - none { - // Remove this port. - let mut ports = ~[]; - self.ports <-> ports; - vec::consume(ports, - |j, x| if i != j { vec::push(self.ports, x) }); - } - } - } -/* - while !done { - do self.ports.swap |ports| { - if ports.len() > 0 { - let old_len = ports.len(); - let (_, m, ports) = pipes::select(ports); - alt m { - some(pipes::streamp::data(x, next)) { - result = some(move!{x}); - done = true; - assert ports.len() == old_len - 1; - vec::append_one(ports, move!{next}) - } - none { - //#error("pipe closed"); - assert ports.len() == old_len - 1; - ports - } - } - } - else { - //#error("no more pipes"); - done = true; - ~[] - } - } - } -*/ - result - } - - fn recv() -> T { - option::unwrap(self.try_recv()) - } -} - -impl private_methods/& for pipes::port { - pure fn header() -> *pipes::packet_header unchecked { - alt self.endp { - some(endp) { - endp.header() - } - none { fail "peeking empty stream" } - } - } -} - -type shared_chan = arc::exclusive>; - -impl chan for shared_chan { - fn send(+x: T) { - let mut xx = some(x); - do self.with |_c, chan| { - let mut x = none; - x <-> xx; - chan.send(option::unwrap(x)) - } - } -} - -fn shared_chan(+c: pipes::chan) -> shared_chan { - arc::exclusive(c) -} \ No newline at end of file diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 085bba6ddb4..3b9ea45f221 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -8,7 +8,7 @@ use std; import io::writer; import io::writer_util; -import pipes::{port, chan}; +import pipes::{port, port_set, chan}; macro_rules! move { { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } @@ -92,96 +92,3 @@ fn main(args: ~[str]) { #debug("%?", args); run(args); } - -// Treat a whole bunch of ports as one. -class box { - let mut contents: option; - new(+x: T) { self.contents = some(x); } - - fn swap(f: fn(+T) -> T) { - let mut tmp = none; - self.contents <-> tmp; - self.contents = some(f(option::unwrap(tmp))); - } - - fn unwrap() -> T { - let mut tmp = none; - self.contents <-> tmp; - option::unwrap(tmp) - } -} - -class port_set { - let mut ports: ~[pipes::port]; - - new() { self.ports = ~[]; } - - fn add(+port: pipes::port) { - vec::push(self.ports, port) - } - - fn try_recv() -> option { - let mut result = none; - while result == none && self.ports.len() > 0 { - let i = pipes::wait_many(self.ports.map(|p| p.header())); - // dereferencing an unsafe pointer nonsense to appease the - // borrowchecker. - alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} { - some(m) { - result = some(move!{m}); - } - none { - // Remove this port. - let mut ports = ~[]; - self.ports <-> ports; - vec::consume(ports, - |j, x| if i != j { vec::push(self.ports, x) }); - } - } - } -/* - while !done { - do self.ports.swap |ports| { - if ports.len() > 0 { - let old_len = ports.len(); - let (_, m, ports) = pipes::select(ports); - alt m { - some(pipes::streamp::data(x, next)) { - result = some(move!{x}); - done = true; - assert ports.len() == old_len - 1; - vec::append_one(ports, move!{next}) - } - none { - //#error("pipe closed"); - assert ports.len() == old_len - 1; - ports - } - } - } - else { - //#error("no more pipes"); - done = true; - ~[] - } - } - } -*/ - result - } - - fn recv() -> T { - option::unwrap(self.try_recv()) - } -} - -impl private_methods/& for pipes::port { - pure fn header() -> *pipes::packet_header unchecked { - alt self.endp { - some(endp) { - endp.header() - } - none { fail "peeking empty stream" } - } - } -} \ No newline at end of file