rollup merge of #24385: aturon/unstable-scoped

Conflicts:
	src/libstd/thread/mod.rs
	src/test/bench/shootout-mandelbrot.rs
	src/test/bench/shootout-reverse-complement.rs
	src/test/run-pass/capturing-logging.rs
	src/test/run-pass/issue-9396.rs
	src/test/run-pass/tcp-accept-stress.rs
	src/test/run-pass/tcp-connect-timeouts.rs
	src/test/run-pass/tempfile.rs
This commit is contained in:
Alex Crichton 2015-04-14 10:59:55 -07:00
commit b9d9a376ea
51 changed files with 544 additions and 138 deletions

View File

@ -389,6 +389,7 @@ safe concurrent programs.
Here's an example of a concurrent Rust program:
```{rust}
# #![feature(scoped)]
use std::thread;
fn main() {
@ -421,6 +422,7 @@ problem.
Let's see an example. This Rust code will not compile:
```{rust,ignore}
# #![feature(scoped)]
use std::thread;
fn main() {
@ -467,6 +469,7 @@ that our mutation doesn't cause a data race.
Here's what using a Mutex looks like:
```{rust}
# #![feature(scoped)]
use std::thread;
use std::sync::Mutex;
@ -527,6 +530,7 @@ As an example, Rust's ownership system is _entirely_ at compile time. The
safety check that makes this an error about moved values:
```{rust,ignore}
# #![feature(scoped)]
use std::thread;
fn main() {

View File

@ -56,67 +56,34 @@ place!
## Threads
Rust's standard library provides a library for 'threads', which allow you to
Rust's standard library provides a library for threads, which allow you to
run Rust code in parallel. Here's a basic example of using `std::thread`:
```
use std::thread;
fn main() {
thread::scoped(|| {
println!("Hello from a thread!");
});
}
```
The `thread::scoped()` method accepts a closure, which is executed in a new
thread. It's called `scoped` because this thread returns a join guard:
```
use std::thread;
fn main() {
let guard = thread::scoped(|| {
println!("Hello from a thread!");
});
// guard goes out of scope here
}
```
When `guard` goes out of scope, it will block execution until the thread is
finished. If we didn't want this behaviour, we could use `thread::spawn()`:
```
use std::thread;
fn main() {
thread::spawn(|| {
println!("Hello from a thread!");
});
thread::sleep_ms(50);
}
```
We need to `sleep` here because when `main()` ends, it kills all of the
running threads.
The `thread::spawn()` method accepts a closure, which is executed in a
new thread. It returns a handle to the thread, that can be used to
wait for the child thread to finish and extract its result:
[`scoped`](std/thread/struct.Builder.html#method.scoped) has an interesting
type signature:
```text
fn scoped<'a, T, F>(self, f: F) -> JoinGuard<'a, T>
where T: Send + 'a,
F: FnOnce() -> T,
F: Send + 'a
```
use std::thread;
Specifically, `F`, the closure that we pass to execute in the new thread. It
has two restrictions: It must be a `FnOnce` from `()` to `T`. Using `FnOnce`
allows the closure to take ownership of any data it mentions from the parent
thread. The other restriction is that `F` must be `Send`. We aren't allowed to
transfer this ownership unless the type thinks that's okay.
fn main() {
let handle = thread::spawn(|| {
"Hello from a thread!"
});
println!("{}", handle.join().unwrap());
}
```
Many languages have the ability to execute threads, but it's wildly unsafe.
There are entire books about how to prevent errors that occur from shared

View File

@ -130,10 +130,10 @@ struct Output {
pub fn main() {
const STACK_SIZE: usize = 32000000; // 32MB
let res = std::thread::Builder::new().stack_size(STACK_SIZE).scoped(move || {
let res = std::thread::Builder::new().stack_size(STACK_SIZE).spawn(move || {
let s = env::args().collect::<Vec<_>>();
main_args(&s)
}).unwrap().join();
}).unwrap().join().unwrap();
env::set_exit_status(res as i32);
}

View File

@ -67,15 +67,35 @@
//! thread. This means that it can outlive its parent (the thread that spawned
//! it), unless this parent is the main thread.
//!
//! ## Scoped threads
//!
//! Often a parent thread uses a child thread to perform some particular task,
//! and at some point must wait for the child to complete before continuing.
//! For this scenario, use the `thread::scoped` function:
//! The parent thread can also wait on the completion of the child
//! thread; a call to `spawn` produces a `JoinHandle`, which provides
//! a `join` method for waiting:
//!
//! ```rust
//! use std::thread;
//!
//! let child = thread::spawn(move || {
//! // some work here
//! });
//! // some work here
//! let res = child.join();
//! ```
//!
//! The `join` method returns a `Result` containing `Ok` of the final
//! value produced by the child thread, or `Err` of the value given to
//! a call to `panic!` if the child panicked.
//!
//! ## Scoped threads
//!
//! The `spawn` method does not allow the child and parent threads to
//! share any stack data, since that is not safe in general. However,
//! `scoped` makes it possible to share the parent's stack by forcing
//! a join before any relevant stack frames are popped:
//!
//! ```rust
//! # #![feature(scoped)]
//! use std::thread;
//!
//! let guard = thread::scoped(move || {
//! // some work here
//! });
@ -253,8 +273,8 @@ impl Builder {
/// `io::Result` to capture any failure to create the thread at
/// the OS level.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F>(self, f: F) -> io::Result<JoinHandle> where
F: FnOnce(), F: Send + 'static
pub fn spawn<F, T>(self, f: F) -> io::Result<JoinHandle<T>> where
F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
{
self.spawn_inner(Box::new(f)).map(|i| JoinHandle(i))
}
@ -274,7 +294,8 @@ impl Builder {
/// Unlike the `scoped` free function, this method yields an
/// `io::Result` to capture any failure to create the thread at
/// the OS level.
#[stable(feature = "rust1", since = "1.0.0")]
#[unstable(feature = "scoped",
reason = "memory unsafe if destructor is avoided, see #24292")]
pub fn scoped<'a, T, F>(self, f: F) -> io::Result<JoinGuard<'a, T>> where
T: Send + 'a, F: FnOnce() -> T, F: Send + 'a
{
@ -370,7 +391,9 @@ impl Builder {
/// Panics if the OS fails to create a thread; use `Builder::spawn`
/// to recover from such errors.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F>(f: F) -> JoinHandle where F: FnOnce(), F: Send + 'static {
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
{
Builder::new().spawn(f).unwrap()
}
@ -387,7 +410,8 @@ pub fn spawn<F>(f: F) -> JoinHandle where F: FnOnce(), F: Send + 'static {
///
/// Panics if the OS fails to create a thread; use `Builder::scoped`
/// to recover from such errors.
#[stable(feature = "rust1", since = "1.0.0")]
#[unstable(feature = "scoped",
reason = "memory unsafe if destructor is avoided, see #24292")]
pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where
T: Send + 'a, F: FnOnce() -> T, F: Send + 'a
{
@ -635,9 +659,9 @@ impl<T> JoinInner<T> {
/// handle: the ability to join a child thread is a uniquely-owned
/// permission.
#[stable(feature = "rust1", since = "1.0.0")]
pub struct JoinHandle(JoinInner<()>);
pub struct JoinHandle<T>(JoinInner<T>);
impl JoinHandle {
impl<T> JoinHandle<T> {
/// Extracts a handle to the underlying thread
#[stable(feature = "rust1", since = "1.0.0")]
pub fn thread(&self) -> &Thread {
@ -649,13 +673,14 @@ impl JoinHandle {
/// If the child thread panics, `Err` is returned with the parameter given
/// to `panic`.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn join(mut self) -> Result<()> {
pub fn join(mut self) -> Result<T> {
self.0.join()
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl Drop for JoinHandle {
#[unsafe_destructor]
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if !self.0.joined {
unsafe { imp::detach(self.0.native) }
@ -674,7 +699,8 @@ impl Drop for JoinHandle {
/// handle: the ability to join a child thread is a uniquely-owned
/// permission.
#[must_use = "thread will be immediately joined if `JoinGuard` is not used"]
#[stable(feature = "rust1", since = "1.0.0")]
#[unstable(feature = "scoped",
reason = "memory unsafe if destructor is avoided, see #24292")]
pub struct JoinGuard<'a, T: Send + 'a> {
inner: JoinInner<T>,
_marker: PhantomData<&'a T>,
@ -706,7 +732,8 @@ impl<'a, T: Send + 'a> JoinGuard<'a, T> {
}
#[unsafe_destructor]
#[stable(feature = "rust1", since = "1.0.0")]
#[unstable(feature = "scoped",
reason = "memory unsafe if destructor is avoided, see #24292")]
impl<'a, T: Send + 'a> Drop for JoinGuard<'a, T> {
fn drop(&mut self) {
if !self.inner.joined {

View File

@ -111,11 +111,11 @@ fn main() {
let messages = (min_depth..max_depth + 1).step_by(2).map(|depth| {
use std::num::Int;
let iterations = 2.pow((max_depth - depth + min_depth) as u32);
thread::scoped(move || inner(depth, iterations))
thread::spawn(move || inner(depth, iterations))
}).collect::<Vec<_>>();
for message in messages {
println!("{}", message.join());
println!("{}", message.join().unwrap());
}
println!("long lived tree of depth {}\t check: {}",

View File

@ -166,7 +166,7 @@ fn fannkuch(n: i32) -> (i32, i32) {
for (_, j) in (0..N).zip((0..).step_by(k)) {
let max = cmp::min(j+k, perm.max());
futures.push(thread::scoped(move|| {
futures.push(thread::spawn(move|| {
work(perm, j as usize, max as usize)
}))
}
@ -174,7 +174,7 @@ fn fannkuch(n: i32) -> (i32, i32) {
let mut checksum = 0;
let mut maxflips = 0;
for fut in futures {
let (cs, mf) = fut.join();
let (cs, mf) = fut.join().unwrap();
checksum += cs;
maxflips = cmp::max(maxflips, mf);
}

View File

@ -307,17 +307,17 @@ fn main() {
let nb_freqs: Vec<_> = (1..3).map(|i| {
let input = input.clone();
(i, thread::scoped(move|| generate_frequencies(&input, i)))
(i, thread::spawn(move|| generate_frequencies(&input, i)))
}).collect();
let occ_freqs: Vec<_> = OCCURRENCES.iter().map(|&occ| {
let input = input.clone();
thread::scoped(move|| generate_frequencies(&input, occ.len()))
thread::spawn(move|| generate_frequencies(&input, occ.len()))
}).collect();
for (i, freq) in nb_freqs {
print_frequencies(&freq.join(), i);
print_frequencies(&freq.join().unwrap(), i);
}
for (&occ, freq) in OCCURRENCES.iter().zip(occ_freqs.into_iter()) {
print_occurrences(&mut freq.join(), occ);
print_occurrences(&mut freq.join().unwrap(), occ);
}
}

View File

@ -81,7 +81,7 @@ fn mandelbrot<W: Write>(w: usize, mut out: W) -> io::Result<()> {
let mut precalc_i = Vec::with_capacity(h);
let precalc_futures = (0..WORKERS).map(|i| {
thread::scoped(move|| {
thread::spawn(move|| {
let mut rs = Vec::with_capacity(w / WORKERS);
let mut is = Vec::with_capacity(w / WORKERS);
@ -107,7 +107,7 @@ fn mandelbrot<W: Write>(w: usize, mut out: W) -> io::Result<()> {
}).collect::<Vec<_>>();
for res in precalc_futures {
let (rs, is) = res.join();
let (rs, is) = res.join().unwrap();
precalc_r.extend(rs.into_iter());
precalc_i.extend(is.into_iter());
}
@ -122,7 +122,7 @@ fn mandelbrot<W: Write>(w: usize, mut out: W) -> io::Result<()> {
let vec_init_r = arc_init_r.clone();
let vec_init_i = arc_init_i.clone();
thread::scoped(move|| {
thread::spawn(move|| {
let mut res: Vec<u8> = Vec::with_capacity((chunk_size * w) / 8);
let init_r_slice = vec_init_r;
@ -143,7 +143,7 @@ fn mandelbrot<W: Write>(w: usize, mut out: W) -> io::Result<()> {
try!(writeln!(&mut out, "P4\n{} {}", w, h));
for res in data {
try!(out.write_all(&res.join()));
try!(out.write_all(&res.join().unwrap()));
}
out.flush()
}

View File

@ -40,7 +40,7 @@
// ignore-android see #10393 #13206
#![feature(libc)]
#![feature(libc, scoped)]
extern crate libc;

View File

@ -41,7 +41,7 @@
// no-pretty-expanded FIXME #15189
#![allow(non_snake_case)]
#![feature(unboxed_closures, core, os)]
#![feature(unboxed_closures, core, os, scoped)]
use std::iter::repeat;
use std::thread;

View File

@ -13,9 +13,9 @@
use std::thread::Builder;
fn main() {
let r: () = Builder::new().name("owned name".to_string()).scoped(move|| {
let r: () = Builder::new().name("owned name".to_string()).spawn(move|| {
panic!("test");
()
}).unwrap().join();
}).unwrap().join().unwrap();
panic!();
}

View File

@ -37,7 +37,7 @@ fn r(x:isize) -> r {
fn main() {
error!("whatever");
let _t = thread::scoped(move|| {
let _t = thread::spawn(move|| {
let _i = r(5);
});
panic!();

View File

@ -27,7 +27,7 @@ fn main(){
if env::args().count() == 2 {
let barrier = sync::Arc::new(sync::Barrier::new(2));
let tbarrier = barrier.clone();
let t = thread::scoped(||{
let t = thread::spawn(move || {
tbarrier.wait();
do_print(1);
});

View File

@ -22,8 +22,8 @@ struct Pair {
pub fn main() {
let z: Box<_> = box Pair { a : 10, b : 12};
let _t = thread::scoped(move|| {
thread::spawn(move|| {
assert_eq!(z.a, 10);
assert_eq!(z.b, 12);
});
}).join();
}

View File

@ -15,11 +15,12 @@ use std::sync::mpsc::{channel, Sender};
pub fn main() {
let (tx, rx) = channel();
let _t = thread::scoped(move|| { child(&tx) });
let t = thread::spawn(move|| { child(&tx) });
let y = rx.recv().unwrap();
println!("received");
println!("{}", y);
assert_eq!(y, 10);
t.join();
}
fn child(c: &Sender<isize>) {

View File

@ -42,7 +42,7 @@ fn count(n: libc::uintptr_t) -> libc::uintptr_t {
pub fn main() {
// Make sure we're on a task with small Rust stacks (main currently
// has a large stack)
thread::scoped(move|| {
thread::spawn(move|| {
let result = count(1000);
println!("result = {}", result);
assert_eq!(result, 1000);

View File

@ -46,9 +46,9 @@ fn count(n: libc::uintptr_t) -> libc::uintptr_t {
pub fn main() {
// Make sure we're on a task with small Rust stacks (main currently
// has a large stack)
let _t = thread::scoped(move|| {
thread::spawn(move|| {
let result = count(12);
println!("result = {}", result);
assert_eq!(result, 2048);
});
}).join();
}

View File

@ -34,14 +34,12 @@ fn main() {
fn parent() {
let file = File::open("Makefile").unwrap();
let _dir = fs::read_dir("/").unwrap();
let tcp1 = TcpListener::bind("127.0.0.1:0").unwrap();
assert_eq!(tcp1.as_raw_fd(), file.as_raw_fd() + 2);
let tcp2 = tcp1.try_clone().unwrap();
let addr = tcp1.local_addr().unwrap();
let t = thread::scoped(|| TcpStream::connect(addr).unwrap());
let t = thread::spawn(move || TcpStream::connect(addr).unwrap());
let tcp3 = tcp1.accept().unwrap().0;
let tcp4 = t.join();
let tcp4 = t.join().unwrap();
let tcp5 = tcp3.try_clone().unwrap();
let tcp6 = tcp4.try_clone().unwrap();
let udp1 = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -49,7 +47,6 @@ fn parent() {
let status = Command::new(env::args().next().unwrap())
.arg(file.as_raw_fd().to_string())
.arg((file.as_raw_fd() + 1).to_string())
.arg(tcp1.as_raw_fd().to_string())
.arg(tcp2.as_raw_fd().to_string())
.arg(tcp3.as_raw_fd().to_string())

View File

@ -26,7 +26,7 @@ const SIZE: usize = 1024 * 1024;
fn main() {
// do the test in a new thread to avoid (spurious?) stack overflows
let _ = thread::scoped(|| {
thread::spawn(|| {
let _memory: [u8; SIZE] = unsafe { init() };
}).join();
}

View File

@ -26,7 +26,7 @@ fn helper(rx: Receiver<Sender<()>>) {
fn main() {
let (tx, rx) = channel();
let _t = thread::scoped(move|| { helper(rx) });
let t = thread::spawn(move|| { helper(rx) });
let (snd, rcv) = channel::<isize>();
for _ in 1..100000 {
snd.send(1).unwrap();
@ -38,4 +38,5 @@ fn main() {
}
}
drop(tx);
t.join();
}

View File

@ -13,11 +13,11 @@
use std::thread;
fn _foo() {
let _t = thread::scoped(move || { // no need for -> ()
thread::spawn(move || { // no need for -> ()
loop {
println!("hello");
}
});
}).join();
}
fn main() {}

View File

@ -23,7 +23,7 @@ enum Msg
}
fn foo(name: String, samples_chan: Sender<Msg>) {
let _t = thread::scoped(move|| {
thread::spawn(move|| {
let mut samples_chan = samples_chan;
// FIXME (#22405): Replace `Box::new` with `box` here when/if possible.
@ -34,7 +34,7 @@ fn foo(name: String, samples_chan: Sender<Msg>) {
});
samples_chan.send(Msg::GetSamples(name.clone(), callback));
});
}).join();
}
pub fn main() {}

View File

@ -13,7 +13,7 @@ use std::thread;
pub fn main() {
let (tx, rx) = channel();
let _t = thread::scoped(move||{
let t = thread::spawn(move||{
thread::sleep_ms(10);
tx.send(()).unwrap();
});
@ -24,4 +24,5 @@ pub fn main() {
Err(TryRecvError::Disconnected) => unreachable!()
}
}
t.join();
}

View File

@ -23,9 +23,10 @@ fn producer(tx: &Sender<Vec<u8>>) {
pub fn main() {
let (tx, rx) = channel::<Vec<u8>>();
let _prod = thread::scoped(move|| {
let prod = thread::spawn(move|| {
producer(&tx)
});
let _data: Vec<u8> = rx.recv().unwrap();
prod.join();
}

View File

@ -18,12 +18,13 @@ fn foo() {
// Here, i is *copied* into the proc (heap closure).
// Requires allocation. The proc's copy is not mutable.
let mut i = 0;
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
user(i);
println!("spawned {}", i)
});
i += 1;
println!("original {}", i)
println!("original {}", i);
t.join();
}
fn bar() {
@ -31,10 +32,11 @@ fn bar() {
// mutable outside of the proc.
let mut i = 0;
while i < 10 {
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
user(i);
});
i += 1;
t.join();
}
}
@ -42,12 +44,13 @@ fn car() {
// Here, i must be shadowed in the proc to be mutable.
let mut i = 0;
while i < 10 {
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
let mut i = i;
i += 1;
user(i);
});
i += 1;
t.join();
}
}

View File

@ -14,7 +14,7 @@ use std::thread;
pub fn main() {
let x = "Hello world!".to_string();
let _t = thread::scoped(move|| {
thread::spawn(move|| {
println!("{}", x);
});
}).join();
}

View File

@ -37,7 +37,7 @@ fn recurse() {
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() > 1 && args[1] == "recurse" {
let _t = thread::scoped(recurse);
thread::spawn(recurse).join();
} else {
let recurse = Command::new(&args[0]).arg("recurse").output().unwrap();
assert!(!recurse.status.success());

View File

@ -40,7 +40,7 @@ impl log::Logger for ChannelLogger {
pub fn main() {
let (logger, rx) = ChannelLogger::new();
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
log::set_logger(logger);
info!("foo");
@ -53,4 +53,6 @@ pub fn main() {
assert_eq!(rx.recv().unwrap(), "foo bar");
assert_eq!(rx.recv().unwrap(), "bar foo");
assert!(rx.recv().is_err());
t.join();
}

View File

@ -10,7 +10,7 @@
// pretty-expanded FIXME #23616
#![feature(core, std_misc)]
#![feature(core, std_misc, scoped)]
use std::thread;
use std::sync::Mutex;
@ -25,7 +25,6 @@ fn par_for<I, F>(iter: I, f: F)
f(elem)
})
}).collect();
}
fn sum(x: &[i32]) {

View File

@ -32,7 +32,7 @@ fn test(f: isize) -> test {
pub fn main() {
let (tx, rx) = channel();
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
let (tx2, rx2) = channel();
tx.send(tx2).unwrap();
@ -40,4 +40,6 @@ pub fn main() {
});
rx.recv().unwrap().send(test(42)).unwrap();
t.join();
}

View File

@ -16,13 +16,16 @@ fn x(s: String, n: isize) {
}
pub fn main() {
let _t = thread::scoped(|| x("hello from first spawned fn".to_string(), 65) );
let _t = thread::scoped(|| x("hello from second spawned fn".to_string(), 66) );
let _t = thread::scoped(|| x("hello from third spawned fn".to_string(), 67) );
let t1 = thread::spawn(|| x("hello from first spawned fn".to_string(), 65) );
let t2 = thread::spawn(|| x("hello from second spawned fn".to_string(), 66) );
let t3 = thread::spawn(|| x("hello from third spawned fn".to_string(), 67) );
let mut i = 30;
while i > 0 {
i = i - 1;
println!("parent sleeping");
thread::yield_now();
}
t1.join();
t2.join();
t3.join();
}

View File

@ -26,7 +26,7 @@ fn test05_start(tx : &Sender<isize>) {
fn test05() {
let (tx, rx) = channel();
let _t = thread::scoped(move|| { test05_start(&tx) });
let t = thread::spawn(move|| { test05_start(&tx) });
let mut value: isize = rx.recv().unwrap();
println!("{}", value);
value = rx.recv().unwrap();
@ -34,4 +34,5 @@ fn test05() {
value = rx.recv().unwrap();
println!("{}", value);
assert_eq!(value, 30);
t.join();
}

View File

@ -17,6 +17,6 @@ pub fn main() { test00(); }
fn start() { println!("Started / Finished task."); }
fn test00() {
let _ = thread::scoped(move|| start() ).join();
thread::spawn(move|| start() ).join();
println!("Completing.");
}

View File

@ -29,10 +29,12 @@ fn start(tx: &Sender<Sender<String>>) {
pub fn main() {
let (tx, rx) = channel();
let _child = thread::scoped(move|| { start(&tx) });
let child = thread::spawn(move|| { start(&tx) });
let mut c = rx.recv().unwrap();
c.send("A".to_string()).unwrap();
c.send("B".to_string()).unwrap();
thread::yield_now();
child.join();
}

View File

@ -22,8 +22,9 @@ fn start(tx: &Sender<Sender<isize>>) {
pub fn main() {
let (tx, rx) = channel();
let _child = thread::scoped(move|| {
let child = thread::spawn(move|| {
start(&tx)
});
let _tx = rx.recv().unwrap();
child.join();
}

View File

@ -18,7 +18,7 @@ fn start(_task_number: isize) { println!("Started / Finished task."); }
fn test00() {
let i: isize = 0;
let mut result = thread::scoped(move|| {
let mut result = thread::spawn(move|| {
start(i)
});

View File

@ -21,6 +21,6 @@ fn start(tx: &Sender<isize>, start: isize, number_of_messages: isize) {
pub fn main() {
println!("Check that we don't deadlock.");
let (tx, rx) = channel();
let _t = thread::scoped(move|| { start(&tx, 0, 10) }).join();
let _ = thread::spawn(move|| { start(&tx, 0, 10) }).join();
println!("Joined task");
}

View File

@ -21,7 +21,7 @@ pub fn main() {
while (i > 0) {
println!("{}", i);
let tx = tx.clone();
thread::scoped({let i = i; move|| { child(i, &tx) }});
thread::spawn({let i = i; move|| { child(i, &tx) }});
i = i - 1;
}

View File

@ -29,8 +29,9 @@ pub fn main() {
// the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash!
let (tx, rx) = channel();
let _t = thread::scoped(move|| {
let t = thread::spawn(move|| {
start(&tx, 10)
});
rx.recv();
t.join();
}

View File

@ -22,5 +22,5 @@ fn f() {
}
pub fn main() {
let _t = thread::scoped(move|| f() ).join();
thread::spawn(move|| f() ).join();
}

View File

@ -42,7 +42,7 @@ fn test00() {
let mut results = Vec::new();
while i < number_of_tasks {
let tx = tx.clone();
results.push(thread::scoped({
results.push(thread::spawn({
let i = i;
move|| {
test00_start(&tx, i, number_of_messages)

View File

@ -30,19 +30,19 @@ fn test00() {
let number_of_messages: isize = 10;
let tx2 = tx.clone();
let _t = thread::scoped(move|| {
let t1 = thread::spawn(move|| {
test00_start(&tx2, number_of_messages * 0, number_of_messages);
});
let tx2 = tx.clone();
let _t = thread::scoped(move|| {
let t2 = thread::spawn(move|| {
test00_start(&tx2, number_of_messages * 1, number_of_messages);
});
let tx2 = tx.clone();
let _t = thread::scoped(move|| {
let t3 = thread::spawn(move|| {
test00_start(&tx2, number_of_messages * 2, number_of_messages);
});
let tx2 = tx.clone();
let _t = thread::scoped(move|| {
let t4 = thread::spawn(move|| {
test00_start(&tx2, number_of_messages * 3, number_of_messages);
});
@ -60,4 +60,9 @@ fn test00() {
}
assert_eq!(sum, number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
t1.join();
t2.join();
t3.join();
t4.join();
}

View File

@ -26,7 +26,7 @@ fn test00() {
let (tx, rx) = channel();
let number_of_messages: isize = 10;
let result = thread::scoped(move|| {
let result = thread::spawn(move|| {
test00_start(&tx, number_of_messages);
});

View File

@ -15,7 +15,7 @@
use std::thread;
pub fn main() {
let _t = thread::scoped(move|| child("Hello".to_string()) );
thread::spawn(move|| child("Hello".to_string()) ).join();
}
fn child(_s: String) {

View File

@ -21,11 +21,13 @@ pub fn main() {
let x: Box<isize> = box 1;
let x_in_parent = &(*x) as *const isize as usize;
let _t = thread::scoped(move || {
let t = thread::spawn(move || {
let x_in_child = &(*x) as *const isize as usize;
tx.send(x_in_child).unwrap();
});
let x_in_child = rx.recv().unwrap();
assert_eq!(x_in_parent, x_in_child);
t.join();
}

View File

@ -0,0 +1,91 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// ignore-macos osx really doesn't like cycling through large numbers of
// sockets as calls to connect() will start returning EADDRNOTAVAIL
// quite quickly and it takes a few seconds for the sockets to get
// recycled.
#![feature(old_io, io, std_misc)]
use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::thread;
static N: usize = 8;
static M: usize = 20;
fn main() {
test();
}
fn test() {
let mut l = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.socket_name().unwrap();
let mut a = l.listen().unwrap();
let cnt = Arc::new(AtomicUsize::new(0));
let (srv_tx, srv_rx) = channel();
let (cli_tx, cli_rx) = channel();
let ts1 = (0..N).map(|_| {
let a = a.clone();
let cnt = cnt.clone();
let srv_tx = srv_tx.clone();
thread::spawn(move|| {
let mut a = a;
loop {
match a.accept() {
Ok(..) => {
if cnt.fetch_add(1, Ordering::SeqCst) == N * M - 1 {
break
}
}
Err(ref e) if e.kind == EndOfFile => break,
Err(e) => panic!("{}", e),
}
}
srv_tx.send(());
})
}).collect::<Vec<_>>();
let ts2 = (0..N).map(|_| {
let cli_tx = cli_tx.clone();
thread::scoped(move|| {
for _ in 0..M {
let _s = TcpStream::connect(addr).unwrap();
}
cli_tx.send(());
})
}).collect::<Vec<_>>();
drop((cli_tx, srv_tx));
// wait for senders
if cli_rx.iter().take(N).count() != N {
a.close_accept().unwrap();
panic!("clients panicked");
}
// wait for one acceptor to die
let _ = srv_rx.recv();
// Notify other receivers should die
a.close_accept().unwrap();
// wait for receivers
assert_eq!(srv_rx.iter().take(N - 1).count(), N - 1);
// Everything should have been accepted.
assert_eq!(cnt.load(Ordering::SeqCst), N * M);
for t in ts1 { t.join() }
for t in ts2 { t.join() }
}

View File

@ -0,0 +1,77 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// ignore-pretty
// compile-flags:--test
// exec-env:RUST_TEST_THREADS=1
// Tests for the connect_timeout() function on a TcpStream. This runs with only
// one test task to ensure that errors are timeouts, not file descriptor
// exhaustion.
#![reexport_test_harness_main = "test_main"]
#![allow(unused_imports)]
#![feature(old_io, std_misc, io)]
use std::old_io::*;
use std::old_io::test::*;
use std::old_io;
use std::time::Duration;
use std::sync::mpsc::channel;
use std::thread;
#[cfg_attr(target_os = "freebsd", ignore)]
fn eventual_timeout() {
let addr = next_test_ip4();
let (tx1, rx1) = channel();
let (_tx2, rx2) = channel::<()>();
let t = thread::spawn(move|| {
let _l = TcpListener::bind(addr).unwrap().listen();
tx1.send(()).unwrap();
let _ = rx2.recv();
});
rx1.recv().unwrap();
let mut v = Vec::new();
for _ in 0_usize..10000 {
match TcpStream::connect_timeout(addr, Duration::milliseconds(100)) {
Ok(e) => v.push(e),
Err(ref e) if e.kind == old_io::TimedOut => return,
Err(e) => panic!("other error: {}", e),
}
}
panic!("never timed out!");
t.join();
}
fn timeout_success() {
let addr = next_test_ip4();
let _l = TcpListener::bind(addr).unwrap().listen();
assert!(TcpStream::connect_timeout(addr, Duration::milliseconds(1000)).is_ok());
}
fn timeout_error() {
let addr = next_test_ip4();
assert!(TcpStream::connect_timeout(addr, Duration::milliseconds(1000)).is_err());
}
fn connect_timeout_zero() {
let addr = next_test_ip4();
assert!(TcpStream::connect_timeout(addr, Duration::milliseconds(0)).is_err());
}
fn connect_timeout_negative() {
let addr = next_test_ip4();
assert!(TcpStream::connect_timeout(addr, Duration::milliseconds(-1)).is_err());
}

View File

@ -0,0 +1,213 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// ignore-windows TempDir may cause IoError on windows: #10463
// These tests are here to exercise the functionality of the `tempfile` module.
// One might expect these tests to be located in that module, but sadly they
// cannot. The tests need to invoke `os::change_dir` which cannot be done in the
// normal test infrastructure. If the tests change the current working
// directory, then *all* tests which require relative paths suddenly break b/c
// they're in a different location than before. Hence, these tests are all run
// serially here.
#![feature(old_io, old_path, os, old_fs)]
use std::old_path::{Path, GenericPath};
use std::old_io::fs::PathExtensions;
use std::old_io::{fs, TempDir};
use std::old_io;
use std::env;
use std::sync::mpsc::channel;
use std::thread;
fn test_tempdir() {
let path = {
let p = TempDir::new_in(&Path::new("."), "foobar").unwrap();
let p = p.path();
assert!(p.as_str().unwrap().contains("foobar"));
p.clone()
};
assert!(!path.exists());
}
fn test_rm_tempdir() {
let (tx, rx) = channel();
let f = move|| -> () {
let tmp = TempDir::new("test_rm_tempdir").unwrap();
tx.send(tmp.path().clone()).unwrap();
panic!("panic to unwind past `tmp`");
};
thread::spawn(f).join();
let path = rx.recv().unwrap();
assert!(!path.exists());
let tmp = TempDir::new("test_rm_tempdir").unwrap();
let path = tmp.path().clone();
let f = move|| -> () {
let _tmp = tmp;
panic!("panic to unwind past `tmp`");
};
thread::spawn(f).join();
assert!(!path.exists());
let path;
{
let f = move || {
TempDir::new("test_rm_tempdir").unwrap()
};
// FIXME(#16640) `: TempDir` annotation shouldn't be necessary
let tmp: TempDir = thread::spawn(f).join().unwrap();
path = tmp.path().clone();
assert!(path.exists());
}
assert!(!path.exists());
let path;
{
let tmp = TempDir::new("test_rm_tempdir").unwrap();
path = tmp.into_inner();
}
assert!(path.exists());
fs::rmdir_recursive(&path);
assert!(!path.exists());
}
fn test_rm_tempdir_close() {
let (tx, rx) = channel();
let f = move|| -> () {
let tmp = TempDir::new("test_rm_tempdir").unwrap();
tx.send(tmp.path().clone()).unwrap();
tmp.close();
panic!("panic when unwinding past `tmp`");
};
thread::spawn(f).join();
let path = rx.recv().unwrap();
assert!(!path.exists());
let tmp = TempDir::new("test_rm_tempdir").unwrap();
let path = tmp.path().clone();
let f = move|| -> () {
let tmp = tmp;
tmp.close();
panic!("panic when unwinding past `tmp`");
};
thread::spawn(f).join();
assert!(!path.exists());
let path;
{
let f = move || {
TempDir::new("test_rm_tempdir").unwrap()
};
// FIXME(#16640) `: TempDir` annotation shouldn't be necessary
let tmp: TempDir = thread::spawn(f).join().unwrap();
path = tmp.path().clone();
assert!(path.exists());
tmp.close();
}
assert!(!path.exists());
let path;
{
let tmp = TempDir::new("test_rm_tempdir").unwrap();
path = tmp.into_inner();
}
assert!(path.exists());
fs::rmdir_recursive(&path);
assert!(!path.exists());
}
// Ideally these would be in std::os but then core would need
// to depend on std
fn recursive_mkdir_rel() {
let path = Path::new("frob");
let cwd = Path::new(env::current_dir().unwrap().to_str().unwrap());
println!("recursive_mkdir_rel: Making: {} in cwd {} [{}]", path.display(),
cwd.display(), path.exists());
fs::mkdir_recursive(&path, old_io::USER_RWX);
assert!(path.is_dir());
fs::mkdir_recursive(&path, old_io::USER_RWX);
assert!(path.is_dir());
}
fn recursive_mkdir_dot() {
let dot = Path::new(".");
fs::mkdir_recursive(&dot, old_io::USER_RWX);
let dotdot = Path::new("..");
fs::mkdir_recursive(&dotdot, old_io::USER_RWX);
}
fn recursive_mkdir_rel_2() {
let path = Path::new("./frob/baz");
let cwd = Path::new(env::current_dir().unwrap().to_str().unwrap());
println!("recursive_mkdir_rel_2: Making: {} in cwd {} [{}]", path.display(),
cwd.display(), path.exists());
fs::mkdir_recursive(&path, old_io::USER_RWX);
assert!(path.is_dir());
assert!(path.dir_path().is_dir());
let path2 = Path::new("quux/blat");
println!("recursive_mkdir_rel_2: Making: {} in cwd {}", path2.display(),
cwd.display());
fs::mkdir_recursive(&path2, old_io::USER_RWX);
assert!(path2.is_dir());
assert!(path2.dir_path().is_dir());
}
// Ideally this would be in core, but needs TempFile
pub fn test_rmdir_recursive_ok() {
let rwx = old_io::USER_RWX;
let tmpdir = TempDir::new("test").ok().expect("test_rmdir_recursive_ok: \
couldn't create temp dir");
let tmpdir = tmpdir.path();
let root = tmpdir.join("foo");
println!("making {}", root.display());
fs::mkdir(&root, rwx);
fs::mkdir(&root.join("foo"), rwx);
fs::mkdir(&root.join("foo").join("bar"), rwx);
fs::mkdir(&root.join("foo").join("bar").join("blat"), rwx);
fs::rmdir_recursive(&root);
assert!(!root.exists());
assert!(!root.join("bar").exists());
assert!(!root.join("bar").join("blat").exists());
}
pub fn dont_double_panic() {
let r: Result<(), _> = thread::spawn(move|| {
let tmpdir = TempDir::new("test").unwrap();
// Remove the temporary directory so that TempDir sees
// an error on drop
fs::rmdir(tmpdir.path());
// Panic. If TempDir panics *again* due to the rmdir
// error then the process will abort.
panic!();
}).join();
assert!(r.is_err());
}
fn in_tmpdir<F>(f: F) where F: FnOnce() {
let tmpdir = TempDir::new("test").ok().expect("can't make tmpdir");
assert!(env::set_current_dir(tmpdir.path().as_str().unwrap()).is_ok());
f();
}
pub fn main() {
in_tmpdir(test_tempdir);
in_tmpdir(test_rm_tempdir);
in_tmpdir(test_rm_tempdir_close);
in_tmpdir(recursive_mkdir_rel);
in_tmpdir(recursive_mkdir_dot);
in_tmpdir(recursive_mkdir_rel_2);
in_tmpdir(test_rmdir_recursive_ok);
in_tmpdir(dont_double_panic);
}

View File

@ -15,7 +15,7 @@ use std::thread;
pub fn main() {
let mut i = 10;
while i > 0 {
thread::scoped({let i = i; move|| child(i)});
thread::spawn({let i = i; move|| child(i)}).join();
i = i - 1;
}
println!("main thread exiting");

View File

@ -83,16 +83,19 @@ pub fn main() {
box dogge2 as Box<Pet+Sync+Send>));
let (tx1, rx1) = channel();
let arc1 = arc.clone();
let _t1 = thread::scoped(move|| { check_legs(arc1); tx1.send(()); });
let t1 = thread::spawn(move|| { check_legs(arc1); tx1.send(()); });
let (tx2, rx2) = channel();
let arc2 = arc.clone();
let _t2 = thread::scoped(move|| { check_names(arc2); tx2.send(()); });
let t2 = thread::spawn(move|| { check_names(arc2); tx2.send(()); });
let (tx3, rx3) = channel();
let arc3 = arc.clone();
let _t3 = thread::scoped(move|| { check_pedigree(arc3); tx3.send(()); });
let t3 = thread::spawn(move|| { check_pedigree(arc3); tx3.send(()); });
rx1.recv();
rx2.recv();
rx3.recv();
t1.join();
t2.join();
t3.join();
}
fn check_legs(arc: Arc<Vec<Box<Pet+Sync+Send>>>) {

View File

@ -23,10 +23,10 @@ pub fn main() {
let (tx, rx) = channel();
let n = 100;
let mut expected = 0;
let _t = (0..n).map(|i| {
let ts = (0..n).map(|i| {
expected += i;
let tx = tx.clone();
thread::scoped(move|| {
thread::spawn(move|| {
child(&tx, i)
})
}).collect::<Vec<_>>();
@ -38,4 +38,6 @@ pub fn main() {
}
assert_eq!(expected, actual);
for t in ts { t.join(); }
}