Refactor try_execute_query
This commit is contained in:
parent
822c10feb7
commit
ced33ad289
@ -124,8 +124,6 @@ impl<D: DepKind> QueryJob<D> {
|
||||
}
|
||||
|
||||
impl QueryJobId {
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
#[cfg(not(parallel_compiler))]
|
||||
pub(super) fn find_cycle_in_stack<D: DepKind>(
|
||||
&self,
|
||||
|
@ -6,18 +6,18 @@ use crate::dep_graph::{DepContext, DepKind, DepNode, DepNodeIndex, DepNodeParams
|
||||
use crate::dep_graph::{DepGraphData, HasDepContext};
|
||||
use crate::ich::StableHashingContext;
|
||||
use crate::query::caches::QueryCache;
|
||||
#[cfg(parallel_compiler)]
|
||||
use crate::query::job::QueryLatch;
|
||||
use crate::query::job::{report_cycle, QueryInfo, QueryJob, QueryJobId, QueryJobInfo};
|
||||
use crate::query::{QueryContext, QueryMap, QuerySideEffects, QueryStackFrame};
|
||||
use crate::values::Value;
|
||||
use crate::HandleCycleError;
|
||||
use rustc_data_structures::fingerprint::Fingerprint;
|
||||
use rustc_data_structures::fx::FxHashMap;
|
||||
#[cfg(parallel_compiler)]
|
||||
use rustc_data_structures::profiling::TimingGuard;
|
||||
#[cfg(parallel_compiler)]
|
||||
use rustc_data_structures::sharded::Sharded;
|
||||
use rustc_data_structures::stack::ensure_sufficient_stack;
|
||||
use rustc_data_structures::sync::{Lock, LockGuard};
|
||||
use rustc_data_structures::sync::Lock;
|
||||
#[cfg(parallel_compiler)]
|
||||
use rustc_data_structures::{cold_path, sharded::Sharded};
|
||||
use rustc_errors::{DiagnosticBuilder, ErrorGuaranteed, FatalError};
|
||||
use rustc_session::Session;
|
||||
use rustc_span::{Span, DUMMY_SP};
|
||||
@ -116,7 +116,6 @@ where
|
||||
{
|
||||
state: &'tcx QueryState<K, D>,
|
||||
key: K,
|
||||
id: QueryJobId,
|
||||
}
|
||||
|
||||
#[cold]
|
||||
@ -166,81 +165,6 @@ impl<'tcx, K, D: DepKind> JobOwner<'tcx, K, D>
|
||||
where
|
||||
K: Eq + Hash + Copy,
|
||||
{
|
||||
/// Either gets a `JobOwner` corresponding the query, allowing us to
|
||||
/// start executing the query, or returns with the result of the query.
|
||||
/// This function assumes that `try_get_cached` is already called and returned `lookup`.
|
||||
/// If the query is executing elsewhere, this will wait for it and return the result.
|
||||
/// If the query panicked, this will silently panic.
|
||||
///
|
||||
/// This function is inlined because that results in a noticeable speed-up
|
||||
/// for some compile-time benchmarks.
|
||||
#[inline(always)]
|
||||
fn try_start<'b, Qcx>(
|
||||
qcx: &'b Qcx,
|
||||
state: &'b QueryState<K, Qcx::DepKind>,
|
||||
mut state_lock: LockGuard<'b, FxHashMap<K, QueryResult<Qcx::DepKind>>>,
|
||||
span: Span,
|
||||
key: K,
|
||||
) -> TryGetJob<'b, K, D>
|
||||
where
|
||||
Qcx: QueryContext + HasDepContext<DepKind = D>,
|
||||
{
|
||||
let lock = &mut *state_lock;
|
||||
let current_job_id = qcx.current_query_job();
|
||||
|
||||
match lock.entry(key) {
|
||||
Entry::Vacant(entry) => {
|
||||
let id = qcx.next_job_id();
|
||||
let job = QueryJob::new(id, span, current_job_id);
|
||||
|
||||
let key = *entry.key();
|
||||
entry.insert(QueryResult::Started(job));
|
||||
|
||||
let owner = JobOwner { state, id, key };
|
||||
return TryGetJob::NotYetStarted(owner);
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
#[cfg(not(parallel_compiler))]
|
||||
QueryResult::Started(job) => {
|
||||
let id = job.id;
|
||||
drop(state_lock);
|
||||
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
return TryGetJob::Cycle(id.find_cycle_in_stack(
|
||||
qcx.try_collect_active_jobs().unwrap(),
|
||||
¤t_job_id,
|
||||
span,
|
||||
));
|
||||
}
|
||||
#[cfg(parallel_compiler)]
|
||||
QueryResult::Started(job) => {
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked();
|
||||
|
||||
// Get the latch out
|
||||
let latch = job.latch();
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
let result = latch.wait_on(current_job_id, span);
|
||||
|
||||
match result {
|
||||
Ok(()) => TryGetJob::JobCompleted(query_blocked_prof_timer),
|
||||
Err(cycle) => TryGetJob::Cycle(cycle),
|
||||
}
|
||||
}
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Completes the query by updating the query cache with the `result`,
|
||||
/// signals the waiter and forgets the JobOwner, so it won't poison the query
|
||||
fn complete<C>(self, cache: &C, result: C::Value, dep_node_index: DepNodeIndex)
|
||||
@ -307,25 +231,6 @@ pub(crate) struct CycleError<D: DepKind> {
|
||||
pub cycle: Vec<QueryInfo<D>>,
|
||||
}
|
||||
|
||||
/// The result of `try_start`.
|
||||
enum TryGetJob<'tcx, K, D>
|
||||
where
|
||||
K: Eq + Hash + Copy,
|
||||
D: DepKind,
|
||||
{
|
||||
/// The query is not yet started. Contains a guard to the cache eventually used to start it.
|
||||
NotYetStarted(JobOwner<'tcx, K, D>),
|
||||
|
||||
/// The query was already completed.
|
||||
/// Returns the result of the query and its dep-node index
|
||||
/// if it succeeded or a cycle error if it failed.
|
||||
#[cfg(parallel_compiler)]
|
||||
JobCompleted(TimingGuard<'tcx>),
|
||||
|
||||
/// Trying to execute the query resulted in a cycle.
|
||||
Cycle(CycleError<D>),
|
||||
}
|
||||
|
||||
/// Checks if the query is already computed and in the cache.
|
||||
/// It returns the shard index and a lock guard to the shard,
|
||||
/// which will be used if the query is not in the cache and we need
|
||||
@ -346,6 +251,65 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
#[cfg(not(parallel_compiler))]
|
||||
fn cycle_error<Q, Qcx>(
|
||||
query: Q,
|
||||
qcx: Qcx,
|
||||
try_execute: QueryJobId,
|
||||
span: Span,
|
||||
) -> (Q::Value, Option<DepNodeIndex>)
|
||||
where
|
||||
Q: QueryConfig<Qcx>,
|
||||
Qcx: QueryContext,
|
||||
{
|
||||
let error = try_execute.find_cycle_in_stack(
|
||||
qcx.try_collect_active_jobs().unwrap(),
|
||||
&qcx.current_query_job(),
|
||||
span,
|
||||
);
|
||||
(mk_cycle(qcx, error, query.handle_cycle_error()), None)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[cfg(parallel_compiler)]
|
||||
fn wait_for_query<Q, Qcx>(
|
||||
query: Q,
|
||||
qcx: Qcx,
|
||||
span: Span,
|
||||
key: Q::Key,
|
||||
latch: QueryLatch<Qcx::DepKind>,
|
||||
current: Option<QueryJobId>,
|
||||
) -> (Q::Value, Option<DepNodeIndex>)
|
||||
where
|
||||
Q: QueryConfig<Qcx>,
|
||||
Qcx: QueryContext,
|
||||
{
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked();
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
let result = latch.wait_on(current, span);
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
let Some((v, index)) = query.query_cache(qcx).lookup(&key) else {
|
||||
cold_path(|| panic!("value must be in cache after waiting"))
|
||||
};
|
||||
|
||||
qcx.dep_context().profiler().query_cache_hit(index.into());
|
||||
query_blocked_prof_timer.finish_with_query_invocation_id(index.into());
|
||||
|
||||
(v, Some(index))
|
||||
}
|
||||
Err(cycle) => (mk_cycle(qcx, cycle, query.handle_cycle_error()), None),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn try_execute_query<Q, Qcx>(
|
||||
query: Q,
|
||||
@ -360,9 +324,9 @@ where
|
||||
{
|
||||
let state = query.query_state(qcx);
|
||||
#[cfg(parallel_compiler)]
|
||||
let state_lock = state.active.get_shard_by_value(&key).lock();
|
||||
let mut state_lock = state.active.get_shard_by_value(&key).lock();
|
||||
#[cfg(not(parallel_compiler))]
|
||||
let state_lock = state.active.lock();
|
||||
let mut state_lock = state.active.lock();
|
||||
|
||||
// For the parallel compiler we need to check both the query cache and query state structures
|
||||
// while holding the state lock to ensure that 1) the query has not yet completed and 2) the
|
||||
@ -377,46 +341,84 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
match JobOwner::<'_, Q::Key, Qcx::DepKind>::try_start(&qcx, state, state_lock, span, key) {
|
||||
TryGetJob::NotYetStarted(job) => {
|
||||
let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() {
|
||||
None => execute_job_non_incr(query, qcx, key, job.id),
|
||||
Some(data) => execute_job_incr(query, qcx, data, key, dep_node, job.id),
|
||||
};
|
||||
let current_job_id = qcx.current_query_job();
|
||||
|
||||
let cache = query.query_cache(qcx);
|
||||
if query.feedable() {
|
||||
// We should not compute queries that also got a value via feeding.
|
||||
// This can't happen, as query feeding adds the very dependencies to the fed query
|
||||
// as its feeding query had. So if the fed query is red, so is its feeder, which will
|
||||
// get evaluated first, and re-feed the query.
|
||||
if let Some((cached_result, _)) = cache.lookup(&key) {
|
||||
panic!(
|
||||
"fed query later has its value computed. The already cached value: {cached_result:?}"
|
||||
);
|
||||
match state_lock.entry(key) {
|
||||
Entry::Vacant(entry) => {
|
||||
// Nothing has computed or is computing the query, so we start a new job and insert it in the
|
||||
// state map.
|
||||
let id = qcx.next_job_id();
|
||||
let job = QueryJob::new(id, span, current_job_id);
|
||||
entry.insert(QueryResult::Started(job));
|
||||
|
||||
// Drop the lock before we start executing the query
|
||||
drop(state_lock);
|
||||
|
||||
execute_job(query, qcx, state, key, id, dep_node)
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
#[cfg(not(parallel_compiler))]
|
||||
QueryResult::Started(job) => {
|
||||
let id = job.id;
|
||||
drop(state_lock);
|
||||
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
cycle_error(query, qcx, id, span)
|
||||
}
|
||||
#[cfg(parallel_compiler)]
|
||||
QueryResult::Started(job) => {
|
||||
// Get the latch out
|
||||
let latch = job.latch();
|
||||
drop(state_lock);
|
||||
|
||||
wait_for_query(query, qcx, span, key, latch, current_job_id)
|
||||
}
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
job.complete(cache, result, dep_node_index);
|
||||
(result, Some(dep_node_index))
|
||||
}
|
||||
TryGetJob::Cycle(error) => {
|
||||
let result = mk_cycle(qcx, error, query.handle_cycle_error());
|
||||
(result, None)
|
||||
}
|
||||
#[cfg(parallel_compiler)]
|
||||
TryGetJob::JobCompleted(query_blocked_prof_timer) => {
|
||||
let Some((v, index)) = query.query_cache(qcx).lookup(&key) else {
|
||||
panic!("value must be in cache after waiting")
|
||||
};
|
||||
|
||||
qcx.dep_context().profiler().query_cache_hit(index.into());
|
||||
query_blocked_prof_timer.finish_with_query_invocation_id(index.into());
|
||||
|
||||
(v, Some(index))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn execute_job<Q, Qcx>(
|
||||
query: Q,
|
||||
qcx: Qcx,
|
||||
state: &QueryState<Q::Key, Qcx::DepKind>,
|
||||
key: Q::Key,
|
||||
id: QueryJobId,
|
||||
dep_node: Option<DepNode<Qcx::DepKind>>,
|
||||
) -> (Q::Value, Option<DepNodeIndex>)
|
||||
where
|
||||
Q: QueryConfig<Qcx>,
|
||||
Qcx: QueryContext,
|
||||
{
|
||||
// Use `JobOwner` so the query will be poisoned if executing it panics.
|
||||
let job_owner = JobOwner { state, key };
|
||||
|
||||
let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() {
|
||||
None => execute_job_non_incr(query, qcx, key, id),
|
||||
Some(data) => execute_job_incr(query, qcx, data, key, dep_node, id),
|
||||
};
|
||||
|
||||
let cache = query.query_cache(qcx);
|
||||
if query.feedable() {
|
||||
// We should not compute queries that also got a value via feeding.
|
||||
// This can't happen, as query feeding adds the very dependencies to the fed query
|
||||
// as its feeding query had. So if the fed query is red, so is its feeder, which will
|
||||
// get evaluated first, and re-feed the query.
|
||||
if let Some((cached_result, _)) = cache.lookup(&key) {
|
||||
panic!(
|
||||
"fed query later has its value computed. The already cached value: {cached_result:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
job_owner.complete(cache, result, dep_node_index);
|
||||
|
||||
(result, Some(dep_node_index))
|
||||
}
|
||||
|
||||
// Fast path for when incr. comp. is off.
|
||||
#[inline(always)]
|
||||
fn execute_job_non_incr<Q, Qcx>(
|
||||
|
Loading…
x
Reference in New Issue
Block a user