559 lines
18 KiB
Rust
559 lines
18 KiB
Rust
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
|
// file at the top-level directory of this distribution and at
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
// option. This file may not be copied, modified, or distributed
|
|
// except according to those terms.
|
|
|
|
use option::*;
|
|
use sys;
|
|
use cast::transmute;
|
|
use libc::c_void;
|
|
use ptr::mut_null;
|
|
|
|
use super::work_queue::WorkQueue;
|
|
use super::stack::{StackPool, StackSegment};
|
|
use super::io::{EventLoop, EventLoopObject};
|
|
use super::context::Context;
|
|
use tls = super::thread_local_storage;
|
|
|
|
#[cfg(test)] use super::uvio::UvEventLoop;
|
|
#[cfg(test)] use unstable::run_in_bare_thread;
|
|
#[cfg(test)] use int;
|
|
|
|
/// The Scheduler is responsible for coordinating execution of Tasks
|
|
/// on a single thread. When the scheduler is running it is owned by
|
|
/// thread local storage and the running task is owned by the
|
|
/// scheduler.
|
|
pub struct Scheduler {
|
|
task_queue: WorkQueue<~Task>,
|
|
stack_pool: StackPool,
|
|
/// The event loop used to drive the scheduler and perform I/O
|
|
event_loop: ~EventLoopObject,
|
|
/// The scheduler's saved context.
|
|
/// Always valid when a task is executing, otherwise not
|
|
priv saved_context: Context,
|
|
/// The currently executing task
|
|
priv current_task: Option<~Task>,
|
|
/// A queue of jobs to perform immediately upon return from task
|
|
/// context to scheduler context.
|
|
/// XXX: This probably should be a single cleanup action and it
|
|
/// should run after a context switch, not on return from the
|
|
/// scheduler
|
|
priv cleanup_jobs: ~[CleanupJob]
|
|
}
|
|
|
|
// XXX: Some hacks to put a &fn in Scheduler without borrowck
|
|
// complaining
|
|
type UnsafeTaskReceiver = sys::Closure;
|
|
trait HackAroundBorrowCk {
|
|
static fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
|
|
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
|
|
}
|
|
impl HackAroundBorrowCk for UnsafeTaskReceiver {
|
|
static fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
|
|
unsafe { transmute(f) }
|
|
}
|
|
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
|
|
unsafe { transmute(self) }
|
|
}
|
|
}
|
|
|
|
enum CleanupJob {
|
|
RescheduleTask(~Task),
|
|
RecycleTask(~Task),
|
|
GiveTask(~Task, UnsafeTaskReceiver)
|
|
}
|
|
|
|
pub impl Scheduler {
|
|
|
|
static fn new(event_loop: ~EventLoopObject) -> Scheduler {
|
|
Scheduler {
|
|
event_loop: event_loop,
|
|
task_queue: WorkQueue::new(),
|
|
stack_pool: StackPool::new(),
|
|
saved_context: Context::empty(),
|
|
current_task: None,
|
|
cleanup_jobs: ~[]
|
|
}
|
|
}
|
|
|
|
// XXX: This may eventually need to be refactored so that
|
|
// the scheduler itself doesn't have to call event_loop.run.
|
|
// That will be important for embedding the runtime into external
|
|
// event loops.
|
|
fn run(~self) -> ~Scheduler {
|
|
fail_unless!(!self.in_task_context());
|
|
|
|
// Give ownership of the scheduler (self) to the thread
|
|
do self.install |scheduler| {
|
|
fn run_scheduler_once() {
|
|
do Scheduler::local |scheduler| {
|
|
if scheduler.resume_task_from_queue() {
|
|
// Ok, a task ran. Nice! We'll do it again later
|
|
scheduler.event_loop.callback(run_scheduler_once);
|
|
}
|
|
}
|
|
}
|
|
|
|
scheduler.event_loop.callback(run_scheduler_once);
|
|
scheduler.event_loop.run();
|
|
}
|
|
}
|
|
|
|
fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler {
|
|
let mut tlsched = ThreadLocalScheduler::new();
|
|
tlsched.put_scheduler(self);
|
|
{
|
|
let sched = tlsched.get_scheduler();
|
|
f(sched);
|
|
}
|
|
return tlsched.take_scheduler();
|
|
}
|
|
|
|
static fn local(f: &fn(&mut Scheduler)) {
|
|
let mut tlsched = ThreadLocalScheduler::new();
|
|
f(tlsched.get_scheduler());
|
|
}
|
|
|
|
// * Scheduler-context operations
|
|
|
|
fn resume_task_from_queue(&mut self) -> bool {
|
|
fail_unless!(!self.in_task_context());
|
|
|
|
let mut self = self;
|
|
match self.task_queue.pop_front() {
|
|
Some(task) => {
|
|
self.resume_task_immediately(task);
|
|
return true;
|
|
}
|
|
None => {
|
|
rtdebug!("no tasks in queue");
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn resume_task_immediately(&mut self, task: ~Task) {
|
|
fail_unless!(!self.in_task_context());
|
|
|
|
rtdebug!("scheduling a task");
|
|
|
|
// Store the task in the scheduler so it can be grabbed later
|
|
self.current_task = Some(task);
|
|
self.swap_in_task();
|
|
// The running task should have passed ownership elsewhere
|
|
fail_unless!(self.current_task.is_none());
|
|
|
|
// Running tasks may have asked us to do some cleanup
|
|
self.run_cleanup_jobs();
|
|
}
|
|
|
|
|
|
// * Task-context operations
|
|
|
|
/// Called by a running task to end execution, after which it will
|
|
/// be recycled by the scheduler for reuse in a new task.
|
|
fn terminate_current_task(&mut self) {
|
|
fail_unless!(self.in_task_context());
|
|
|
|
rtdebug!("ending running task");
|
|
|
|
let dead_task = self.current_task.swap_unwrap();
|
|
self.enqueue_cleanup_job(RecycleTask(dead_task));
|
|
let dead_task = self.task_from_last_cleanup_job();
|
|
self.swap_out_task(dead_task);
|
|
}
|
|
|
|
/// Block a running task, context switch to the scheduler, then pass the
|
|
/// blocked task to a closure.
|
|
///
|
|
/// # Safety note
|
|
///
|
|
/// The closure here is a *stack* closure that lives in the
|
|
/// running task. It gets transmuted to the scheduler's lifetime
|
|
/// and called while the task is blocked.
|
|
fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
|
|
fail_unless!(self.in_task_context());
|
|
|
|
rtdebug!("blocking task");
|
|
|
|
let blocked_task = self.current_task.swap_unwrap();
|
|
let f_fake_region = unsafe {
|
|
transmute::<&fn(&mut Scheduler, ~Task),
|
|
&fn(&mut Scheduler, ~Task)>(f)
|
|
};
|
|
let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
|
|
self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
|
|
let blocked_task = self.task_from_last_cleanup_job();
|
|
|
|
self.swap_out_task(blocked_task);
|
|
}
|
|
|
|
/// Switch directly to another task, without going through the scheduler.
|
|
/// You would want to think hard about doing this, e.g. if there are
|
|
/// pending I/O events it would be a bad idea.
|
|
fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
|
|
fail_unless!(self.in_task_context());
|
|
|
|
rtdebug!("switching tasks");
|
|
|
|
let old_running_task = self.current_task.swap_unwrap();
|
|
self.enqueue_cleanup_job(RescheduleTask(old_running_task));
|
|
let old_running_task = self.task_from_last_cleanup_job();
|
|
|
|
self.current_task = Some(next_task);
|
|
self.swap_in_task_from_running_task(old_running_task);
|
|
}
|
|
|
|
|
|
// * Context switching
|
|
|
|
// NB: When switching to a task callers are expected to first set
|
|
// self.running_task. When switching away from a task likewise move
|
|
// out of the self.running_task
|
|
|
|
priv fn swap_in_task(&mut self) {
|
|
// Take pointers to both the task and scheduler's saved registers.
|
|
let running_task: &~Task = self.current_task.get_ref();
|
|
let task_context = &running_task.saved_context;
|
|
let scheduler_context = &mut self.saved_context;
|
|
|
|
// Context switch to the task, restoring it's registers
|
|
// and saving the scheduler's
|
|
Context::swap(scheduler_context, task_context);
|
|
}
|
|
|
|
priv fn swap_out_task(&mut self, running_task: &mut Task) {
|
|
let task_context = &mut running_task.saved_context;
|
|
let scheduler_context = &self.saved_context;
|
|
Context::swap(task_context, scheduler_context);
|
|
}
|
|
|
|
priv fn swap_in_task_from_running_task(&mut self,
|
|
running_task: &mut Task) {
|
|
let running_task_context = &mut running_task.saved_context;
|
|
let next_context = &self.current_task.get_ref().saved_context;
|
|
Context::swap(running_task_context, next_context);
|
|
}
|
|
|
|
|
|
// * Other stuff
|
|
|
|
fn in_task_context(&self) -> bool { self.current_task.is_some() }
|
|
|
|
fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
|
|
self.cleanup_jobs.unshift(job);
|
|
}
|
|
|
|
fn run_cleanup_jobs(&mut self) {
|
|
fail_unless!(!self.in_task_context());
|
|
rtdebug!("running cleanup jobs");
|
|
|
|
while !self.cleanup_jobs.is_empty() {
|
|
match self.cleanup_jobs.pop() {
|
|
RescheduleTask(task) => {
|
|
// NB: Pushing to the *front* of the queue
|
|
self.task_queue.push_front(task);
|
|
}
|
|
RecycleTask(task) => task.recycle(&mut self.stack_pool),
|
|
GiveTask(task, f) => (f.to_fn())(self, task)
|
|
}
|
|
}
|
|
}
|
|
|
|
// XXX: Hack. This should return &self/mut but I don't know how to
|
|
// make the borrowcheck happy
|
|
fn task_from_last_cleanup_job(&mut self) -> &mut Task {
|
|
fail_unless!(!self.cleanup_jobs.is_empty());
|
|
let last_job: &self/mut CleanupJob = &mut self.cleanup_jobs[0];
|
|
let last_task: &self/Task = match last_job {
|
|
&RescheduleTask(~ref task) => task,
|
|
&RecycleTask(~ref task) => task,
|
|
&GiveTask(~ref task, _) => task,
|
|
};
|
|
// XXX: Pattern matching mutable pointers above doesn't work
|
|
// because borrowck thinks the three patterns are conflicting
|
|
// borrows
|
|
return unsafe { transmute::<&Task, &mut Task>(last_task) };
|
|
}
|
|
}
|
|
|
|
const TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
|
|
|
|
pub struct Task {
|
|
/// The segment of stack on which the task is currently running or,
|
|
/// if the task is blocked, on which the task will resume execution
|
|
priv current_stack_segment: StackSegment,
|
|
/// These are always valid when the task is not running, unless
|
|
/// the task is dead
|
|
priv saved_context: Context,
|
|
}
|
|
|
|
impl Task {
|
|
static fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
|
|
let start = Task::build_start_wrapper(start);
|
|
let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
|
|
// NB: Context holds a pointer to that ~fn
|
|
let initial_context = Context::new(start, &mut stack);
|
|
return Task {
|
|
current_stack_segment: stack,
|
|
saved_context: initial_context,
|
|
};
|
|
}
|
|
|
|
static priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
|
|
// XXX: The old code didn't have this extra allocation
|
|
let wrapper: ~fn() = || {
|
|
start();
|
|
|
|
let mut sched = ThreadLocalScheduler::new();
|
|
let sched = sched.get_scheduler();
|
|
sched.terminate_current_task();
|
|
};
|
|
return wrapper;
|
|
}
|
|
|
|
/// Destroy the task and try to reuse its components
|
|
fn recycle(~self, stack_pool: &mut StackPool) {
|
|
match self {
|
|
~Task {current_stack_segment, _} => {
|
|
stack_pool.give_segment(current_stack_segment);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// NB: This is a type so we can use make use of the &self region.
|
|
struct ThreadLocalScheduler(tls::Key);
|
|
|
|
impl ThreadLocalScheduler {
|
|
static fn new() -> ThreadLocalScheduler {
|
|
unsafe {
|
|
// NB: This assumes that the TLS key has been created prior.
|
|
// Currently done in rust_start.
|
|
let key: *mut c_void = rust_get_sched_tls_key();
|
|
let key: &mut tls::Key = transmute(key);
|
|
ThreadLocalScheduler(*key)
|
|
}
|
|
}
|
|
|
|
fn put_scheduler(&mut self, scheduler: ~Scheduler) {
|
|
unsafe {
|
|
let key = match self { &ThreadLocalScheduler(key) => key };
|
|
let value: *mut c_void =
|
|
transmute::<~Scheduler, *mut c_void>(scheduler);
|
|
tls::set(key, value);
|
|
}
|
|
}
|
|
|
|
fn get_scheduler(&mut self) -> &self/mut Scheduler {
|
|
unsafe {
|
|
let key = match self { &ThreadLocalScheduler(key) => key };
|
|
let mut value: *mut c_void = tls::get(key);
|
|
fail_unless!(value.is_not_null());
|
|
{
|
|
let value_ptr = &mut value;
|
|
let sched: &mut ~Scheduler =
|
|
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr);
|
|
let sched: &mut Scheduler = &mut **sched;
|
|
return sched;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn take_scheduler(&mut self) -> ~Scheduler {
|
|
unsafe {
|
|
let key = match self { &ThreadLocalScheduler(key) => key };
|
|
let value: *mut c_void = tls::get(key);
|
|
fail_unless!(value.is_not_null());
|
|
let sched = transmute(value);
|
|
tls::set(key, mut_null());
|
|
return sched;
|
|
}
|
|
}
|
|
}
|
|
|
|
extern {
|
|
fn rust_get_sched_tls_key() -> *mut c_void;
|
|
}
|
|
|
|
#[test]
|
|
fn thread_local_scheduler_smoke_test() {
|
|
let scheduler = ~UvEventLoop::new_scheduler();
|
|
let mut tls_scheduler = ThreadLocalScheduler::new();
|
|
tls_scheduler.put_scheduler(scheduler);
|
|
{
|
|
let _scheduler = tls_scheduler.get_scheduler();
|
|
}
|
|
let _scheduler = tls_scheduler.take_scheduler();
|
|
}
|
|
|
|
#[test]
|
|
fn thread_local_scheduler_two_instances() {
|
|
let scheduler = ~UvEventLoop::new_scheduler();
|
|
let mut tls_scheduler = ThreadLocalScheduler::new();
|
|
tls_scheduler.put_scheduler(scheduler);
|
|
{
|
|
|
|
let _scheduler = tls_scheduler.get_scheduler();
|
|
}
|
|
{
|
|
let scheduler = tls_scheduler.take_scheduler();
|
|
tls_scheduler.put_scheduler(scheduler);
|
|
}
|
|
|
|
let mut tls_scheduler = ThreadLocalScheduler::new();
|
|
{
|
|
let _scheduler = tls_scheduler.get_scheduler();
|
|
}
|
|
let _scheduler = tls_scheduler.take_scheduler();
|
|
}
|
|
|
|
#[test]
|
|
fn test_simple_scheduling() {
|
|
do run_in_bare_thread {
|
|
let mut task_ran = false;
|
|
let task_ran_ptr: *mut bool = &mut task_ran;
|
|
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
let task = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe { *task_ran_ptr = true; }
|
|
};
|
|
sched.task_queue.push_back(task);
|
|
sched.run();
|
|
fail_unless!(task_ran);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_several_tasks() {
|
|
do run_in_bare_thread {
|
|
let total = 10;
|
|
let mut task_count = 0;
|
|
let task_count_ptr: *mut int = &mut task_count;
|
|
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
for int::range(0, total) |_| {
|
|
let task = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe { *task_count_ptr = *task_count_ptr + 1; }
|
|
};
|
|
sched.task_queue.push_back(task);
|
|
}
|
|
sched.run();
|
|
fail_unless!(task_count == total);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_swap_tasks() {
|
|
do run_in_bare_thread {
|
|
let mut count = 0;
|
|
let count_ptr: *mut int = &mut count;
|
|
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
let task1 = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe { *count_ptr = *count_ptr + 1; }
|
|
do Scheduler::local |sched| {
|
|
let task2 = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe { *count_ptr = *count_ptr + 1; }
|
|
};
|
|
// Context switch directly to the new task
|
|
sched.resume_task_from_running_task_direct(task2);
|
|
}
|
|
unsafe { *count_ptr = *count_ptr + 1; }
|
|
};
|
|
sched.task_queue.push_back(task1);
|
|
sched.run();
|
|
fail_unless!(count == 3);
|
|
}
|
|
}
|
|
|
|
#[bench] #[test] #[ignore(reason = "long test")]
|
|
fn test_run_a_lot_of_tasks_queued() {
|
|
do run_in_bare_thread {
|
|
const MAX: int = 1000000;
|
|
let mut count = 0;
|
|
let count_ptr: *mut int = &mut count;
|
|
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
|
|
let start_task = ~do Task::new(&mut sched.stack_pool) {
|
|
run_task(count_ptr);
|
|
};
|
|
sched.task_queue.push_back(start_task);
|
|
sched.run();
|
|
|
|
fail_unless!(count == MAX);
|
|
|
|
fn run_task(count_ptr: *mut int) {
|
|
do Scheduler::local |sched| {
|
|
let task = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe {
|
|
*count_ptr = *count_ptr + 1;
|
|
if *count_ptr != MAX {
|
|
run_task(count_ptr);
|
|
}
|
|
}
|
|
};
|
|
sched.task_queue.push_back(task);
|
|
}
|
|
};
|
|
}
|
|
}
|
|
|
|
#[bench] #[test] #[ignore(reason = "too much stack allocation")]
|
|
fn test_run_a_lot_of_tasks_direct() {
|
|
do run_in_bare_thread {
|
|
const MAX: int = 100000;
|
|
let mut count = 0;
|
|
let count_ptr: *mut int = &mut count;
|
|
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
|
|
let start_task = ~do Task::new(&mut sched.stack_pool) {
|
|
run_task(count_ptr);
|
|
};
|
|
sched.task_queue.push_back(start_task);
|
|
sched.run();
|
|
|
|
fail_unless!(count == MAX);
|
|
|
|
fn run_task(count_ptr: *mut int) {
|
|
do Scheduler::local |sched| {
|
|
let task = ~do Task::new(&mut sched.stack_pool) {
|
|
unsafe {
|
|
*count_ptr = *count_ptr + 1;
|
|
if *count_ptr != MAX {
|
|
run_task(count_ptr);
|
|
}
|
|
}
|
|
};
|
|
// Context switch directly to the new task
|
|
sched.resume_task_from_running_task_direct(task);
|
|
}
|
|
};
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_block_task() {
|
|
do run_in_bare_thread {
|
|
let mut sched = ~UvEventLoop::new_scheduler();
|
|
let task = ~do Task::new(&mut sched.stack_pool) {
|
|
do Scheduler::local |sched| {
|
|
fail_unless!(sched.in_task_context());
|
|
do sched.block_running_task_and_then() |sched, task| {
|
|
fail_unless!(!sched.in_task_context());
|
|
sched.task_queue.push_back(task);
|
|
}
|
|
}
|
|
};
|
|
sched.task_queue.push_back(task);
|
|
sched.run();
|
|
}
|
|
}
|