diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 58e68cae253..b331442eb4b 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -16,6 +16,7 @@ use either::{Either, Left, Right}; use option::{Option, Some, None}; use prelude::*; use rt::task::Task; +use to_bytes::IterBytes; use unstable::atomics::{AtomicUint, Acquire, SeqCst}; use unstable::sync::{UnsafeAtomicRcBox, LittleLock}; use util; @@ -194,6 +195,17 @@ impl BlockedTask { } } +// So that KillHandle can be hashed in the taskgroup bookkeeping code. +impl IterBytes for KillHandle { + fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool { + self.data.iter_bytes(lsb0, f) + } +} +impl Eq for KillHandle { + #[inline] fn eq(&self, other: &KillHandle) -> bool { self.data.eq(&other.data) } + #[inline] fn ne(&self, other: &KillHandle) -> bool { self.data.ne(&other.data) } +} + impl KillHandle { pub fn new() -> (KillHandle, KillFlagHandle) { let (flag, flag_clone) = diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index a1227dd180c..f6e2faf5bf1 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -27,6 +27,7 @@ use super::local_heap::LocalHeap; use rt::sched::{Scheduler, SchedHandle}; use rt::stack::{StackSegment, StackPool}; use rt::context::Context; +use task::spawn::TCB; use cell::Cell; pub struct Task { @@ -36,6 +37,7 @@ pub struct Task { logger: StdErrLogger, unwinder: Unwinder, home: Option, + taskgroup: Option, death: Death, destroyed: bool, coroutine: Option<~Coroutine> @@ -85,6 +87,7 @@ impl Task { logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, home: Some(home), + taskgroup: None, death: Death::new(), destroyed: false, coroutine: Some(~Coroutine::new(stack_pool, start)) @@ -102,6 +105,7 @@ impl Task { logger: StdErrLogger, home: Some(home), unwinder: Unwinder { unwinding: false }, + taskgroup: None, // FIXME(#7544) make watching optional death: self.death.new_child(), destroyed: false, @@ -121,6 +125,7 @@ impl Task { } self.unwinder.try(f); + { let _ = self.taskgroup.take(); } self.death.collect_failure(!self.unwinder.unwinding); self.destroy(); } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 518b52a19fb..303028b30a2 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -79,7 +79,7 @@ use cast; use cell::Cell; use container::MutableMap; use comm::{Chan, GenericChan}; -use hashmap::HashSet; +use hashmap::{HashSet, HashSetConsumeIterator}; use local_data; use task::local_data_priv::{local_get, local_set, OldHandle}; use task::rt::rust_task; @@ -88,32 +88,61 @@ use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; use task::{Success, TaskOpts, TaskResult, ThreadPerTask}; use task::{ExistingScheduler, SchedulerHandle}; use task::unkillable; +use to_bytes::IterBytes; use uint; use util; use unstable::sync::{Exclusive, exclusive}; +use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context}; use rt::local::Local; use rt::task::Task; +use rt::kill::KillHandle; +use rt::sched::Scheduler; use iterator::IteratorUtil; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; #[cfg(test)] use task; -type TaskSet = HashSet<*rust_task>; +// Transitionary. +#[deriving(Eq)] +enum TaskHandle { + OldTask(*rust_task), + NewTask(KillHandle), +} + +impl Clone for TaskHandle { + fn clone(&self) -> TaskHandle { + match *self { + OldTask(x) => OldTask(x), + NewTask(ref x) => NewTask(x.clone()), + } + } +} + +impl IterBytes for TaskHandle { + fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool { + match *self { + OldTask(ref x) => x.iter_bytes(lsb0, f), + NewTask(ref x) => x.iter_bytes(lsb0, f), + } + } +} + +type TaskSet = HashSet; fn new_taskset() -> TaskSet { HashSet::new() } -fn taskset_insert(tasks: &mut TaskSet, task: *rust_task) { +fn taskset_insert(tasks: &mut TaskSet, task: TaskHandle) { let didnt_overwrite = tasks.insert(task); assert!(didnt_overwrite); } -fn taskset_remove(tasks: &mut TaskSet, task: *rust_task) { - let was_present = tasks.remove(&task); +fn taskset_remove(tasks: &mut TaskSet, task: &TaskHandle) { + let was_present = tasks.remove(task); assert!(was_present); } -pub fn taskset_each(tasks: &TaskSet, blk: &fn(v: *rust_task) -> bool) -> bool { - tasks.iter().advance(|k| blk(*k)) +fn taskset_consume(tasks: TaskSet) -> HashSetConsumeIterator { + tasks.consume() } // One of these per group of linked-failure tasks. @@ -179,25 +208,23 @@ fn access_ancestors(x: &Exclusive, // taskgroups that forward_blk already ran on successfully (Note: bail_blk // is NOT called on the block that forward_blk broke on!). // (3) As a bonus, coalesces away all 'dead' taskgroup nodes in the list. -// FIXME(#2190): Change Option<@fn(...)> to Option<&fn(...)>, to save on -// allocations. Once that bug is fixed, changing the sigil should suffice. fn each_ancestor(list: &mut AncestorList, - bail_opt: Option<@fn(TaskGroupInner)>, + bail_blk: &fn(TaskGroupInner), forward_blk: &fn(TaskGroupInner) -> bool) -> bool { // "Kickoff" call - there was no last generation. - return !coalesce(list, bail_opt, forward_blk, uint::max_value); + return !coalesce(list, bail_blk, forward_blk, uint::max_value); // Recursively iterates, and coalesces afterwards if needed. Returns // whether or not unwinding is needed (i.e., !successful iteration). fn coalesce(list: &mut AncestorList, - bail_opt: Option<@fn(TaskGroupInner)>, + bail_blk: &fn(TaskGroupInner), forward_blk: &fn(TaskGroupInner) -> bool, last_generation: uint) -> bool { // Need to swap the list out to use it, to appease borrowck. let tmp_list = util::replace(&mut *list, AncestorList(None)); let (coalesce_this, early_break) = - iterate(&tmp_list, bail_opt, forward_blk, last_generation); + iterate(&tmp_list, bail_blk, forward_blk, last_generation); // What should our next ancestor end up being? if coalesce_this.is_some() { // Needed coalesce. Our next ancestor becomes our old @@ -219,7 +246,7 @@ fn each_ancestor(list: &mut AncestorList, // True if the supplied block did 'break', here or in any recursive // calls. If so, must call the unwinder on all previous nodes. fn iterate(ancestors: &AncestorList, - bail_opt: Option<@fn(TaskGroupInner)>, + bail_blk: &fn(TaskGroupInner), forward_blk: &fn(TaskGroupInner) -> bool, last_generation: uint) -> (Option, bool) { @@ -257,7 +284,7 @@ fn each_ancestor(list: &mut AncestorList, None => nobe_is_dead }; // Call iterator block. (If the group is dead, it's - // safe to skip it. This will leave our *rust_task + // safe to skip it. This will leave our TaskHandle // hanging around in the group even after it's freed, // but that's ok because, by virtue of the group being // dead, nobody will ever kill-all (foreach) over it.) @@ -271,17 +298,15 @@ fn each_ancestor(list: &mut AncestorList, let mut need_unwind = false; if do_continue { // NB: Takes many locks! (ancestor nodes & parent groups) - need_unwind = coalesce(&mut nobe.ancestors, bail_opt, + need_unwind = coalesce(&mut nobe.ancestors, |tg| bail_blk(tg), forward_blk, nobe.generation); } /*##########################################################* * Step 3: Maybe unwind; compute return info for our caller. *##########################################################*/ if need_unwind && !nobe_is_dead { - for bail_opt.iter().advance |bail_blk| { - do with_parent_tg(&mut nobe.parent_group) |tg_opt| { - (*bail_blk)(tg_opt) - } + do with_parent_tg(&mut nobe.parent_group) |tg_opt| { + bail_blk(tg_opt) } } // Decide whether our caller should unwind. @@ -311,8 +336,7 @@ fn each_ancestor(list: &mut AncestorList, } // One of these per task. -struct TCB { - me: *rust_task, +pub struct TCB { // List of tasks with whose fates this one's is intertwined. tasks: TaskGroupArc, // 'none' means the group has failed. // Lists of tasks who will kill us if they fail, but whom we won't kill. @@ -329,33 +353,34 @@ impl Drop for TCB { let this: &mut TCB = transmute(self); // If we are failing, the whole taskgroup needs to die. - if rt::rust_task_is_unwinding(self.me) { - for this.notifier.mut_iter().advance |x| { - x.failed = true; - } - // Take everybody down with us. - do access_group(&self.tasks) |tg| { - kill_taskgroup(tg, self.me, self.is_main); - } - } else { - // Remove ourselves from the group(s). - do access_group(&self.tasks) |tg| { - leave_taskgroup(tg, self.me, true); + do RuntimeGlue::with_task_handle_and_failing |me, failing| { + if failing { + for this.notifier.mut_iter().advance |x| { + x.failed = true; + } + // Take everybody down with us. + do access_group(&self.tasks) |tg| { + kill_taskgroup(tg, &me, self.is_main); + } + } else { + // Remove ourselves from the group(s). + do access_group(&self.tasks) |tg| { + leave_taskgroup(tg, &me, true); + } } + // It doesn't matter whether this happens before or after dealing + // with our own taskgroup, so long as both happen before we die. + // We remove ourself from every ancestor we can, so no cleanup; no + // break. + for each_ancestor(&mut this.ancestors, |_| {}) |ancestor_group| { + leave_taskgroup(ancestor_group, &me, false); + }; } - // It doesn't matter whether this happens before or after dealing - // with our own taskgroup, so long as both happen before we die. - // We remove ourself from every ancestor we can, so no cleanup; no - // break. - for each_ancestor(&mut this.ancestors, None) |ancestor_group| { - leave_taskgroup(ancestor_group, self.me, false); - }; } } } -fn TCB(me: *rust_task, - tasks: TaskGroupArc, +pub fn TCB(tasks: TaskGroupArc, ancestors: AncestorList, is_main: bool, mut notifier: Option) -> TCB { @@ -364,7 +389,6 @@ fn TCB(me: *rust_task, } TCB { - me: me, tasks: tasks, ancestors: ancestors, is_main: is_main, @@ -391,7 +415,7 @@ fn AutoNotify(chan: Chan) -> AutoNotify { } } -fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, +fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle, is_member: bool) -> bool { let newstate = util::replace(&mut *state, None); // If 'None', the group was failing. Can't enlist. @@ -410,7 +434,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, } // NB: Runs in destructor/post-exit context. Can't 'fail'. -fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, +fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle, is_member: bool) { let newstate = util::replace(&mut *state, None); // If 'None', already failing and we've already gotten a kill signal. @@ -426,7 +450,7 @@ fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, } // NB: Runs in destructor/post-exit context. Can't 'fail'. -fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { +fn kill_taskgroup(state: TaskGroupInner, me: &TaskHandle, is_main: bool) { unsafe { // NB: We could do the killing iteration outside of the group arc, by // having "let mut newstate" here, swapping inside, and iterating @@ -442,20 +466,21 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { // That's ok; only one task needs to do the dirty work. (Might also // see 'None' if Somebody already failed and we got a kill signal.) if newstate.is_some() { - let group = newstate.unwrap(); - for taskset_each(&group.members) |sibling| { + let TaskGroupData { members: members, descendants: descendants } = + newstate.unwrap(); + for taskset_consume(members).advance() |sibling| { // Skip self - killing ourself won't do much good. - if sibling != me { - rt::rust_task_kill_other(sibling); + if &sibling != me { + RuntimeGlue::kill_task(sibling); } } - for taskset_each(&group.descendants) |child| { - assert!(child != me); - rt::rust_task_kill_other(child); + do taskset_consume(descendants).advance() |child| { + assert!(&child != me); + RuntimeGlue::kill_task(child); } // Only one task should ever do this. if is_main { - rt::rust_task_kill_all(me); + RuntimeGlue::kill_all_tasks(me); } // Do NOT restore state to Some(..)! It stays None to indicate // that the whole taskgroup is failing, to forbid new spawns. @@ -475,43 +500,103 @@ fn taskgroup_key() -> local_data::Key<@@mut TCB> { unsafe { cast::transmute((-2, 0)) } } -fn gen_child_taskgroup(linked: bool, supervised: bool) - -> (TaskGroupArc, AncestorList, bool) { - unsafe { - let spawner = rt::rust_get_task(); - /*##################################################################* - * Step 1. Get spawner's taskgroup info. - *##################################################################*/ - let spawner_group: @@mut TCB = - do local_get(OldHandle(spawner), taskgroup_key()) |group| { - match group { +// Transitionary. +struct RuntimeGlue; +impl RuntimeGlue { + unsafe fn kill_task(task: TaskHandle) { + match task { + OldTask(ptr) => rt::rust_task_kill_other(ptr), + NewTask(handle) => { + let mut handle = handle; + do handle.kill().map_consume |killed_task| { + let killed_task = Cell::new(killed_task); + do Local::borrow:: |sched| { + sched.enqueue_task(killed_task.take()); + } + }; + } + } + } + + unsafe fn kill_all_tasks(task: &TaskHandle) { + match *task { + OldTask(ptr) => rt::rust_task_kill_all(ptr), + NewTask(ref _handle) => rtabort!("unimplemented"), // FIXME(#7544) + } + } + + fn with_task_handle_and_failing(blk: &fn(TaskHandle, bool)) { + match context() { + OldTaskContext => unsafe { + let me = rt::rust_get_task(); + blk(OldTask(me), rt::rust_task_is_unwinding(me)) + }, + TaskContext => unsafe { + // Can't use safe borrow, because the taskgroup destructor needs to + // access the scheduler again to send kill signals to other tasks. + let me = Local::unsafe_borrow::(); + // FIXME(#7544): Get rid of this clone by passing by-ref. + // Will probably have to wait until the old rt is gone. + blk(NewTask((*me).death.kill_handle.get_ref().clone()), + (*me).unwinder.unwinding) + }, + SchedulerContext | GlobalContext => rtabort!("task dying in bad context"), + } + } + + fn with_my_taskgroup(blk: &fn(&mut TCB) -> U) -> U { + match context() { + OldTaskContext => unsafe { + let me = rt::rust_get_task(); + do local_get(OldHandle(me), taskgroup_key()) |g| { + match g { + None => { + // Main task, doing first spawn ever. Lazily initialise here. + let mut members = new_taskset(); + taskset_insert(&mut members, OldTask(me)); + let tasks = exclusive(Some(TaskGroupData { + members: members, + descendants: new_taskset(), + })); + // Main task/group has no ancestors, no notifier, etc. + let group = @@mut TCB(tasks, AncestorList(None), true, None); + local_set(OldHandle(me), taskgroup_key(), group); + blk(&mut **group) + } + Some(&group) => blk(&mut **group) + } + } + }, + TaskContext => unsafe { + // Can't use safe borrow, because creating new hashmaps for the + // tasksets requires an rng, which needs to borrow the sched. + let me = Local::unsafe_borrow::(); + blk(match (*me).taskgroup { None => { - // Main task, doing first spawn ever. Lazily initialise - // here. + // Main task, doing first spawn ever. Lazily initialize. let mut members = new_taskset(); - taskset_insert(&mut members, spawner); + let my_handle = (*me).death.kill_handle.get_ref().clone(); + taskset_insert(&mut members, NewTask(my_handle)); let tasks = exclusive(Some(TaskGroupData { members: members, descendants: new_taskset(), })); - // Main task/group has no ancestors, no notifier, etc. - let group = @@mut TCB(spawner, - tasks, - AncestorList(None), - true, - None); - local_set(OldHandle(spawner), taskgroup_key(), group); - group + let group = TCB(tasks, AncestorList(None), true, None); + (*me).taskgroup = Some(group); + (*me).taskgroup.get_mut_ref() } - Some(&group) => group - } - }; - let spawner_group: &mut TCB = *spawner_group; + Some(ref mut group) => group, + }) + }, + SchedulerContext | GlobalContext => rtabort!("spawning in bad context"), + } + } +} - /*##################################################################* - * Step 2. Process spawn options for child. - *##################################################################*/ - return if linked { +fn gen_child_taskgroup(linked: bool, supervised: bool) + -> (TaskGroupArc, AncestorList, bool) { + return do RuntimeGlue::with_my_taskgroup |spawner_group| { + if linked { // Child is in the same group as spawner. let g = spawner_group.tasks.clone(); // Child's ancestors are spawner's ancestors. @@ -550,8 +635,8 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) AncestorList(None) }; (g, a, false) - }; - } + } + }; fn share_ancestors(ancestors: &mut AncestorList) -> AncestorList { // Appease the borrow-checker. Really this wants to be written as: @@ -562,6 +647,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) if tmp.is_some() { let ancestor_arc = tmp.unwrap(); let result = ancestor_arc.clone(); + error!("cloned ancestors"); **ancestors = Some(ancestor_arc); AncestorList(Some(result)) } else { @@ -570,9 +656,35 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) } } -pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { - use rt::*; +// Set up membership in taskgroup and descendantship in all ancestor +// groups. If any enlistment fails, Some task was already failing, so +// don't let the child task run, and undo every successful enlistment. +fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, + ancestors: &mut AncestorList) -> bool { + // Join this taskgroup. + let mut result = do access_group(child_arc) |child_tg| { + enlist_in_taskgroup(child_tg, child.clone(), true) // member + }; + if result { + // Unwinding function in case any ancestral enlisting fails + let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, &child, false) }; + // Attempt to join every ancestor group. + result = do each_ancestor(ancestors, bail) |ancestor_tg| { + // Enlist as a descendant, not as an actual member. + // Descendants don't kill ancestor groups on failure. + enlist_in_taskgroup(ancestor_tg, child.clone(), false) + }; + // If any ancestor group fails, need to exit this group too. + if !result { + do access_group(child_arc) |child_tg| { + leave_taskgroup(child_tg, &child, true); // member + } + } + } + result +} +pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { match context() { OldTaskContext => { spawn_raw_oldsched(opts, f) @@ -590,8 +702,6 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { - use rt::sched::*; - let f = Cell::new(f); let mut task = unsafe { @@ -686,12 +796,8 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { let notifier = notify_chan.map_consume(|c| AutoNotify(c)); - if enlist_many(child, &child_arc, &mut ancestors) { - let group = @@mut TCB(child, - child_arc, - ancestors, - is_main, - notifier); + if enlist_many(OldTask(child), &child_arc, &mut ancestors) { + let group = @@mut TCB(child_arc, ancestors, is_main, notifier); unsafe { local_set(OldHandle(child), taskgroup_key(), group); } @@ -707,38 +813,6 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { // unsafe { cleanup::annihilate(); } }; return result; - - // Set up membership in taskgroup and descendantship in all ancestor - // groups. If any enlistment fails, Some task was already failing, so - // don't let the child task run, and undo every successful enlistment. - fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc, - ancestors: &mut AncestorList) -> bool { - // Join this taskgroup. - let mut result = - do access_group(child_arc) |child_tg| { - enlist_in_taskgroup(child_tg, child, true) // member - }; - if result { - // Unwinding function in case any ancestral enlisting fails - let bail: @fn(TaskGroupInner) = |tg| { - leave_taskgroup(tg, child, false) - }; - // Attempt to join every ancestor group. - result = - each_ancestor(ancestors, Some(bail), |ancestor_tg| { - // Enlist as a descendant, not as an actual member. - // Descendants don't kill ancestor groups on failure. - enlist_in_taskgroup(ancestor_tg, child, false) - }); - // If any ancestor group fails, need to exit this group too. - if !result { - do access_group(child_arc) |child_tg| { - leave_taskgroup(child_tg, child, true); // member - } - } - } - result - } } fn new_task_in_sched(opts: SchedOpts) -> *rust_task {