From bc0c5bb447ac8eba0a80599305c23db6c8a1ed6a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 22 Jan 2013 12:38:08 -0800 Subject: [PATCH] core: Stop using oldcomm --- src/libcore/private.rs | 1 - src/libcore/run.rs | 13 +++-- src/libcore/task/mod.rs | 119 +++++++++++++++++--------------------- src/libcore/task/spawn.rs | 10 ++-- 4 files changed, 64 insertions(+), 79 deletions(-) diff --git a/src/libcore/private.rs b/src/libcore/private.rs index 03207330f31..23268b1b778 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -18,7 +18,6 @@ use cast; use iter; use libc; -use oldcomm; use option; use pipes; use prelude::*; diff --git a/src/libcore/run.rs b/src/libcore/run.rs index 54bce77d308..07071e94892 100644 --- a/src/libcore/run.rs +++ b/src/libcore/run.rs @@ -17,7 +17,7 @@ use io::ReaderUtil; use libc; use libc::{pid_t, c_void, c_int}; -use oldcomm; +use pipes::{stream, SharedChan}; use option::{Some, None}; use os; use prelude::*; @@ -333,22 +333,23 @@ pub fn program_output(prog: &str, args: &[~str]) -> // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); + let ch_clone = ch.clone(); do task::spawn_sched(task::SingleThreaded) { let errput = readclose(pipe_err.in); - oldcomm::send(ch, (2, move errput)); + ch.send((2, move errput)); }; do task::spawn_sched(task::SingleThreaded) { let output = readclose(pipe_out.in); - oldcomm::send(ch, (1, move output)); + ch_clone.send((1, move output)); }; let status = run::waitpid(pid); let mut errs = ~""; let mut outs = ~""; let mut count = 2; while count > 0 { - let stream = oldcomm::recv(p); + let stream = p.recv(); match stream { (1, copy s) => { outs = move s; diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index 86d38a18c50..315a2843af6 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -43,10 +43,9 @@ use cmp::Eq; use iter; use libc; -use oldcomm; use option; use result::Result; -use pipes::{stream, Chan, Port}; +use pipes::{stream, Chan, Port, SharedChan}; use pipes; use prelude::*; use ptr; @@ -427,18 +426,17 @@ fn spawn_with(arg: A, f: fn~(v: A)) { * Fails if a future_result was already set for this task. */ fn try(f: fn~() -> T) -> Result { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::(); let mut result = None; let fr_task_builder = self.future_result(|+r| { result = Some(move r); }); - do fr_task_builder.spawn |move f| { - oldcomm::send(ch, f()); + do fr_task_builder.spawn |move f, move ch| { + ch.send(f()); } match option::unwrap(move result).recv() { - Success => result::Ok(oldcomm::recv(po)), + Success => result::Ok(po.recv()), Failure => result::Err(()) } } @@ -665,17 +663,18 @@ fn test_cant_dup_task_builder() { #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); + let ch = SharedChan(ch); do spawn_unlinked { + let ch = ch.clone(); do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. for iter::repeat(16) { task::yield(); } - oldcomm::send(ch, ()); // If killed first, grandparent hangs. + ch.send(()); // If killed first, grandparent hangs. } fail; // Shouldn't kill either (grand)parent or (grand)child. } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails @@ -695,8 +694,7 @@ fn test_spawn_unlinked_sup_fail_down() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). @@ -714,7 +712,7 @@ fn test_spawn_unlinked_sup_fail_down() { .. b0 }; do b1.spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails @@ -738,11 +736,10 @@ fn test_spawn_unlinked_sup_fail_down() { } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Default options are to spawn linked & unsupervised. do spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails @@ -810,27 +807,25 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_run_basic() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); do task().spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] fn test_add_wrapper() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); let b0 = task(); let b1 = do b0.add_wrapper |body| { fn~(move body) { body(); - oldcomm::send(ch, ()); + ch.send(()); } }; do b1.spawn { } - oldcomm::recv(po); + po.recv(); } #[test] @@ -883,10 +878,10 @@ fn test_spawn_sched_no_threads() { #[test] fn test_spawn_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); + let ch = SharedChan(ch); - fn f(i: int, ch: oldcomm::Chan<()>) { + fn f(i: int, ch: SharedChan<()>) { let parent_sched_id = rt::rust_get_sched_id(); do spawn_sched(SingleThreaded) { @@ -894,21 +889,20 @@ fn f(i: int, ch: oldcomm::Chan<()>) { assert parent_sched_id != child_sched_id; if (i == 0) { - oldcomm::send(ch, ()); + ch.send(()); } else { - f(i - 1, ch); + f(i - 1, ch.clone()); } }; } f(10, ch); - oldcomm::recv(po); + po.recv(); } #[test] fn test_spawn_sched_childs_on_default_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); // Assuming tests run on the default scheduler let default_id = rt::rust_get_sched_id(); @@ -919,11 +913,11 @@ fn test_spawn_sched_childs_on_default_sched() { let child_sched_id = rt::rust_get_sched_id(); assert parent_sched_id != child_sched_id; assert child_sched_id == default_id; - oldcomm::send(ch, ()); + ch.send(()); }; }; - oldcomm::recv(po); + po.recv(); } #[nolink] @@ -945,10 +939,8 @@ fn test_spawn_sched_blocking() { // without affecting other schedulers for iter::repeat(20u) { - let start_po = oldcomm::Port(); - let start_ch = oldcomm::Chan(&start_po); - let fin_po = oldcomm::Port(); - let fin_ch = oldcomm::Chan(&fin_po); + let (start_po, start_ch) = stream(); + let (fin_po, fin_ch) = stream(); let lock = testrt::rust_dbg_lock_create(); @@ -956,44 +948,42 @@ fn test_spawn_sched_blocking() { unsafe { testrt::rust_dbg_lock_lock(lock); - oldcomm::send(start_ch, ()); + start_ch.send(()); // Block the scheduler thread testrt::rust_dbg_lock_wait(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::send(fin_ch, ()); + fin_ch.send(()); } }; // Wait until the other task has its lock - oldcomm::recv(start_po); + start_po.recv(); - fn pingpong(po: oldcomm::Port, ch: oldcomm::Chan) { + fn pingpong(po: &Port, ch: &Chan) { let mut val = 20; while val > 0 { - val = oldcomm::recv(po); - oldcomm::send(ch, val - 1); + val = po.recv(); + ch.send(val - 1); } } - let setup_po = oldcomm::Port(); - let setup_ch = oldcomm::Chan(&setup_po); - let parent_po = oldcomm::Port(); - let parent_ch = oldcomm::Chan(&parent_po); + let (setup_po, setup_ch) = stream(); + let (parent_po, parent_ch) = stream(); do spawn { - let child_po = oldcomm::Port(); - oldcomm::send(setup_ch, oldcomm::Chan(&child_po)); - pingpong(child_po, parent_ch); + let (child_po, child_ch) = stream(); + setup_ch.send(child_ch); + pingpong(&child_po, &parent_ch); }; - let child_ch = oldcomm::recv(setup_po); - oldcomm::send(child_ch, 20); - pingpong(parent_po, child_ch); + let child_ch = setup_po.recv(); + child_ch.send(20); + pingpong(&parent_po, &child_ch); testrt::rust_dbg_lock_lock(lock); testrt::rust_dbg_lock_signal(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::recv(fin_po); + fin_po.recv(); testrt::rust_dbg_lock_destroy(lock); } } @@ -1001,18 +991,17 @@ fn pingpong(po: oldcomm::Port, ch: oldcomm::Chan) { #[cfg(test)] fn avoid_copying_the_body(spawnfn: fn(v: fn~())) { - let p = oldcomm::Port::(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream::(); let x = ~1; let x_in_parent = ptr::addr_of(&(*x)) as uint; do spawnfn |move x| { let x_in_child = ptr::addr_of(&(*x)) as uint; - oldcomm::send(ch, x_in_child); + ch.send(x_in_child); } - let x_in_child = oldcomm::recv(p); + let x_in_child = p.recv(); assert x_in_parent == x_in_child; } @@ -1050,20 +1039,18 @@ fn test_avoid_copying_the_body_unlinked() { #[test] fn test_platform_thread() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do task().sched_mode(PlatformThread).spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] #[should_fail] fn test_unkillable() { - let po = oldcomm::Port(); - let ch = po.chan(); + let (po, ch) = stream(); // We want to do this after failing do spawn_unlinked { diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index e3afa7c4535..a844542c214 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -74,9 +74,8 @@ #[warn(deprecated_mode)]; use cast; -use oldcomm; use option; -use pipes::{Chan, Port}; +use pipes::{stream, Chan, Port}; use pipes; use prelude::*; use private; @@ -667,12 +666,11 @@ fn new_task_in_sched(opts: SchedOpts) -> *rust_task { #[test] fn test_spawn_raw_simple() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do spawn_raw(default_task_opts()) { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test]