auto merge of #8116 : toddaaro/rust/tls-tk-pr-pre, r=brson

Merged with task killing code this time around.
This commit is contained in:
bors 2013-08-01 18:01:42 -07:00
commit 5890fcf872
17 changed files with 1161 additions and 1160 deletions

View File

@ -23,9 +23,14 @@ macro_rules! rtdebug_ (
} )
)
// An alternate version with no output, for turning off logging
// An alternate version with no output, for turning off logging. An
// earlier attempt that did not call the fmt! macro was insufficient,
// as a case of the "let bind each variable" approach eventually
// failed without an error message describing the invocation site.
macro_rules! rtdebug (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
($( $arg:expr),+) => ( {
let _x = fmt!( $($arg),+ );
})
)
macro_rules! rtassert (

View File

@ -24,6 +24,7 @@ use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
use rt::{context, SchedulerContext};
/// A combined refcount / BlockedTask-as-uint pointer.
///
@ -90,6 +91,9 @@ impl<T> ChanOne<T> {
}
pub fn try_send(self, val: T) -> bool {
rtassert!(context() != SchedulerContext);
let mut this = self;
let mut recvr_active = true;
let packet = this.packet();
@ -127,10 +131,7 @@ impl<T> ChanOne<T> {
// Port is blocked. Wake it up.
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(woken_task);
Scheduler::run_task(woken_task);
};
}
}
@ -346,8 +347,7 @@ impl<T> Drop for ChanOne<T> {
assert!((*this.packet()).payload.is_none());
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let sched = Local::take::<Scheduler>();
sched.schedule_task(woken_task);
Scheduler::run_task(woken_task);
};
}
}
@ -743,7 +743,7 @@ mod test {
do run_in_newsched_task {
let (port, chan) = oneshot::<~int>();
let port_cell = Cell::new(port);
do spawntask_immediately {
do spawntask {
assert!(port_cell.take().recv() == ~10);
}
@ -1019,5 +1019,4 @@ mod test {
}
}
}
}

View File

@ -49,12 +49,11 @@ impl Context {
let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) };
let sp: *uint = stack.end();
let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) };
// Save and then immediately load the current context,
// which we will then modify to call the given function when restored
let mut regs = new_regs();
unsafe {
swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs))
swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs));
};
initialize_call_frame(&mut *regs, fp, argp, sp);
@ -72,13 +71,14 @@ impl Context {
then loading the registers from a previously saved Context.
*/
pub fn swap(out_context: &mut Context, in_context: &Context) {
rtdebug!("swapping contexts");
let out_regs: &mut Registers = match out_context {
&Context { regs: ~ref mut r, _ } => r
};
let in_regs: &Registers = match in_context {
&Context { regs: ~ref r, _ } => r
};
rtdebug!("doing raw swap");
unsafe { swap_registers(out_regs, in_regs) };
}
}

View File

