Replace *rust_task ptrs in taskgroup code with TaskHandle, for transitioning to newsched killing.

This commit is contained in:
Ben Blum 2013-07-12 22:45:19 -04:00
parent e2a42416dd
commit 9bbec651df
3 changed files with 219 additions and 128 deletions
src/libstd

@ -16,6 +16,7 @@ use either::{Either, Left, Right};
use option::{Option, Some, None};
use prelude::*;
use rt::task::Task;
use to_bytes::IterBytes;
use unstable::atomics::{AtomicUint, Acquire, SeqCst};
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
use util;
@ -194,6 +195,17 @@ impl BlockedTask {
}
}
// So that KillHandle can be hashed in the taskgroup bookkeeping code.
impl IterBytes for KillHandle {
fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool {
self.data.iter_bytes(lsb0, f)
}
}
impl Eq for KillHandle {
#[inline] fn eq(&self, other: &KillHandle) -> bool { self.data.eq(&other.data) }
#[inline] fn ne(&self, other: &KillHandle) -> bool { self.data.ne(&other.data) }
}
impl KillHandle {
pub fn new() -> (KillHandle, KillFlagHandle) {
let (flag, flag_clone) =

@ -27,6 +27,7 @@ use super::local_heap::LocalHeap;
use rt::sched::{Scheduler, SchedHandle};
use rt::stack::{StackSegment, StackPool};
use rt::context::Context;
use task::spawn::TCB;
use cell::Cell;
pub struct Task {
@ -36,6 +37,7 @@ pub struct Task {
logger: StdErrLogger,
unwinder: Unwinder,
home: Option<SchedHome>,
taskgroup: Option<TCB>,
death: Death,
destroyed: bool,
coroutine: Option<~Coroutine>
@ -85,6 +87,7 @@ impl Task {
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
home: Some(home),
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start))
@ -102,6 +105,7 @@ impl Task {
logger: StdErrLogger,
home: Some(home),
unwinder: Unwinder { unwinding: false },
taskgroup: None,
// FIXME(#7544) make watching optional
death: self.death.new_child(),
destroyed: false,
@ -121,6 +125,7 @@ impl Task {
}
self.unwinder.try(f);
{ let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding);
self.destroy();
}

@ -79,7 +79,7 @@ use cast;
use cell::Cell;
use container::MutableMap;
use comm::{Chan, GenericChan};
use hashmap::HashSet;
use hashmap::{HashSet, HashSetConsumeIterator};
use local_data;
use task::local_data_priv::{local_get, local_set, OldHandle};
use task::rt::rust_task;
@ -88,32 +88,61 @@ use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
use task::{Success, TaskOpts, TaskResult, ThreadPerTask};
use task::{ExistingScheduler, SchedulerHandle};
use task::unkillable;
use to_bytes::IterBytes;
use uint;
use util;
use unstable::sync::{Exclusive, exclusive};
use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context};
use rt::local::Local;
use rt::task::Task;
use rt::kill::KillHandle;
use rt::sched::Scheduler;
use iterator::IteratorUtil;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
#[cfg(test)] use task;
type TaskSet = HashSet<*rust_task>;
// Transitionary.
#[deriving(Eq)]
enum TaskHandle {
OldTask(*rust_task),
NewTask(KillHandle),
}
impl Clone for TaskHandle {
fn clone(&self) -> TaskHandle {
match *self {
OldTask(x) => OldTask(x),
NewTask(ref x) => NewTask(x.clone()),
}
}
}
impl IterBytes for TaskHandle {
fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool {
match *self {
OldTask(ref x) => x.iter_bytes(lsb0, f),
NewTask(ref x) => x.iter_bytes(lsb0, f),
}
}
}
type TaskSet = HashSet<TaskHandle>;
fn new_taskset() -> TaskSet {
HashSet::new()
}
fn taskset_insert(tasks: &mut TaskSet, task: *rust_task) {
fn taskset_insert(tasks: &mut TaskSet, task: TaskHandle) {
let didnt_overwrite = tasks.insert(task);
assert!(didnt_overwrite);
}
fn taskset_remove(tasks: &mut TaskSet, task: *rust_task) {
let was_present = tasks.remove(&task);
fn taskset_remove(tasks: &mut TaskSet, task: &TaskHandle) {
let was_present = tasks.remove(task);
assert!(was_present);
}
pub fn taskset_each(tasks: &TaskSet, blk: &fn(v: *rust_task) -> bool) -> bool {
tasks.iter().advance(|k| blk(*k))
fn taskset_consume(tasks: TaskSet) -> HashSetConsumeIterator<TaskHandle> {
tasks.consume()
}
// One of these per group of linked-failure tasks.
@ -179,25 +208,23 @@ fn access_ancestors<U>(x: &Exclusive<AncestorNode>,
// taskgroups that forward_blk already ran on successfully (Note: bail_blk
// is NOT called on the block that forward_blk broke on!).
// (3) As a bonus, coalesces away all 'dead' taskgroup nodes in the list.
// FIXME(#2190): Change Option<@fn(...)> to Option<&fn(...)>, to save on
// allocations. Once that bug is fixed, changing the sigil should suffice.
fn each_ancestor(list: &mut AncestorList,
bail_opt: Option<@fn(TaskGroupInner)>,
bail_blk: &fn(TaskGroupInner),
forward_blk: &fn(TaskGroupInner) -> bool)
-> bool {
// "Kickoff" call - there was no last generation.
return !coalesce(list, bail_opt, forward_blk, uint::max_value);
return !coalesce(list, bail_blk, forward_blk, uint::max_value);
// Recursively iterates, and coalesces afterwards if needed. Returns
// whether or not unwinding is needed (i.e., !successful iteration).
fn coalesce(list: &mut AncestorList,
bail_opt: Option<@fn(TaskGroupInner)>,
bail_blk: &fn(TaskGroupInner),
forward_blk: &fn(TaskGroupInner) -> bool,
last_generation: uint) -> bool {
// Need to swap the list out to use it, to appease borrowck.
let tmp_list = util::replace(&mut *list, AncestorList(None));
let (coalesce_this, early_break) =
iterate(&tmp_list, bail_opt, forward_blk, last_generation);
iterate(&tmp_list, bail_blk, forward_blk, last_generation);
// What should our next ancestor end up being?
if coalesce_this.is_some() {
// Needed coalesce. Our next ancestor becomes our old
@ -219,7 +246,7 @@ fn each_ancestor(list: &mut AncestorList,
// True if the supplied block did 'break', here or in any recursive
// calls. If so, must call the unwinder on all previous nodes.
fn iterate(ancestors: &AncestorList,
bail_opt: Option<@fn(TaskGroupInner)>,
bail_blk: &fn(TaskGroupInner),
forward_blk: &fn(TaskGroupInner) -> bool,
last_generation: uint)
-> (Option<AncestorList>, bool) {
@ -257,7 +284,7 @@ fn each_ancestor(list: &mut AncestorList,
None => nobe_is_dead
};
// Call iterator block. (If the group is dead, it's
// safe to skip it. This will leave our *rust_task
// safe to skip it. This will leave our TaskHandle
// hanging around in the group even after it's freed,
// but that's ok because, by virtue of the group being
// dead, nobody will ever kill-all (foreach) over it.)
@ -271,17 +298,15 @@ fn each_ancestor(list: &mut AncestorList,
let mut need_unwind = false;
if do_continue {
// NB: Takes many locks! (ancestor nodes & parent groups)
need_unwind = coalesce(&mut nobe.ancestors, bail_opt,
need_unwind = coalesce(&mut nobe.ancestors, |tg| bail_blk(tg),
forward_blk, nobe.generation);
}
/*##########################################################*
* Step 3: Maybe unwind; compute return info for our caller.
*##########################################################*/
if need_unwind && !nobe_is_dead {
for bail_opt.iter().advance |bail_blk| {
do with_parent_tg(&mut nobe.parent_group) |tg_opt| {
(*bail_blk)(tg_opt)
}
do with_parent_tg(&mut nobe.parent_group) |tg_opt| {
bail_blk(tg_opt)
}
}
// Decide whether our caller should unwind.
@ -311,8 +336,7 @@ fn each_ancestor(list: &mut AncestorList,
}
// One of these per task.
struct TCB {
me: *rust_task,
pub struct TCB {
// List of tasks with whose fates this one's is intertwined.
tasks: TaskGroupArc, // 'none' means the group has failed.
// Lists of tasks who will kill us if they fail, but whom we won't kill.
@ -329,33 +353,34 @@ impl Drop for TCB {
let this: &mut TCB = transmute(self);
// If we are failing, the whole taskgroup needs to die.
if rt::rust_task_is_unwinding(self.me) {
for this.notifier.mut_iter().advance |x| {
x.failed = true;
}
// Take everybody down with us.
do access_group(&self.tasks) |tg| {
kill_taskgroup(tg, self.me, self.is_main);
}
} else {
// Remove ourselves from the group(s).
do access_group(&self.tasks) |tg| {
leave_taskgroup(tg, self.me, true);
do RuntimeGlue::with_task_handle_and_failing |me, failing| {
if failing {
for this.notifier.mut_iter().advance |x| {
x.failed = true;
}
// Take everybody down with us.
do access_group(&self.tasks) |tg| {
kill_taskgroup(tg, &me, self.is_main);
}
} else {
// Remove ourselves from the group(s).
do access_group(&self.tasks) |tg| {
leave_taskgroup(tg, &me, true);
}
}
// It doesn't matter whether this happens before or after dealing
// with our own taskgroup, so long as both happen before we die.
// We remove ourself from every ancestor we can, so no cleanup; no
// break.
for each_ancestor(&mut this.ancestors, |_| {}) |ancestor_group| {
leave_taskgroup(ancestor_group, &me, false);
};
}
// It doesn't matter whether this happens before or after dealing
// with our own taskgroup, so long as both happen before we die.
// We remove ourself from every ancestor we can, so no cleanup; no
// break.
for each_ancestor(&mut this.ancestors, None) |ancestor_group| {
leave_taskgroup(ancestor_group, self.me, false);
};
}
}
}
fn TCB(me: *rust_task,
tasks: TaskGroupArc,
pub fn TCB(tasks: TaskGroupArc,
ancestors: AncestorList,
is_main: bool,
mut notifier: Option<AutoNotify>) -> TCB {
@ -364,7 +389,6 @@ fn TCB(me: *rust_task,
}
TCB {
me: me,
tasks: tasks,
ancestors: ancestors,
is_main: is_main,
@ -391,7 +415,7 @@ fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
}
}
fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle,
is_member: bool) -> bool {
let newstate = util::replace(&mut *state, None);
// If 'None', the group was failing. Can't enlist.
@ -410,7 +434,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
}
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn leave_taskgroup(state: TaskGroupInner, me: *rust_task,
fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle,
is_member: bool) {
let newstate = util::replace(&mut *state, None);
// If 'None', already failing and we've already gotten a kill signal.
@ -426,7 +450,7 @@ fn leave_taskgroup(state: TaskGroupInner, me: *rust_task,
}
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
fn kill_taskgroup(state: TaskGroupInner, me: &TaskHandle, is_main: bool) {
unsafe {
// NB: We could do the killing iteration outside of the group arc, by
// having "let mut newstate" here, swapping inside, and iterating
@ -442,20 +466,21 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
// That's ok; only one task needs to do the dirty work. (Might also
// see 'None' if Somebody already failed and we got a kill signal.)
if newstate.is_some() {
let group = newstate.unwrap();
for taskset_each(&group.members) |sibling| {
let TaskGroupData { members: members, descendants: descendants } =
newstate.unwrap();
for taskset_consume(members).advance() |sibling| {
// Skip self - killing ourself won't do much good.
if sibling != me {
rt::rust_task_kill_other(sibling);
if &sibling != me {
RuntimeGlue::kill_task(sibling);
}
}
for taskset_each(&group.descendants) |child| {
assert!(child != me);
rt::rust_task_kill_other(child);
do taskset_consume(descendants).advance() |child| {
assert!(&child != me);
RuntimeGlue::kill_task(child);
}
// Only one task should ever do this.
if is_main {
rt::rust_task_kill_all(me);
RuntimeGlue::kill_all_tasks(me);
}
// Do NOT restore state to Some(..)! It stays None to indicate
// that the whole taskgroup is failing, to forbid new spawns.
@ -475,43 +500,103 @@ fn taskgroup_key() -> local_data::Key<@@mut TCB> {
unsafe { cast::transmute((-2, 0)) }
}
fn gen_child_taskgroup(linked: bool, supervised: bool)
-> (TaskGroupArc, AncestorList, bool) {
unsafe {
let spawner = rt::rust_get_task();
/*##################################################################*
* Step 1. Get spawner's taskgroup info.
*##################################################################*/
let spawner_group: @@mut TCB =
do local_get(OldHandle(spawner), taskgroup_key()) |group| {
match group {
// Transitionary.
struct RuntimeGlue;
impl RuntimeGlue {
unsafe fn kill_task(task: TaskHandle) {
match task {
OldTask(ptr) => rt::rust_task_kill_other(ptr),
NewTask(handle) => {
let mut handle = handle;
do handle.kill().map_consume |killed_task| {
let killed_task = Cell::new(killed_task);
do Local::borrow::<Scheduler, ()> |sched| {
sched.enqueue_task(killed_task.take());
}
};
}
}
}
unsafe fn kill_all_tasks(task: &TaskHandle) {
match *task {
OldTask(ptr) => rt::rust_task_kill_all(ptr),
NewTask(ref _handle) => rtabort!("unimplemented"), // FIXME(#7544)
}
}
fn with_task_handle_and_failing(blk: &fn(TaskHandle, bool)) {
match context() {
OldTaskContext => unsafe {
let me = rt::rust_get_task();
blk(OldTask(me), rt::rust_task_is_unwinding(me))
},
TaskContext => unsafe {
// Can't use safe borrow, because the taskgroup destructor needs to
// access the scheduler again to send kill signals to other tasks.
let me = Local::unsafe_borrow::<Task>();
// FIXME(#7544): Get rid of this clone by passing by-ref.
// Will probably have to wait until the old rt is gone.
blk(NewTask((*me).death.kill_handle.get_ref().clone()),
(*me).unwinder.unwinding)
},
SchedulerContext | GlobalContext => rtabort!("task dying in bad context"),
}
}
fn with_my_taskgroup<U>(blk: &fn(&mut TCB) -> U) -> U {
match context() {
OldTaskContext => unsafe {
let me = rt::rust_get_task();
do local_get(OldHandle(me), taskgroup_key()) |g| {
match g {
None => {
// Main task, doing first spawn ever. Lazily initialise here.
let mut members = new_taskset();
taskset_insert(&mut members, OldTask(me));
let tasks = exclusive(Some(TaskGroupData {
members: members,
descendants: new_taskset(),
}));
// Main task/group has no ancestors, no notifier, etc.
let group = @@mut TCB(tasks, AncestorList(None), true, None);
local_set(OldHandle(me), taskgroup_key(), group);
blk(&mut **group)
}
Some(&group) => blk(&mut **group)
}
}
},
TaskContext => unsafe {
// Can't use safe borrow, because creating new hashmaps for the
// tasksets requires an rng, which needs to borrow the sched.
let me = Local::unsafe_borrow::<Task>();
blk(match (*me).taskgroup {
None => {
// Main task, doing first spawn ever. Lazily initialise
// here.
// Main task, doing first spawn ever. Lazily initialize.
let mut members = new_taskset();
taskset_insert(&mut members, spawner);
let my_handle = (*me).death.kill_handle.get_ref().clone();
taskset_insert(&mut members, NewTask(my_handle));
let tasks = exclusive(Some(TaskGroupData {
members: members,
descendants: new_taskset(),
}));
// Main task/group has no ancestors, no notifier, etc.
let group = @@mut TCB(spawner,
tasks,
AncestorList(None),
true,
None);
local_set(OldHandle(spawner), taskgroup_key(), group);
group
let group = TCB(tasks, AncestorList(None), true, None);
(*me).taskgroup = Some(group);
(*me).taskgroup.get_mut_ref()
}
Some(&group) => group
}
};
let spawner_group: &mut TCB = *spawner_group;
Some(ref mut group) => group,
})
},
SchedulerContext | GlobalContext => rtabort!("spawning in bad context"),
}
}
}
/*##################################################################*
* Step 2. Process spawn options for child.
*##################################################################*/
return if linked {
fn gen_child_taskgroup(linked: bool, supervised: bool)
-> (TaskGroupArc, AncestorList, bool) {
return do RuntimeGlue::with_my_taskgroup |spawner_group| {
if linked {
// Child is in the same group as spawner.
let g = spawner_group.tasks.clone();
// Child's ancestors are spawner's ancestors.
@ -550,8 +635,8 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
AncestorList(None)
};
(g, a, false)
};
}
}
};
fn share_ancestors(ancestors: &mut AncestorList) -> AncestorList {
// Appease the borrow-checker. Really this wants to be written as:
@ -562,6 +647,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
if tmp.is_some() {
let ancestor_arc = tmp.unwrap();
let result = ancestor_arc.clone();
error!("cloned ancestors");
**ancestors = Some(ancestor_arc);
AncestorList(Some(result))
} else {
@ -570,9 +656,35 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
}
}
pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
use rt::*;
// Set up membership in taskgroup and descendantship in all ancestor
// groups. If any enlistment fails, Some task was already failing, so
// don't let the child task run, and undo every successful enlistment.
fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc,
ancestors: &mut AncestorList) -> bool {
// Join this taskgroup.
let mut result = do access_group(child_arc) |child_tg| {
enlist_in_taskgroup(child_tg, child.clone(), true) // member
};
if result {
// Unwinding function in case any ancestral enlisting fails
let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, &child, false) };
// Attempt to join every ancestor group.
result = do each_ancestor(ancestors, bail) |ancestor_tg| {
// Enlist as a descendant, not as an actual member.
// Descendants don't kill ancestor groups on failure.
enlist_in_taskgroup(ancestor_tg, child.clone(), false)
};
// If any ancestor group fails, need to exit this group too.
if !result {
do access_group(child_arc) |child_tg| {
leave_taskgroup(child_tg, &child, true); // member
}
}
}
result
}
pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
match context() {
OldTaskContext => {
spawn_raw_oldsched(opts, f)
@ -590,8 +702,6 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
}
fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let f = Cell::new(f);
let mut task = unsafe {
@ -686,12 +796,8 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {
let notifier = notify_chan.map_consume(|c| AutoNotify(c));
if enlist_many(child, &child_arc, &mut ancestors) {
let group = @@mut TCB(child,
child_arc,
ancestors,
is_main,
notifier);
if enlist_many(OldTask(child), &child_arc, &mut ancestors) {
let group = @@mut TCB(child_arc, ancestors, is_main, notifier);
unsafe {
local_set(OldHandle(child), taskgroup_key(), group);
}
@ -707,38 +813,6 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {
// unsafe { cleanup::annihilate(); }
};
return result;
// Set up membership in taskgroup and descendantship in all ancestor
// groups. If any enlistment fails, Some task was already failing, so
// don't let the child task run, and undo every successful enlistment.
fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc,
ancestors: &mut AncestorList) -> bool {
// Join this taskgroup.
let mut result =
do access_group(child_arc) |child_tg| {
enlist_in_taskgroup(child_tg, child, true) // member
};
if result {
// Unwinding function in case any ancestral enlisting fails
let bail: @fn(TaskGroupInner) = |tg| {
leave_taskgroup(tg, child, false)
};
// Attempt to join every ancestor group.
result =
each_ancestor(ancestors, Some(bail), |ancestor_tg| {
// Enlist as a descendant, not as an actual member.
// Descendants don't kill ancestor groups on failure.
enlist_in_taskgroup(ancestor_tg, child, false)
});
// If any ancestor group fails, need to exit this group too.
if !result {
do access_group(child_arc) |child_tg| {
leave_taskgroup(child_tg, child, true); // member
}
}
}
result
}
}
fn new_task_in_sched(opts: SchedOpts) -> *rust_task {