rust/src/libcore/rt/sched.rs

557 lines
19 KiB
Rust
Raw Normal View History

// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use sys;
use cast::transmute;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::rtio::{EventLoop, EventLoopObject};
use super::context::Context;
use super::local_services::LocalServices;
use cell::Cell;
// A more convenient name for external callers, e.g. `local_sched::take()`
pub mod local_sched;
2013-05-12 15:26:19 -07:00
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
/// thread local storage and the running task is owned by the
/// scheduler.
pub struct Scheduler {
2013-05-12 15:26:19 -07:00
priv work_queue: WorkQueue<~Coroutine>,
stack_pool: StackPool,
/// The event loop used to drive the scheduler and perform I/O
event_loop: ~EventLoopObject,
/// The scheduler's saved context.
/// Always valid when a task is executing, otherwise not
priv saved_context: Context,
/// The currently executing task
2013-05-12 15:26:19 -07:00
current_task: Option<~Coroutine>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>
}
// XXX: Some hacks to put a &fn in Scheduler without borrowck
// complaining
type UnsafeTaskReceiver = sys::Closure;
2013-04-15 18:56:39 -07:00
trait ClosureConverter {
2013-05-12 15:26:19 -07:00
fn from_fn(&fn(~Coroutine)) -> Self;
fn to_fn(self) -> &fn(~Coroutine);
}
2013-04-15 18:56:39 -07:00
impl ClosureConverter for UnsafeTaskReceiver {
2013-05-12 15:26:19 -07:00
fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
}
enum CleanupJob {
DoNothing,
2013-05-12 15:26:19 -07:00
GiveTask(~Coroutine, UnsafeTaskReceiver)
}
pub impl Scheduler {
fn in_task_context(&self) -> bool { self.current_task.is_some() }
fn new(event_loop: ~EventLoopObject) -> Scheduler {
// Lazily initialize the global state, currently the scheduler TLS key
unsafe { rust_initialize_global_state(); }
extern {
fn rust_initialize_global_state();
}
Scheduler {
event_loop: event_loop,
work_queue: WorkQueue::new(),
stack_pool: StackPool::new(),
saved_context: Context::empty(),
current_task: None,
cleanup_job: None
}
}
// XXX: This may eventually need to be refactored so that
// the scheduler itself doesn't have to call event_loop.run.
// That will be important for embedding the runtime into external
// event loops.
fn run(~self) -> ~Scheduler {
2013-03-28 18:39:09 -07:00
assert!(!self.in_task_context());
let mut self_sched = self;
unsafe {
let event_loop: *mut ~EventLoopObject = {
let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
event_loop
};
// Give ownership of the scheduler (self) to the thread
local_sched::put(self_sched);
(*event_loop).run();
}
let sched = local_sched::take();
assert!(sched.work_queue.is_empty());
return sched;
}
/// Schedule a task to be executed later.
///
/// Pushes the task onto the work stealing queue and tells the event loop
/// to run it later. Always use this instead of pushing to the work queue
/// directly.
2013-05-12 15:26:19 -07:00
fn enqueue_task(&mut self, task: ~Coroutine) {
self.work_queue.push_front(task);
self.event_loop.callback(resume_task_from_queue);
fn resume_task_from_queue() {
let scheduler = local_sched::take();
scheduler.resume_task_from_queue();
}
}
// * Scheduler-context operations
fn resume_task_from_queue(~self) {
2013-03-28 18:39:09 -07:00
assert!(!self.in_task_context());
rtdebug!("looking in work queue for task to schedule");
let mut this = self;
match this.work_queue.pop_front() {
Some(task) => {
rtdebug!("resuming task from work queue");
this.resume_task_immediately(task);
}
None => {
rtdebug!("no tasks in queue");
local_sched::put(this);
}
}
}
// * Task-context operations
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
fn terminate_current_task(~self) {
assert!(self.in_task_context());
rtdebug!("ending running task");
do self.deschedule_running_task_and_then |dead_task| {
let dead_task = Cell(dead_task);
do local_sched::borrow |sched| {
dead_task.take().recycle(&mut sched.stack_pool);
}
}
abort!("control reached end of task");
}
2013-05-12 15:26:19 -07:00
fn schedule_new_task(~self, task: ~Coroutine) {
assert!(self.in_task_context());
do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task);
do local_sched::borrow |sched| {
sched.enqueue_task(last_task.take());
}
}
}
fn schedule_task(~self, task: ~Coroutine) {
assert!(self.in_task_context());
do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task);
do local_sched::borrow |sched| {
sched.enqueue_task(last_task.take());
}
}
}
// Core scheduling ops
2013-05-12 15:26:19 -07:00
fn resume_task_immediately(~self, task: ~Coroutine) {
let mut this = self;
assert!(!this.in_task_context());
rtdebug!("scheduling a task");
// Store the task in the scheduler so it can be grabbed later
this.current_task = Some(task);
this.enqueue_cleanup_job(DoNothing);
local_sched::put(this);
// Take pointers to both the task and scheduler's saved registers.
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, _, next_task_context) = (*sched).get_contexts();
let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers
// and saving the scheduler's
Context::swap(sched_context, next_task_context);
let sched = local_sched::unsafe_borrow();
// The running task should have passed ownership elsewhere
assert!((*sched).current_task.is_none());
// Running tasks may have asked us to do some cleanup
(*sched).run_cleanup_job();
}
}
/// Block a running task, context switch to the scheduler, then pass the
/// blocked task to a closure.
///
/// # Safety note
///
/// The closure here is a *stack* closure that lives in the
/// running task. It gets transmuted to the scheduler's lifetime
/// and called while the task is blocked.
2013-05-12 15:26:19 -07:00
fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
rtdebug!("blocking task");
unsafe {
let blocked_task = this.current_task.swap_unwrap();
2013-05-12 15:26:19 -07:00
let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
local_sched::put(this);
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, last_task_context, _) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
(*sched).run_cleanup_job();
}
}
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
2013-05-12 15:26:19 -07:00
fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
rtdebug!("switching tasks");
let old_running_task = this.current_task.swap_unwrap();
2013-05-12 15:26:19 -07:00
let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
this.current_task = Some(next_task);
local_sched::put(this);
unsafe {
let sched = local_sched::unsafe_borrow();
let (_, last_task_context, next_task_context) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
(*sched).run_cleanup_job();
}
}
// * Other stuff
fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
assert!(self.cleanup_job.is_none());
self.cleanup_job = Some(job);
}
fn run_cleanup_job(&mut self) {
rtdebug!("running cleanup job");
assert!(self.cleanup_job.is_some());
let cleanup_job = self.cleanup_job.swap_unwrap();
match cleanup_job {
DoNothing => { }
GiveTask(task, f) => (f.to_fn())(task)
}
}
/// Get mutable references to all the contexts that may be involved in a
/// context switch.
///
/// Returns (the scheduler context, the optional context of the
/// task in the cleanup list, the optional context of the task in
/// the current task slot). When context switching to a task,
/// callers should first arrange for that task to be located in the
/// Scheduler's current_task slot and set up the
/// post-context-switch cleanup job.
fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
Option<&'a mut Context>,
Option<&'a mut Context>) {
let last_task = match self.cleanup_job {
Some(GiveTask(~ref task, _)) => {
Some(task)
}
Some(DoNothing) => {
None
}
None => fail!("all context switches should have a cleanup job")
2013-04-10 13:11:35 -07:00
};
// XXX: Pattern matching mutable pointers above doesn't work
// because borrowck thinks the three patterns are conflicting
// borrows
unsafe {
2013-05-12 15:26:19 -07:00
let last_task = transmute::<Option<&Coroutine>, Option<&mut Coroutine>>(last_task);
let last_task_context = match last_task {
2013-03-15 15:24:24 -04:00
Some(t) => Some(&mut t.saved_context), None => None
};
let next_task_context = match self.current_task {
Some(ref mut t) => Some(&mut t.saved_context), None => None
};
// XXX: These transmutes can be removed after snapshot
return (transmute(&mut self.saved_context),
last_task_context,
transmute(next_task_context));
}
2013-04-10 13:11:35 -07:00
}
}
2013-05-12 15:26:19 -07:00
static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
2013-05-12 15:26:19 -07:00
pub struct Coroutine {
/// The segment of stack on which the task is currently running or,
/// if the task is blocked, on which the task will resume execution
priv current_stack_segment: StackSegment,
/// These are always valid when the task is not running, unless
/// the task is dead
priv saved_context: Context,
/// The heap, GC, unwinding, local storage, logging
local_services: LocalServices
}
2013-05-12 15:26:19 -07:00
pub impl Coroutine {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
Coroutine::with_local(stack_pool, LocalServices::new(), start)
}
fn with_local(stack_pool: &mut StackPool,
local_services: LocalServices,
2013-05-12 15:26:19 -07:00
start: ~fn()) -> Coroutine {
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
2013-03-12 00:48:41 -07:00
let initial_context = Context::new(start, &mut stack);
2013-05-12 15:26:19 -07:00
return Coroutine {
current_stack_segment: stack,
saved_context: initial_context,
local_services: local_services
};
}
priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
// XXX: The old code didn't have this extra allocation
let wrapper: ~fn() = || {
// This is the first code to execute after the initial
// context switch to the task. The previous context may
// have asked us to do some cleanup.
unsafe {
let sched = local_sched::unsafe_borrow();
(*sched).run_cleanup_job();
let sched = local_sched::unsafe_borrow();
let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in another closure
task.local_services.run(||start());
}
let sched = local_sched::take();
sched.terminate_current_task();
};
return wrapper;
}
/// Destroy the task and try to reuse its components
fn recycle(~self, stack_pool: &mut StackPool) {
match self {
2013-05-12 15:26:19 -07:00
~Coroutine {current_stack_segment, _} => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
#[cfg(test)]
mod test {
use int;
use cell::Cell;
use rt::uv::uvio::UvEventLoop;
use unstable::run_in_bare_thread;
use task::spawn;
use rt::test::*;
use super::*;
#[test]
fn test_simple_scheduling() {
do run_in_bare_thread {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
let mut sched = ~UvEventLoop::new_scheduler();
2013-05-12 15:26:19 -07:00
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *task_ran_ptr = true; }
};
sched.enqueue_task(task);
sched.run();
assert!(task_ran);
}
}
#[test]
fn test_several_tasks() {
do run_in_bare_thread {
let total = 10;
let mut task_count = 0;
let task_count_ptr: *mut int = &mut task_count;
let mut sched = ~UvEventLoop::new_scheduler();
for int::range(0, total) |_| {
2013-05-12 15:26:19 -07:00
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *task_count_ptr = *task_count_ptr + 1; }
};
sched.enqueue_task(task);
}
sched.run();
assert!(task_count == total);
}
}
#[test]
fn test_swap_tasks_then() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
2013-05-12 15:26:19 -07:00
let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take();
2013-05-12 15:26:19 -07:00
let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1);
do local_sched::borrow |sched| {
sched.enqueue_task(task1.take());
}
}
unsafe { *count_ptr = *count_ptr + 1; }
};
sched.enqueue_task(task1);
sched.run();
assert!(count == 3);
}
}
#[bench] #[test] #[ignore(reason = "long test")]
fn test_run_a_lot_of_tasks_queued() {
do run_in_bare_thread {
static MAX: int = 1000000;
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
2013-05-12 15:26:19 -07:00
let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.enqueue_task(start_task);
sched.run();
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
do local_sched::borrow |sched| {
2013-05-12 15:26:19 -07:00
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
}
};
sched.enqueue_task(task);
}
};
}
}
#[test]
fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
2013-05-12 15:26:19 -07:00
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let sched = local_sched::take();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
}
}
};
sched.enqueue_task(task);
sched.run();
}
}
#[test]
fn test_io_callback() {
// This is a regression test that when there are no schedulable tasks
// in the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and doesn't
// exit before emptying the work queue
do run_in_newsched_task {
do spawn {
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let mut sched = local_sched::take();
let task = Cell(task);
do sched.event_loop.callback_ms(10) {
rtdebug!("in callback");
let mut sched = local_sched::take();
sched.enqueue_task(task.take());
local_sched::put(sched);
}
local_sched::put(sched);
}
}
}
}
}