@ -186,7 +186,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -194,7 +194,7 @@ mod test {
assert!(buf[0] == 99);
}
do spawntask_immediately {
do spawntask {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
@ -206,7 +206,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -214,7 +214,7 @@ mod test {
assert!(buf[0] == 99);
}
do spawntask_immediately {
do spawntask {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
@ -226,7 +226,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -234,7 +234,7 @@ mod test {
assert!(nread.is_none());
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -246,7 +246,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -254,7 +254,7 @@ mod test {
assert!(nread.is_none());
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -266,7 +266,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -276,7 +276,7 @@ mod test {
assert!(nread.is_none());
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -288,7 +288,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
@ -298,7 +298,7 @@ mod test {
assert!(nread.is_none());
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -310,7 +310,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
@ -327,7 +327,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -339,7 +339,7 @@ mod test {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
@ -356,7 +356,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
let _stream = TcpStream::connect(addr);
// Close
}
@ -369,7 +369,7 @@ mod test {
let addr = next_test_ip4();
let max = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
@ -379,7 +379,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
do max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
@ -394,7 +394,7 @@ mod test {
let addr = next_test_ip6();
let max = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
@ -404,7 +404,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
do max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
@ -419,13 +419,13 @@ mod test {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
do spawntask {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
@ -440,7 +440,7 @@ mod test {
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
do spawntask {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
@ -458,13 +458,13 @@ mod test {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
do spawntask {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
@ -479,7 +479,7 @@ mod test {
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
do spawntask {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
@ -497,7 +497,7 @@ mod test {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell::new(listener.accept());
@ -535,7 +535,7 @@ mod test {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell::new(listener.accept());
@ -571,7 +571,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let listener = TcpListener::bind(addr);
assert!(listener.is_some());
@ -590,13 +590,13 @@ mod test {
#[cfg(test)]
fn peer_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
listener.accept();
}
do spawntask_immediately {
do spawntask {
let stream = TcpStream::connect(addr);
assert!(stream.is_some());

View File

@ -132,7 +132,7 @@ mod test {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(server_ip) {
Some(ref mut server) => {
let mut buf = [0];
@ -149,7 +149,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(client_ip) {
Some(ref mut client) => client.sendto([99], server_ip),
None => fail!()
@ -164,7 +164,7 @@ mod test {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(server_ip) {
Some(ref mut server) => {
let mut buf = [0];
@ -181,7 +181,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(client_ip) {
Some(ref mut client) => client.sendto([99], server_ip),
None => fail!()
@ -196,7 +196,7 @@ mod test {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
@ -214,7 +214,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
@ -233,7 +233,7 @@ mod test {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
@ -251,7 +251,7 @@ mod test {
}
}
do spawntask_immediately {
do spawntask {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
@ -267,7 +267,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let server = UdpSocket::bind(addr);
assert!(server.is_some());

View File

@ -14,6 +14,7 @@ use rt::task::Task;
use rt::local_ptr;
use rt::rtio::{EventLoop, IoFactoryObject};
//use borrow::to_uint;
use cell::Cell;
pub trait Local {
fn put(value: ~Self);
@ -24,40 +25,62 @@ pub trait Local {
unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
}
impl Local for Scheduler {
fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
impl Local for Task {
fn put(value: ~Task) { unsafe { local_ptr::put(value) } }
fn take() -> ~Task { unsafe { local_ptr::take() } }
fn exists() -> bool { local_ptr::exists() }
fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T {
fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
let mut res: Option<T> = None;
let res_ptr: *mut Option<T> = &mut res;
unsafe {
do local_ptr::borrow |sched| {
// rtdebug!("successfully unsafe borrowed sched pointer");
let result = f(sched);
do local_ptr::borrow |task| {
let result = f(task);
*res_ptr = Some(result);
}
}
match res {
Some(r) => { r }
None => rtabort!("function failed!")
None => { rtabort!("function failed in local_borrow") }
}
}
unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
if Local::exists::<Task>() {
Some(Local::unsafe_borrow())
} else {
None
}
}
unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") }
}
impl Local for Task {
fn put(_value: ~Task) { rtabort!("unimpl") }
fn take() -> ~Task { rtabort!("unimpl") }
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
do Local::borrow::<Scheduler, T> |sched| {
// rtdebug!("sched about to grab current_task");
match sched.current_task {
impl Local for Scheduler {
fn put(value: ~Scheduler) {
let value = Cell::new(value);
do Local::borrow::<Task,()> |task| {
let task = task;
task.sched = Some(value.take());
};
}
fn take() -> ~Scheduler {
do Local::borrow::<Task,~Scheduler> |task| {
let sched = task.sched.take_unwrap();
let task = task;
task.sched = None;
sched
}
}
fn exists() -> bool {
do Local::borrow::<Task,bool> |task| {
match task.sched {
Some(ref _task) => true,
None => false
}
}
}
fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T {
do Local::borrow::<Task, T> |task| {
match task.sched {
Some(~ref mut task) => {
// rtdebug!("current task pointer: %x", to_uint(task));
// rtdebug!("current task heap pointer: %x", to_uint(&task.heap));
f(task)
}
None => {
@ -66,19 +89,18 @@ impl Local for Task {
}
}
}
unsafe fn unsafe_borrow() -> *mut Task {
match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => {
let s: *mut Task = &mut *task;
unsafe fn unsafe_borrow() -> *mut Scheduler {
match (*Local::unsafe_borrow::<Task>()).sched {
Some(~ref mut sched) => {
let s: *mut Scheduler = &mut *sched;
return s;
}
None => {
// Don't fail. Infinite recursion
rtabort!("no scheduler")
}
}
}
unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> {
if Local::exists::<Scheduler>() {
Some(Local::unsafe_borrow())
} else {
@ -101,57 +123,67 @@ impl Local for IoFactoryObject {
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") }
}
#[cfg(test)]
mod test {
use unstable::run_in_bare_thread;
use rt::test::*;
use rt::sched::Scheduler;
use super::*;
use rt::task::Task;
use rt::local_ptr;
#[test]
fn thread_local_scheduler_smoke_test() {
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
fn thread_local_task_smoke_test() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}
#[test]
fn thread_local_scheduler_two_instances() {
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
fn thread_local_task_two_instances() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}
#[test]
fn borrow_smoke_test() {
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
unsafe {
let _task: *mut Task = Local::unsafe_borrow();
}
let task: ~Task = Local::take();
cleanup_task(task);
}
#[test]
fn borrow_with_return() {
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let res = do Local::borrow::<Scheduler,bool> |_sched| {
true
};
assert!(res);
let _scheduler: ~Scheduler = Local::take();
}
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let res = do Local::borrow::<Task,bool> |_task| {
true
};
assert!(res)
let task: ~Task = Local::take();
cleanup_task(task);
}
}

View File

@ -67,9 +67,10 @@ use iter::Times;
use iterator::{Iterator, IteratorUtil};
use option::{Some, None};
use ptr::RawPtr;
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::{Task, Sched};
use rt::task::{Task, SchedTask, GreenTask, Sched};
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::uv::uvio::UvEventLoop;
@ -243,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let nscheds = util::default_sched_threads();
let main = Cell::new(main);
// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
let sleepers = SleeperList::new();
@ -256,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let mut handles = ~[];
do nscheds.times {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
@ -267,12 +272,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
// If we need a main-thread task then create a main thread scheduler
// that will reject any task that isn't pinned to it
let mut main_sched = if use_main_sched {
let main_sched = if use_main_sched {
// Create a friend handle.
let mut friend_sched = scheds.pop();
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);
let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
sleepers.clone(),
false);
false,
Some(friend_handle));
let main_handle = main_sched.make_handle();
handles.push(main_handle);
Some(main_sched)
@ -309,44 +321,63 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
}
};
// Build the main task and queue it up
match main_sched {
None => {
// The default case where we don't need a scheduler on the main thread.
// Just put an unpinned task onto one of the default schedulers.
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main);
main_task.death.on_exit = Some(on_exit);
main_task.name = Some(~"main");
scheds[0].enqueue_task(main_task);
}
Some(ref mut main_sched) => {
let home = Sched(main_sched.make_handle());
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, home, main);
main_task.death.on_exit = Some(on_exit);
main_task.name = Some(~"main");
main_sched.enqueue_task(main_task);
}
};
// Run each scheduler in a thread.
let mut threads = ~[];
while !scheds.is_empty() {
let on_exit = Cell::new(on_exit);
if !use_main_sched {
// In the case where we do not use a main_thread scheduler we
// run the main task in one of our threads.
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
main.take());
main_task.death.on_exit = Some(on_exit.take());
let main_task_cell = Cell::new(main_task);
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
sched.bootstrap(main_task_cell.take());
};
threads.push(thread);
}
// Run the main-thread scheduler
match main_sched {
Some(sched) => { let _ = sched.run(); },
None => ()
// Run each remaining scheduler in a thread.
while !scheds.is_empty() {
rtdebug!("creating regular schedulers");
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let mut sched = sched_cell.take();
let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
rtdebug!("boostraping a non-primary scheduler");
};
sched.bootstrap(bootstrap_task);
};
threads.push(thread);
}
// If we do have a main thread scheduler, run it now.
if use_main_sched {
rtdebug!("about to create the main scheduler task");
let mut main_sched = main_sched.get();
let home = Sched(main_sched.make_handle());
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
rtdebug!("boostrapping main_task");
main_sched.bootstrap(main_task);
}
rtdebug!("waiting for threads");
// Wait for schedulers
foreach thread in threads.consume_iter() {
thread.join();
@ -378,27 +409,22 @@ pub enum RuntimeContext {
pub fn context() -> RuntimeContext {
use task::rt::rust_task;
use self::local::Local;
use self::sched::Scheduler;
// XXX: Hitting TLS twice to check if the scheduler exists
// then to check for the task is not good for perf
if unsafe { rust_try_get_task().is_not_null() } {
return OldTaskContext;
} else {
if Local::exists::<Scheduler>() {
let context = Cell::new_empty();
do Local::borrow::<Scheduler, ()> |sched| {
if sched.in_task_context() {
context.put_back(TaskContext);
} else {
context.put_back(SchedulerContext);
}
} else if Local::exists::<Task>() {
// In this case we know it is a new runtime context, but we
// need to check which one. Going to try borrowing task to
// check. Task should always be in TLS, so hopefully this
// doesn't conflict with other ops that borrow.
return do Local::borrow::<Task,RuntimeContext> |task| {
match task.task_type {
SchedTask => SchedulerContext,
GreenTask(_) => TaskContext
}
return context.take();
} else {
return GlobalContext;
}
};
} else {
return GlobalContext;
}
extern {
@ -410,23 +436,9 @@ pub fn context() -> RuntimeContext {
#[test]
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{Scheduler};
use rt::local::Local;
use rt::test::new_test_uv_sched;
assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~new_test_uv_sched();
let task = ~do Task::new_root(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
sched.enqueue_blocked_task(task);
}
};
sched.enqueue_task(task);
sched.run();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -30,21 +30,34 @@ use rt::context::Context;
use task::spawn::Taskgroup;
use cell::Cell;
// The Task struct represents all state associated with a rust
// task. There are at this point two primary "subtypes" of task,
// however instead of using a subtype we just have a "task_type" field
// in the struct. This contains a pointer to another struct that holds
// the type-specific state.
pub struct Task {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: StdErrLogger,
unwinder: Unwinder,
home: Option<SchedHome>,
taskgroup: Option<Taskgroup>,
death: Death,
destroyed: bool,
coroutine: Option<~Coroutine>,
// FIXME(#6874/#7599) use StringRef to save on allocations
name: Option<~str>,
coroutine: Option<Coroutine>,
sched: Option<~Scheduler>,
task_type: TaskType
}
pub enum TaskType {
GreenTask(Option<~SchedHome>),
SchedTask
}
/// A coroutine is nothing more than a (register context, stack) pair.
pub struct Coroutine {
/// The segment of stack on which the task is currently running or
/// if the task is blocked, on which the task will resume
@ -54,6 +67,7 @@ pub struct Coroutine {
saved_context: Context
}
/// Some tasks have a deciated home scheduler that they must run on.
pub enum SchedHome {
AnySched,
Sched(SchedHandle)
@ -68,6 +82,59 @@ pub struct Unwinder {
impl Task {
// A helper to build a new task using the dynamically found
// scheduler and task. Only works in GreenTask context.
pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task {
let f = Cell::new(f);
let home = Cell::new(home);
do Local::borrow::<Task, ~Task> |running_task| {
let mut sched = running_task.sched.take_unwrap();
let new_task = ~running_task.new_child_homed(&mut sched.stack_pool,
home.take(),
f.take());
running_task.sched = Some(sched);
new_task
}
}
pub fn build_child(f: ~fn()) -> ~Task {
Task::build_homed_child(f, AnySched)
}
pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task {
let f = Cell::new(f);
let home = Cell::new(home);
do Local::borrow::<Task, ~Task> |running_task| {
let mut sched = running_task.sched.take_unwrap();
let new_task = ~Task::new_root_homed(&mut sched.stack_pool,
home.take(),
f.take());
running_task.sched = Some(sched);
new_task
}
}
pub fn build_root(f: ~fn()) -> ~Task {
Task::build_homed_root(f, AnySched)
}
pub fn new_sched_task() -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(Coroutine::empty()),
name: None,
sched: None,
task_type: SchedTask
}
}
pub fn new_root(stack_pool: &mut StackPool,
start: ~fn()) -> Task {
Task::new_root_homed(stack_pool, AnySched, start)
@ -88,12 +155,13 @@ impl Task {
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
home: Some(home),
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
}
@ -106,28 +174,43 @@ impl Task {
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
home: Some(home),
unwinder: Unwinder { unwinding: false },
taskgroup: None,
// FIXME(#7544) make watching optional
death: self.death.new_child(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
}
pub fn give_home(&mut self, new_home: SchedHome) {
self.home = Some(new_home);
match self.task_type {
GreenTask(ref mut home) => {
*home = Some(~new_home);
}
SchedTask => {
rtabort!("type error: used SchedTask as GreenTask");
}
}
}
pub fn take_unwrap_home(&mut self) -> SchedHome {
match self.task_type {
GreenTask(ref mut home) => {
let out = home.take_unwrap();
return *out;
}
SchedTask => {
rtabort!("type error: used SchedTask as GreenTask");
}
}
}
pub fn run(&mut self, f: &fn()) {
// This is just an assertion that `run` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
rtdebug!("run called on task: %u", borrow::to_uint(self));
self.unwinder.try(f);
{ let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding);
@ -141,6 +224,8 @@ impl Task {
/// thread-local-storage.
fn destroy(&mut self) {
rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self));
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
@ -158,63 +243,68 @@ impl Task {
self.destroyed = true;
}
/// Check if *task* is currently home.
pub fn is_home(&self) -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => { rtabort!("task home of None") }
}
}
}
// New utility functions for homes.
pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
match self.task_type {
GreenTask(Some(~AnySched)) => { false }
GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => {
*id == sched.sched_id()
}
None => {rtabort!("task home of None") }
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
// Awe yea
rtabort!("type error: expected: GreenTask, found: SchedTask");
}
}
}
pub fn is_home_using_id(sched_id: uint) -> bool {
pub fn homed(&self) -> bool {
match self.task_type {
GreenTask(Some(~AnySched)) => { false }
GreenTask(Some(~Sched(SchedHandle { _ }))) => { true }
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
rtabort!("type error: expected: GreenTask, found: SchedTask");
}
}
}
// Grab both the scheduler and the task from TLS and check if the
// task is executing on an appropriate scheduler.
pub fn on_appropriate_sched() -> bool {
do Local::borrow::<Task,bool> |task| {
match task.home {
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
let sched_id = task.sched.get_ref().sched_id();
let sched_run_anything = task.sched.get_ref().run_anything;
match task.task_type {
GreenTask(Some(~AnySched)) => {
rtdebug!("anysched task in sched check ****");
sched_run_anything
}
GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => {
rtdebug!("homed task in sched check ****");
*id == sched_id
}
Some(AnySched) => { false }
None => { rtabort!("task home of None") }
GreenTask(None) => {
rtabort!("task without home");
}
SchedTask => {
rtabort!("type error: expected: GreenTask, found: SchedTask");
}
}
}
}
/// Check if this *task* has a home.
pub fn homed(&self) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(_)) => { true }
None => {
rtabort!("task home of None")
}
}
}
/// On a special scheduler?
pub fn on_special() -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
!sched.run_anything
}
}
}
impl Drop for Task {
fn drop(&self) { assert!(self.destroyed) }
fn drop(&self) {
rtdebug!("called drop for a task: %u", borrow::to_uint(self));
assert!(self.destroyed)
}
}
// Coroutines represent nothing more than a context and a stack
@ -234,19 +324,33 @@ impl Coroutine {
}
}
pub fn empty() -> Coroutine {
Coroutine {
current_stack_segment: StackSegment::new(0),
saved_context: Context::empty()
}
}
fn build_start_wrapper(start: ~fn()) -> ~fn() {
let start_cell = Cell::new(start);
let wrapper: ~fn() = || {
// First code after swap to this new context. Run our
// cleanup job.
unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref();
// Again - might work while safe, or it might not.
do Local::borrow::<Scheduler,()> |sched| {
(sched).run_cleanup_job();
}
do task.run {
// To call the run method on a task we need a direct
// reference to it. The task is in TLS, so we can
// simply unsafe_borrow it to get this reference. We
// need to still have the task in TLS though, so we
// need to unsafe_borrow.
let task = Local::unsafe_borrow::<Task>();
do (*task).run {
// N.B. Removing `start` from the start wrapper
// closure by emptying a cell is critical for
// correctness. The ~Task pointer, and in turn the
@ -262,16 +366,19 @@ impl Coroutine {
};
}
// We remove the sched from the Task in TLS right now.
let sched = Local::take::<Scheduler>();
sched.terminate_current_task();
// ... allowing us to give it away when performing a
// scheduling operation.
sched.terminate_current_task()
};
return wrapper;
}
/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(~self, stack_pool: &mut StackPool) {
pub fn recycle(self, stack_pool: &mut StackPool) {
match self {
~Coroutine { current_stack_segment, _ } => {
Coroutine { current_stack_segment, _ } => {
stack_pool.give_segment(current_stack_segment);
}
}
@ -465,3 +572,4 @@ mod test {
}
}
}

View File

@ -18,14 +18,12 @@ use iterator::Iterator;
use vec::{OwnedVector, MutableVector};
use super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::sched::Scheduler;
use rt::local::Local;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
use rt::task::Task;
use rt::uv::uvio::UvEventLoop;
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
use rt::task::{Sched};
use rt::comm::oneshot;
use result::{Result, Ok, Err};
@ -34,29 +32,37 @@ pub fn new_test_uv_sched() -> Scheduler {
let mut sched = Scheduler::new(~UvEventLoop::new(),
WorkQueue::new(),
SleeperList::new());
// Don't wait for the Shutdown message
sched.no_sleep = true;
return sched;
}
/// Creates a new scheduler in a new thread and runs a task in it,
/// then waits for the scheduler to exit. Failure of the task
/// will abort the process.
pub fn run_in_newsched_task(f: ~fn()) {
let f = Cell::new(f);
do run_in_bare_thread {
let mut sched = ~new_test_uv_sched();
let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
let mut task = ~Task::new_root(&mut sched.stack_pool,
f.take());
rtdebug!("newsched_task: %x", ::borrow::to_uint(task));
task.death.on_exit = Some(on_exit);
sched.enqueue_task(task);
sched.run();
run_in_newsched_task_core(f.take());
}
}
pub fn run_in_newsched_task_core(f: ~fn()) {
use rt::sched::Shutdown;
let mut sched = ~new_test_uv_sched();
let exit_handle = Cell::new(sched.make_handle());
let on_exit: ~fn(bool) = |exit_status| {
exit_handle.take().send(Shutdown);
rtassert!(exit_status);
};
let mut task = ~Task::new_root(&mut sched.stack_pool, f);
task.death.on_exit = Some(on_exit);
sched.bootstrap(task);
}
/// Create more than one scheduler and run a function in a task
/// in one of the schedulers. The schedulers will stay alive
/// until the function `f` returns.
@ -65,7 +71,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
use from_str::FromStr;
use rt::sched::Shutdown;
let f_cell = Cell::new(f);
let f = Cell::new(f);
do run_in_bare_thread {
let nthreads = match os::getenv("RUST_RT_TEST_THREADS") {
@ -95,7 +101,6 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
scheds.push(sched);
}
let f_cell = Cell::new(f_cell.take());
let handles = Cell::new(handles);
let on_exit: ~fn(bool) = |exit_status| {
let mut handles = handles.take();
@ -107,18 +112,32 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
rtassert!(exit_status);
};
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
f_cell.take());
f.take());
main_task.death.on_exit = Some(on_exit);
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
let main_task = Cell::new(main_task);
let main_thread = {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
do Thread::start {
let sched = sched_cell.take();
sched.bootstrap(main_task.take());
}
};
threads.push(main_thread);
while !scheds.is_empty() {
let sched = scheds.pop();
let mut sched = scheds.pop();
let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
rtdebug!("bootstrapping non-primary scheduler");
};
let bootstrap_task_cell = Cell::new(bootstrap_task);
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
sched.bootstrap(bootstrap_task_cell.take());
};
threads.push(thread);
@ -134,187 +153,52 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
/// Test tasks will abort on failure instead of unwinding
pub fn spawntask(f: ~fn()) {
use super::sched::*;
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
rtdebug!("spawntask taking the scheduler from TLS");
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, f.take())
}
};
rtdebug!("new task pointer: %x", ::borrow::to_uint(task));
let sched = Local::take::<Scheduler>();
rtdebug!("spawntask scheduling the new task");
sched.schedule_task(task);
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*;
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_blocked_task(task);
}
Scheduler::run_task(Task::build_child(f));
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, f.take())
}
};
let mut sched = Local::take::<Scheduler>();
sched.enqueue_task(task);
Local::put(sched);
Scheduler::run_task_later(Task::build_child(f));
}
/// Spawn a task and either run it immediately or run it later
pub fn spawntask_random(f: ~fn()) {
use super::sched::*;
use rand::{Rand, rng};
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let mut sched = Local::take::<Scheduler>();
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
if run_now {
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_blocked_task(task);
}
spawntask(f)
} else {
sched.enqueue_task(task);
Local::put(sched);
spawntask_later(f)
}
}
/// Spawn a task, with the current scheduler as home, and queue it to
/// run later.
pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
use super::sched::*;
use rand::{rng, RngUtil};
let mut rng = rng();
let task = {
let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
let handle = sched.make_handle();
let home_id = handle.sched_id;
// now that we know where this is going, build a new function
// that can assert it is in the right place
let af: ~fn() = || {
do Local::borrow::<Scheduler,()>() |sched| {
rtdebug!("home_id: %u, runtime loc: %u",
home_id,
sched.sched_id());
assert!(home_id == sched.sched_id());
};
f()
};
~Task::new_root_homed(&mut sched.stack_pool,
Sched(handle),
af)
};
let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
// enqueue it for future execution
dest_sched.enqueue_task(task);
}
/// Spawn a task and wait for it to finish, returning whether it
/// completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
use super::sched::*;
let f = Cell::new(f);
pub fn spawntask_try(f: ~fn()) -> Result<(),()> {
let (port, chan) = oneshot();
let chan = Cell::new(chan);
let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
let mut new_task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task> |_running_task| {
// I don't understand why using a child task here fails. I
// think the fail status is propogating back up the task
// tree and triggering a fail for the parent, which we
// aren't correctly expecting.
// ~running_task.new_child(&mut (*sched).stack_pool,
~Task::new_root(&mut (*sched).stack_pool,
f.take())
}
};
let mut new_task = Task::build_root(f);
new_task.death.on_exit = Some(on_exit);
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
sched.enqueue_blocked_task(old_task);
}
rtdebug!("enqueued the new task, now waiting on exit_status");
Scheduler::run_task(new_task);
let exit_status = port.recv();
if exit_status { Ok(()) } else { Err(()) }
}
/// Spawn a new task in a new scheduler and return a thread handle.
pub fn spawntask_thread(f: ~fn()) -> Thread {
use rt::sched::*;
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let task = Cell::new(task);
let thread = do Thread::start {
let mut sched = ~new_test_uv_sched();
sched.enqueue_task(task.take());
sched.run();
run_in_newsched_task_core(f.take());
};
return thread;
}
@ -323,11 +207,14 @@ pub fn with_test_task(blk: ~fn(~Task) -> ~Task) {
do run_in_bare_thread {
let mut sched = ~new_test_uv_sched();
let task = blk(~Task::new_root(&mut sched.stack_pool, ||{}));
sched.enqueue_task(task);
sched.run();
cleanup_task(task);
}
}
/// Use to cleanup tasks created for testing but not "run".
pub fn cleanup_task(mut task: ~Task) {
task.destroyed = true;
}
/// Get a port number, starting at 9600, for use in tests
pub fn next_test_port() -> u16 {

View File

@ -17,7 +17,6 @@ use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::Scheduler;
use rt::{context, TaskContext, SchedulerContext};
use rt::kill::BlockedTask;
use rt::local::Local;
use vec::OwnedVector;
@ -44,8 +43,6 @@ impl<T> Tube<T> {
pub fn send(&mut self, val: T) {
rtdebug!("tube send");
assert!(context() == SchedulerContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
(*state).buf.push(val);
@ -61,8 +58,6 @@ impl<T> Tube<T> {
}
pub fn recv(&mut self) -> T {
assert!(context() == TaskContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
if !(*state).buf.is_empty() {

View File

@ -51,7 +51,7 @@ use rt::io::net::ip::IpAddr;
use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
//#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
@ -333,7 +333,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
return None;
}
}
/*
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
@ -360,3 +360,4 @@ fn loop_smoke_test() {
loop_.close();
}
}
*/

View File

@ -33,7 +33,7 @@ use unstable::sync::Exclusive;
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use rt::test::{spawntask_immediately,
#[cfg(test)] use rt::test::{spawntask,
next_test_ip4,
run_in_newsched_task};
@ -251,13 +251,11 @@ impl IoFactory for UvIoFactory {
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_, task| {
rtdebug!("connect: entered scheduler context");
assert!(!sched.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell::new(task);
@ -458,11 +456,9 @@ impl RtioTcpStream for UvTcpStream {
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("read: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
@ -500,7 +496,6 @@ impl RtioTcpStream for UvTcpStream {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -602,11 +597,9 @@ impl RtioUdpSocket for UvUdpSocket {
let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("recvfrom: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
@ -637,7 +630,6 @@ impl RtioUdpSocket for UvUdpSocket {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -798,10 +790,8 @@ impl Drop for UvTimer {
impl RtioTimer for UvTimer {
fn sleep(&self, msecs: u64) {
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("sleep: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
let mut watcher = **self;
do watcher.start(msecs, 0) |_, status| {
@ -845,7 +835,7 @@ fn test_simple_tcp_server_and_client() {
let addr = next_test_ip4();
// Start the server first so it's listening when we connect
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
@ -860,7 +850,7 @@ fn test_simple_tcp_server_and_client() {
}
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
@ -876,7 +866,7 @@ fn test_simple_udp_server_and_client() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut server_socket = (*io).udp_bind(server_addr).unwrap();
@ -891,7 +881,7 @@ fn test_simple_udp_server_and_client() {
}
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut client_socket = (*io).udp_bind(client_addr).unwrap();
@ -906,7 +896,7 @@ fn test_read_and_block() {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
@ -939,7 +929,7 @@ fn test_read_and_block() {
assert!(reads > 1);
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
@ -959,7 +949,7 @@ fn test_read_read_read() {
let addr = next_test_ip4();
static MAX: uint = 500000;
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
@ -973,7 +963,7 @@ fn test_read_read_read() {
}
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
@ -999,7 +989,7 @@ fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut client = (*io).udp_bind(client_addr).unwrap();
@ -1008,7 +998,7 @@ fn test_udp_twice() {
}
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut server = (*io).udp_bind(server_addr).unwrap();
@ -1036,7 +1026,7 @@ fn test_udp_many_read() {
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
@ -1059,7 +1049,7 @@ fn test_udp_many_read() {
}
}
do spawntask_immediately {
do spawntask {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut client_out = (*io).udp_bind(client_out_addr).unwrap();

View File

@ -677,121 +677,190 @@ fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); }
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
let (po, ch) = stream();
let ch = SharedChan::new(ch);
do spawn_unlinked {
let ch = ch.clone();
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let (po, ch) = stream();
let ch = SharedChan::new(ch);
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
do 16.times { task::yield(); }
ch.send(()); // If killed first, grandparent hangs.
let ch = ch.clone();
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
do 16.times { task::yield(); }
ch.send(()); // If killed first, grandparent hangs.
}
fail!(); // Shouldn't kill either (grand)parent or (grand)child.
}
fail!(); // Shouldn't kill either (grand)parent or (grand)child.
po.recv();
}
po.recv();
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
do spawn_unlinked { fail!(); }
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
do spawn_unlinked { fail!(); }
}
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
do spawn_supervised { fail!(); }
// Give child a chance to fail-but-not-kill-us.
do 16.times { task::yield(); }
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_unlinked_sup_fail_down() {
do spawn_supervised { block_forever(); }
fail!(); // Shouldn't leave a child hanging around.
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn {
fail!();
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
do spawn_supervised { fail!(); }
// Give child a chance to fail-but-not-kill-us.
do 16.times { task::yield(); }
}
block_forever(); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_sup_fail_down() {
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
do spawn_supervised { block_forever(); }
fail!(); // Shouldn't leave a child hanging around.
};
assert!(result.is_err());
}
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn {
fail!();
}
block_forever(); // We should get punted awake
};
assert!(result.is_err());
}
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn { block_forever(); }
fail!(); // *both* mechanisms would be wrong if this didn't kill the child
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn { block_forever(); }
fail!(); // *both* mechanisms would be wrong if this didn't kill the child
};
assert!(result.is_err());
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
// Default options are to spawn linked & unsupervised.
do spawn { fail!(); }
block_forever(); // We should get punted awake
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { fail!(); }
block_forever(); // We should get punted awake
};
assert!(result.is_err());
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
// Default options are to spawn linked & unsupervised.
do spawn { block_forever(); }
fail!();
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { block_forever(); }
fail!();
};
assert!(result.is_err());
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
// Make sure the above test is the same as this one.
let mut builder = task();
builder.linked();
do builder.spawn { block_forever(); }
fail!();
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Make sure the above test is the same as this one.
let mut builder = task();
builder.linked();
do builder.spawn { block_forever(); }
fail!();
};
assert!(result.is_err());
}
}
// A couple bonus linked failure tests - testing for failure propagation even
// when the middle task exits successfully early before kill signals are sent.
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_failure_propagate_grandchild() {
// Middle task exits; does grandparent's failure propagate across the gap?
do spawn_supervised {
do spawn_supervised { block_forever(); }
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Middle task exits; does grandparent's failure propagate across the gap?
do spawn_supervised {
do spawn_supervised { block_forever(); }
}
do 16.times { task::yield(); }
fail!();
};
assert!(result.is_err());
}
do 16.times { task::yield(); }
fail!();
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_failure_propagate_secondborn() {
// First-born child exits; does parent's failure propagate to sibling?
do spawn_supervised {
do spawn { block_forever(); } // linked
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// First-born child exits; does parent's failure propagate to sibling?
do spawn_supervised {
do spawn { block_forever(); } // linked
}
do 16.times { task::yield(); }
fail!();
};
assert!(result.is_err());
}
do 16.times { task::yield(); }
fail!();
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_failure_propagate_nephew_or_niece() {
// Our sibling exits; does our failure propagate to sibling's child?
do spawn { // linked
do spawn_supervised { block_forever(); }
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Our sibling exits; does our failure propagate to sibling's child?
do spawn { // linked
do spawn_supervised { block_forever(); }
}
do 16.times { task::yield(); }
fail!();
};
assert!(result.is_err());
}
do 16.times { task::yield(); }
fail!();
}
#[test] #[should_fail] #[ignore(cfg(windows))]
#[test] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_propagate_sibling() {
// Middle sibling exits - does eldest's failure propagate to youngest?
do spawn { // linked
do spawn { block_forever(); } // linked
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result: Result<(),()> = do try {
// Middle sibling exits - does eldest's failure propagate to youngest?
do spawn { // linked
do spawn { block_forever(); } // linked
}
do 16.times { task::yield(); }
fail!();
};
assert!(result.is_err());
}
do 16.times { task::yield(); }
fail!();
}
#[test]
@ -1149,11 +1218,15 @@ fn test_child_doesnt_ref_parent() {
fn child_no(x: uint) -> ~fn() {
return || {
if x < generations {
task::spawn(child_no(x+1));
let mut t = task();
t.unwatched();
t.spawn(child_no(x+1));
}
}
}
task::spawn(child_no(0));
let mut t = task();
t.unwatched();
t.spawn(child_no(0));
}
#[test]
@ -1167,9 +1240,9 @@ fn test_simple_newsched_spawn() {
#[test] #[ignore(cfg(windows))]
fn test_spawn_watched() {
use rt::test::{run_in_newsched_task, spawntask_try};
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result = do spawntask_try {
let result = do try {
let mut t = task();
t.unlinked();
t.watched();
@ -1189,9 +1262,9 @@ fn test_spawn_watched() {
#[test] #[ignore(cfg(windows))]
fn test_indestructible() {
use rt::test::{run_in_newsched_task, spawntask_try};
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
let result = do spawntask_try {
let result = do try {
let mut t = task();
t.watched();
t.supervised();

View File

@ -653,22 +653,16 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc,
pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
match context() {
OldTaskContext => {
spawn_raw_oldsched(opts, f)
}
TaskContext => {
spawn_raw_newsched(opts, f)
}
SchedulerContext => {
fail!("can't spawn from scheduler context")
}
GlobalContext => {
fail!("can't spawn from global context")
}
OldTaskContext => spawn_raw_oldsched(opts, f),
TaskContext => spawn_raw_newsched(opts, f),
SchedulerContext => fail!("can't spawn from scheduler context"),
GlobalContext => fail!("can't spawn from global context"),
}
}
fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised));
let indestructible = opts.indestructible;
@ -700,19 +694,11 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
}
};
let mut task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
rtdebug!("unsafe borrowed sched");
if opts.watched {
let child_wrapper = Cell::new(child_wrapper);
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, child_wrapper.take())
}
} else {
// An unwatched task is a new root in the exit-code propagation tree
~Task::new_root(&mut (*sched).stack_pool, child_wrapper)
}
let mut task = if opts.watched {
Task::build_child(child_wrapper)
} else {
// An unwatched task is a new root in the exit-code propagation tree
Task::build_root(child_wrapper)
};
if opts.notify_chan.is_some() {
@ -727,12 +713,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
}
task.name = opts.name.take();
rtdebug!("spawn calling run_task");
Scheduler::run_task(task);
rtdebug!("spawn about to take scheduler");
let sched = Local::take::<Scheduler>();
rtdebug!("took sched in spawn");
sched.schedule_task(task);
}
fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {

View File

@ -70,9 +70,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
_ => {
let mut alloc = ::ptr::null();
do Local::borrow::<Task,()> |task| {
rtdebug!("task pointer: %x, heap pointer: %x",
::borrow::to_uint(task),
::borrow::to_uint(&task.heap));
alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;

View File

@ -8,7 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// xfail-win32
// xfail-test
extern mod extra;
use std::comm::*;