green: Fixing all tests from previous refactorings

This commit is contained in:
Alex Crichton 2013-12-13 19:33:28 -08:00
parent 1a6d920e3d
commit aad9fbf6b6
6 changed files with 351 additions and 282 deletions

View File

@ -223,3 +223,61 @@ impl Drop for BasicPausable {
}
}
}
#[cfg(test)]
mod test {
use std::task::TaskOpts;
use basic;
use PoolConfig;
use SchedPool;
fn pool() -> SchedPool {
SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: Some(basic::event_loop),
})
}
fn run(f: proc()) {
let mut pool = pool();
pool.spawn(TaskOpts::new(), f);
pool.shutdown();
}
#[test]
fn smoke() {
do run {}
}
#[test]
fn some_channels() {
do run {
let (p, c) = Chan::new();
do spawn {
c.send(());
}
p.recv();
}
}
#[test]
fn multi_thread() {
let mut pool = SchedPool::new(PoolConfig {
threads: 2,
event_loop_factory: Some(basic::event_loop),
});
for _ in range(0, 20) {
do pool.spawn(TaskOpts::new()) {
let (p, c) = Chan::new();
do spawn {
c.send(());
}
p.recv();
}
}
pool.shutdown();
}
}

View File

@ -58,8 +58,6 @@ pub mod sleeper_list;
pub mod stack;
pub mod task;
#[cfg(test)] mod tests;
#[cfg(stage0)]
#[lang = "start"]
pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {

View File

@ -951,21 +951,47 @@ fn new_sched_rng() -> XorShiftRng {
#[cfg(test)]
mod test {
use borrow::to_uint;
use rt::deque::BufferPool;
use rt::basic;
use rt::sched::{Scheduler};
use rt::task::{GreenTask, Sched};
use rt::thread::Thread;
use rt::util;
use task::TaskResult;
use unstable::run_in_bare_thread;
use std::task::TaskOpts;
use std::rt::Runtime;
use std::rt::task::Task;
use std::rt::local::Local;
use basic;
use sched::TaskFromFriend;
use task::{GreenTask, HomeSched};
use PoolConfig;
use SchedPool;
fn pool() -> SchedPool {
SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: Some(basic::event_loop),
})
}
fn run(f: proc()) {
let mut pool = pool();
pool.spawn(TaskOpts::new(), f);
pool.shutdown();
}
fn sched_id() -> uint {
let mut task = Local::borrow(None::<Task>);
match task.get().maybe_take_runtime::<GreenTask>() {
Some(green) => {
let ret = green.sched.get_ref().sched_id();
task.get().put_runtime(green as ~Runtime);
return ret;
}
None => fail!()
}
}
#[test]
fn trivial_run_in_newsched_task_test() {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
do run_in_newsched_task || {
do run {
unsafe { *task_ran_ptr = true };
rtdebug!("executed from the new scheduler")
}
@ -977,9 +1003,11 @@ mod test {
let total = 10;
let mut task_run_count = 0;
let task_run_count_ptr: *mut uint = &mut task_run_count;
do run_in_newsched_task || {
// with only one thread this is safe to run in without worries of
// contention.
do run {
for _ in range(0u, total) {
do spawntask || {
do spawn || {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
}
}
@ -991,12 +1019,12 @@ mod test {
fn multiple_task_nested_test() {
let mut task_run_count = 0;
let task_run_count_ptr: *mut uint = &mut task_run_count;
do run_in_newsched_task || {
do spawntask || {
do run {
do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
do spawntask || {
do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
do spawntask || {
do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
}
}
@ -1005,52 +1033,33 @@ mod test {
assert!(task_run_count == 3);
}
// Confirm that a sched_id actually is the uint form of the
// pointer to the scheduler struct.
#[test]
fn simple_sched_id_test() {
do run_in_bare_thread {
let sched = ~new_test_uv_sched();
assert!(to_uint(sched) == sched.sched_id());
}
}
// Compare two scheduler ids that are different, this should never
// fail but may catch a mistake someday.
#[test]
fn compare_sched_id_test() {
do run_in_bare_thread {
let sched_one = ~new_test_uv_sched();
let sched_two = ~new_test_uv_sched();
assert!(sched_one.sched_id() != sched_two.sched_id());
}
}
// A very simple test that confirms that a task executing on the
// home scheduler notices that it is home.
#[test]
fn test_home_sched() {
do run_in_bare_thread {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
let mut pool = pool();
let mut sched = ~new_test_uv_sched();
let sched_handle = sched.make_handle();
let (dport, dchan) = Chan::new();
{
let (port, chan) = Chan::new();
let mut handle1 = pool.spawn_sched();
let mut handle2 = pool.spawn_sched();
let mut task = ~do GreenTask::new_root_homed(&mut sched.stack_pool, None,
Sched(sched_handle)) {
unsafe { *task_ran_ptr = true };
assert!(GreenTask::on_appropriate_sched());
handle1.send(TaskFromFriend(do pool.task(TaskOpts::new()) {
chan.send(sched_id());
}));
let sched1_id = port.recv();
let mut task = do pool.task(TaskOpts::new()) {
assert_eq!(sched_id(), sched1_id);
dchan.send(());
};
let on_exit: proc(TaskResult) = proc(exit_status) {
rtassert!(exit_status.is_ok())
};
task.death.on_exit = Some(on_exit);
sched.bootstrap(task);
task.give_home(HomeSched(handle1));
handle2.send(TaskFromFriend(task));
}
dport.recv();
pool.shutdown();
}
// An advanced test that checks all four possible states that a
@ -1058,12 +1067,13 @@ mod test {
#[test]
fn test_schedule_home_states() {
use rt::sleeper_list::SleeperList;
use rt::sched::Shutdown;
use borrow;
use sleeper_list::SleeperList;
use super::{Shutdown, Scheduler, SchedHandle};
use std::unstable::run_in_bare_thread;
use std::rt::thread::Thread;
use std::sync::deque::BufferPool;
do run_in_bare_thread {
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let (normal_worker, normal_stealer) = pool.deque();
@ -1072,15 +1082,18 @@ mod test {
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
1,
basic::event_loop(),
normal_worker,
queues.clone(),
sleepers.clone());
let normal_handle = normal_sched.make_handle();
let friend_handle = normal_sched.make_handle();
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
1,
basic::event_loop(),
special_worker,
queues.clone(),
@ -1099,35 +1112,61 @@ mod test {
// 3) task not homed, sched requeues
// 4) task not home, send home
let task1 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
Sched(t1_handle)) || {
rtassert!(GreenTask::on_appropriate_sched());
};
rtdebug!("task1 id: **{}**", borrow::to_uint(task1));
// Grab both the scheduler and the task from TLS and check if the
// task is executing on an appropriate scheduler.
fn on_appropriate_sched() -> bool {
use task::{TypeGreen, TypeSched, HomeSched};
let task = GreenTask::convert(Local::take());
let sched_id = task.sched.get_ref().sched_id();
let run_any = task.sched.get_ref().run_anything;
let ret = match task.task_type {
TypeGreen(Some(AnySched)) => {
run_any
}
TypeGreen(Some(HomeSched(SchedHandle {
sched_id: ref id,
..
}))) => {
*id == sched_id
}
TypeGreen(None) => { fail!("task without home"); }
TypeSched => { fail!("expected green task"); }
};
task.put();
ret
}
let task2 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(GreenTask::on_appropriate_sched());
let task1 = do GreenTask::new_homed(&mut special_sched.stack_pool,
None, HomeSched(t1_handle)) {
rtassert!(on_appropriate_sched());
};
let task3 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtassert!(GreenTask::on_appropriate_sched());
let task2 = do GreenTask::new(&mut normal_sched.stack_pool, None) {
rtassert!(on_appropriate_sched());
};
let task4 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
Sched(t4_handle)) {
rtassert!(GreenTask::on_appropriate_sched());
let task3 = do GreenTask::new(&mut normal_sched.stack_pool, None) {
rtassert!(on_appropriate_sched());
};
let task4 = do GreenTask::new_homed(&mut special_sched.stack_pool,
None, HomeSched(t4_handle)) {
rtassert!(on_appropriate_sched());
};
rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
// Signal from the special task that we are done.
let (port, chan) = Chan::<()>::new();
let normal_task = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
rtdebug!("*about to submit task2*");
Scheduler::run_task(task2);
rtdebug!("*about to submit task4*");
Scheduler::run_task(task4);
rtdebug!("*normal_task done*");
fn run(next: ~GreenTask) {
let mut task = GreenTask::convert(Local::take());
let sched = task.sched.take_unwrap();
sched.run_task(task, next)
}
let normal_task = do GreenTask::new(&mut normal_sched.stack_pool,
None) {
run(task2);
run(task4);
port.recv();
let mut nh = normal_handle;
nh.send(Shutdown);
@ -1135,29 +1174,23 @@ mod test {
sh.send(Shutdown);
};
rtdebug!("normal task: {}", borrow::to_uint(normal_task));
let special_task = ~do GreenTask::new_root(&mut special_sched.stack_pool, None) {
rtdebug!("*about to submit task1*");
Scheduler::run_task(task1);
rtdebug!("*about to submit task3*");
Scheduler::run_task(task3);
rtdebug!("*done with special_task*");
let special_task = do GreenTask::new(&mut special_sched.stack_pool,
None) {
run(task1);
run(task3);
chan.send(());
};
rtdebug!("special task: {}", borrow::to_uint(special_task));
let normal_sched = normal_sched;
let normal_thread = do Thread::start {
normal_sched.bootstrap(normal_task);
rtdebug!("finished with normal_thread");
};
let special_sched = special_sched;
let special_thread = do Thread::start {
special_sched.bootstrap(special_task);
rtdebug!("finished with special_sched");
};
normal_thread.join();
@ -1165,109 +1198,82 @@ mod test {
}
}
#[test]
fn test_stress_schedule_task_states() {
if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
let n = stress_factor() * 120;
for _ in range(0, n as int) {
test_schedule_home_states();
}
}
//#[test]
//fn test_stress_schedule_task_states() {
// if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
// let n = stress_factor() * 120;
// for _ in range(0, n as int) {
// test_schedule_home_states();
// }
//}
#[test]
fn test_io_callback() {
use io::timer;
use std::io::timer;
// This is a regression test that when there are no schedulable tasks
// in the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and doesn't
// exit before emptying the work queue
do run_in_uv_task {
do spawntask {
let mut pool = SchedPool::new(PoolConfig {
threads: 2,
event_loop_factory: None,
});
// This is a regression test that when there are no schedulable tasks in
// the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and
// doesn't exit before emptying the work queue
do pool.spawn(TaskOpts::new()) {
do spawn {
timer::sleep(10);
}
}
pool.shutdown();
}
#[test]
fn handle() {
do run_in_bare_thread {
let (port, chan) = Chan::new();
fn wakeup_across_scheds() {
let (port1, chan1) = Chan::new();
let (port2, chan2) = Chan::new();
let thread_one = do Thread::start {
let chan = chan;
do run_in_newsched_task_core {
chan.send(());
}
};
let mut pool1 = pool();
let mut pool2 = pool();
let thread_two = do Thread::start {
let port = port;
do run_in_newsched_task_core {
port.recv();
}
};
thread_two.join();
thread_one.join();
do pool1.spawn(TaskOpts::new()) {
let id = sched_id();
chan1.send(());
port2.recv();
assert_eq!(id, sched_id());
}
do pool2.spawn(TaskOpts::new()) {
let id = sched_id();
port1.recv();
assert_eq!(id, sched_id());
chan2.send(());
}
pool1.shutdown();
pool2.shutdown();
}
// A regression test that the final message is always handled.
// Used to deadlock because Shutdown was never recvd.
#[test]
fn no_missed_messages() {
use rt::sleeper_list::SleeperList;
use rt::stack::StackPool;
use rt::sched::{Shutdown, TaskFromFriend};
let mut pool = pool();
do run_in_bare_thread {
stress_factor().times(|| {
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let (worker, stealer) = pool.deque();
let task = pool.task(TaskOpts::new(), proc()());
pool.spawn_sched().send(TaskFromFriend(task));
let mut sched = ~Scheduler::new(
basic::event_loop(),
worker,
~[stealer],
sleepers.clone());
let mut handle = sched.make_handle();
let sched = sched;
let thread = do Thread::start {
let mut sched = sched;
let bootstrap_task =
~GreenTask::new_root(&mut sched.stack_pool,
None,
proc()());
sched.bootstrap(bootstrap_task);
};
let mut stack_pool = StackPool::new();
let task = ~GreenTask::new_root(&mut stack_pool, None, proc()());
handle.send(TaskFromFriend(task));
handle.send(Shutdown);
drop(handle);
thread.join();
})
}
pool.shutdown();
}
#[test]
fn multithreading() {
use num::Times;
use vec::OwnedVector;
use container::Container;
do run_in_mt_newsched_task {
do run {
let mut ports = ~[];
10.times(|| {
let (port, chan) = Chan::new();
do spawntask_later {
do spawn {
chan.send(());
}
ports.push(port);
@ -1281,7 +1287,7 @@ mod test {
#[test]
fn thread_ring() {
do run_in_mt_newsched_task {
do run {
let (end_port, end_chan) = Chan::new();
let n_tasks = 10;
@ -1294,14 +1300,14 @@ mod test {
let (next_p, ch) = Chan::new();
let imm_i = i;
let imm_p = p;
do spawntask_random {
do spawn {
roundtrip(imm_i, n_tasks, &imm_p, &ch);
};
p = next_p;
i += 1;
}
let p = p;
do spawntask_random {
do spawn {
roundtrip(1, n_tasks, &p, &ch1);
}
@ -1332,11 +1338,9 @@ mod test {
#[test]
fn start_closure_dtor() {
use ops::Drop;
// Regression test that the `start` task entrypoint can
// contain dtors that use task resources
do run_in_newsched_task {
do run {
struct S { field: () }
impl Drop for S {
@ -1347,7 +1351,7 @@ mod test {
let s = S { field: () };
do spawntask {
do spawn {
let _ss = &s;
}
}
@ -1357,52 +1361,49 @@ mod test {
#[ignore]
#[test]
fn dont_starve_1() {
stress_factor().times(|| {
do run_in_mt_newsched_task {
let (port, chan) = Chan::new();
let mut pool = SchedPool::new(PoolConfig {
threads: 2, // this must be > 1
event_loop_factory: Some(basic::event_loop),
});
do pool.spawn(TaskOpts::new()) {
let (port, chan) = Chan::new();
// This task should not be able to starve the sender;
// The sender should get stolen to another thread.
do spawntask {
while port.try_recv().is_none() { }
}
chan.send(());
// This task should not be able to starve the sender;
// The sender should get stolen to another thread.
do spawn {
while port.try_recv().is_none() { }
}
})
chan.send(());
}
pool.shutdown();
}
#[test]
fn dont_starve_2() {
stress_factor().times(|| {
do run_in_newsched_task {
let (port, chan) = Chan::new();
let (_port2, chan2) = Chan::new();
do run {
let (port, chan) = Chan::new();
let (_port2, chan2) = Chan::new();
// This task should not be able to starve the other task.
// The sends should eventually yield.
do spawntask {
while port.try_recv().is_none() {
chan2.send(());
}
// This task should not be able to starve the other task.
// The sends should eventually yield.
do spawn {
while port.try_recv().is_none() {
chan2.send(());
}
chan.send(());
}
})
chan.send(());
}
}
// Regression test for a logic bug that would cause single-threaded schedulers
// to sleep forever after yielding and stealing another task.
// Regression test for a logic bug that would cause single-threaded
// schedulers to sleep forever after yielding and stealing another task.
#[test]
fn single_threaded_yield() {
use task::{spawn, spawn_sched, SingleThreaded, deschedule};
use num::Times;
do spawn_sched(SingleThreaded) {
5.times(|| { deschedule(); })
use std::task::deschedule;
do run {
5.times(deschedule);
}
do spawn { }
do spawn { }
}
}

View File

@ -346,7 +346,7 @@ impl Runtime for GreenTask {
}
}
fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
fn reawaken(mut ~self, to_wake: ~Task) {
self.put_task(to_wake);
assert!(self.sched.is_none());
@ -371,21 +371,16 @@ impl Runtime for GreenTask {
let mut running_task: ~Task = Local::take();
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
let mut sched = running_green_task.sched.take_unwrap();
running_green_task.put_task(running_task);
let sched = running_green_task.sched.take_unwrap();
if sched.pool_id == self.pool_id {
running_green_task.put_task(running_task);
if can_resched {
sched.run_task(running_green_task, self);
} else {
sched.enqueue_task(self);
running_green_task.put_with_sched(sched);
}
sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();
// put that thing back where it came from!
running_task.put_runtime(running_green_task as ~Runtime);
Local::put(running_task);
running_green_task.put_with_sched(sched);
}
}
None => {
@ -427,94 +422,110 @@ impl Drop for GreenTask {
}
#[cfg(test)]
mod test {
mod tests {
use std::rt::Runtime;
use std::rt::local::Local;
use std::rt::task::Task;
use std::task;
use std::task::TaskOpts;
#[test]
fn local_heap() {
do run_in_newsched_task() {
let a = @5;
let b = a;
assert!(*a == 5);
assert!(*b == 5);
}
use super::super::{PoolConfig, SchedPool};
use super::GreenTask;
fn spawn_opts(opts: TaskOpts, f: proc()) {
let mut pool = SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: None,
});
pool.spawn(opts, f);
pool.shutdown();
}
#[test]
fn tls() {
use std::local_data;
do run_in_newsched_task() {
local_data_key!(key: @~str)
local_data::set(key, @~"data");
assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data");
local_data_key!(key2: @~str)
local_data::set(key2, @~"data");
assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data");
fn smoke() {
let (p, c) = Chan::new();
do spawn_opts(TaskOpts::new()) {
c.send(());
}
p.recv();
}
#[test]
fn unwind() {
do run_in_newsched_task() {
let result = spawntask_try(proc()());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = spawntask_try(proc() fail!());
rtdebug!("trying second assert");
assert!(result.is_err());
fn smoke_fail() {
let (p, c) = Chan::<()>::new();
do spawn_opts(TaskOpts::new()) {
let _c = c;
fail!()
}
assert_eq!(p.recv_opt(), None);
}
#[test]
fn rng() {
do run_in_uv_task() {
use std::rand::{rng, Rng};
let mut r = rng();
let _ = r.next_u32();
}
fn smoke_opts() {
let mut opts = TaskOpts::new();
opts.name = Some(SendStrStatic("test"));
opts.stack_size = Some(20 * 4096);
let (p, c) = Chan::new();
opts.notify_chan = Some(c);
spawn_opts(opts, proc() {});
assert!(p.recv().is_ok());
}
#[test]
fn logging() {
do run_in_uv_task() {
info!("here i am. logging in a newsched task");
}
fn smoke_opts_fail() {
let mut opts = TaskOpts::new();
let (p, c) = Chan::new();
opts.notify_chan = Some(c);
spawn_opts(opts, proc() { fail!() });
assert!(p.recv().is_err());
}
#[test]
fn comm_stream() {
do run_in_newsched_task() {
let (port, chan) = Chan::new();
chan.send(10);
assert!(port.recv() == 10);
fn yield_test() {
let (p, c) = Chan::new();
do spawn_opts(TaskOpts::new()) {
10.times(task::deschedule);
c.send(());
}
p.recv();
}
#[test]
fn comm_shared_chan() {
do run_in_newsched_task() {
let (port, chan) = SharedChan::new();
chan.send(10);
assert!(port.recv() == 10);
fn spawn_children() {
let (p, c) = Chan::new();
do spawn_opts(TaskOpts::new()) {
let (p, c2) = Chan::new();
do spawn {
let (p, c3) = Chan::new();
do spawn {
c3.send(());
}
p.recv();
c2.send(());
}
p.recv();
c.send(());
}
p.recv();
}
//#[test]
//fn heap_cycles() {
// use std::option::{Option, Some, None};
// do run_in_newsched_task {
// struct List {
// next: Option<@mut List>,
// }
// let a = @mut List { next: None };
// let b = @mut List { next: Some(a) };
// a.next = Some(b);
// }
//}
#[test]
#[should_fail]
fn test_begin_unwind() { begin_unwind("cause", file!(), line!()) }
fn spawn_inherits() {
let (p, c) = Chan::new();
do spawn_opts(TaskOpts::new()) {
let c = c;
do spawn {
let mut task: ~Task = Local::take();
match task.maybe_take_runtime::<GreenTask>() {
Some(ops) => {
task.put_runtime(ops as ~Runtime);
}
None => fail!(),
}
Local::put(task);
c.send(());
}
}
p.recv();
}
}

View File

@ -227,6 +227,7 @@ mod tests {
use std::rt::local::Local;
use std::rt::task::Task;
use std::task;
use std::task::TaskOpts;
use super::{spawn, spawn_opts, Ops};
#[test]

View File

@ -207,9 +207,9 @@ mod test {
let port = timer.period(1);
port.recv();
port.recv();
let port = timer.period(1);
port.recv();
port.recv();
let port2 = timer.period(1);
port2.recv();
port2.recv();
}
#[test]