Move streams into core.

This commit is contained in:
Eric Holk 2012-07-10 11:58:43 -07:00
parent 594d9a0554
commit 22e955a76a
2 changed files with 71 additions and 43 deletions

View File

@ -2,6 +2,7 @@
import unsafe::{forget, reinterpret_cast, transmute};
import either::{either, left, right};
import option::unwrap;
enum state {
empty,
@ -428,3 +429,69 @@ fn spawn_service_recv<T: send>(
client
}
// Streams - Make pipes a little easier in general.
proto! streamp {
open:send<T: send> {
data(T) -> open<T>
}
}
type chan<T:send> = { mut endp: option<streamp::client::open<T>> };
type port<T:send> = { mut endp: option<streamp::server::open<T>> };
fn stream<T:send>() -> (chan<T>, port<T>) {
let (c, s) = streamp::init();
#macro[
[#move[x],
unsafe { let y <- *ptr::addr_of(x); y }]
];
({ mut endp: some(c) }, { mut endp: some(s) })
}
impl chan<T: send> for chan<T> {
fn send(+x: T) {
let mut endp = none;
endp <-> self.endp;
self.endp = some(
streamp::client::data(unwrap(endp), x))
}
}
impl port<T: send> for port<T> {
fn recv() -> T {
let mut endp = none;
endp <-> self.endp;
let streamp::data(x, endp) = pipes::recv(unwrap(endp));
self.endp = some(endp);
x
}
fn try_recv() -> option<T> {
let mut endp = none;
endp <-> self.endp;
alt pipes::try_recv(unwrap(endp)) {
some(streamp::data(x, endp)) {
self.endp = some(#move(endp));
some(#move(x))
}
none { none }
}
}
pure fn peek() -> bool unchecked {
let mut endp = none;
endp <-> self.endp;
let peek = alt endp {
some(endp) {
pipes::peek(endp)
}
none { fail "peeking empty stream" }
};
self.endp <-> endp;
peek
}
}

View File

@ -9,46 +9,7 @@ import std::map;
import std::map::hashmap;
import std::sort;
import stream::{stream, chan, port};
// After a snapshot, this should move into core, or std.
mod stream {
import option::unwrap;
proto! streamp {
open:send<T: send> {
data(T) -> open<T>
}
}
type chan<T:send> = { mut endp: option<streamp::client::open<T>> };
type port<T:send> = { mut endp: option<streamp::server::open<T>> };
fn stream<T:send>() -> (chan<T>, port<T>) {
let (c, s) = streamp::init();
({ mut endp: some(c) }, { mut endp: some(s) })
}
impl chan<T: send> for chan<T> {
fn send(+x: T) {
let mut endp = none;
endp <-> self.endp;
self.endp = some(
streamp::client::data(unwrap(endp), x))
}
}
impl port<T: send> for port<T> {
fn recv() -> T {
let mut endp = none;
endp <-> self.endp;
let streamp::data(x, endp) = unwrap(
pipes::try_recv(unwrap(endp)));
self.endp = some(endp);
x
}
}
}
import pipes::{stream, port, chan};
// given a map, print a sorted version of it
fn sort_and_fmt(mm: hashmap<~[u8], uint>, total: uint) -> str {
@ -127,8 +88,8 @@ fn windows_with_carry(bb: ~[const u8], nn: uint,
ret vec::slice(bb, len - (nn - 1u), len);
}
fn make_sequence_processor(sz: uint, from_parent: stream::port<~[u8]>,
to_parent: stream::chan<str>) {
fn make_sequence_processor(sz: uint, from_parent: pipes::port<~[u8]>,
to_parent: pipes::chan<str>) {
let freqs: hashmap<~[u8], uint> = map::bytes_hash();
let mut carry: ~[u8] = ~[];
@ -190,7 +151,7 @@ fn main(args: ~[str]) {
vec::push(from_child, from_child_);
let (to_child, from_parent) = stream::stream();
let (to_child, from_parent) = pipes::stream();
do task::spawn_with(from_parent) |from_parent| {
make_sequence_processor(sz, from_parent, to_parent_);