Percolate the (Scheduler, GreenTask) pair upwards
This is in preparation for running do_work in a loop while there are no active I/O handles. This changes the do_work and interpret_message_queue methods to return a triple where the last element is a boolean flag as to whether work was done or not. This commit preserves the same behavior as before, it simply re-structures the code in preparation for future work.
This commit is contained in:
parent
cc34dbb840
commit
4256d24a16
@ -269,26 +269,26 @@ impl Scheduler {
|
||||
|
||||
// First we check for scheduler messages, these are higher
|
||||
// priority than regular tasks.
|
||||
let (sched, stask) =
|
||||
match self.interpret_message_queue(stask, DontTryTooHard) {
|
||||
Some(pair) => pair,
|
||||
None => return
|
||||
};
|
||||
let (sched, stask, did_work) =
|
||||
self.interpret_message_queue(stask, DontTryTooHard);
|
||||
if did_work {
|
||||
return stask.put_with_sched(sched);
|
||||
}
|
||||
|
||||
// This helper will use a randomized work-stealing algorithm
|
||||
// to find work.
|
||||
let (sched, stask) = match sched.do_work(stask) {
|
||||
Some(pair) => pair,
|
||||
None => return
|
||||
};
|
||||
let (sched, stask, did_work) = sched.do_work(stask);
|
||||
if did_work {
|
||||
return stask.put_with_sched(sched);
|
||||
}
|
||||
|
||||
// Now, before sleeping we need to find out if there really
|
||||
// were any messages. Give it your best!
|
||||
let (mut sched, stask) =
|
||||
match sched.interpret_message_queue(stask, GiveItYourBest) {
|
||||
Some(pair) => pair,
|
||||
None => return
|
||||
};
|
||||
let (mut sched, stask, did_work) =
|
||||
sched.interpret_message_queue(stask, GiveItYourBest);
|
||||
if did_work {
|
||||
return stask.put_with_sched(sched);
|
||||
}
|
||||
|
||||
// If we got here then there was no work to do.
|
||||
// Generate a SchedHandle and push it to the sleeper list so
|
||||
@ -318,7 +318,7 @@ impl Scheduler {
|
||||
// return None.
|
||||
fn interpret_message_queue(mut ~self, stask: ~GreenTask,
|
||||
effort: EffortLevel)
|
||||
-> Option<(~Scheduler, ~GreenTask)>
|
||||
-> (~Scheduler, ~GreenTask, bool)
|
||||
{
|
||||
|
||||
let msg = if effort == DontTryTooHard {
|
||||
@ -349,25 +349,25 @@ impl Scheduler {
|
||||
Some(PinnedTask(task)) => {
|
||||
let mut task = task;
|
||||
task.give_home(HomeSched(self.make_handle()));
|
||||
self.resume_task_immediately(stask, task).put();
|
||||
return None;
|
||||
let (sched, task) = self.resume_task_immediately(stask, task);
|
||||
(sched, task, true)
|
||||
}
|
||||
Some(TaskFromFriend(task)) => {
|
||||
rtdebug!("got a task from a friend. lovely!");
|
||||
self.process_task(stask, task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
return None;
|
||||
let (sched, task) =
|
||||
self.process_task(stask, task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
(sched, task, true)
|
||||
}
|
||||
Some(RunOnce(task)) => {
|
||||
// bypass the process_task logic to force running this task once
|
||||
// on this home scheduler. This is often used for I/O (homing).
|
||||
self.resume_task_immediately(stask, task).put();
|
||||
return None;
|
||||
let (sched, task) = self.resume_task_immediately(stask, task);
|
||||
(sched, task, true)
|
||||
}
|
||||
Some(Wake) => {
|
||||
self.sleepy = false;
|
||||
stask.put_with_sched(self);
|
||||
return None;
|
||||
(self, stask, true)
|
||||
}
|
||||
Some(Shutdown) => {
|
||||
rtdebug!("shutting down");
|
||||
@ -389,31 +389,30 @@ impl Scheduler {
|
||||
// event loop references we will shut down.
|
||||
self.no_sleep = true;
|
||||
self.sleepy = false;
|
||||
stask.put_with_sched(self);
|
||||
return None;
|
||||
(self, stask, true)
|
||||
}
|
||||
Some(NewNeighbor(neighbor)) => {
|
||||
self.work_queues.push(neighbor);
|
||||
return Some((self, stask));
|
||||
}
|
||||
None => {
|
||||
return Some((self, stask));
|
||||
(self, stask, false)
|
||||
}
|
||||
None => (self, stask, false)
|
||||
}
|
||||
}
|
||||
|
||||
fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> {
|
||||
fn do_work(mut ~self,
|
||||
stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) {
|
||||
rtdebug!("scheduler calling do work");
|
||||
match self.find_work() {
|
||||
Some(task) => {
|
||||
rtdebug!("found some work! running the task");
|
||||
self.process_task(stask, task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
return None;
|
||||
let (sched, task) =
|
||||
self.process_task(stask, task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
(sched, task, true)
|
||||
}
|
||||
None => {
|
||||
rtdebug!("no work was found, returning the scheduler struct");
|
||||
return Some((self, stask));
|
||||
(self, stask, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -486,7 +485,8 @@ impl Scheduler {
|
||||
// place.
|
||||
|
||||
fn process_task(mut ~self, cur: ~GreenTask,
|
||||
mut next: ~GreenTask, schedule_fn: SchedulingFn) {
|
||||
mut next: ~GreenTask,
|
||||
schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) {
|
||||
rtdebug!("processing a task");
|
||||
|
||||
match next.take_unwrap_home() {
|
||||
@ -495,23 +495,23 @@ impl Scheduler {
|
||||
rtdebug!("sending task home");
|
||||
next.give_home(HomeSched(home_handle));
|
||||
Scheduler::send_task_home(next);
|
||||
cur.put_with_sched(self);
|
||||
(self, cur)
|
||||
} else {
|
||||
rtdebug!("running task here");
|
||||
next.give_home(HomeSched(home_handle));
|
||||
schedule_fn(self, cur, next);
|
||||
schedule_fn(self, cur, next)
|
||||
}
|
||||
}
|
||||
AnySched if self.run_anything => {
|
||||
rtdebug!("running anysched task here");
|
||||
next.give_home(AnySched);
|
||||
schedule_fn(self, cur, next);
|
||||
schedule_fn(self, cur, next)
|
||||
}
|
||||
AnySched => {
|
||||
rtdebug!("sending task to friend");
|
||||
next.give_home(AnySched);
|
||||
self.send_to_friend(next);
|
||||
cur.put_with_sched(self);
|
||||
(self, cur)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -664,18 +664,19 @@ impl Scheduler {
|
||||
// * Context Swapping Helpers - Here be ugliness!
|
||||
|
||||
pub fn resume_task_immediately(~self, cur: ~GreenTask,
|
||||
next: ~GreenTask) -> ~GreenTask {
|
||||
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
|
||||
assert!(cur.is_sched());
|
||||
self.change_task_context(cur, next, |sched, stask| {
|
||||
let mut cur = self.change_task_context(cur, next, |sched, stask| {
|
||||
assert!(sched.sched_task.is_none());
|
||||
sched.sched_task = Some(stask);
|
||||
})
|
||||
});
|
||||
(cur.sched.take_unwrap(), cur)
|
||||
}
|
||||
|
||||
fn resume_task_immediately_cl(sched: ~Scheduler,
|
||||
cur: ~GreenTask,
|
||||
next: ~GreenTask) {
|
||||
sched.resume_task_immediately(cur, next).put()
|
||||
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
|
||||
sched.resume_task_immediately(cur, next)
|
||||
}
|
||||
|
||||
/// Block a running task, context switch to the scheduler, then pass the
|
||||
@ -741,15 +742,17 @@ impl Scheduler {
|
||||
cur.put();
|
||||
}
|
||||
|
||||
fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) {
|
||||
sched.change_task_context(cur, next, |sched, last_task| {
|
||||
fn switch_task(sched: ~Scheduler, cur: ~GreenTask,
|
||||
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
|
||||
let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
|
||||
if last_task.is_sched() {
|
||||
assert!(sched.sched_task.is_none());
|
||||
sched.sched_task = Some(last_task);
|
||||
} else {
|
||||
sched.enqueue_task(last_task);
|
||||
}
|
||||
}).put()
|
||||
});
|
||||
(cur.sched.take_unwrap(), cur)
|
||||
}
|
||||
|
||||
// * Task Context Helpers
|
||||
@ -769,7 +772,9 @@ impl Scheduler {
|
||||
}
|
||||
|
||||
pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
|
||||
self.process_task(cur, next, Scheduler::switch_task);
|
||||
let (sched, task) =
|
||||
self.process_task(cur, next, Scheduler::switch_task);
|
||||
task.put_with_sched(sched);
|
||||
}
|
||||
|
||||
pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
|
||||
@ -836,7 +841,8 @@ impl Scheduler {
|
||||
|
||||
// Supporting types
|
||||
|
||||
type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
|
||||
type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask)
|
||||
-> (~Scheduler, ~GreenTask);
|
||||
|
||||
pub enum SchedMessage {
|
||||
Wake,
|
||||
|
Loading…
x
Reference in New Issue
Block a user