A round of code cleaning for the primary scheduler code. Comments have been updated, a minor amount of support type restructing has happened, methods have been reordered, and some duplicate code has been purged.

This commit is contained in:
toddaaro 2013-08-15 19:46:23 -07:00
parent f83835b0e7
commit 20213fcca4
3 changed files with 206 additions and 280 deletions

View File

@ -31,10 +31,11 @@ use rand::{XorShiftRng, RngUtil};
use iterator::{range};
use vec::{OwnedVector};
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
/// thread local storage and the running task is owned by the
/// scheduler.
/// A scheduler is responsible for coordinating the execution of Tasks
/// on a single thread. The scheduler runs inside a slightly modified
/// Rust Task. When not running this task is stored in the scheduler
/// struct. The scheduler struct acts like a baton, all scheduling
/// actions are transfers of the baton.
///
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
@ -64,11 +65,12 @@ pub struct Scheduler {
stack_pool: StackPool,
/// The event loop used to drive the scheduler and perform I/O
event_loop: ~EventLoopObject,
/// The scheduler runs on a special task.
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
sched_task: Option<~Task>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>,
cleanup_job: Option<CleanupJob>,
metrics: SchedMetrics,
/// Should this scheduler run any task, or only pinned tasks?
run_anything: bool,
@ -81,27 +83,9 @@ pub struct Scheduler {
idle_callback: ~PausibleIdleCallback
}
enum CleanupJob {
DoNothing,
GiveTask(~Task, UnsafeTaskReceiver)
}
pub struct SchedHandle {
priv remote: ~RemoteCallbackObject,
priv queue: MessageQueue<SchedMessage>,
sched_id: uint
}
pub enum SchedMessage {
Wake,
Shutdown,
PinnedTask(~Task),
TaskFromFriend(~Task)
}
impl Scheduler {
pub fn sched_id(&self) -> uint { to_uint(self) }
// * Initialization Functions
pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
@ -115,8 +99,6 @@ impl Scheduler {
}
// When you create a scheduler it isn't yet "in" a task, so the
// task field is None.
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
@ -185,7 +167,6 @@ impl Scheduler {
let sched = Local::take::<Scheduler>();
rtdebug!("starting scheduler %u", sched.sched_id());
sched.run();
// Now that we are done with the scheduler, clean up the
@ -231,11 +212,11 @@ impl Scheduler {
}
}
// One iteration of the scheduler loop, always run at least once.
// * Execution Functions - Core Loop Logic
// The model for this function is that you continue through it
// until you either use the scheduler while performing a schedule
// action, in which case you give it away and do not return, or
// action, in which case you give it away and return early, or
// you reach the end and sleep. In the case that a scheduler
// action is performed the loop is evented such that this function
// is called again.
@ -251,39 +232,18 @@ impl Scheduler {
// end of this function without performing an action.
sched.idle_callback.resume();
// Our first task is to read mail to see if we have important
// messages.
// 1) A wake message is easy, mutate sched struct and return
// it.
// 2) A shutdown is also easy, shutdown.
// 3) A pinned task - we resume immediately and do not return
// here.
// 4) A message from another scheduler with a non-homed task
// to run here.
let result = sched.interpret_message_queue();
let sched = match result {
Some(sched) => {
// We did not resume a task, so we returned.
sched
}
None => {
return;
}
// First we check for scheduler messages, these are higher
// priority than regular tasks.
let sched = match sched.interpret_message_queue() {
Some(sched) => sched,
None => return
};
// Second activity is to try resuming a task from the queue.
let result = sched.do_work();
let mut sched = match result {
Some(sched) => {
// Failed to dequeue a task, so we return.
sched
}
None => {
return;
}
// This helper will use a randomized work-stealing algorithm
// to find work.
let mut sched = match sched.do_work() {
Some(sched) => sched,
None => return
};
// If we got here then there was no work to do.
@ -310,67 +270,10 @@ impl Scheduler {
Local::put(sched);
}
pub fn make_handle(&mut self) -> SchedHandle {
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
return SchedHandle {
remote: remote,
queue: self.message_queue.clone(),
sched_id: self.sched_id()
};
}
/// Schedule a task to be executed later.
///
/// Pushes the task onto the work stealing queue and tells the
/// event loop to run it later. Always use this instead of pushing
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Task) {
let this = self;
rtdebug!("enqueuing task");
// We push the task onto our local queue clone.
this.work_queue.push(task);
// There is definitely work to be done later. Make sure we wake up for it.
this.idle_callback.resume();
// We've made work available. Notify a
// sleeping scheduler.
// XXX: perf. Check for a sleeper without
// synchronizing memory. It's not critical
// that we always find it.
// XXX: perf. If there's a sleeper then we
// might as well just send it the task
// directly instead of pushing it to the
// queue. That is essentially the intent here
// and it is less work.
match this.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
}
None => { (/* pass */) }
};
}
/// As enqueue_task, but with the possibility for the blocked task to
/// already have been killed.
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
do blocked_task.wake().map_move |task| {
self.enqueue_task(task);
};
}
// * Scheduler-context operations
// This function returns None if the scheduler is "used", or it
// returns the still-available scheduler. Note: currently
// considers *any* message receive a use and returns None.
// returns the still-available scheduler. At this point all
// message-handling will count as a turn of work, and as a result
// return None.
fn interpret_message_queue(~self) -> Option<~Scheduler> {
let mut this = self;
@ -383,7 +286,8 @@ impl Scheduler {
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
this.sched_schedule_task(task).map_move(Local::put);
this.process_task(task,
Scheduler::resume_task_immediately_cl).map_move(Local::put);
return None;
}
Some(Wake) => {
@ -411,7 +315,6 @@ impl Scheduler {
// event loop references we will shut down.
this.no_sleep = true;
this.sleepy = false;
Local::put(this);
return None;
}
@ -421,30 +324,19 @@ impl Scheduler {
}
}
/// Given an input Coroutine sends it back to its home scheduler.
fn send_task_home(task: ~Task) {
let mut task = task;
let mut home = task.take_unwrap_home();
match home {
Sched(ref mut home_handle) => {
home_handle.send(PinnedTask(task));
}
AnySched => {
rtabort!("error: cannot send anysched task home");
}
}
}
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;
/// Take a non-homed task we aren't allowed to run here and send
/// it to the designated friend scheduler to execute.
fn send_to_friend(&mut self, task: ~Task) {
rtdebug!("sending a task to friend");
match self.friend_handle {
Some(ref mut handle) => {
handle.send(TaskFromFriend(task));
rtdebug!("scheduler calling do work");
match this.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
return this.process_task(task,
Scheduler::resume_task_immediately_cl);
}
None => {
rtabort!("tried to send task to a friend but scheduler has no friends");
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
}
}
}
@ -468,8 +360,8 @@ impl Scheduler {
None => {
// Our naive stealing, try kinda hard.
rtdebug!("scheduler trying to steal");
let _len = self.work_queues.len();
return self.try_steals(2);
let len = self.work_queues.len();
return self.try_steals(len/2);
}
}
}
@ -483,7 +375,8 @@ impl Scheduler {
let work_queues = &mut self.work_queues;
match work_queues[index].steal() {
Some(task) => {
rtdebug!("found task by stealing"); return Some(task)
rtdebug!("found task by stealing");
return Some(task)
}
None => ()
}
@ -492,8 +385,11 @@ impl Scheduler {
return None;
}
// Given a task, execute it correctly.
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
// * Task Routing Functions - Make sure tasks send up in the right
// place.
fn process_task(~self, task: ~Task,
schedule_fn: SchedulingFn) -> Option<~Scheduler> {
let mut this = self;
let mut task = task;
@ -510,15 +406,13 @@ impl Scheduler {
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
return schedule_fn(this, task);
}
}
AnySched if this.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
return schedule_fn(this, task);
}
AnySched => {
rtdebug!("sending task to friend");
@ -529,98 +423,71 @@ impl Scheduler {
}
}
// Bundle the helpers together.
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;
fn send_task_home(task: ~Task) {
let mut task = task;
let mut home = task.take_unwrap_home();
match home {
Sched(ref mut home_handle) => {
home_handle.send(PinnedTask(task));
}
AnySched => {
rtabort!("error: cannot send anysched task home");
}
}
}
rtdebug!("scheduler calling do work");
match this.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
return this.process_task(task);
/// Take a non-homed task we aren't allowed to run here and send
/// it to the designated friend scheduler to execute.
fn send_to_friend(&mut self, task: ~Task) {
rtdebug!("sending a task to friend");
match self.friend_handle {
Some(ref mut handle) => {
handle.send(TaskFromFriend(task));
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
rtabort!("tried to send task to a friend but scheduler has no friends");
}
}
}
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
pub fn terminate_current_task(~self) {
// Similar to deschedule running task and then, but cannot go through
// the task-blocking path. The task is already dying.
let mut this = self;
let stask = this.sched_task.take_unwrap();
do this.change_task_context(stask) |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
}
/// Schedule a task to be executed later.
///
/// Pushes the task onto the work stealing queue and tells the
/// event loop to run it later. Always use this instead of pushing
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Task) {
// Scheduling a task requires a few checks to make sure the task
// ends up in the appropriate location. The run_anything flag on
// the scheduler and the home on the task need to be checked. This
// helper performs that check. It takes a function that specifies
// how to queue the the provided task if that is the correct
// action. This is a "core" function that requires handling the
// returned Option correctly.
let this = self;
pub fn schedule_task(~self, task: ~Task,
schedule_fn: ~fn(sched: ~Scheduler, task: ~Task))
-> Option<~Scheduler> {
// We push the task onto our local queue clone.
this.work_queue.push(task);
this.idle_callback.resume();
// is the task home?
let is_home = task.is_home_no_tls(&self);
// We've made work available. Notify a
// sleeping scheduler.
// does the task have a home?
let homed = task.homed();
let mut this = self;
if is_home || (!homed && this.run_anything) {
// here we know we are home, execute now OR we know we
// aren't homed, and that this sched doesn't care
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
schedule_fn(this, task);
return None;
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
this.send_to_friend(task);
return Some(this);
} else {
// task isn't home, so don't run it here, send it home
Scheduler::send_task_home(task);
return Some(this);
}
}
// There are two contexts in which schedule_task can be called:
// inside the scheduler, and inside a task. These contexts handle
// executing the task slightly differently. In the scheduler
// context case we want to receive the scheduler as an input, and
// manually deal with the option. In the task context case we want
// to use TLS to find the scheduler, and deal with the option
// inside the helper.
pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
do self.schedule_task(task) |sched, next_task| {
sched.resume_task_immediately(next_task);
}
}
// Task context case - use TLS.
pub fn run_task(task: ~Task) {
let sched = Local::take::<Scheduler>();
let opt = do sched.schedule_task(task) |sched, next_task| {
do sched.switch_running_tasks_and_then(next_task) |sched, last_task| {
sched.enqueue_blocked_task(last_task);
// XXX: perf. Check for a sleeper without
// synchronizing memory. It's not critical
// that we always find it.
match this.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
}
None => { (/* pass */) }
};
opt.map_move(Local::put);
}
/// As enqueue_task, but with the possibility for the blocked task to
/// already have been killed.
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
do blocked_task.wake().map_move |task| {
self.enqueue_task(task);
};
}
// * Core Context Switching Functions
// The primary function for changing contexts. In the current
// design the scheduler is just a slightly modified GreenTask, so
// all context swaps are from Task to Task. The only difference
@ -650,7 +517,7 @@ impl Scheduler {
// The current task is placed inside an enum with the cleanup
// function. This enum is then placed inside the scheduler.
this.enqueue_cleanup_job(GiveTask(current_task, f_opaque));
this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
// The scheduler is then placed inside the next task.
let mut next_task = next_task;
@ -666,12 +533,9 @@ impl Scheduler {
transmute_mut_region(*next_task.sched.get_mut_ref());
let current_task: &mut Task = match sched.cleanup_job {
Some(GiveTask(ref task, _)) => {
Some(CleanupJob { task: ref task, _ }) => {
transmute_mut_region(*transmute_mut_unsafe(task))
}
Some(DoNothing) => {
rtabort!("no next task");
}
None => {
rtabort!("no cleanup job");
}
@ -705,19 +569,42 @@ impl Scheduler {
}
}
// Old API for task manipulation implemented over the new core
// function.
pub fn resume_task_immediately(~self, task: ~Task) {
do self.change_task_context(task) |sched, stask| {
sched.sched_task = Some(stask);
// Returns a mutable reference to both contexts involved in this
// swap. This is unsafe - we are getting mutable internal
// references to keep even when we don't own the tasks. It looks
// kinda safe because we are doing transmutes before passing in
// the arguments.
pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
(&'a mut Context, &'a mut Context) {
let current_task_context =
&mut current_task.coroutine.get_mut_ref().saved_context;
let next_task_context =
&mut next_task.coroutine.get_mut_ref().saved_context;
unsafe {
(transmute_mut_region(current_task_context),
transmute_mut_region(next_task_context))
}
}
// * Context Swapping Helpers - Here be ugliness!
pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
do self.change_task_context(task) |sched, stask| {
sched.sched_task = Some(stask);
}
return None;
}
fn resume_task_immediately_cl(sched: ~Scheduler,
task: ~Task) -> Option<~Scheduler> {
sched.resume_task_immediately(task)
}
pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
match blocked_task.wake() {
Some(task) => self.resume_task_immediately(task),
None => Local::put(self),
Some(task) => { self.resume_task_immediately(task); }
None => Local::put(self)
};
}
@ -756,54 +643,75 @@ impl Scheduler {
}
}
// A helper that looks up the scheduler and runs a task later by
// enqueuing it.
fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
do sched.switch_running_tasks_and_then(task) |sched, last_task| {
sched.enqueue_blocked_task(last_task);
};
return None;
}
// * Task Context Helpers
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
pub fn terminate_current_task(~self) {
// Similar to deschedule running task and then, but cannot go through
// the task-blocking path. The task is already dying.
let mut this = self;
let stask = this.sched_task.take_unwrap();
do this.change_task_context(stask) |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
}
pub fn run_task(task: ~Task) {
let sched = Local::take::<Scheduler>();
sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
}
pub fn run_task_later(next_task: ~Task) {
// We aren't performing a scheduler operation, so we want to
// put the Scheduler back when we finish.
let next_task = Cell::new(next_task);
do Local::borrow::<Scheduler,()> |sched| {
sched.enqueue_task(next_task.take());
};
}
// Returns a mutable reference to both contexts involved in this
// swap. This is unsafe - we are getting mutable internal
// references to keep even when we don't own the tasks. It looks
// kinda safe because we are doing transmutes before passing in
// the arguments.
pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
(&'a mut Context, &'a mut Context) {
let current_task_context =
&mut current_task.coroutine.get_mut_ref().saved_context;
let next_task_context =
&mut next_task.coroutine.get_mut_ref().saved_context;
unsafe {
(transmute_mut_region(current_task_context),
transmute_mut_region(next_task_context))
}
}
// * Utility Functions
pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
self.cleanup_job = Some(job);
}
pub fn sched_id(&self) -> uint { to_uint(self) }
pub fn run_cleanup_job(&mut self) {
rtdebug!("running cleanup job");
let cleanup_job = self.cleanup_job.take_unwrap();
match cleanup_job {
DoNothing => { }
GiveTask(task, f) => f.to_fn()(self, task)
}
cleanup_job.run(self);
}
pub fn make_handle(&mut self) -> SchedHandle {
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
return SchedHandle {
remote: remote,
queue: self.message_queue.clone(),
sched_id: self.sched_id()
};
}
}
// The cases for the below function.
enum ResumeAction {
SendHome,
Requeue,
ResumeNow,
Homeless
// Supporting types
type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
pub enum SchedMessage {
Wake,
Shutdown,
PinnedTask(~Task),
TaskFromFriend(~Task)
}
pub struct SchedHandle {
priv remote: ~RemoteCallbackObject,
priv queue: MessageQueue<SchedMessage>,
sched_id: uint
}
impl SchedHandle {
@ -813,6 +721,25 @@ impl SchedHandle {
}
}
struct CleanupJob {
task: ~Task,
f: UnsafeTaskReceiver
}
impl CleanupJob {
pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
CleanupJob {
task: task,
f: f
}
}
pub fn run(self, sched: &mut Scheduler) {
let CleanupJob { task: task, f: f } = self;
f.to_fn()(sched, task)
}
}
// XXX: Some hacks to put a &fn in Scheduler without borrowck
// complaining
type UnsafeTaskReceiver = raw::Closure;

View File

@ -360,7 +360,7 @@ impl Coroutine {
// Again - might work while safe, or it might not.
do Local::borrow::<Scheduler,()> |sched| {
(sched).run_cleanup_job();
sched.run_cleanup_job();
}
// To call the run method on a task we need a direct

View File

@ -38,8 +38,7 @@ pub fn default_sched_threads() -> uint {
pub fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
dbg.write_str(s + "\n");
}
pub fn abort(msg: &str) -> ! {