Auto merge of #84806 - Mark-Simulacrum:try-start-entry, r=cjgillot
Streamline try_start code This shifts some branches around and avoids interleaving parallel and non-parallel versions of the function too much.
This commit is contained in:
commit
777bb2f612
@ -14,7 +14,7 @@ pub trait CacheSelector<K, V> {
|
||||
type Cache;
|
||||
}
|
||||
|
||||
pub trait QueryStorage: Default {
|
||||
pub trait QueryStorage {
|
||||
type Value: Debug;
|
||||
type Stored: Clone;
|
||||
|
||||
@ -23,7 +23,7 @@ pub trait QueryStorage: Default {
|
||||
fn store_nocache(&self, value: Self::Value) -> Self::Stored;
|
||||
}
|
||||
|
||||
pub trait QueryCache: QueryStorage {
|
||||
pub trait QueryCache: QueryStorage + Sized {
|
||||
type Key: Hash + Eq + Clone + Debug;
|
||||
type Sharded: Default;
|
||||
|
||||
|
@ -52,10 +52,6 @@ impl<CTX: QueryContext, K, V> QueryVtable<CTX, K, V> {
|
||||
(self.hash_result)(hcx, value)
|
||||
}
|
||||
|
||||
pub(crate) fn handle_cycle_error(&self, tcx: CTX, diag: DiagnosticBuilder<'_>) -> V {
|
||||
(self.handle_cycle_error)(tcx, diag)
|
||||
}
|
||||
|
||||
pub(crate) fn cache_on_disk(&self, tcx: CTX, key: &K, value: Option<&V>) -> bool {
|
||||
(self.cache_on_disk)(tcx, key, value)
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ use rustc_span::Span;
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
#[cfg(parallel_compiler)]
|
||||
@ -100,8 +99,6 @@ pub struct QueryJob<D> {
|
||||
/// The latch that is used to wait on this job.
|
||||
#[cfg(parallel_compiler)]
|
||||
latch: Option<QueryLatch<D>>,
|
||||
|
||||
dummy: PhantomData<QueryLatch<D>>,
|
||||
}
|
||||
|
||||
impl<D> QueryJob<D>
|
||||
@ -116,23 +113,17 @@ where
|
||||
parent,
|
||||
#[cfg(parallel_compiler)]
|
||||
latch: None,
|
||||
dummy: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(parallel_compiler)]
|
||||
pub(super) fn latch(&mut self, _id: QueryJobId<D>) -> QueryLatch<D> {
|
||||
pub(super) fn latch(&mut self) -> QueryLatch<D> {
|
||||
if self.latch.is_none() {
|
||||
self.latch = Some(QueryLatch::new());
|
||||
}
|
||||
self.latch.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
#[cfg(not(parallel_compiler))]
|
||||
pub(super) fn latch(&mut self, id: QueryJobId<D>) -> QueryLatch<D> {
|
||||
QueryLatch { id }
|
||||
}
|
||||
|
||||
/// Signals to waiters that the query is complete.
|
||||
///
|
||||
/// This does nothing for single threaded rustc,
|
||||
@ -148,13 +139,7 @@ where
|
||||
}
|
||||
|
||||
#[cfg(not(parallel_compiler))]
|
||||
#[derive(Clone)]
|
||||
pub(super) struct QueryLatch<D> {
|
||||
id: QueryJobId<D>,
|
||||
}
|
||||
|
||||
#[cfg(not(parallel_compiler))]
|
||||
impl<D> QueryLatch<D>
|
||||
impl<D> QueryJobId<D>
|
||||
where
|
||||
D: Copy + Clone + Eq + Hash,
|
||||
{
|
||||
@ -172,7 +157,7 @@ where
|
||||
let info = query_map.get(&job).unwrap();
|
||||
cycle.push(info.info.clone());
|
||||
|
||||
if job == self.id {
|
||||
if job == *self {
|
||||
cycle.reverse();
|
||||
|
||||
// This is the end of the cycle
|
||||
|
@ -11,13 +11,13 @@ use crate::query::job::{
|
||||
};
|
||||
use crate::query::{QueryContext, QueryMap, QueryStackFrame};
|
||||
|
||||
#[cfg(not(parallel_compiler))]
|
||||
use rustc_data_structures::cold_path;
|
||||
use rustc_data_structures::fingerprint::Fingerprint;
|
||||
use rustc_data_structures::fx::{FxHashMap, FxHasher};
|
||||
use rustc_data_structures::sharded::{get_shard_index_by_hash, Sharded};
|
||||
use rustc_data_structures::sync::{Lock, LockGuard};
|
||||
use rustc_data_structures::thin_vec::ThinVec;
|
||||
#[cfg(not(parallel_compiler))]
|
||||
use rustc_errors::DiagnosticBuilder;
|
||||
use rustc_errors::{Diagnostic, FatalError};
|
||||
use rustc_span::Span;
|
||||
use std::collections::hash_map::Entry;
|
||||
@ -36,7 +36,7 @@ pub struct QueryCacheStore<C: QueryCache> {
|
||||
pub cache_hits: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<C: QueryCache> Default for QueryCacheStore<C> {
|
||||
impl<C: QueryCache + Default> Default for QueryCacheStore<C> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cache: C::default(),
|
||||
@ -158,6 +158,31 @@ where
|
||||
id: QueryJobId<D>,
|
||||
}
|
||||
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
#[cfg(not(parallel_compiler))]
|
||||
fn mk_cycle<CTX, V, R>(
|
||||
tcx: CTX,
|
||||
root: QueryJobId<CTX::DepKind>,
|
||||
span: Span,
|
||||
handle_cycle_error: fn(CTX, DiagnosticBuilder<'_>) -> V,
|
||||
cache: &dyn crate::query::QueryStorage<Value = V, Stored = R>,
|
||||
) -> R
|
||||
where
|
||||
CTX: QueryContext,
|
||||
V: std::fmt::Debug,
|
||||
R: Clone,
|
||||
{
|
||||
let error: CycleError = root.find_cycle_in_stack(
|
||||
tcx.try_collect_active_jobs().unwrap(),
|
||||
&tcx.current_query_job(),
|
||||
span,
|
||||
);
|
||||
let error = report_cycle(tcx.dep_context().sess(), error);
|
||||
let value = handle_cycle_error(tcx, error);
|
||||
cache.store_nocache(value)
|
||||
}
|
||||
|
||||
impl<'tcx, D, C> JobOwner<'tcx, D, C>
|
||||
where
|
||||
D: Copy + Clone + Eq + Hash,
|
||||
@ -177,7 +202,7 @@ where
|
||||
state: &'b QueryState<CTX::DepKind, C::Key>,
|
||||
cache: &'b QueryCacheStore<C>,
|
||||
span: Span,
|
||||
key: &C::Key,
|
||||
key: C::Key,
|
||||
lookup: QueryLookup,
|
||||
query: &QueryVtable<CTX, C::Key, C::Value>,
|
||||
) -> TryGetJob<'b, CTX::DepKind, C>
|
||||
@ -188,94 +213,86 @@ where
|
||||
let mut state_lock = state.shards.get_shard_by_index(shard).lock();
|
||||
let lock = &mut *state_lock;
|
||||
|
||||
let (latch, mut _query_blocked_prof_timer) = match lock.active.entry((*key).clone()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
QueryResult::Started(job) => {
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
let _query_blocked_prof_timer = if cfg!(parallel_compiler) {
|
||||
Some(tcx.dep_context().profiler().query_blocked())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create the id of the job we're waiting for
|
||||
let id = QueryJobId::new(job.id, shard, query.dep_kind);
|
||||
|
||||
(job.latch(id), _query_blocked_prof_timer)
|
||||
}
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
}
|
||||
match lock.active.entry(key) {
|
||||
Entry::Vacant(entry) => {
|
||||
// No job entry for this query. Return a new one to be started later.
|
||||
|
||||
// Generate an id unique within this shard.
|
||||
let id = lock.jobs.checked_add(1).unwrap();
|
||||
lock.jobs = id;
|
||||
let id = QueryShardJobId(NonZeroU32::new(id).unwrap());
|
||||
|
||||
let global_id = QueryJobId::new(id, shard, query.dep_kind);
|
||||
|
||||
let job = tcx.current_query_job();
|
||||
let job = QueryJob::new(id, span, job);
|
||||
|
||||
let key = entry.key().clone();
|
||||
entry.insert(QueryResult::Started(job));
|
||||
|
||||
let owner = JobOwner { state, cache, id: global_id, key: (*key).clone() };
|
||||
let global_id = QueryJobId::new(id, shard, query.dep_kind);
|
||||
let owner = JobOwner { state, cache, id: global_id, key };
|
||||
return TryGetJob::NotYetStarted(owner);
|
||||
}
|
||||
};
|
||||
mem::drop(state_lock);
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
#[cfg(not(parallel_compiler))]
|
||||
QueryResult::Started(job) => {
|
||||
let id = QueryJobId::new(job.id, shard, query.dep_kind);
|
||||
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
#[cfg(not(parallel_compiler))]
|
||||
return TryGetJob::Cycle(cold_path(|| {
|
||||
let error: CycleError = latch.find_cycle_in_stack(
|
||||
tcx.try_collect_active_jobs().unwrap(),
|
||||
&tcx.current_query_job(),
|
||||
span,
|
||||
);
|
||||
let error = report_cycle(tcx.dep_context().sess(), error);
|
||||
let value = query.handle_cycle_error(tcx, error);
|
||||
cache.cache.store_nocache(value)
|
||||
}));
|
||||
drop(state_lock);
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
#[cfg(parallel_compiler)]
|
||||
{
|
||||
let result = latch.wait_on(tcx.current_query_job(), span);
|
||||
|
||||
if let Err(cycle) = result {
|
||||
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
|
||||
let value = query.handle_cycle_error(tcx, cycle);
|
||||
let value = cache.cache.store_nocache(value);
|
||||
return TryGetJob::Cycle(value);
|
||||
}
|
||||
|
||||
let cached = cache
|
||||
.cache
|
||||
.lookup(cache, &key, |value, index| {
|
||||
if unlikely!(tcx.dep_context().profiler().enabled()) {
|
||||
tcx.dep_context().profiler().query_cache_hit(index.into());
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
return TryGetJob::Cycle(mk_cycle(
|
||||
tcx,
|
||||
id,
|
||||
span,
|
||||
query.handle_cycle_error,
|
||||
&cache.cache,
|
||||
));
|
||||
}
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
|
||||
#[cfg(parallel_compiler)]
|
||||
QueryResult::Started(job) => {
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
let query_blocked_prof_timer = tcx.dep_context().profiler().query_blocked();
|
||||
|
||||
// Get the latch out
|
||||
let latch = job.latch();
|
||||
let key = entry.key().clone();
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
let result = latch.wait_on(tcx.current_query_job(), span);
|
||||
|
||||
if let Err(cycle) = result {
|
||||
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
|
||||
let value = (query.handle_cycle_error)(tcx, cycle);
|
||||
let value = cache.cache.store_nocache(value);
|
||||
return TryGetJob::Cycle(value);
|
||||
}
|
||||
|
||||
let cached = cache
|
||||
.cache
|
||||
.lookup(cache, &key, |value, index| {
|
||||
if unlikely!(tcx.dep_context().profiler().enabled()) {
|
||||
tcx.dep_context().profiler().query_cache_hit(index.into());
|
||||
}
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(value.clone(), index)
|
||||
})
|
||||
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));
|
||||
|
||||
query_blocked_prof_timer.finish_with_query_invocation_id(cached.1.into());
|
||||
|
||||
return TryGetJob::JobCompleted(cached);
|
||||
}
|
||||
(value.clone(), index)
|
||||
})
|
||||
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));
|
||||
|
||||
if let Some(prof_timer) = _query_blocked_prof_timer.take() {
|
||||
prof_timer.finish_with_query_invocation_id(cached.1.into());
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
}
|
||||
|
||||
return TryGetJob::JobCompleted(cached);
|
||||
}
|
||||
}
|
||||
|
||||
@ -418,7 +435,13 @@ where
|
||||
CTX: QueryContext,
|
||||
{
|
||||
let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
|
||||
tcx, state, cache, span, &key, lookup, query,
|
||||
tcx,
|
||||
state,
|
||||
cache,
|
||||
span,
|
||||
key.clone(),
|
||||
lookup,
|
||||
query,
|
||||
) {
|
||||
TryGetJob::NotYetStarted(job) => job,
|
||||
TryGetJob::Cycle(result) => return result,
|
||||
@ -741,7 +764,13 @@ fn force_query_impl<CTX, C>(
|
||||
};
|
||||
|
||||
let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
|
||||
tcx, state, cache, span, &key, lookup, query,
|
||||
tcx,
|
||||
state,
|
||||
cache,
|
||||
span,
|
||||
key.clone(),
|
||||
lookup,
|
||||
query,
|
||||
) {
|
||||
TryGetJob::NotYetStarted(job) => job,
|
||||
TryGetJob::Cycle(_) => return,
|
||||
|
Loading…
x
Reference in New Issue
Block a user