diff --git a/src/librustc/ty/maps/job.rs b/src/librustc/ty/maps/job.rs index adc06a9e457..8b8f8420163 100644 --- a/src/librustc/ty/maps/job.rs +++ b/src/librustc/ty/maps/job.rs @@ -194,19 +194,25 @@ impl<'tcx> QueryLatch<'tcx> { } } + /// Awaits the caller on this latch by blocking the current thread. fn await(&self, waiter: &mut QueryWaiter<'tcx>) { let mut info = self.info.lock(); if !info.complete { + // We push the waiter on to the `waiters` list. It can be accessed inside + // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. + // Both of these will remove it from the `waiters` list before resuming + // this thread. info.waiters.push(waiter); - let condvar = &waiter.condvar; + // If this detects a deadlock and the deadlock handler want to resume this thread // we have to be in the `wait` call. This is ensured by the deadlock handler // getting the self.info lock. rayon_core::mark_blocked(); - condvar.wait(&mut info); + waiter.condvar.wait(&mut info); } } + /// Sets the latch and resumes all waiters on it fn set(&self) { let mut info = self.info.lock(); debug_assert!(!info.complete); @@ -219,46 +225,56 @@ impl<'tcx> QueryLatch<'tcx> { } } - fn resume_waiter( + /// Remove a single waiter from the list of waiters. + /// This is used to break query cycles. + fn extract_waiter( &self, waiter: usize, - error: CycleError<'tcx> ) -> *mut QueryWaiter<'tcx> { let mut info = self.info.lock(); debug_assert!(!info.complete); // Remove the waiter from the list of waiters - let waiter = info.waiters.remove(waiter); - - // Set the cycle error it will be picked it up when resumed - unsafe { - (*waiter).cycle = Some(error); - } - - waiter + info.waiters.remove(waiter) } } +/// A pointer to an active query job. This is used to give query jobs an identity. #[cfg(parallel_queries)] type Ref<'tcx> = *const QueryJob<'tcx>; +/// A resumable waiter of a query. The usize is the index into waiters in the query's latch #[cfg(parallel_queries)] type Waiter<'tcx> = (Ref<'tcx>, usize); +/// Visits all the non-resumable and resumable waiters of a query. +/// Only waiters in a query are visited. +/// `visit` is called for every waiter and is passed a query waiting on `query_ref` +/// and a span indicating the reason the query waited on `query_ref`. +/// If `visit` returns Some, this function returns. +/// For visits of non-resumable waiters it returns the return value of `visit`. +/// For visits of resumable waiters it returns Some(Some(Waiter)) which has the +/// required information to resume the waiter. +/// If all `visit` calls returns None, this function also returns None. #[cfg(parallel_queries)] fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option>> where F: FnMut(Span, Ref<'tcx>) -> Option>> { let query = unsafe { &*query_ref }; + + // Visit the parent query which is a non-resumable waiter since it's on the same stack if let Some(ref parent) = query.parent { if let Some(cycle) = visit(query.info.span, &**parent as Ref) { return Some(cycle); } } + + // Visit the explict waiters which use condvars and are resumable for (i, &waiter) in query.latch.info.lock().waiters.iter().enumerate() { unsafe { if let Some(ref waiter_query) = *(*waiter).query { if visit((*waiter).span, &**waiter_query as Ref).is_some() { + // Return a value which indicates that this waiter can be resumed return Some(Some((query_ref, i))); } } @@ -267,6 +283,10 @@ where None } +/// Look for query cycles by doing a depth first search starting at `query`. +/// `span` is the reason for the `query` to execute. This is initially DUMMY_SP. +/// If a cycle is detected, this initial value is replaced with the span causing +/// the cycle. #[cfg(parallel_queries)] fn cycle_check<'tcx>(query: Ref<'tcx>, span: Span, @@ -274,6 +294,8 @@ fn cycle_check<'tcx>(query: Ref<'tcx>, visited: &mut HashSet>) -> Option>> { if visited.contains(&query) { return if let Some(p) = stack.iter().position(|q| q.1 == query) { + // We detected a query cycle, fix up the initial span and return Some + // Remove previous stack entries stack.splice(0..p, iter::empty()); // Replace the span for the first query with the cycle cause @@ -284,13 +306,16 @@ fn cycle_check<'tcx>(query: Ref<'tcx>, } } + // Mark this query is visited and add it to the stack visited.insert(query); stack.push((span, query)); + // Visit all the waiters let r = visit_waiters(query, |span, successor| { cycle_check(successor, span, stack, visited) }); + // Remove the entry in our stack if we didn't find a cycle if r.is_none() { stack.pop(); } @@ -298,12 +323,17 @@ fn cycle_check<'tcx>(query: Ref<'tcx>, r } +/// Finds out if there's a path to the compiler root (aka. code which isn't in a query) +/// from `query` without going through any of the queries in `visited`. +/// This is achieved with a depth first search. #[cfg(parallel_queries)] fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet>) -> bool { + // We already visited this or we're deliberately ignoring it if visited.contains(&query) { return false; } + // This query is connected to the root (it has no query parent), return true if unsafe { (*query).parent.is_none() } { return true; } @@ -321,19 +351,20 @@ fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet>) - }).is_some() } -#[cfg(parallel_queries)] -fn query_entry<'tcx>(r: Ref<'tcx>) -> QueryInfo<'tcx> { - unsafe { (*r).info.clone() } -} - +/// Looks for query cycles starting from the last query in `jobs`. +/// If a cycle is found, all queries in the cycle is removed from `jobs` and +/// the function return true. +/// If a cycle was not found, the starting query is removed from `jobs` and +/// the function returns false. #[cfg(parallel_queries)] fn remove_cycle<'tcx>( jobs: &mut Vec>, wakelist: &mut Vec<*mut QueryWaiter<'tcx>>, tcx: TyCtxt<'_, 'tcx, '_> -) { +) -> bool { let mut visited = HashSet::new(); let mut stack = Vec::new(); + // Look for a cycle starting with the last query in `jobs` if let Some(waiter) = cycle_check(jobs.pop().unwrap(), DUMMY_SP, &mut stack, @@ -341,13 +372,15 @@ fn remove_cycle<'tcx>( // Reverse the stack so earlier entries require later entries stack.reverse(); + // Extract the spans and queries into separate arrays let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect(); let queries = stack.iter().map(|e| e.1); - // Shift the spans so that a query is matched the span for its waitee + // Shift the spans so that queries are matched with the span for their waitee let last = spans.pop().unwrap(); spans.insert(0, last); + // Zip them back together let mut stack: Vec<_> = spans.into_iter().zip(queries).collect(); // Remove the queries in our cycle from the list of jobs to look at @@ -355,9 +388,6 @@ fn remove_cycle<'tcx>( jobs.remove_item(&r.1); } - let (waitee_query, waiter_idx) = waiter.unwrap(); - let waitee_query = unsafe { &*waitee_query }; - // Find the queries in the cycle which are // connected to queries outside the cycle let entry_points: Vec> = stack.iter().filter_map(|query| { @@ -392,6 +422,7 @@ fn remove_cycle<'tcx>( stack.insert(0, last); } + // Create the cycle error let mut error = CycleError { usage: None, cycle: stack.iter().map(|&(s, q)| QueryInfo { @@ -400,10 +431,30 @@ fn remove_cycle<'tcx>( } ).collect(), }; - wakelist.push(waitee_query.latch.resume_waiter(waiter_idx, error)); + // We unwrap `waiter` here since there must always be one + // edge which is resumeable / waited using a query latch + let (waitee_query, waiter_idx) = waiter.unwrap(); + let waitee_query = unsafe { &*waitee_query }; + + // Extract the waiter we want to resume + let waiter = waitee_query.latch.extract_waiter(waiter_idx); + + // Set the cycle error it will be picked it up when resumed + unsafe { + (*waiter).cycle = Some(error); + } + + // Put the waiter on the list of things to resume + wakelist.push(waiter); + + true + } else { + false } } +/// Creates a new thread and forwards information in thread locals to it. +/// The new thread runs the deadlock handler. #[cfg(parallel_queries)] pub fn handle_deadlock() { use syntax; @@ -440,6 +491,11 @@ pub fn handle_deadlock() { }); } +/// Detects query cycles by using depth first search over all active query jobs. +/// If a query cycle is found it will break the cycle by finding an edge which +/// uses a query latch and then resuming that waiter. +/// There may be multiple cycles involved in a deadlock, so this searches +/// all active queries for cycles before finally resuming all the waiters at once. #[cfg(parallel_queries)] fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) { let on_panic = OnDrop(|| { @@ -450,13 +506,22 @@ fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) { let mut wakelist = Vec::new(); let mut jobs: Vec<_> = tcx.maps.collect_active_jobs().iter().map(|j| &**j as Ref).collect(); + let mut found_cycle = false; + while jobs.len() > 0 { - remove_cycle(&mut jobs, &mut wakelist, tcx); + if remove_cycle(&mut jobs, &mut wakelist, tcx) { + found_cycle = true; + } } - // FIXME: Panic if no cycle is detected - - // FIXME: Write down the conditions when a deadlock happens without a cycle + // Check that a cycle was found. It is possible for a deadlock to occur without + // a query cycle if a query which can be waited on uses Rayon to do multithreading + // internally. Such a query (X) may be executing on 2 threads (A and B) and A may + // wait using Rayon on B. Rayon may then switch to executing another query (Y) + // which in turn will wait on X causing a deadlock. We have a false dependency from + // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here + // only considers the true dependency and won't detect a cycle. + assert!(found_cycle); // FIXME: Ensure this won't cause a deadlock before we return for waiter in wakelist.into_iter() {