core: Stop using oldcomm

This commit is contained in:
Brian Anderson 2013-01-22 12:38:08 -08:00
parent cc9ab2c033
commit bc0c5bb447
4 changed files with 64 additions and 79 deletions

View File

@ -18,7 +18,6 @@
use cast;
use iter;
use libc;
use oldcomm;
use option;
use pipes;
use prelude::*;

View File

@ -17,7 +17,7 @@
use io::ReaderUtil;
use libc;
use libc::{pid_t, c_void, c_int};
use oldcomm;
use pipes::{stream, SharedChan};
use option::{Some, None};
use os;
use prelude::*;
@ -333,22 +333,23 @@ pub fn program_output(prog: &str, args: &[~str]) ->
// in parallel so we don't deadlock while blocking on one
// or the other. FIXME (#2625): Surely there's a much more
// clever way to do this.
let p = oldcomm::Port();
let ch = oldcomm::Chan(&p);
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch_clone = ch.clone();
do task::spawn_sched(task::SingleThreaded) {
let errput = readclose(pipe_err.in);
oldcomm::send(ch, (2, move errput));
ch.send((2, move errput));
};
do task::spawn_sched(task::SingleThreaded) {
let output = readclose(pipe_out.in);
oldcomm::send(ch, (1, move output));
ch_clone.send((1, move output));
};
let status = run::waitpid(pid);
let mut errs = ~"";
let mut outs = ~"";
let mut count = 2;
while count > 0 {
let stream = oldcomm::recv(p);
let stream = p.recv();
match stream {
(1, copy s) => {
outs = move s;

View File

@ -43,10 +43,9 @@
use cmp::Eq;
use iter;
use libc;
use oldcomm;
use option;
use result::Result;
use pipes::{stream, Chan, Port};
use pipes::{stream, Chan, Port, SharedChan};
use pipes;
use prelude::*;
use ptr;
@ -427,18 +426,17 @@ fn spawn_with<A: Owned>(arg: A, f: fn~(v: A)) {
* Fails if a future_result was already set for this task.
*/
fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<T>();
let mut result = None;
let fr_task_builder = self.future_result(|+r| {
result = Some(move r);
});
do fr_task_builder.spawn |move f| {
oldcomm::send(ch, f());
do fr_task_builder.spawn |move f, move ch| {
ch.send(f());
}
match option::unwrap(move result).recv() {
Success => result::Ok(oldcomm::recv(po)),
Success => result::Ok(po.recv()),
Failure => result::Err(())
}
}
@ -665,17 +663,18 @@ fn test_cant_dup_task_builder() {
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
let ch = SharedChan(ch);
do spawn_unlinked {
let ch = ch.clone();
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
for iter::repeat(16) { task::yield(); }
oldcomm::send(ch, ()); // If killed first, grandparent hangs.
ch.send(()); // If killed first, grandparent hangs.
}
fail; // Shouldn't kill either (grand)parent or (grand)child.
}
oldcomm::recv(po);
po.recv();
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
@ -695,8 +694,7 @@ fn test_spawn_unlinked_sup_fail_down() {
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
let po = oldcomm::Port::<()>();
let _ch = oldcomm::Chan(&po);
let (po, _ch) = stream::<()>();
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
@ -714,7 +712,7 @@ fn test_spawn_unlinked_sup_fail_down() {
.. b0
};
do b1.spawn { fail; }
oldcomm::recv(po); // We should get punted awake
po.recv(); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
@ -738,11 +736,10 @@ fn test_spawn_unlinked_sup_fail_down() {
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
let po = oldcomm::Port::<()>();
let _ch = oldcomm::Chan(&po);
let (po, _ch) = stream::<()>();
// Default options are to spawn linked & unsupervised.
do spawn { fail; }
oldcomm::recv(po); // We should get punted awake
po.recv(); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
@ -810,27 +807,25 @@ fn test_spawn_linked_sup_propagate_sibling() {
#[test]
fn test_run_basic() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
do task().spawn {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]
fn test_add_wrapper() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
let b0 = task();
let b1 = do b0.add_wrapper |body| {
fn~(move body) {
body();
oldcomm::send(ch, ());
ch.send(());
}
};
do b1.spawn { }
oldcomm::recv(po);
po.recv();
}
#[test]
@ -883,10 +878,10 @@ fn test_spawn_sched_no_threads() {
#[test]
fn test_spawn_sched() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream::<()>();
let ch = SharedChan(ch);
fn f(i: int, ch: oldcomm::Chan<()>) {
fn f(i: int, ch: SharedChan<()>) {
let parent_sched_id = rt::rust_get_sched_id();
do spawn_sched(SingleThreaded) {
@ -894,21 +889,20 @@ fn f(i: int, ch: oldcomm::Chan<()>) {
assert parent_sched_id != child_sched_id;
if (i == 0) {
oldcomm::send(ch, ());
ch.send(());
} else {
f(i - 1, ch);
f(i - 1, ch.clone());
}
};
}
f(10, ch);
oldcomm::recv(po);
po.recv();
}
#[test]
fn test_spawn_sched_childs_on_default_sched() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
// Assuming tests run on the default scheduler
let default_id = rt::rust_get_sched_id();
@ -919,11 +913,11 @@ fn test_spawn_sched_childs_on_default_sched() {
let child_sched_id = rt::rust_get_sched_id();
assert parent_sched_id != child_sched_id;
assert child_sched_id == default_id;
oldcomm::send(ch, ());
ch.send(());
};
};
oldcomm::recv(po);
po.recv();
}
#[nolink]
@ -945,10 +939,8 @@ fn test_spawn_sched_blocking() {
// without affecting other schedulers
for iter::repeat(20u) {
let start_po = oldcomm::Port();
let start_ch = oldcomm::Chan(&start_po);
let fin_po = oldcomm::Port();
let fin_ch = oldcomm::Chan(&fin_po);
let (start_po, start_ch) = stream();
let (fin_po, fin_ch) = stream();
let lock = testrt::rust_dbg_lock_create();
@ -956,44 +948,42 @@ fn test_spawn_sched_blocking() {
unsafe {
testrt::rust_dbg_lock_lock(lock);
oldcomm::send(start_ch, ());
start_ch.send(());
// Block the scheduler thread
testrt::rust_dbg_lock_wait(lock);
testrt::rust_dbg_lock_unlock(lock);
oldcomm::send(fin_ch, ());
fin_ch.send(());
}
};
// Wait until the other task has its lock
oldcomm::recv(start_po);
start_po.recv();
fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) {
fn pingpong(po: &Port<int>, ch: &Chan<int>) {
let mut val = 20;
while val > 0 {
val = oldcomm::recv(po);
oldcomm::send(ch, val - 1);
val = po.recv();
ch.send(val - 1);
}
}
let setup_po = oldcomm::Port();
let setup_ch = oldcomm::Chan(&setup_po);
let parent_po = oldcomm::Port();
let parent_ch = oldcomm::Chan(&parent_po);
let (setup_po, setup_ch) = stream();
let (parent_po, parent_ch) = stream();
do spawn {
let child_po = oldcomm::Port();
oldcomm::send(setup_ch, oldcomm::Chan(&child_po));
pingpong(child_po, parent_ch);
let (child_po, child_ch) = stream();
setup_ch.send(child_ch);
pingpong(&child_po, &parent_ch);
};
let child_ch = oldcomm::recv(setup_po);
oldcomm::send(child_ch, 20);
pingpong(parent_po, child_ch);
let child_ch = setup_po.recv();
child_ch.send(20);
pingpong(&parent_po, &child_ch);
testrt::rust_dbg_lock_lock(lock);
testrt::rust_dbg_lock_signal(lock);
testrt::rust_dbg_lock_unlock(lock);
oldcomm::recv(fin_po);
fin_po.recv();
testrt::rust_dbg_lock_destroy(lock);
}
}
@ -1001,18 +991,17 @@ fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) {
#[cfg(test)]
fn avoid_copying_the_body(spawnfn: fn(v: fn~())) {
let p = oldcomm::Port::<uint>();
let ch = oldcomm::Chan(&p);
let (p, ch) = stream::<uint>();
let x = ~1;
let x_in_parent = ptr::addr_of(&(*x)) as uint;
do spawnfn |move x| {
let x_in_child = ptr::addr_of(&(*x)) as uint;
oldcomm::send(ch, x_in_child);
ch.send(x_in_child);
}
let x_in_child = oldcomm::recv(p);
let x_in_child = p.recv();
assert x_in_parent == x_in_child;
}
@ -1050,20 +1039,18 @@ fn test_avoid_copying_the_body_unlinked() {
#[test]
fn test_platform_thread() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
do task().sched_mode(PlatformThread).spawn {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]
#[ignore(cfg(windows))]
#[should_fail]
fn test_unkillable() {
let po = oldcomm::Port();
let ch = po.chan();
let (po, ch) = stream();
// We want to do this after failing
do spawn_unlinked {

View File

@ -74,9 +74,8 @@
#[warn(deprecated_mode)];
use cast;
use oldcomm;
use option;
use pipes::{Chan, Port};
use pipes::{stream, Chan, Port};
use pipes;
use prelude::*;
use private;
@ -667,12 +666,11 @@ fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
#[test]
fn test_spawn_raw_simple() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let (po, ch) = stream();
do spawn_raw(default_task_opts()) {
oldcomm::send(ch, ());
ch.send(());
}
oldcomm::recv(po);
po.recv();
}
#[test]