commit
c316189d15
@ -157,7 +157,7 @@ concurrently:
|
||||
|
||||
~~~~
|
||||
use task::spawn;
|
||||
use pipes::{stream, Port, Chan};
|
||||
use comm::{stream, Port, Chan};
|
||||
|
||||
let (port, chan): (Port<int>, Chan<int>) = stream();
|
||||
|
||||
@ -178,7 +178,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
|
||||
a tuple into its component parts).
|
||||
|
||||
~~~~
|
||||
# use pipes::{stream, Chan, Port};
|
||||
# use comm::{stream, Chan, Port};
|
||||
let (port, chan): (Port<int>, Chan<int>) = stream();
|
||||
~~~~
|
||||
|
||||
@ -189,7 +189,7 @@ spawns the child task.
|
||||
~~~~
|
||||
# use task::{spawn};
|
||||
# use task::spawn;
|
||||
# use pipes::{stream, Port, Chan};
|
||||
# use comm::{stream, Port, Chan};
|
||||
# fn some_expensive_computation() -> int { 42 }
|
||||
# let (port, chan) = stream();
|
||||
do spawn || {
|
||||
@ -209,7 +209,7 @@ computation, then waits for the child's result to arrive on the
|
||||
port:
|
||||
|
||||
~~~~
|
||||
# use pipes::{stream, Port, Chan};
|
||||
# use comm::{stream, Port, Chan};
|
||||
# fn some_other_expensive_computation() {}
|
||||
# let (port, chan) = stream::<int>();
|
||||
# chan.send(0);
|
||||
@ -225,7 +225,7 @@ following program is ill-typed:
|
||||
|
||||
~~~ {.xfail-test}
|
||||
# use task::{spawn};
|
||||
# use pipes::{stream, Port, Chan};
|
||||
# use comm::{stream, Port, Chan};
|
||||
# fn some_expensive_computation() -> int { 42 }
|
||||
let (port, chan) = stream();
|
||||
|
||||
@ -245,7 +245,7 @@ Instead we can use a `SharedChan`, a type that allows a single
|
||||
|
||||
~~~
|
||||
# use task::spawn;
|
||||
use pipes::{stream, SharedChan};
|
||||
use comm::{stream, SharedChan};
|
||||
|
||||
let (port, chan) = stream();
|
||||
let chan = SharedChan(chan);
|
||||
@ -278,7 +278,7 @@ might look like the example below.
|
||||
|
||||
~~~
|
||||
# use task::spawn;
|
||||
# use pipes::{stream, Port, Chan};
|
||||
# use comm::{stream, Port, Chan};
|
||||
|
||||
// Create a vector of ports, one for each child task
|
||||
let ports = do vec::from_fn(3) |init_val| {
|
||||
@ -393,7 +393,7 @@ internally, with additional logic to wait for the child task to finish
|
||||
before returning. Hence:
|
||||
|
||||
~~~
|
||||
# use pipes::{stream, Chan, Port};
|
||||
# use comm::{stream, Chan, Port};
|
||||
# use task::{spawn, try};
|
||||
# fn sleep_forever() { loop { task::yield() } }
|
||||
# do task::try {
|
||||
@ -468,7 +468,7 @@ Here is the function that implements the child task:
|
||||
|
||||
~~~~
|
||||
# use std::comm::DuplexStream;
|
||||
# use pipes::{Port, Chan};
|
||||
# use comm::{Port, Chan};
|
||||
fn stringifier(channel: &DuplexStream<~str, uint>) {
|
||||
let mut value: uint;
|
||||
loop {
|
||||
@ -491,7 +491,7 @@ Here is the code for the parent task:
|
||||
|
||||
~~~~
|
||||
# use std::comm::DuplexStream;
|
||||
# use pipes::{Port, Chan};
|
||||
# use comm::{Port, Chan};
|
||||
# use task::spawn;
|
||||
# fn stringifier(channel: &DuplexStream<~str, uint>) {
|
||||
# let mut value: uint;
|
||||
|
@ -76,7 +76,7 @@ pub fn run(lib_path: ~str,
|
||||
|
||||
|
||||
writeclose(pipe_in.out, input);
|
||||
let p = pipes::PortSet();
|
||||
let p = comm::PortSet();
|
||||
let ch = p.chan();
|
||||
do task::spawn_sched(task::SingleThreaded) || {
|
||||
let errput = readclose(pipe_err.in);
|
||||
|
410
src/libcore/comm.rs
Normal file
410
src/libcore/comm.rs
Normal file
@ -0,0 +1,410 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
// Transitional -- needs snapshot
|
||||
#[allow(structural_records)];
|
||||
|
||||
use either::{Either, Left, Right};
|
||||
use kinds::Owned;
|
||||
use option;
|
||||
use option::{Option, Some, None, unwrap};
|
||||
use private;
|
||||
use vec;
|
||||
|
||||
use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
|
||||
|
||||
// NOTE Making this public exposes some plumbing from pipes. Needs
|
||||
// some refactoring
|
||||
pub use pipes::Selectable;
|
||||
|
||||
/// A trait for things that can send multiple messages.
|
||||
pub trait GenericChan<T> {
|
||||
/// Sends a message.
|
||||
fn send(x: T);
|
||||
}
|
||||
|
||||
/// Things that can send multiple messages and can detect when the receiver
|
||||
/// is closed
|
||||
pub trait GenericSmartChan<T> {
|
||||
/// Sends a message, or report if the receiver has closed the connection.
|
||||
fn try_send(x: T) -> bool;
|
||||
}
|
||||
|
||||
/// A trait for things that can receive multiple messages.
|
||||
pub trait GenericPort<T> {
|
||||
/// Receives a message, or fails if the connection closes.
|
||||
fn recv() -> T;
|
||||
|
||||
/** Receives a message, or returns `none` if
|
||||
the connection is closed or closes.
|
||||
*/
|
||||
fn try_recv() -> Option<T>;
|
||||
}
|
||||
|
||||
/// Ports that can `peek`
|
||||
pub trait Peekable<T> {
|
||||
/// Returns true if a message is available
|
||||
pure fn peek() -> bool;
|
||||
}
|
||||
|
||||
/// Returns the index of an endpoint that is ready to receive.
|
||||
pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint {
|
||||
wait_many(endpoints)
|
||||
}
|
||||
|
||||
/// Returns 0 or 1 depending on which endpoint is ready to receive
|
||||
pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) ->
|
||||
Either<(), ()> {
|
||||
match wait_many([a.header(), b.header()]) {
|
||||
0 => Left(()),
|
||||
1 => Right(()),
|
||||
_ => fail!(~"wait returned unexpected index")
|
||||
}
|
||||
}
|
||||
|
||||
// Streams - Make pipes a little easier in general.
|
||||
|
||||
proto! streamp (
|
||||
Open:send<T: Owned> {
|
||||
data(T) -> Open<T>
|
||||
}
|
||||
)
|
||||
|
||||
#[doc(hidden)]
|
||||
struct Chan_<T> {
|
||||
mut endp: Option<streamp::client::Open<T>>
|
||||
}
|
||||
|
||||
/// An endpoint that can send many messages.
|
||||
pub enum Chan<T> {
|
||||
Chan_(Chan_<T>)
|
||||
}
|
||||
|
||||
struct Port_<T> {
|
||||
mut endp: Option<streamp::server::Open<T>>,
|
||||
}
|
||||
|
||||
/// An endpoint that can receive many messages.
|
||||
pub enum Port<T> {
|
||||
Port_(Port_<T>)
|
||||
}
|
||||
|
||||
/** Creates a `(chan, port)` pair.
|
||||
|
||||
These allow sending or receiving an unlimited number of messages.
|
||||
|
||||
*/
|
||||
pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
|
||||
let (c, s) = streamp::init();
|
||||
|
||||
(Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
|
||||
}
|
||||
|
||||
impl<T: Owned> GenericChan<T> for Chan<T> {
|
||||
fn send(x: T) {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
self.endp = Some(
|
||||
streamp::client::data(unwrap(endp), x))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
|
||||
|
||||
fn try_send(x: T) -> bool {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
match streamp::client::try_data(unwrap(endp), x) {
|
||||
Some(next) => {
|
||||
self.endp = Some(next);
|
||||
true
|
||||
}
|
||||
None => false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> GenericPort<T> for Port<T> {
|
||||
fn recv() -> T {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
let streamp::data(x, endp) = recv(unwrap(endp));
|
||||
self.endp = Some(endp);
|
||||
x
|
||||
}
|
||||
|
||||
fn try_recv() -> Option<T> {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
match try_recv(unwrap(endp)) {
|
||||
Some(streamp::data(x, endp)) => {
|
||||
self.endp = Some(endp);
|
||||
Some(x)
|
||||
}
|
||||
None => None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> Peekable<T> for Port<T> {
|
||||
pure fn peek() -> bool {
|
||||
unsafe {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
let peek = match &endp {
|
||||
&Some(ref endp) => peek(endp),
|
||||
&None => fail!(~"peeking empty stream")
|
||||
};
|
||||
self.endp <-> endp;
|
||||
peek
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> Selectable for Port<T> {
|
||||
pure fn header() -> *PacketHeader {
|
||||
unsafe {
|
||||
match self.endp {
|
||||
Some(ref endp) => endp.header(),
|
||||
None => fail!(~"peeking empty stream")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Treat many ports as one.
|
||||
pub struct PortSet<T> {
|
||||
mut ports: ~[Port<T>],
|
||||
}
|
||||
|
||||
pub fn PortSet<T: Owned>() -> PortSet<T>{
|
||||
PortSet {
|
||||
ports: ~[]
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> PortSet<T> {
|
||||
|
||||
fn add(port: Port<T>) {
|
||||
self.ports.push(port)
|
||||
}
|
||||
|
||||
fn chan() -> Chan<T> {
|
||||
let (po, ch) = stream();
|
||||
self.add(po);
|
||||
ch
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> GenericPort<T> for PortSet<T> {
|
||||
|
||||
fn try_recv() -> Option<T> {
|
||||
let mut result = None;
|
||||
// we have to swap the ports array so we aren't borrowing
|
||||
// aliasable mutable memory.
|
||||
let mut ports = ~[];
|
||||
ports <-> self.ports;
|
||||
while result.is_none() && ports.len() > 0 {
|
||||
let i = wait_many(ports);
|
||||
match ports[i].try_recv() {
|
||||
Some(m) => {
|
||||
result = Some(m);
|
||||
}
|
||||
None => {
|
||||
// Remove this port.
|
||||
let _ = ports.swap_remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
ports <-> self.ports;
|
||||
result
|
||||
}
|
||||
|
||||
fn recv() -> T {
|
||||
self.try_recv().expect("port_set: endpoints closed")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<T: Owned> Peekable<T> for PortSet<T> {
|
||||
pure fn peek() -> bool {
|
||||
// It'd be nice to use self.port.each, but that version isn't
|
||||
// pure.
|
||||
for vec::each(self.ports) |p| {
|
||||
if p.peek() { return true }
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// A channel that can be shared between many senders.
|
||||
pub type SharedChan<T> = private::Exclusive<Chan<T>>;
|
||||
|
||||
impl<T: Owned> GenericChan<T> for SharedChan<T> {
|
||||
fn send(x: T) {
|
||||
let mut xx = Some(x);
|
||||
do self.with_imm |chan| {
|
||||
let mut x = None;
|
||||
x <-> xx;
|
||||
chan.send(option::unwrap(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
|
||||
fn try_send(x: T) -> bool {
|
||||
let mut xx = Some(x);
|
||||
do self.with_imm |chan| {
|
||||
let mut x = None;
|
||||
x <-> xx;
|
||||
chan.try_send(option::unwrap(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a `chan` into a `shared_chan`.
|
||||
pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
|
||||
private::exclusive(c)
|
||||
}
|
||||
|
||||
/// Receive a message from one of two endpoints.
|
||||
pub trait Select2<T: Owned, U: Owned> {
|
||||
/// Receive a message or return `None` if a connection closes.
|
||||
fn try_select() -> Either<Option<T>, Option<U>>;
|
||||
/// Receive a message or fail if a connection closes.
|
||||
fn select() -> Either<T, U>;
|
||||
}
|
||||
|
||||
impl<T: Owned, U: Owned,
|
||||
Left: Selectable + GenericPort<T>,
|
||||
Right: Selectable + GenericPort<U>>
|
||||
Select2<T, U> for (Left, Right) {
|
||||
|
||||
fn select() -> Either<T, U> {
|
||||
match self {
|
||||
(ref lp, ref rp) => match select2i(lp, rp) {
|
||||
Left(()) => Left (lp.recv()),
|
||||
Right(()) => Right(rp.recv())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_select() -> Either<Option<T>, Option<U>> {
|
||||
match self {
|
||||
(ref lp, ref rp) => match select2i(lp, rp) {
|
||||
Left(()) => Left (lp.try_recv()),
|
||||
Right(()) => Right(rp.try_recv())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proto! oneshot (
|
||||
Oneshot:send<T:Owned> {
|
||||
send(T) -> !
|
||||
}
|
||||
)
|
||||
|
||||
/// The send end of a oneshot pipe.
|
||||
pub type ChanOne<T> = oneshot::client::Oneshot<T>;
|
||||
/// The receive end of a oneshot pipe.
|
||||
pub type PortOne<T> = oneshot::server::Oneshot<T>;
|
||||
|
||||
/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
|
||||
pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
|
||||
let (chan, port) = oneshot::init();
|
||||
(port, chan)
|
||||
}
|
||||
|
||||
impl<T: Owned> PortOne<T> {
|
||||
fn recv(self) -> T { recv_one(self) }
|
||||
fn try_recv(self) -> Option<T> { try_recv_one(self) }
|
||||
}
|
||||
|
||||
impl<T: Owned> ChanOne<T> {
|
||||
fn send(self, data: T) { send_one(self, data) }
|
||||
fn try_send(self, data: T) -> bool { try_send_one(self, data) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive a message from a oneshot pipe, failing if the connection was
|
||||
* closed.
|
||||
*/
|
||||
pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
|
||||
let oneshot::send(message) = recv(port);
|
||||
message
|
||||
}
|
||||
|
||||
/// Receive a message from a oneshot pipe unless the connection was closed.
|
||||
pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> {
|
||||
let message = try_recv(port);
|
||||
|
||||
if message.is_none() { None }
|
||||
else {
|
||||
let oneshot::send(message) = option::unwrap(message);
|
||||
Some(message)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message on a oneshot pipe, failing if the connection was closed.
|
||||
pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
|
||||
oneshot::client::send(chan, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message on a oneshot pipe, or return false if the connection was
|
||||
* closed.
|
||||
*/
|
||||
pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T)
|
||||
-> bool {
|
||||
oneshot::client::try_send(chan, data).is_some()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use either::{Either, Left, Right};
|
||||
use super::{Chan, Port, oneshot, recv_one, stream};
|
||||
|
||||
#[test]
|
||||
pub fn test_select2() {
|
||||
let (p1, c1) = stream();
|
||||
let (p2, c2) = stream();
|
||||
|
||||
c1.send(~"abc");
|
||||
|
||||
match (p1, p2).select() {
|
||||
Right(_) => fail!(),
|
||||
_ => ()
|
||||
}
|
||||
|
||||
c2.send(123);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_oneshot() {
|
||||
let (c, p) = oneshot::init();
|
||||
|
||||
oneshot::client::send(c, ());
|
||||
|
||||
recv_one(p)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peek_terminated() {
|
||||
let (port, chan): (Port<int>, Chan<int>) = stream();
|
||||
|
||||
{
|
||||
// Destroy the channel
|
||||
let _chan = chan;
|
||||
}
|
||||
|
||||
assert !port.peek();
|
||||
}
|
||||
}
|
@ -148,6 +148,7 @@ pub mod hashmap;
|
||||
|
||||
#[path = "task/mod.rs"]
|
||||
pub mod task;
|
||||
pub mod comm;
|
||||
pub mod pipes;
|
||||
|
||||
|
||||
@ -255,6 +256,7 @@ pub mod core {
|
||||
pub use option;
|
||||
pub use kinds;
|
||||
pub use sys;
|
||||
pub use pipes;
|
||||
}
|
||||
|
||||
|
||||
|
@ -142,7 +142,7 @@ pub struct Buffer<T> {
|
||||
data: T,
|
||||
}
|
||||
|
||||
struct PacketHeader {
|
||||
pub struct PacketHeader {
|
||||
mut state: State,
|
||||
mut blocked_task: *rust_task,
|
||||
|
||||
@ -151,7 +151,7 @@ struct PacketHeader {
|
||||
mut buffer: *libc::c_void,
|
||||
}
|
||||
|
||||
fn PacketHeader() -> PacketHeader {
|
||||
pub fn PacketHeader() -> PacketHeader {
|
||||
PacketHeader {
|
||||
state: Empty,
|
||||
blocked_task: ptr::null(),
|
||||
@ -159,7 +159,7 @@ fn PacketHeader() -> PacketHeader {
|
||||
}
|
||||
}
|
||||
|
||||
impl PacketHeader {
|
||||
pub impl PacketHeader {
|
||||
// Returns the old state.
|
||||
unsafe fn mark_blocked(this: *rust_task) -> State {
|
||||
rustrt::rust_task_ref(this);
|
||||
@ -551,12 +551,6 @@ pub pure fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned,Tb:Owned> Peekable<T> for RecvPacketBuffered<T, Tb> {
|
||||
pure fn peek() -> bool {
|
||||
peek(&self)
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn sender_terminate<T:Owned>(p: *Packet<T>) {
|
||||
let p = unsafe { &*p };
|
||||
@ -622,7 +616,7 @@ that vector. The index points to an endpoint that has either been
|
||||
closed by the sender or has a message waiting to be received.
|
||||
|
||||
*/
|
||||
fn wait_many<T:Selectable>(pkts: &[T]) -> uint {
|
||||
pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
|
||||
let this = unsafe { rustrt::rust_get_task() };
|
||||
|
||||
unsafe {
|
||||
@ -720,7 +714,7 @@ pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
trait Selectable {
|
||||
pub trait Selectable {
|
||||
pure fn header() -> *PacketHeader;
|
||||
}
|
||||
|
||||
@ -957,335 +951,6 @@ pub fn spawn_service_recv<T:Owned,Tb:Owned>(
|
||||
client
|
||||
}
|
||||
|
||||
// Streams - Make pipes a little easier in general.
|
||||
|
||||
proto! streamp (
|
||||
Open:send<T:Owned> {
|
||||
data(T) -> Open<T>
|
||||
}
|
||||
)
|
||||
|
||||
/// A trait for things that can send multiple messages.
|
||||
pub trait GenericChan<T> {
|
||||
/// Sends a message.
|
||||
fn send(x: T);
|
||||
}
|
||||
|
||||
/// Things that can send multiple messages and can detect when the receiver
|
||||
/// is closed
|
||||
pub trait GenericSmartChan<T> {
|
||||
/// Sends a message, or report if the receiver has closed the connection.
|
||||
fn try_send(x: T) -> bool;
|
||||
}
|
||||
|
||||
/// A trait for things that can receive multiple messages.
|
||||
pub trait GenericPort<T> {
|
||||
/// Receives a message, or fails if the connection closes.
|
||||
fn recv() -> T;
|
||||
|
||||
/** Receives a message, or returns `none` if
|
||||
the connection is closed or closes.
|
||||
*/
|
||||
fn try_recv() -> Option<T>;
|
||||
}
|
||||
|
||||
/// Ports that can `peek`
|
||||
pub trait Peekable<T> {
|
||||
/// Returns true if a message is available
|
||||
pure fn peek() -> bool;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
struct Chan_<T> {
|
||||
mut endp: Option<streamp::client::Open<T>>
|
||||
}
|
||||
|
||||
/// An endpoint that can send many messages.
|
||||
pub enum Chan<T> {
|
||||
Chan_(Chan_<T>)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
struct Port_<T> {
|
||||
mut endp: Option<streamp::server::Open<T>>,
|
||||
}
|
||||
|
||||
/// An endpoint that can receive many messages.
|
||||
pub enum Port<T> {
|
||||
Port_(Port_<T>)
|
||||
}
|
||||
|
||||
/** Creates a `(chan, port)` pair.
|
||||
|
||||
These allow sending or receiving an unlimited number of messages.
|
||||
|
||||
*/
|
||||
pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
|
||||
let (c, s) = streamp::init();
|
||||
|
||||
(Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
|
||||
}
|
||||
|
||||
impl<T:Owned> GenericChan<T> for Chan<T> {
|
||||
fn send(x: T) {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
self.endp = Some(
|
||||
streamp::client::data(unwrap(endp), x))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> GenericSmartChan<T> for Chan<T> {
|
||||
|
||||
fn try_send(x: T) -> bool {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
match streamp::client::try_data(unwrap(endp), x) {
|
||||
Some(next) => {
|
||||
self.endp = Some(next);
|
||||
true
|
||||
}
|
||||
None => false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> GenericPort<T> for Port<T> {
|
||||
fn recv() -> T {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
let streamp::data(x, endp) = pipes::recv(unwrap(endp));
|
||||
self.endp = Some(endp);
|
||||
x
|
||||
}
|
||||
|
||||
fn try_recv() -> Option<T> {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
match pipes::try_recv(unwrap(endp)) {
|
||||
Some(streamp::data(x, endp)) => {
|
||||
self.endp = Some(endp);
|
||||
Some(x)
|
||||
}
|
||||
None => None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> Peekable<T> for Port<T> {
|
||||
pure fn peek() -> bool {
|
||||
unsafe {
|
||||
let mut endp = None;
|
||||
endp <-> self.endp;
|
||||
let peek = match &endp {
|
||||
&Some(ref endp) => pipes::peek(endp),
|
||||
&None => fail!(~"peeking empty stream")
|
||||
};
|
||||
self.endp <-> endp;
|
||||
peek
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> Selectable for Port<T> {
|
||||
pure fn header() -> *PacketHeader {
|
||||
unsafe {
|
||||
match self.endp {
|
||||
Some(ref endp) => endp.header(),
|
||||
None => fail!(~"peeking empty stream")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Treat many ports as one.
|
||||
pub struct PortSet<T> {
|
||||
mut ports: ~[pipes::Port<T>],
|
||||
}
|
||||
|
||||
pub fn PortSet<T:Owned>() -> PortSet<T>{
|
||||
PortSet {
|
||||
ports: ~[]
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> PortSet<T> {
|
||||
|
||||
fn add(port: pipes::Port<T>) {
|
||||
self.ports.push(port)
|
||||
}
|
||||
|
||||
fn chan() -> Chan<T> {
|
||||
let (po, ch) = stream();
|
||||
self.add(po);
|
||||
ch
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> GenericPort<T> for PortSet<T> {
|
||||
|
||||
fn try_recv() -> Option<T> {
|
||||
let mut result = None;
|
||||
// we have to swap the ports array so we aren't borrowing
|
||||
// aliasable mutable memory.
|
||||
let mut ports = ~[];
|
||||
ports <-> self.ports;
|
||||
while result.is_none() && ports.len() > 0 {
|
||||
let i = wait_many(ports);
|
||||
match ports[i].try_recv() {
|
||||
Some(m) => {
|
||||
result = Some(m);
|
||||
}
|
||||
None => {
|
||||
// Remove this port.
|
||||
let _ = ports.swap_remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
ports <-> self.ports;
|
||||
result
|
||||
}
|
||||
|
||||
fn recv() -> T {
|
||||
self.try_recv().expect("port_set: endpoints closed")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<T:Owned> Peekable<T> for PortSet<T> {
|
||||
pure fn peek() -> bool {
|
||||
// It'd be nice to use self.port.each, but that version isn't
|
||||
// pure.
|
||||
for vec::each(self.ports) |p| {
|
||||
if p.peek() { return true }
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// A channel that can be shared between many senders.
|
||||
pub type SharedChan<T> = private::Exclusive<Chan<T>>;
|
||||
|
||||
impl<T:Owned> GenericChan<T> for SharedChan<T> {
|
||||
fn send(x: T) {
|
||||
let mut xx = Some(x);
|
||||
do self.with_imm |chan| {
|
||||
let mut x = None;
|
||||
x <-> xx;
|
||||
chan.send(option::unwrap(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T:Owned> GenericSmartChan<T> for SharedChan<T> {
|
||||
fn try_send(x: T) -> bool {
|
||||
let mut xx = Some(x);
|
||||
do self.with_imm |chan| {
|
||||
let mut x = None;
|
||||
x <-> xx;
|
||||
chan.try_send(option::unwrap(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a `chan` into a `shared_chan`.
|
||||
pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
|
||||
private::exclusive(c)
|
||||
}
|
||||
|
||||
/// Receive a message from one of two endpoints.
|
||||
pub trait Select2<T:Owned,U:Owned> {
|
||||
/// Receive a message or return `None` if a connection closes.
|
||||
fn try_select() -> Either<Option<T>, Option<U>>;
|
||||
/// Receive a message or fail if a connection closes.
|
||||
fn select() -> Either<T, U>;
|
||||
}
|
||||
|
||||
impl<T: Owned,
|
||||
U: Owned,
|
||||
Left: Selectable + GenericPort<T>,
|
||||
Right: Selectable + GenericPort<U>>
|
||||
Select2<T,U> for (Left, Right) {
|
||||
fn select() -> Either<T, U> {
|
||||
match self {
|
||||
(ref lp, ref rp) => match select2i(lp, rp) {
|
||||
Left(()) => Left (lp.recv()),
|
||||
Right(()) => Right(rp.recv())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_select() -> Either<Option<T>, Option<U>> {
|
||||
match self {
|
||||
(ref lp, ref rp) => match select2i(lp, rp) {
|
||||
Left(()) => Left (lp.try_recv()),
|
||||
Right(()) => Right(rp.try_recv())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proto! oneshot (
|
||||
Oneshot:send<T:Owned> {
|
||||
send(T) -> !
|
||||
}
|
||||
)
|
||||
|
||||
/// The send end of a oneshot pipe.
|
||||
pub type ChanOne<T> = oneshot::client::Oneshot<T>;
|
||||
/// The receive end of a oneshot pipe.
|
||||
pub type PortOne<T> = oneshot::server::Oneshot<T>;
|
||||
|
||||
/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
|
||||
pub fn oneshot<T:Owned>() -> (PortOne<T>, ChanOne<T>) {
|
||||
let (chan, port) = oneshot::init();
|
||||
(port, chan)
|
||||
}
|
||||
|
||||
impl<T:Owned> PortOne<T> {
|
||||
fn recv(self) -> T { recv_one(self) }
|
||||
fn try_recv(self) -> Option<T> { try_recv_one(self) }
|
||||
}
|
||||
|
||||
impl<T:Owned> ChanOne<T> {
|
||||
fn send(self, data: T) { send_one(self, data) }
|
||||
fn try_send(self, data: T) -> bool { try_send_one(self, data) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive a message from a oneshot pipe, failing if the connection was
|
||||
* closed.
|
||||
*/
|
||||
pub fn recv_one<T:Owned>(port: PortOne<T>) -> T {
|
||||
let oneshot::send(message) = recv(port);
|
||||
message
|
||||
}
|
||||
|
||||
/// Receive a message from a oneshot pipe unless the connection was closed.
|
||||
pub fn try_recv_one<T:Owned> (port: PortOne<T>) -> Option<T> {
|
||||
let message = try_recv(port);
|
||||
|
||||
if message.is_none() { None }
|
||||
else {
|
||||
let oneshot::send(message) = option::unwrap(message);
|
||||
Some(message)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message on a oneshot pipe, failing if the connection was closed.
|
||||
pub fn send_one<T:Owned>(chan: ChanOne<T>, data: T) {
|
||||
oneshot::client::send(chan, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message on a oneshot pipe, or return false if the connection was
|
||||
* closed.
|
||||
*/
|
||||
pub fn try_send_one<T:Owned>(chan: ChanOne<T>, data: T)
|
||||
-> bool {
|
||||
oneshot::client::try_send(chan, data).is_some()
|
||||
}
|
||||
|
||||
pub mod rt {
|
||||
use option::{None, Option, Some};
|
||||
|
||||
@ -1298,13 +963,13 @@ pub mod rt {
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use either::{Either, Left, Right};
|
||||
use pipes::{Chan, Port, oneshot, recv_one, stream};
|
||||
use pipes;
|
||||
use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
|
||||
GenericPort, GenericChan, Peekable};
|
||||
|
||||
#[test]
|
||||
pub fn test_select2() {
|
||||
let (p1, c1) = pipes::stream();
|
||||
let (p2, c2) = pipes::stream();
|
||||
let (p1, c1) = stream();
|
||||
let (p2, c2) = stream();
|
||||
|
||||
c1.send(~"abc");
|
||||
|
||||
|
@ -68,7 +68,7 @@ pub use ops;
|
||||
pub use option;
|
||||
pub use os;
|
||||
pub use path;
|
||||
pub use pipes;
|
||||
pub use comm;
|
||||
pub use private;
|
||||
pub use ptr;
|
||||
pub use rand;
|
||||
|
@ -14,7 +14,7 @@ use cast;
|
||||
use iter;
|
||||
use libc;
|
||||
use option;
|
||||
use pipes::{GenericChan, GenericPort};
|
||||
use comm::{GenericChan, GenericPort};
|
||||
use prelude::*;
|
||||
use ptr;
|
||||
use result;
|
||||
@ -59,7 +59,7 @@ The executing thread has no access to a task pointer and will be using
|
||||
a normal large stack.
|
||||
*/
|
||||
pub unsafe fn run_in_bare_thread(f: ~fn()) {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
// FIXME #4525: Unfortunate that this creates an extra scheduler but it's
|
||||
// necessary since rust_raw_thread_join_delete is blocking
|
||||
do task::spawn_sched(task::SingleThreaded) {
|
||||
@ -110,7 +110,7 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
|
||||
// An unwrapper uses this protocol to communicate with the "other" task that
|
||||
// drops the last refcount on an arc. Unfortunately this can't be a proper
|
||||
// pipe protocol because the unwrapper has to access both stages at once.
|
||||
type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
|
||||
type UnwrapProto = ~mut Option<(comm::ChanOne<()>, comm::PortOne<bool>)>;
|
||||
|
||||
struct ArcData<T> {
|
||||
mut count: libc::intptr_t,
|
||||
@ -143,9 +143,9 @@ struct ArcDestruct<T> {
|
||||
cast::reinterpret_cast(&data.unwrapper);
|
||||
let (message, response) = option::swap_unwrap(p);
|
||||
// Send 'ready' and wait for a response.
|
||||
pipes::send_one(message, ());
|
||||
comm::send_one(message, ());
|
||||
// Unkillable wait. Message guaranteed to come.
|
||||
if pipes::recv_one(response) {
|
||||
if comm::recv_one(response) {
|
||||
// Other task got the data.
|
||||
cast::forget(data);
|
||||
} else {
|
||||
@ -172,7 +172,7 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>)
|
||||
-> T {
|
||||
struct DeathThroes<T> {
|
||||
mut ptr: Option<~ArcData<T>>,
|
||||
mut response: Option<pipes::ChanOne<bool>>,
|
||||
mut response: Option<comm::ChanOne<bool>>,
|
||||
drop {
|
||||
unsafe {
|
||||
let response = option::swap_unwrap(&mut self.response);
|
||||
@ -180,13 +180,13 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>)
|
||||
// tried to wake us whether they should hand-off the data to
|
||||
// us.
|
||||
if task::failing() {
|
||||
pipes::send_one(response, false);
|
||||
comm::send_one(response, false);
|
||||
// Either this swap_unwrap or the one below (at "Got
|
||||
// here") ought to run.
|
||||
cast::forget(option::swap_unwrap(&mut self.ptr));
|
||||
} else {
|
||||
assert self.ptr.is_none();
|
||||
pipes::send_one(response, true);
|
||||
comm::send_one(response, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -194,8 +194,8 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>)
|
||||
|
||||
do task::unkillable {
|
||||
let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
|
||||
let (p1,c1) = pipes::oneshot(); // ()
|
||||
let (p2,c2) = pipes::oneshot(); // bool
|
||||
let (p1,c1) = comm::oneshot(); // ()
|
||||
let (p2,c2) = comm::oneshot(); // bool
|
||||
let server: UnwrapProto = ~mut Some((c1,p2));
|
||||
let serverp: int = cast::transmute(server);
|
||||
// Try to put our server end in the unwrapper slot.
|
||||
@ -218,7 +218,7 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>)
|
||||
response: Some(c2) };
|
||||
let mut p1 = Some(p1); // argh
|
||||
do task::rekillable {
|
||||
pipes::recv_one(option::swap_unwrap(&mut p1));
|
||||
comm::recv_one(option::swap_unwrap(&mut p1));
|
||||
}
|
||||
// Got here. Back in the 'unkillable' without getting killed.
|
||||
// Recover ownership of ptr, then take the data out.
|
||||
@ -410,7 +410,7 @@ pub mod tests {
|
||||
use core::option::{None, Some};
|
||||
|
||||
use option;
|
||||
use pipes;
|
||||
use comm;
|
||||
use private::{exclusive, unwrap_exclusive};
|
||||
use result;
|
||||
use task;
|
||||
@ -427,7 +427,7 @@ pub mod tests {
|
||||
|
||||
for uint::range(0, num_tasks) |_i| {
|
||||
let total = total.clone();
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
futures.push(port);
|
||||
|
||||
do task::spawn || {
|
||||
|
@ -22,8 +22,8 @@ use option::{Some, None, swap_unwrap};
|
||||
use private::at_exit::at_exit;
|
||||
use private::global::global_data_clone_create;
|
||||
use private::finally::Finally;
|
||||
use pipes::{Port, Chan, SharedChan, GenericChan, GenericPort,
|
||||
GenericSmartChan, stream};
|
||||
use comm::{Port, Chan, SharedChan, GenericChan,
|
||||
GenericPort, GenericSmartChan, stream};
|
||||
use task::{Task, task, spawn};
|
||||
use task::rt::{task_id, get_task_id};
|
||||
use hashmap::linear::LinearMap;
|
||||
@ -186,7 +186,7 @@ fn test_wait_for_signal_many() {
|
||||
|
||||
#[test]
|
||||
fn test_select_stream_and_oneshot() {
|
||||
use pipes::select2i;
|
||||
use comm::select2i;
|
||||
use either::{Left, Right};
|
||||
|
||||
let (port, chan) = stream();
|
||||
|
@ -14,7 +14,7 @@ use io;
|
||||
use io::ReaderUtil;
|
||||
use libc;
|
||||
use libc::{pid_t, c_void, c_int};
|
||||
use pipes::{stream, SharedChan, GenericChan, GenericPort};
|
||||
use comm::{stream, SharedChan, GenericChan, GenericPort};
|
||||
use option::{Some, None};
|
||||
use os;
|
||||
use prelude::*;
|
||||
|
@ -40,7 +40,7 @@ use iter;
|
||||
use libc;
|
||||
use option;
|
||||
use result::Result;
|
||||
use pipes::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
|
||||
use comm::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
|
||||
use pipes;
|
||||
use prelude::*;
|
||||
use ptr;
|
||||
@ -1109,7 +1109,7 @@ fn test_unkillable() {
|
||||
#[ignore(cfg(windows))]
|
||||
#[should_fail]
|
||||
fn test_unkillable_nested() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
|
||||
// We want to do this after failing
|
||||
do spawn_unlinked || {
|
||||
@ -1175,7 +1175,7 @@ fn test_child_doesnt_ref_parent() {
|
||||
|
||||
#[test]
|
||||
fn test_sched_thread_per_core() {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
|
||||
do spawn_sched(ThreadPerCore) || {
|
||||
unsafe {
|
||||
@ -1191,7 +1191,7 @@ fn test_sched_thread_per_core() {
|
||||
|
||||
#[test]
|
||||
fn test_spawn_thread_on_demand() {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
|
||||
do spawn_sched(ManualThreads(2)) || {
|
||||
unsafe {
|
||||
@ -1200,7 +1200,7 @@ fn test_spawn_thread_on_demand() {
|
||||
let running_threads = rt::rust_sched_current_nonlazy_threads();
|
||||
assert(running_threads as int == 1);
|
||||
|
||||
let (port2, chan2) = pipes::stream();
|
||||
let (port2, chan2) = comm::stream();
|
||||
|
||||
do spawn_sched(CurrentScheduler) || {
|
||||
chan2.send(());
|
||||
|
@ -75,7 +75,7 @@
|
||||
use cast;
|
||||
use container::Map;
|
||||
use option;
|
||||
use pipes::{Chan, GenericChan, GenericPort, Port, stream};
|
||||
use comm::{Chan, GenericChan, GenericPort, Port, stream};
|
||||
use pipes;
|
||||
use prelude::*;
|
||||
use private;
|
||||
@ -702,7 +702,7 @@ fn test_spawn_raw_unsupervise() {
|
||||
#[test]
|
||||
#[ignore(cfg(windows))]
|
||||
fn test_spawn_raw_notify_success() {
|
||||
let (notify_po, notify_ch) = pipes::stream();
|
||||
let (notify_po, notify_ch) = comm::stream();
|
||||
|
||||
let opts = task::TaskOpts {
|
||||
notify_chan: Some(notify_ch),
|
||||
@ -717,7 +717,7 @@ fn test_spawn_raw_notify_success() {
|
||||
#[ignore(cfg(windows))]
|
||||
fn test_spawn_raw_notify_failure() {
|
||||
// New bindings for these
|
||||
let (notify_po, notify_ch) = pipes::stream();
|
||||
let (notify_po, notify_ch) = comm::stream();
|
||||
|
||||
let opts = task::TaskOpts {
|
||||
linked: false,
|
||||
|
@ -314,7 +314,7 @@ fails without recording a fatal error then we've encountered a compiler
|
||||
bug and need to present an error.
|
||||
*/
|
||||
pub fn monitor(+f: fn~(diagnostic::Emitter)) {
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
use std::cell::Cell;
|
||||
let (p, ch) = stream();
|
||||
let ch = SharedChan(ch);
|
||||
|
@ -23,7 +23,7 @@ use parse;
|
||||
use util;
|
||||
use std::cell::Cell;
|
||||
|
||||
use core::pipes::{stream, Chan, SharedChan, Port};
|
||||
use core::comm::{stream, Chan, SharedChan, Port};
|
||||
use core::vec;
|
||||
use core::ops::Drop;
|
||||
use rustc::back::link;
|
||||
|
@ -20,12 +20,12 @@ use core::io::ReaderUtil;
|
||||
use core::io;
|
||||
use core::libc;
|
||||
use core::os;
|
||||
use core::pipes;
|
||||
use core::comm;
|
||||
use core::result;
|
||||
use core::run;
|
||||
use core::str;
|
||||
use core::task;
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
use std::future;
|
||||
use syntax;
|
||||
|
||||
@ -128,12 +128,12 @@ fn pandoc_writer(
|
||||
os::close(pipe_err.out);
|
||||
os::close(pipe_in.out);
|
||||
|
||||
let (stdout_po, stdout_ch) = pipes::stream();
|
||||
let (stdout_po, stdout_ch) = comm::stream();
|
||||
do task::spawn_sched(task::SingleThreaded) || {
|
||||
stdout_ch.send(readclose(pipe_out.in));
|
||||
}
|
||||
|
||||
let (stderr_po, stderr_ch) = pipes::stream();
|
||||
let (stderr_po, stderr_ch) = comm::stream();
|
||||
do task::spawn_sched(task::SingleThreaded) || {
|
||||
stderr_ch.send(readclose(pipe_err.in));
|
||||
}
|
||||
@ -296,7 +296,7 @@ pub fn future_writer_factory(
|
||||
let (markdown_po, markdown_ch) = stream();
|
||||
let markdown_ch = SharedChan(markdown_ch);
|
||||
let writer_factory = fn~(page: doc::Page) -> Writer {
|
||||
let (writer_po, writer_ch) = pipes::stream();
|
||||
let (writer_po, writer_ch) = comm::stream();
|
||||
let markdown_ch = markdown_ch.clone();
|
||||
do task::spawn || {
|
||||
let (writer, future) = future_writer();
|
||||
@ -311,7 +311,7 @@ pub fn future_writer_factory(
|
||||
}
|
||||
|
||||
fn future_writer() -> (Writer, future::Future<~str>) {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
let writer = fn~(instr: WriteInstr) {
|
||||
chan.send(copy instr);
|
||||
};
|
||||
|
@ -30,7 +30,7 @@ use util;
|
||||
|
||||
use core::option;
|
||||
use core::vec;
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
use syntax::ast;
|
||||
|
||||
pub fn mk_pass(output_style: config::OutputStyle) -> Pass {
|
||||
|
@ -507,10 +507,10 @@ mod tests {
|
||||
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
|
||||
let arc_v = arc::ARC(v);
|
||||
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
|
||||
do task::spawn() || {
|
||||
let p = pipes::PortSet();
|
||||
let p = comm::PortSet();
|
||||
c.send(p.chan());
|
||||
|
||||
let arc_v = p.recv();
|
||||
@ -531,18 +531,18 @@ mod tests {
|
||||
pub fn test_mutex_arc_condvar() {
|
||||
let arc = ~MutexARC(false);
|
||||
let arc2 = ~arc.clone();
|
||||
let (p,c) = pipes::oneshot();
|
||||
let (p,c) = comm::oneshot();
|
||||
let (c,p) = (~mut Some(c), ~mut Some(p));
|
||||
do task::spawn || {
|
||||
// wait until parent gets in
|
||||
pipes::recv_one(option::swap_unwrap(p));
|
||||
comm::recv_one(option::swap_unwrap(p));
|
||||
do arc2.access_cond |state, cond| {
|
||||
*state = true;
|
||||
cond.signal();
|
||||
}
|
||||
}
|
||||
do arc.access_cond |state, cond| {
|
||||
pipes::send_one(option::swap_unwrap(c), ());
|
||||
comm::send_one(option::swap_unwrap(c), ());
|
||||
assert !*state;
|
||||
while !*state {
|
||||
cond.wait();
|
||||
@ -553,7 +553,7 @@ mod tests {
|
||||
pub fn test_arc_condvar_poison() {
|
||||
let arc = ~MutexARC(1);
|
||||
let arc2 = ~arc.clone();
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
|
||||
do task::spawn_unlinked || {
|
||||
let _ = p.recv();
|
||||
@ -587,7 +587,7 @@ mod tests {
|
||||
pub fn test_mutex_arc_unwrap_poison() {
|
||||
let arc = MutexARC(1);
|
||||
let arc2 = ~(&arc).clone();
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
do task::spawn || {
|
||||
do arc2.access |one| {
|
||||
c.send(());
|
||||
@ -685,7 +685,7 @@ mod tests {
|
||||
pub fn test_rw_arc() {
|
||||
let arc = ~RWARC(0);
|
||||
let arc2 = ~arc.clone();
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
|
||||
do task::spawn || {
|
||||
do arc2.write |num| {
|
||||
@ -731,7 +731,7 @@ mod tests {
|
||||
// Reader tasks
|
||||
let mut reader_convos = ~[];
|
||||
for 10.times {
|
||||
let ((rp1,rc1),(rp2,rc2)) = (pipes::stream(),pipes::stream());
|
||||
let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
|
||||
reader_convos.push((rc1, rp2));
|
||||
let arcn = ~arc.clone();
|
||||
do task::spawn || {
|
||||
@ -745,7 +745,7 @@ mod tests {
|
||||
|
||||
// Writer task
|
||||
let arc2 = ~arc.clone();
|
||||
let ((wp1,wc1),(wp2,wc2)) = (pipes::stream(),pipes::stream());
|
||||
let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
|
||||
do task::spawn || {
|
||||
wp1.recv();
|
||||
do arc2.write_cond |state, cond| {
|
||||
|
@ -14,8 +14,8 @@ Higher level communication abstractions.
|
||||
|
||||
*/
|
||||
|
||||
use core::pipes::{GenericChan, GenericSmartChan, GenericPort};
|
||||
use core::pipes::{Chan, Port, Selectable, Peekable};
|
||||
use core::comm::{GenericChan, GenericSmartChan, GenericPort};
|
||||
use core::comm::{Chan, Port, Selectable, Peekable};
|
||||
use core::pipes;
|
||||
use core::prelude::*;
|
||||
|
||||
@ -63,8 +63,8 @@ impl<T:Owned,U:Owned> Selectable for DuplexStream<T, U> {
|
||||
pub fn DuplexStream<T:Owned,U:Owned>()
|
||||
-> (DuplexStream<T, U>, DuplexStream<U, T>)
|
||||
{
|
||||
let (p1, c2) = pipes::stream();
|
||||
let (p2, c1) = pipes::stream();
|
||||
let (p1, c2) = comm::stream();
|
||||
let (p2, c1) = comm::stream();
|
||||
(DuplexStream {
|
||||
chan: c1,
|
||||
port: p1
|
||||
|
@ -49,8 +49,8 @@ block the scheduler thread, so will their pipes.
|
||||
|
||||
// The basic send/recv interface FlatChan and PortChan will implement
|
||||
use core::io;
|
||||
use core::pipes::GenericChan;
|
||||
use core::pipes::GenericPort;
|
||||
use core::comm::GenericChan;
|
||||
use core::comm::GenericPort;
|
||||
use core::pipes;
|
||||
use core::prelude::*;
|
||||
use core::sys::size_of;
|
||||
@ -95,8 +95,8 @@ pub mod serial {
|
||||
use flatpipes::{FlatPort, FlatChan};
|
||||
|
||||
use core::io::{Reader, Writer};
|
||||
use core::pipes::{Port, Chan};
|
||||
use core::pipes;
|
||||
use core::comm::{Port, Chan};
|
||||
use core::comm;
|
||||
|
||||
pub type ReaderPort<T, R> = FlatPort<
|
||||
T, DeserializingUnflattener<DefaultDecoder, T>,
|
||||
@ -154,7 +154,7 @@ pub mod serial {
|
||||
pub fn pipe_stream<T: Encodable<DefaultEncoder> +
|
||||
Decodable<DefaultDecoder>>(
|
||||
) -> (PipePort<T>, PipeChan<T>) {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
return (pipe_port(port), pipe_chan(chan));
|
||||
}
|
||||
}
|
||||
@ -177,8 +177,8 @@ pub mod pod {
|
||||
use flatpipes::{FlatPort, FlatChan};
|
||||
|
||||
use core::io::{Reader, Writer};
|
||||
use core::pipes::{Port, Chan};
|
||||
use core::pipes;
|
||||
use core::comm::{Port, Chan};
|
||||
use core::comm;
|
||||
use core::prelude::*;
|
||||
|
||||
pub type ReaderPort<T, R> =
|
||||
@ -222,7 +222,7 @@ pub mod pod {
|
||||
|
||||
/// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
|
||||
pub fn pipe_stream<T:Copy + Owned>() -> (PipePort<T>, PipeChan<T>) {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
return (pipe_port(port), pipe_chan(chan));
|
||||
}
|
||||
|
||||
@ -507,7 +507,7 @@ pub mod bytepipes {
|
||||
use flatpipes::{ByteChan, BytePort};
|
||||
|
||||
use core::io::{Writer, Reader, ReaderUtil};
|
||||
use core::pipes::{Port, Chan};
|
||||
use core::comm::{Port, Chan};
|
||||
use core::pipes;
|
||||
use core::prelude::*;
|
||||
|
||||
@ -564,12 +564,12 @@ pub mod bytepipes {
|
||||
}
|
||||
|
||||
pub struct PipeBytePort {
|
||||
port: pipes::Port<~[u8]>,
|
||||
port: comm::Port<~[u8]>,
|
||||
mut buf: ~[u8]
|
||||
}
|
||||
|
||||
pub struct PipeByteChan {
|
||||
chan: pipes::Chan<~[u8]>
|
||||
chan: comm::Chan<~[u8]>
|
||||
}
|
||||
|
||||
pub impl BytePort for PipeBytePort {
|
||||
@ -777,12 +777,12 @@ mod test {
|
||||
use uv;
|
||||
|
||||
// Indicate to the client task that the server is listening
|
||||
let (begin_connect_port, begin_connect_chan) = pipes::stream();
|
||||
let (begin_connect_port, begin_connect_chan) = comm::stream();
|
||||
// The connection is sent from the server task to the receiver task
|
||||
// to handle the connection
|
||||
let (accept_port, accept_chan) = pipes::stream();
|
||||
let (accept_port, accept_chan) = comm::stream();
|
||||
// The main task will wait until the test is over to proceed
|
||||
let (finish_port, finish_chan) = pipes::stream();
|
||||
let (finish_port, finish_chan) = comm::stream();
|
||||
|
||||
let addr0 = ip::v4::parse_addr("127.0.0.1");
|
||||
|
||||
@ -803,7 +803,7 @@ mod test {
|
||||
}) |new_conn, kill_ch| {
|
||||
|
||||
// Incoming connection. Send it to the receiver task to accept
|
||||
let (res_port, res_chan) = pipes::stream();
|
||||
let (res_port, res_chan) = comm::stream();
|
||||
accept_chan.send((new_conn, res_chan));
|
||||
// Wait until the connection is accepted
|
||||
res_port.recv();
|
||||
@ -894,7 +894,7 @@ mod test {
|
||||
|
||||
fn pipe_port_loader(bytes: ~[u8]
|
||||
) -> pod::PipePort<int> {
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
if !bytes.is_empty() {
|
||||
chan.send(bytes);
|
||||
}
|
||||
|
@ -25,7 +25,8 @@ use core::cast::copy_lifetime;
|
||||
use core::cast;
|
||||
use core::either::Either;
|
||||
use core::option;
|
||||
use core::pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
|
||||
use core::comm::{oneshot, ChanOne, PortOne, send_one, recv_one};
|
||||
use core::pipes::recv;
|
||||
use core::prelude::*;
|
||||
use core::task;
|
||||
|
||||
@ -150,7 +151,7 @@ pub mod test {
|
||||
|
||||
use future::*;
|
||||
|
||||
use core::pipes::oneshot;
|
||||
use core::comm::oneshot;
|
||||
use core::task;
|
||||
|
||||
#[test]
|
||||
|
@ -12,7 +12,7 @@
|
||||
|
||||
use core::libc;
|
||||
use core::prelude::*;
|
||||
use core::pipes::{stream, SharedChan};
|
||||
use core::comm::{stream, SharedChan};
|
||||
use core::ptr;
|
||||
use core::result;
|
||||
use core::str;
|
||||
|
@ -24,7 +24,7 @@ use core::io::{Reader, ReaderUtil, Writer};
|
||||
use core::io;
|
||||
use core::libc::size_t;
|
||||
use core::libc;
|
||||
use core::pipes::{stream, Chan, Port, SharedChan};
|
||||
use core::comm::{stream, Chan, Port, SharedChan};
|
||||
use core::prelude::*;
|
||||
use core::ptr;
|
||||
use core::result::{Result};
|
||||
@ -1441,7 +1441,7 @@ pub mod test {
|
||||
use uv;
|
||||
|
||||
use core::io;
|
||||
use core::pipes::{stream, Chan, Port, SharedChan};
|
||||
use core::comm::{stream, Chan, Port, SharedChan};
|
||||
use core::prelude::*;
|
||||
use core::result;
|
||||
use core::str;
|
||||
|
@ -30,16 +30,16 @@ use core::vec;
|
||||
|
||||
// Each waiting task receives on one of these.
|
||||
#[doc(hidden)]
|
||||
type WaitEnd = pipes::PortOne<()>;
|
||||
type WaitEnd = comm::PortOne<()>;
|
||||
#[doc(hidden)]
|
||||
type SignalEnd = pipes::ChanOne<()>;
|
||||
type SignalEnd = comm::ChanOne<()>;
|
||||
// A doubly-ended queue of waiting tasks.
|
||||
#[doc(hidden)]
|
||||
struct Waitqueue { head: pipes::Port<SignalEnd>,
|
||||
tail: pipes::Chan<SignalEnd> }
|
||||
struct Waitqueue { head: comm::Port<SignalEnd>,
|
||||
tail: comm::Chan<SignalEnd> }
|
||||
|
||||
fn new_waitqueue() -> Waitqueue {
|
||||
let (block_head, block_tail) = pipes::stream();
|
||||
let (block_head, block_tail) = comm::stream();
|
||||
Waitqueue { head: block_head, tail: block_tail }
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool {
|
||||
if q.head.peek() {
|
||||
// Pop and send a wakeup signal. If the waiter was killed, its port
|
||||
// will have closed. Keep trying until we get a live task.
|
||||
if pipes::try_send_one(q.head.recv(), ()) {
|
||||
if comm::try_send_one(q.head.recv(), ()) {
|
||||
true
|
||||
} else {
|
||||
signal_waitqueue(q)
|
||||
@ -64,7 +64,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool {
|
||||
fn broadcast_waitqueue(q: &Waitqueue) -> uint {
|
||||
let mut count = 0;
|
||||
while q.head.peek() {
|
||||
if pipes::try_send_one(q.head.recv(), ()) {
|
||||
if comm::try_send_one(q.head.recv(), ()) {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
@ -107,7 +107,7 @@ impl<Q:Owned> &Sem<Q> {
|
||||
state.count -= 1;
|
||||
if state.count < 0 {
|
||||
// Create waiter nobe.
|
||||
let (WaitEnd, SignalEnd) = pipes::oneshot();
|
||||
let (WaitEnd, SignalEnd) = comm::oneshot();
|
||||
// Tell outer scope we need to block.
|
||||
waiter_nobe = Some(WaitEnd);
|
||||
// Enqueue ourself.
|
||||
@ -119,7 +119,7 @@ impl<Q:Owned> &Sem<Q> {
|
||||
/* for 1000.times { task::yield(); } */
|
||||
// Need to wait outside the exclusive.
|
||||
if waiter_nobe.is_some() {
|
||||
let _ = pipes::recv_one(option::unwrap(waiter_nobe));
|
||||
let _ = comm::recv_one(option::unwrap(waiter_nobe));
|
||||
}
|
||||
}
|
||||
fn release() {
|
||||
@ -214,7 +214,7 @@ impl &Condvar {
|
||||
*/
|
||||
fn wait_on(condvar_id: uint) {
|
||||
// Create waiter nobe.
|
||||
let (WaitEnd, SignalEnd) = pipes::oneshot();
|
||||
let (WaitEnd, SignalEnd) = comm::oneshot();
|
||||
let mut WaitEnd = Some(WaitEnd);
|
||||
let mut SignalEnd = Some(SignalEnd);
|
||||
let mut reacquire = None;
|
||||
@ -250,7 +250,7 @@ impl &Condvar {
|
||||
// Unconditionally "block". (Might not actually block if a
|
||||
// signaller already sent -- I mean 'unconditionally' in contrast
|
||||
// with acquire().)
|
||||
let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd));
|
||||
let _ = comm::recv_one(option::swap_unwrap(&mut WaitEnd));
|
||||
}
|
||||
|
||||
// This is needed for a failing condition variable to reacquire the
|
||||
@ -749,7 +749,7 @@ mod tests {
|
||||
#[test]
|
||||
pub fn test_sem_as_cvar() {
|
||||
/* Child waits and parent signals */
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let s = ~semaphore(0);
|
||||
let s2 = ~s.clone();
|
||||
do task::spawn || {
|
||||
@ -761,7 +761,7 @@ mod tests {
|
||||
let _ = p.recv();
|
||||
|
||||
/* Parent waits and child signals */
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let s = ~semaphore(0);
|
||||
let s2 = ~s.clone();
|
||||
do task::spawn || {
|
||||
@ -778,8 +778,8 @@ mod tests {
|
||||
// time, and shake hands.
|
||||
let s = ~semaphore(2);
|
||||
let s2 = ~s.clone();
|
||||
let (p1,c1) = pipes::stream();
|
||||
let (p2,c2) = pipes::stream();
|
||||
let (p1,c1) = comm::stream();
|
||||
let (p2,c2) = comm::stream();
|
||||
do task::spawn || {
|
||||
do s2.access {
|
||||
let _ = p2.recv();
|
||||
@ -798,7 +798,7 @@ mod tests {
|
||||
do task::spawn_sched(task::ManualThreads(1)) {
|
||||
let s = ~semaphore(1);
|
||||
let s2 = ~s.clone();
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let child_data = ~mut Some((s2, c));
|
||||
do s.access {
|
||||
let (s2,c) = option::swap_unwrap(child_data);
|
||||
@ -820,7 +820,7 @@ mod tests {
|
||||
pub fn test_mutex_lock() {
|
||||
// Unsafely achieve shared state, and do the textbook
|
||||
// "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let m = ~Mutex();
|
||||
let m2 = ~m.clone();
|
||||
let mut sharedstate = ~0;
|
||||
@ -863,7 +863,7 @@ mod tests {
|
||||
cond.wait();
|
||||
}
|
||||
// Parent wakes up child
|
||||
let (port,chan) = pipes::stream();
|
||||
let (port,chan) = comm::stream();
|
||||
let m3 = ~m.clone();
|
||||
do task::spawn || {
|
||||
do m3.lock_cond |cond| {
|
||||
@ -886,7 +886,7 @@ mod tests {
|
||||
|
||||
for num_waiters.times {
|
||||
let mi = ~m.clone();
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
ports.push(port);
|
||||
do task::spawn || {
|
||||
do mi.lock_cond |cond| {
|
||||
@ -948,7 +948,7 @@ mod tests {
|
||||
let m2 = ~m.clone();
|
||||
|
||||
let result: result::Result<(),()> = do task::try || {
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
do task::spawn || { // linked
|
||||
let _ = p.recv(); // wait for sibling to get in the mutex
|
||||
task::yield();
|
||||
@ -970,12 +970,12 @@ mod tests {
|
||||
pub fn test_mutex_killed_broadcast() {
|
||||
let m = ~Mutex();
|
||||
let m2 = ~m.clone();
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
|
||||
let result: result::Result<(),()> = do task::try || {
|
||||
let mut sibling_convos = ~[];
|
||||
for 2.times {
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let c = ~mut Some(c);
|
||||
sibling_convos.push(p);
|
||||
let mi = ~m2.clone();
|
||||
@ -1004,7 +1004,7 @@ mod tests {
|
||||
assert woken == 0;
|
||||
}
|
||||
struct SendOnFailure {
|
||||
c: pipes::Chan<()>,
|
||||
c: comm::Chan<()>,
|
||||
}
|
||||
|
||||
impl Drop for SendOnFailure {
|
||||
@ -1013,7 +1013,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn SendOnFailure(c: pipes::Chan<()>) -> SendOnFailure {
|
||||
fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
|
||||
SendOnFailure {
|
||||
c: c
|
||||
}
|
||||
@ -1038,7 +1038,7 @@ mod tests {
|
||||
let result = do task::try {
|
||||
let m = ~mutex_with_condvars(2);
|
||||
let m2 = ~m.clone();
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
do task::spawn || {
|
||||
do m2.lock_cond |cond| {
|
||||
c.send(());
|
||||
@ -1099,7 +1099,7 @@ mod tests {
|
||||
mode2: RWlockMode) {
|
||||
// Test mutual exclusion between readers and writers. Just like the
|
||||
// mutex mutual exclusion test, a ways above.
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let x2 = ~x.clone();
|
||||
let mut sharedstate = ~0;
|
||||
let ptr = ptr::addr_of(&(*sharedstate));
|
||||
@ -1146,8 +1146,8 @@ mod tests {
|
||||
make_mode2_go_first: bool) {
|
||||
// Much like sem_multi_resource.
|
||||
let x2 = ~x.clone();
|
||||
let (p1,c1) = pipes::stream();
|
||||
let (p2,c2) = pipes::stream();
|
||||
let (p1,c1) = comm::stream();
|
||||
let (p2,c2) = comm::stream();
|
||||
do task::spawn || {
|
||||
if !make_mode2_go_first {
|
||||
let _ = p2.recv(); // parent sends to us once it locks, or ...
|
||||
@ -1212,7 +1212,7 @@ mod tests {
|
||||
cond.wait();
|
||||
}
|
||||
// Parent wakes up child
|
||||
let (port,chan) = pipes::stream();
|
||||
let (port,chan) = comm::stream();
|
||||
let x3 = ~x.clone();
|
||||
do task::spawn || {
|
||||
do x3.write_cond |cond| {
|
||||
@ -1249,7 +1249,7 @@ mod tests {
|
||||
|
||||
for num_waiters.times {
|
||||
let xi = ~x.clone();
|
||||
let (port, chan) = pipes::stream();
|
||||
let (port, chan) = comm::stream();
|
||||
ports.push(port);
|
||||
do task::spawn || {
|
||||
do lock_cond(xi, dg1) |cond| {
|
||||
|
@ -12,7 +12,7 @@
|
||||
/// parallelism.
|
||||
|
||||
use core::io;
|
||||
use core::pipes::{Chan, Port};
|
||||
use core::comm::{Chan, Port};
|
||||
use core::pipes;
|
||||
use core::prelude::*;
|
||||
use core::task::{SchedMode, SingleThreaded};
|
||||
@ -47,7 +47,7 @@ pub impl<T> TaskPool<T> {
|
||||
assert n_tasks >= 1;
|
||||
|
||||
let channels = do vec::from_fn(n_tasks) |i| {
|
||||
let (port, chan) = pipes::stream::<Msg<T>>();
|
||||
let (port, chan) = comm::stream::<Msg<T>>();
|
||||
let init_fn = init_fn_factory();
|
||||
|
||||
let task_body: ~fn() = || {
|
||||
|
@ -27,7 +27,7 @@ use core::either;
|
||||
use core::io::WriterUtil;
|
||||
use core::io;
|
||||
use core::libc::size_t;
|
||||
use core::pipes::{stream, Chan, Port, SharedChan};
|
||||
use core::comm::{stream, Chan, Port, SharedChan};
|
||||
use core::option;
|
||||
use core::prelude::*;
|
||||
use core::result;
|
||||
@ -794,7 +794,7 @@ mod tests {
|
||||
use test::{TestOpts, run_test};
|
||||
|
||||
use core::either;
|
||||
use core::pipes::{stream, SharedChan};
|
||||
use core::comm::{stream, SharedChan};
|
||||
use core::option;
|
||||
use core::vec;
|
||||
|
||||
|
@ -18,7 +18,7 @@ use core::either;
|
||||
use core::libc;
|
||||
use core::libc::c_void;
|
||||
use core::cast::transmute;
|
||||
use core::pipes::{stream, Chan, SharedChan, Port, select2i};
|
||||
use core::comm::{stream, Chan, SharedChan, Port, select2i};
|
||||
use core::prelude::*;
|
||||
use core::ptr;
|
||||
use core;
|
||||
|
@ -17,7 +17,7 @@ use uv_iotask::{IoTask, spawn_iotask};
|
||||
|
||||
use core::either::{Left, Right};
|
||||
use core::libc;
|
||||
use core::pipes::{Port, Chan, SharedChan, select2i};
|
||||
use core::comm::{Port, Chan, SharedChan, select2i};
|
||||
use core::private::global::{global_data_clone_create,
|
||||
global_data_clone};
|
||||
use core::private::weak_task::weaken_task;
|
||||
@ -133,7 +133,7 @@ mod test {
|
||||
use core::task;
|
||||
use core::cast::transmute;
|
||||
use core::libc::c_void;
|
||||
use core::pipes::{stream, SharedChan, Chan};
|
||||
use core::comm::{stream, SharedChan, Chan};
|
||||
|
||||
extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
|
||||
unsafe {
|
||||
|
@ -19,7 +19,7 @@ use ll = uv_ll;
|
||||
|
||||
use core::libc::c_void;
|
||||
use core::libc;
|
||||
use core::pipes::{stream, Port, Chan, SharedChan};
|
||||
use core::comm::{stream, Port, Chan, SharedChan};
|
||||
use core::prelude::*;
|
||||
use core::ptr::addr_of;
|
||||
use core::task::TaskBuilder;
|
||||
|
@ -39,7 +39,7 @@ use core::ptr::to_unsafe_ptr;
|
||||
use core::ptr;
|
||||
use core::str;
|
||||
use core::vec;
|
||||
use core::pipes::{stream, Chan, SharedChan, Port};
|
||||
use core::comm::{stream, Chan, SharedChan, Port};
|
||||
|
||||
// libuv struct mappings
|
||||
pub struct uv_ip4_addr {
|
||||
|
@ -19,7 +19,8 @@ use core::cmp;
|
||||
use core::either::{Either, Left, Right};
|
||||
use core::io;
|
||||
use core::option;
|
||||
use core::pipes::{recv, oneshot, PortOne, send_one};
|
||||
use core::comm::{oneshot, PortOne, send_one};
|
||||
use core::pipes::recv;
|
||||
use core::prelude::*;
|
||||
use core::result;
|
||||
use core::run;
|
||||
|
@ -78,10 +78,10 @@ pub impl gen_send for message {
|
||||
};
|
||||
|
||||
body += ~"let b = pipe.reuse_buffer();\n";
|
||||
body += fmt!("let %s = ::pipes::SendPacketBuffered(\
|
||||
body += fmt!("let %s = ::core::pipes::SendPacketBuffered(\
|
||||
::ptr::addr_of(&(b.buffer.data.%s)));\n",
|
||||
sp, next.name);
|
||||
body += fmt!("let %s = ::pipes::RecvPacketBuffered(\
|
||||
body += fmt!("let %s = ::core::pipes::RecvPacketBuffered(\
|
||||
::ptr::addr_of(&(b.buffer.data.%s)));\n",
|
||||
rp, next.name);
|
||||
}
|
||||
@ -93,7 +93,7 @@ pub impl gen_send for message {
|
||||
(recv, recv) => "(c, s)"
|
||||
};
|
||||
|
||||
body += fmt!("let %s = ::pipes::entangle();\n", pat);
|
||||
body += fmt!("let %s = ::core::pipes::entangle();\n", pat);
|
||||
}
|
||||
body += fmt!("let message = %s(%s);\n",
|
||||
self.name(),
|
||||
@ -102,14 +102,14 @@ pub impl gen_send for message {
|
||||
~"s"), ~", "));
|
||||
|
||||
if !try {
|
||||
body += fmt!("::pipes::send(pipe, message);\n");
|
||||
body += fmt!("::core::pipes::send(pipe, message);\n");
|
||||
// return the new channel
|
||||
body += ~"c }";
|
||||
}
|
||||
else {
|
||||
body += fmt!("if ::pipes::send(pipe, message) {\n \
|
||||
::pipes::rt::make_some(c) \
|
||||
} else { ::pipes::rt::make_none() } }");
|
||||
body += fmt!("if ::core::pipes::send(pipe, message) {\n \
|
||||
::core::pipes::rt::make_some(c) \
|
||||
} else { ::core::pipes::rt::make_none() } }");
|
||||
}
|
||||
|
||||
let body = cx.parse_expr(body);
|
||||
@ -162,14 +162,14 @@ pub impl gen_send for message {
|
||||
message_args);
|
||||
|
||||
if !try {
|
||||
body += fmt!("::pipes::send(pipe, message);\n");
|
||||
body += fmt!("::core::pipes::send(pipe, message);\n");
|
||||
body += ~" }";
|
||||
} else {
|
||||
body += fmt!("if ::pipes::send(pipe, message) \
|
||||
body += fmt!("if ::core::pipes::send(pipe, message) \
|
||||
{ \
|
||||
::pipes::rt::make_some(()) \
|
||||
::core::pipes::rt::make_some(()) \
|
||||
} else { \
|
||||
::pipes::rt::make_none() \
|
||||
::core::pipes::rt::make_none() \
|
||||
} }");
|
||||
}
|
||||
|
||||
@ -272,7 +272,8 @@ pub impl to_type_decls for state {
|
||||
self.data_name(),
|
||||
self.span,
|
||||
cx.ty_path_ast_builder(
|
||||
path_global(~[cx.ident_of(~"pipes"),
|
||||
path_global(~[cx.ident_of(~"core"),
|
||||
cx.ident_of(~"pipes"),
|
||||
cx.ident_of(dir.to_str() + ~"Packet")],
|
||||
dummy_sp())
|
||||
.add_ty(cx.ty_path_ast_builder(
|
||||
@ -288,7 +289,8 @@ pub impl to_type_decls for state {
|
||||
self.data_name(),
|
||||
self.span,
|
||||
cx.ty_path_ast_builder(
|
||||
path_global(~[cx.ident_of(~"pipes"),
|
||||
path_global(~[cx.ident_of(~"core"),
|
||||
cx.ident_of(~"pipes"),
|
||||
cx.ident_of(dir.to_str()
|
||||
+ ~"PacketBuffered")],
|
||||
dummy_sp())
|
||||
@ -313,10 +315,10 @@ pub impl gen_init for protocol {
|
||||
|
||||
let body = if !self.is_bounded() {
|
||||
match start_state.dir {
|
||||
send => quote_expr!( ::pipes::entangle() ),
|
||||
send => quote_expr!( ::core::pipes::entangle() ),
|
||||
recv => {
|
||||
quote_expr!({
|
||||
let (s, c) = ::pipes::entangle();
|
||||
let (s, c) = ::core::pipes::entangle();
|
||||
(c, s)
|
||||
})
|
||||
}
|
||||
@ -336,7 +338,7 @@ pub impl gen_init for protocol {
|
||||
};
|
||||
|
||||
cx.parse_item(fmt!("pub fn init%s() -> (client::%s, server::%s)\
|
||||
{ use pipes::HasBuffer; %s }",
|
||||
{ use core::pipes::HasBuffer; %s }",
|
||||
start_state.ty_params.to_source(cx),
|
||||
start_state.to_ty(cx).to_source(cx),
|
||||
start_state.to_ty(cx).to_source(cx),
|
||||
@ -350,7 +352,7 @@ pub impl gen_init for protocol {
|
||||
let fty = s.to_ty(ext_cx);
|
||||
ext_cx.field_imm(ext_cx.ident_of(s.name),
|
||||
quote_expr!(
|
||||
::pipes::mk_packet::<$fty>()
|
||||
::core::pipes::mk_packet::<$fty>()
|
||||
))
|
||||
}))
|
||||
}
|
||||
@ -358,8 +360,8 @@ pub impl gen_init for protocol {
|
||||
fn gen_init_bounded(&self, ext_cx: ext_ctxt) -> @ast::expr {
|
||||
debug!("gen_init_bounded");
|
||||
let buffer_fields = self.gen_buffer_init(ext_cx);
|
||||
let buffer = quote_expr!(~::pipes::Buffer {
|
||||
header: ::pipes::BufferHeader(),
|
||||
let buffer = quote_expr!(~::core::pipes::Buffer {
|
||||
header: ::core::pipes::BufferHeader(),
|
||||
data: $buffer_fields,
|
||||
});
|
||||
|
||||
@ -375,7 +377,7 @@ pub impl gen_init for protocol {
|
||||
|
||||
quote_expr!({
|
||||
let buffer = $buffer;
|
||||
do ::pipes::entangle_buffer(buffer) |buffer, data| {
|
||||
do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
|
||||
$entangle_body
|
||||
}
|
||||
})
|
||||
@ -408,7 +410,7 @@ pub impl gen_init for protocol {
|
||||
}
|
||||
}
|
||||
let ty = s.to_ty(cx);
|
||||
let fty = quote_ty!( ::pipes::Packet<$ty> );
|
||||
let fty = quote_ty!( ::core::pipes::Packet<$ty> );
|
||||
|
||||
@spanned {
|
||||
node: ast::struct_field_ {
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub fn foo<T:Owned + Copy>(x: T) -> Port<T> {
|
||||
let (p, c) = stream();
|
||||
|
@ -24,7 +24,7 @@ extern mod std;
|
||||
use io::Writer;
|
||||
use io::WriterUtil;
|
||||
|
||||
use pipes::{Port, Chan, SharedChan};
|
||||
use comm::{Port, Chan, SharedChan};
|
||||
|
||||
macro_rules! move_out (
|
||||
{ $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
|
||||
@ -36,7 +36,7 @@ enum request {
|
||||
stop
|
||||
}
|
||||
|
||||
fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
|
||||
fn server(requests: Port<request>, responses: comm::Chan<uint>) {
|
||||
let mut count = 0u;
|
||||
let mut done = false;
|
||||
while !done {
|
||||
@ -55,8 +55,8 @@ fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
|
||||
}
|
||||
|
||||
fn run(args: &[~str]) {
|
||||
let (from_child, to_parent) = pipes::stream();
|
||||
let (from_parent, to_child) = pipes::stream();
|
||||
let (from_child, to_parent) = comm::stream();
|
||||
let (from_parent, to_child) = comm::stream();
|
||||
|
||||
let to_child = SharedChan(to_child);
|
||||
|
||||
|
@ -20,7 +20,7 @@ extern mod std;
|
||||
use io::Writer;
|
||||
use io::WriterUtil;
|
||||
|
||||
use pipes::{Port, PortSet, Chan};
|
||||
use comm::{Port, PortSet, Chan, stream};
|
||||
|
||||
macro_rules! move_out (
|
||||
{ $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
|
||||
@ -32,7 +32,7 @@ enum request {
|
||||
stop
|
||||
}
|
||||
|
||||
fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
|
||||
fn server(requests: PortSet<request>, responses: Chan<uint>) {
|
||||
let mut count = 0;
|
||||
let mut done = false;
|
||||
while !done {
|
||||
@ -51,8 +51,8 @@ fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
|
||||
}
|
||||
|
||||
fn run(args: &[~str]) {
|
||||
let (from_child, to_parent) = pipes::stream();
|
||||
let (from_parent_, to_child) = pipes::stream();
|
||||
let (from_child, to_parent) = stream();
|
||||
let (from_parent_, to_child) = stream();
|
||||
let from_parent = PortSet();
|
||||
from_parent.add(from_parent_);
|
||||
|
||||
@ -62,7 +62,7 @@ fn run(args: &[~str]) {
|
||||
let start = std::time::precise_time_s();
|
||||
let mut worker_results = ~[];
|
||||
for uint::range(0, workers) |_i| {
|
||||
let (from_parent_, to_child) = pipes::stream();
|
||||
let (from_parent_, to_child) = stream();
|
||||
from_parent.add(from_parent_);
|
||||
do task::task().future_result(|+r| {
|
||||
worker_results.push(r);
|
||||
|
@ -20,7 +20,7 @@ extern mod std;
|
||||
use std::time;
|
||||
use std::future;
|
||||
|
||||
use pipes::recv;
|
||||
use core::pipes::recv;
|
||||
|
||||
proto! ring (
|
||||
num:send {
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
extern mod std;
|
||||
|
||||
use pipes::{spawn_service, recv};
|
||||
use core::pipes::{spawn_service, recv};
|
||||
use std::time::precise_time_s;
|
||||
|
||||
proto! pingpong (
|
||||
@ -70,9 +70,9 @@ macro_rules! follow (
|
||||
)
|
||||
)
|
||||
|
||||
fn switch<T:Owned,Tb:Owned,U>(+endp: pipes::RecvPacketBuffered<T, Tb>,
|
||||
fn switch<T:Owned,Tb:Owned,U>(+endp: core::pipes::RecvPacketBuffered<T, Tb>,
|
||||
f: fn(+v: Option<T>) -> U) -> U {
|
||||
f(pipes::try_recv(endp))
|
||||
f(core::pipes::try_recv(endp))
|
||||
}
|
||||
|
||||
// Here's the benchmark
|
||||
|
@ -15,7 +15,7 @@ use std::oldmap;
|
||||
use std::oldmap::HashMap;
|
||||
use std::sort;
|
||||
use std::cell::Cell;
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn print_complements() {
|
||||
let all = ~[Blue, Red, Yellow];
|
||||
|
@ -18,7 +18,7 @@ use std::oldmap;
|
||||
use std::oldmap::HashMap;
|
||||
use std::sort;
|
||||
use io::ReaderUtil;
|
||||
use pipes::{stream, Port, Chan};
|
||||
use comm::{stream, Port, Chan};
|
||||
use cmp::Ord;
|
||||
|
||||
// given a map, print a sorted version of it
|
||||
@ -97,8 +97,8 @@ fn windows_with_carry(bb: &[u8], nn: uint,
|
||||
return vec::slice(bb, len - (nn - 1u), len).to_vec();
|
||||
}
|
||||
|
||||
fn make_sequence_processor(sz: uint, from_parent: pipes::Port<~[u8]>,
|
||||
to_parent: pipes::Chan<~str>) {
|
||||
fn make_sequence_processor(sz: uint, from_parent: comm::Port<~[u8]>,
|
||||
to_parent: comm::Chan<~str>) {
|
||||
|
||||
let freqs: HashMap<~[u8], uint> = oldmap::HashMap();
|
||||
let mut carry: ~[u8] = ~[];
|
||||
@ -159,7 +159,7 @@ fn main() {
|
||||
|
||||
from_child.push(from_child_);
|
||||
|
||||
let (from_parent, to_child) = pipes::stream();
|
||||
let (from_parent, to_child) = comm::stream();
|
||||
|
||||
do task::spawn_with(from_parent) |from_parent| {
|
||||
make_sequence_processor(sz, from_parent, to_parent_);
|
||||
|
@ -108,7 +108,7 @@ impl io::Writer for Devnull {
|
||||
fn get_type(&self) -> io::WriterType { io::File }
|
||||
}
|
||||
|
||||
fn writer(path: ~str, pport: pipes::Port<Line>, size: uint)
|
||||
fn writer(path: ~str, pport: comm::Port<Line>, size: uint)
|
||||
{
|
||||
let cout: io::Writer = match path {
|
||||
~"" => {
|
||||
@ -172,8 +172,8 @@ fn main() {
|
||||
let size = if vec::len(args) < 2_u { 80_u }
|
||||
else { uint::from_str(args[1]).get() };
|
||||
|
||||
let (pport, pchan) = pipes::stream();
|
||||
let pchan = pipes::SharedChan(pchan);
|
||||
let (pport, pchan) = comm::stream();
|
||||
let pchan = comm::SharedChan(pchan);
|
||||
for uint::range(0_u, size) |j| {
|
||||
let cchan = pchan.clone();
|
||||
do task::spawn { cchan.send(chanmb(j, size, depth)) };
|
||||
|
@ -24,12 +24,9 @@
|
||||
extern mod std;
|
||||
|
||||
use std::{time, getopts};
|
||||
use io::WriterUtil;
|
||||
use int::range;
|
||||
use pipes::Port;
|
||||
use pipes::Chan;
|
||||
use pipes::send;
|
||||
use pipes::recv;
|
||||
use core::int::range;
|
||||
use core::comm::*;
|
||||
use core::io::WriterUtil;
|
||||
|
||||
use core::result;
|
||||
use result::{Ok, Err};
|
||||
@ -41,7 +38,7 @@ fn fib(n: int) -> int {
|
||||
} else if n <= 2 {
|
||||
c.send(1);
|
||||
} else {
|
||||
let p = pipes::PortSet();
|
||||
let p = PortSet();
|
||||
let ch = p.chan();
|
||||
task::spawn(|| pfib(ch, n - 1) );
|
||||
let ch = p.chan();
|
||||
@ -50,7 +47,7 @@ fn fib(n: int) -> int {
|
||||
}
|
||||
}
|
||||
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = stream();
|
||||
let _t = task::spawn(|| pfib(ch, n) );
|
||||
p.recv()
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
//
|
||||
// The filename is a song reference; google it in quotes.
|
||||
|
||||
fn child_generation(gens_left: uint, -c: pipes::Chan<()>) {
|
||||
fn child_generation(gens_left: uint, -c: comm::Chan<()>) {
|
||||
// This used to be O(n^2) in the number of generations that ever existed.
|
||||
// With this code, only as many generations are alive at a time as tasks
|
||||
// alive at a time,
|
||||
@ -43,7 +43,7 @@ fn main() {
|
||||
copy args
|
||||
};
|
||||
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
child_generation(uint::from_str(args[1]).get(), c);
|
||||
if p.try_recv().is_none() {
|
||||
fail!(~"it happened when we slumbered");
|
||||
|
@ -20,7 +20,7 @@
|
||||
// Creates in the background 'num_tasks' tasks, all blocked forever.
|
||||
// Doesn't return until all such tasks are ready, but doesn't block forever itself.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn grandchild_group(num_tasks: uint) {
|
||||
let (po, ch) = stream();
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
// Test for concurrent tasks
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn calc(children: uint, parent_wait_chan: &Chan<Chan<Chan<int>>>) {
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
// except according to those terms.
|
||||
|
||||
fn main() {
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
let x = Some(p);
|
||||
c.send(false);
|
||||
match x {
|
||||
|
@ -25,6 +25,6 @@ fn foo(i:int, j: @~str) -> foo {
|
||||
|
||||
fn main() {
|
||||
let cat = ~"kitty";
|
||||
let (_, ch) = pipes::stream(); //~ ERROR does not fulfill `Owned`
|
||||
let (_, ch) = comm::stream(); //~ ERROR does not fulfill `Owned`
|
||||
ch.send(foo(42, @(cat))); //~ ERROR does not fulfill `Owned`
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ extern mod std;
|
||||
fn child() { assert (1 == 2); }
|
||||
|
||||
fn main() {
|
||||
let (p, _c) = pipes::stream::<int>();
|
||||
let (p, _c) = comm::stream::<int>();
|
||||
task::spawn(|| child() );
|
||||
let x = p.recv();
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
fn child() { fail!(); }
|
||||
|
||||
fn main() {
|
||||
let (p, _c) = pipes::stream::<()>();
|
||||
let (p, _c) = comm::stream::<()>();
|
||||
task::spawn(|| child() );
|
||||
task::yield();
|
||||
}
|
||||
|
@ -15,13 +15,13 @@
|
||||
fn grandchild() { fail!(~"grandchild dies"); }
|
||||
|
||||
fn child() {
|
||||
let (p, _c) = pipes::stream::<int>();
|
||||
let (p, _c) = comm::stream::<int>();
|
||||
task::spawn(|| grandchild() );
|
||||
let x = p.recv();
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (p, _c) = pipes::stream::<int>();
|
||||
let (p, _c) = comm::stream::<int>();
|
||||
task::spawn(|| child() );
|
||||
let x = p.recv();
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
fn child() { assert (1 == 2); }
|
||||
|
||||
fn parent() {
|
||||
let (p, _c) = pipes::stream::<int>();
|
||||
let (p, _c) = comm::stream::<int>();
|
||||
task::spawn(|| child() );
|
||||
let x = p.recv();
|
||||
}
|
||||
@ -22,7 +22,7 @@ fn parent() {
|
||||
// This task is not linked to the failure chain, but since the other
|
||||
// tasks are going to fail the kernel, this one will fail too
|
||||
fn sleeper() {
|
||||
let (p, _c) = pipes::stream::<int>();
|
||||
let (p, _c) = comm::stream::<int>();
|
||||
let x = p.recv();
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@ fn goodfail() {
|
||||
|
||||
fn main() {
|
||||
task::spawn(|| goodfail() );
|
||||
let (po, _c) = pipes::stream();
|
||||
let (po, _c) = comm::stream();
|
||||
// We shouldn't be able to get past this recv since there's no
|
||||
// message available
|
||||
let i: int = po.recv();
|
||||
|
@ -24,7 +24,7 @@
|
||||
// course preferable, as the value itself is
|
||||
// irrelevant).
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn foo(&&x: ()) -> Port<()> {
|
||||
let (p, c) = stream::<()>();
|
||||
|
@ -9,7 +9,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub fn main() {
|
||||
let (p, ch) = stream();
|
||||
|
@ -20,14 +20,14 @@ extern mod std;
|
||||
|
||||
use std::oldmap;
|
||||
use std::oldmap::HashMap;
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub fn map(filename: ~str, emit: map_reduce::putter) { emit(filename, ~"1"); }
|
||||
|
||||
mod map_reduce {
|
||||
use std::oldmap;
|
||||
use std::oldmap::HashMap;
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub type putter = fn@(~str, ~str);
|
||||
|
||||
|
@ -11,15 +11,15 @@
|
||||
// xfail-fast
|
||||
|
||||
pub fn main() {
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
do task::try || {
|
||||
let (p2,c2) = pipes::stream();
|
||||
let (p2,c2) = comm::stream();
|
||||
do task::spawn || {
|
||||
p2.recv();
|
||||
error!("sibling fails");
|
||||
fail!();
|
||||
}
|
||||
let (p3,c3) = pipes::stream();
|
||||
let (p3,c3) = comm::stream();
|
||||
c.send(c3);
|
||||
c2.send(());
|
||||
error!("child blocks");
|
||||
|
@ -10,22 +10,22 @@
|
||||
|
||||
// xfail-fast
|
||||
|
||||
use pipes::{Select2, Selectable};
|
||||
use comm::{Select2, Selectable};
|
||||
|
||||
pub fn main() {
|
||||
let (p,c) = pipes::stream();
|
||||
let (p,c) = comm::stream();
|
||||
do task::try || {
|
||||
let (p2,c2) = pipes::stream();
|
||||
let (p2,c2) = comm::stream();
|
||||
do task::spawn || {
|
||||
p2.recv();
|
||||
error!("sibling fails");
|
||||
fail!();
|
||||
}
|
||||
let (p3,c3) = pipes::stream();
|
||||
let (p3,c3) = comm::stream();
|
||||
c.send(c3);
|
||||
c2.send(());
|
||||
error!("child blocks");
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
(p, p3).select();
|
||||
c.send(());
|
||||
};
|
||||
|
@ -1,6 +1,6 @@
|
||||
extern mod std;
|
||||
|
||||
use pipes::Chan;
|
||||
use comm::Chan;
|
||||
|
||||
type RingBuffer = ~[float];
|
||||
type SamplesFn = fn~ (samples: &RingBuffer);
|
||||
|
@ -1,4 +1,4 @@
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn producer(c: &Chan<~[u8]>) {
|
||||
c.send(
|
||||
|
@ -15,7 +15,8 @@
|
||||
//
|
||||
// http://theincredibleholk.wordpress.com/2012/07/06/rusty-pipes/
|
||||
|
||||
use pipes::try_recv;
|
||||
use core::pipes;
|
||||
use core::pipes::try_recv;
|
||||
|
||||
pub type username = ~str;
|
||||
pub type password = ~str;
|
||||
|
@ -18,7 +18,8 @@ extern mod std;
|
||||
use std::timer::sleep;
|
||||
use std::uv;
|
||||
|
||||
use pipes::{try_recv, recv};
|
||||
use core::pipes;
|
||||
use core::pipes::{try_recv, recv};
|
||||
|
||||
proto! oneshot (
|
||||
waiting:send {
|
||||
|
@ -13,6 +13,7 @@
|
||||
extern mod std;
|
||||
use std::timer::sleep;
|
||||
use std::uv;
|
||||
use core::pipes;
|
||||
|
||||
proto! oneshot (
|
||||
waiting:send {
|
||||
|
@ -19,6 +19,7 @@
|
||||
// modified in hopefully straightforward ways.
|
||||
|
||||
mod pingpong {
|
||||
use core::pipes;
|
||||
use core::pipes::*;
|
||||
use core::ptr;
|
||||
|
||||
@ -44,6 +45,7 @@ mod pingpong {
|
||||
pub enum ping = server::pong;
|
||||
pub enum pong = client::ping;
|
||||
pub mod client {
|
||||
use core::pipes;
|
||||
use core::pipes::*;
|
||||
use core::ptr;
|
||||
|
||||
@ -53,7 +55,7 @@ mod pingpong {
|
||||
let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
|
||||
let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
|
||||
let message = ::pingpong::ping(s);
|
||||
::pipes::send(pipe, message);
|
||||
send(pipe, message);
|
||||
c
|
||||
}
|
||||
}
|
||||
@ -63,6 +65,7 @@ mod pingpong {
|
||||
::pingpong::Packets>;
|
||||
}
|
||||
pub mod server {
|
||||
use core::pipes;
|
||||
use core::pipes::*;
|
||||
use core::ptr;
|
||||
|
||||
@ -74,7 +77,7 @@ mod pingpong {
|
||||
let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
|
||||
let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
|
||||
let message = ::pingpong::pong(s);
|
||||
::pipes::send(pipe, message);
|
||||
send(pipe, message);
|
||||
c
|
||||
}
|
||||
}
|
||||
@ -84,7 +87,7 @@ mod pingpong {
|
||||
}
|
||||
|
||||
mod test {
|
||||
use pipes::recv;
|
||||
use core::pipes::recv;
|
||||
use pingpong::{ping, pong};
|
||||
|
||||
pub fn client(-chan: ::pingpong::client::ping) {
|
||||
|
@ -32,7 +32,7 @@ macro_rules! select_if (
|
||||
], )*
|
||||
} => {
|
||||
if $index == $count {
|
||||
match pipes::try_recv($port) {
|
||||
match core::pipes::try_recv($port) {
|
||||
$(Some($message($($($x,)+)* next)) => {
|
||||
let $next = next;
|
||||
$e
|
||||
@ -66,7 +66,7 @@ macro_rules! select (
|
||||
-> $next:ident $e:expr),+
|
||||
} )+
|
||||
} => ({
|
||||
let index = pipes::selecti([$(($port).header()),+]);
|
||||
let index = core::comm::selecti([$(($port).header()),+]);
|
||||
select_if!(index, 0, $( $port => [
|
||||
$($message$(($($x),+))dont_type_this* -> $next $e),+
|
||||
], )+)
|
||||
|
@ -17,7 +17,8 @@ extern mod std;
|
||||
use std::timer::sleep;
|
||||
use std::uv;
|
||||
|
||||
use pipes::{recv, select};
|
||||
use core::pipes;
|
||||
use core::pipes::{recv, select};
|
||||
|
||||
proto! oneshot (
|
||||
waiting:send {
|
||||
|
@ -13,7 +13,8 @@
|
||||
extern mod std;
|
||||
use std::timer::sleep;
|
||||
use std::uv;
|
||||
use pipes::recv;
|
||||
use core::pipes;
|
||||
use core::pipes::recv;
|
||||
|
||||
proto! oneshot (
|
||||
waiting:send {
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
// Tests of the runtime's scheduler interface
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
type sched_id = int;
|
||||
type task_id = *libc::c_void;
|
||||
|
@ -17,7 +17,7 @@ fn die() {
|
||||
|
||||
fn iloop() {
|
||||
task::spawn(|| die() );
|
||||
let (p, c) = core::pipes::stream::<()>();
|
||||
let (p, c) = comm::stream::<()>();
|
||||
loop {
|
||||
// Sending and receiving here because these actions yield,
|
||||
// at which point our child can kill us.
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
struct test {
|
||||
f: int,
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
// tests that ctrl's type gets inferred properly
|
||||
type command<K, V> = {key: K, val: V};
|
||||
|
@ -23,6 +23,6 @@ fn foo(i:int, j: char) -> foo {
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let (_po, ch) = pipes::stream();
|
||||
let (_po, ch) = comm::stream();
|
||||
ch.send(foo(42, 'c'));
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
Arnold.
|
||||
*/
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
type ctx = Chan<int>;
|
||||
|
||||
|
@ -13,8 +13,8 @@
|
||||
|
||||
extern mod std;
|
||||
|
||||
use pipes::Chan;
|
||||
use pipes::Port;
|
||||
use comm::Chan;
|
||||
use comm::Port;
|
||||
|
||||
pub fn main() { test05(); }
|
||||
|
||||
@ -28,7 +28,7 @@ fn test05_start(ch : Chan<int>) {
|
||||
}
|
||||
|
||||
fn test05() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
task::spawn(|| test05_start(ch) );
|
||||
let mut value = po.recv();
|
||||
log(error, value);
|
||||
|
@ -13,8 +13,8 @@
|
||||
|
||||
extern mod std;
|
||||
|
||||
fn start(c: pipes::Chan<pipes::Chan<~str>>) {
|
||||
let (p, ch) = pipes::stream();
|
||||
fn start(c: comm::Chan<comm::Chan<~str>>) {
|
||||
let (p, ch) = comm::stream();
|
||||
c.send(ch);
|
||||
|
||||
let mut a;
|
||||
@ -28,7 +28,7 @@ fn start(c: pipes::Chan<pipes::Chan<~str>>) {
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = comm::stream();
|
||||
let child = task::spawn(|| start(ch) );
|
||||
|
||||
let c = p.recv();
|
||||
|
@ -13,13 +13,13 @@
|
||||
|
||||
extern mod std;
|
||||
|
||||
fn start(c: pipes::Chan<pipes::Chan<int>>) {
|
||||
let (p, ch) = pipes::stream();
|
||||
fn start(c: comm::Chan<comm::Chan<int>>) {
|
||||
let (p, ch) = comm::stream();
|
||||
c.send(ch);
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = comm::stream();
|
||||
let child = task::spawn(|| start(ch) );
|
||||
let c = p.recv();
|
||||
}
|
||||
|
@ -12,16 +12,15 @@
|
||||
#[legacy_modes];
|
||||
|
||||
extern mod std;
|
||||
use pipes::send;
|
||||
|
||||
fn start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
|
||||
fn start(c: comm::Chan<int>, start: int, number_of_messages: int) {
|
||||
let mut i: int = 0;
|
||||
while i < number_of_messages { c.send(start + i); i += 1; }
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
debug!("Check that we don't deadlock.");
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = comm::stream();
|
||||
task::try(|| start(ch, 0, 10) );
|
||||
debug!("Joined task");
|
||||
}
|
||||
|
@ -12,13 +12,13 @@
|
||||
#[legacy_modes];
|
||||
|
||||
pub fn main() {
|
||||
let po = pipes::PortSet();
|
||||
let po = comm::PortSet();
|
||||
|
||||
// Spawn 10 tasks each sending us back one int.
|
||||
let mut i = 10;
|
||||
while (i > 0) {
|
||||
log(debug, i);
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = comm::stream();
|
||||
po.add(p);
|
||||
task::spawn({let i = i; || child(i, ch)});
|
||||
i = i - 1;
|
||||
@ -37,7 +37,7 @@ pub fn main() {
|
||||
debug!("main thread exiting");
|
||||
}
|
||||
|
||||
fn child(x: int, ch: pipes::Chan<int>) {
|
||||
fn child(x: int, ch: comm::Chan<int>) {
|
||||
log(debug, x);
|
||||
ch.send(x);
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
extern mod std;
|
||||
|
||||
fn start(c: pipes::Chan<int>, i0: int) {
|
||||
fn start(c: comm::Chan<int>, i0: int) {
|
||||
let mut i = i0;
|
||||
while i > 0 {
|
||||
c.send(0);
|
||||
@ -27,7 +27,7 @@ pub fn main() {
|
||||
// is likely to terminate before the child completes, so from
|
||||
// the child's point of view the receiver may die. We should
|
||||
// drop messages on the floor in this case, and not crash!
|
||||
let (p, ch) = pipes::stream();
|
||||
let (p, ch) = comm::stream();
|
||||
task::spawn(|| start(ch, 10));
|
||||
p.recv();
|
||||
}
|
||||
|
@ -10,16 +10,11 @@
|
||||
// except according to those terms.
|
||||
|
||||
|
||||
use pipes::send;
|
||||
use pipes::Port;
|
||||
use pipes::recv;
|
||||
use pipes::Chan;
|
||||
|
||||
// Tests of ports and channels on various types
|
||||
fn test_rec() {
|
||||
struct R {val0: int, val1: u8, val2: char}
|
||||
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
let r0: R = R {val0: 0, val1: 1u8, val2: '2'};
|
||||
ch.send(r0);
|
||||
let mut r1: R;
|
||||
@ -30,7 +25,7 @@ fn test_rec() {
|
||||
}
|
||||
|
||||
fn test_vec() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
let v0: ~[int] = ~[0, 1, 2];
|
||||
ch.send(v0);
|
||||
let v1 = po.recv();
|
||||
@ -40,7 +35,7 @@ fn test_vec() {
|
||||
}
|
||||
|
||||
fn test_str() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
let s0 = ~"test";
|
||||
ch.send(s0);
|
||||
let s1 = po.recv();
|
||||
@ -84,7 +79,7 @@ impl cmp::Eq for t {
|
||||
}
|
||||
|
||||
fn test_tag() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
ch.send(tag1);
|
||||
ch.send(tag2(10));
|
||||
ch.send(tag3(10, 11u8, 'A'));
|
||||
@ -98,8 +93,8 @@ fn test_tag() {
|
||||
}
|
||||
|
||||
fn test_chan() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po0, ch0) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
let (po0, ch0) = comm::stream();
|
||||
ch.send(ch0);
|
||||
let ch1 = po.recv();
|
||||
// Does the transmitted channel still work?
|
||||
|
@ -12,9 +12,7 @@
|
||||
#[legacy_modes];
|
||||
|
||||
extern mod std;
|
||||
use pipes::Chan;
|
||||
use pipes::send;
|
||||
use pipes::recv;
|
||||
use core::comm::Chan;
|
||||
|
||||
pub fn main() { debug!("===== WITHOUT THREADS ====="); test00(); }
|
||||
|
||||
@ -35,7 +33,7 @@ fn test00() {
|
||||
|
||||
debug!("Creating tasks");
|
||||
|
||||
let po = pipes::PortSet();
|
||||
let po = comm::PortSet();
|
||||
|
||||
let mut i: int = 0;
|
||||
|
||||
|
@ -8,14 +8,12 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use pipes::send;
|
||||
|
||||
pub fn main() { test00(); }
|
||||
|
||||
fn test00() {
|
||||
let mut r: int = 0;
|
||||
let mut sum: int = 0;
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
c.send(1);
|
||||
c.send(2);
|
||||
c.send(3);
|
||||
|
@ -15,7 +15,7 @@ pub fn main() { test00(); }
|
||||
fn test00() {
|
||||
let r: int = 0;
|
||||
let mut sum: int = 0;
|
||||
let (p, c) = pipes::stream();
|
||||
let (p, c) = comm::stream();
|
||||
let number_of_messages: int = 1000;
|
||||
let mut i: int = 0;
|
||||
while i < number_of_messages { c.send(i + 0); i += 1; }
|
||||
|
@ -8,16 +8,14 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use pipes::send;
|
||||
use pipes::Chan;
|
||||
use pipes::recv;
|
||||
use core::comm::Chan;
|
||||
|
||||
pub fn main() { test00(); }
|
||||
|
||||
fn test00() {
|
||||
let mut r: int = 0;
|
||||
let mut sum: int = 0;
|
||||
let p = pipes::PortSet();
|
||||
let p = comm::PortSet();
|
||||
let c0 = p.chan();
|
||||
let c1 = p.chan();
|
||||
let c2 = p.chan();
|
||||
|
@ -15,7 +15,7 @@ extern mod std;
|
||||
|
||||
pub fn main() { test00(); }
|
||||
|
||||
fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
|
||||
fn test00_start(c: comm::Chan<int>, start: int, number_of_messages: int) {
|
||||
let mut i: int = 0;
|
||||
while i < number_of_messages { c.send(start + i); i += 1; }
|
||||
}
|
||||
@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
|
||||
fn test00() {
|
||||
let mut r: int = 0;
|
||||
let mut sum: int = 0;
|
||||
let p = pipes::PortSet();
|
||||
let p = comm::PortSet();
|
||||
let number_of_messages: int = 10;
|
||||
|
||||
let c = p.chan();
|
||||
|
@ -15,7 +15,7 @@ extern mod std;
|
||||
|
||||
pub fn main() { test00(); }
|
||||
|
||||
fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
|
||||
fn test00_start(c: comm::Chan<int>, number_of_messages: int) {
|
||||
let mut i: int = 0;
|
||||
while i < number_of_messages { c.send(i + 0); i += 1; }
|
||||
}
|
||||
@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
|
||||
fn test00() {
|
||||
let r: int = 0;
|
||||
let mut sum: int = 0;
|
||||
let p = pipes::PortSet();
|
||||
let p = comm::PortSet();
|
||||
let number_of_messages: int = 10;
|
||||
let ch = p.chan();
|
||||
|
||||
|
@ -16,7 +16,7 @@ extern mod std;
|
||||
// any size, but rustc currently can because they do have size. Whether
|
||||
// or not this is desirable I don't know, but here's a regression test.
|
||||
pub fn main() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
ch.send(());
|
||||
let n: () = po.recv();
|
||||
assert (n == ());
|
||||
|
@ -13,7 +13,7 @@
|
||||
// A port of task-killjoin to use a class with a dtor to manage
|
||||
// the join.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
struct notify {
|
||||
ch: Chan<bool>, v: @mut bool,
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub fn main() {
|
||||
let (p, ch) = stream::<uint>();
|
||||
|
@ -8,14 +8,12 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use pipes::{Port, Chan};
|
||||
|
||||
/*
|
||||
This is about the simplest program that can successfully send a
|
||||
message.
|
||||
*/
|
||||
pub fn main() {
|
||||
let (po, ch) = pipes::stream();
|
||||
let (po, ch) = comm::stream();
|
||||
ch.send(42);
|
||||
let r = po.recv();
|
||||
log(error, r);
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
fn child(c: &SharedChan<~uint>, i: uint) {
|
||||
c.send(~i);
|
||||
|
@ -8,7 +8,7 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
pub fn main() {
|
||||
let (p, c) = stream();
|
||||
|
@ -11,7 +11,7 @@
|
||||
// xfail-win32
|
||||
extern mod std;
|
||||
|
||||
use core::pipes::*;
|
||||
use core::comm::*;
|
||||
|
||||
struct complainer {
|
||||
c: SharedChan<bool>,
|
||||
|
Loading…
x
Reference in New Issue
Block a user