diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index a5a2f0093ce..c9bc2240c21 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -124,8 +124,6 @@ impl QueryJob { } impl QueryJobId { - #[cold] - #[inline(never)] #[cfg(not(parallel_compiler))] pub(super) fn find_cycle_in_stack( &self, diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs index 186417e862a..0326869826b 100644 --- a/compiler/rustc_query_system/src/query/plumbing.rs +++ b/compiler/rustc_query_system/src/query/plumbing.rs @@ -6,6 +6,8 @@ use crate::dep_graph::{DepContext, DepKind, DepNode, DepNodeIndex, DepNodeParams use crate::dep_graph::{DepGraphData, HasDepContext}; use crate::ich::StableHashingContext; use crate::query::caches::QueryCache; +#[cfg(parallel_compiler)] +use crate::query::job::QueryLatch; use crate::query::job::{report_cycle, QueryInfo, QueryJob, QueryJobId, QueryJobInfo}; use crate::query::SerializedDepNodeIndex; use crate::query::{QueryContext, QueryMap, QuerySideEffects, QueryStackFrame}; @@ -13,12 +15,10 @@ use crate::values::Value; use crate::HandleCycleError; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::fx::FxHashMap; -#[cfg(parallel_compiler)] -use rustc_data_structures::profiling::TimingGuard; -#[cfg(parallel_compiler)] -use rustc_data_structures::sharded::Sharded; use rustc_data_structures::stack::ensure_sufficient_stack; -use rustc_data_structures::sync::{Lock, LockGuard}; +use rustc_data_structures::sync::Lock; +#[cfg(parallel_compiler)] +use rustc_data_structures::{cold_path, sharded::Sharded}; use rustc_errors::{DiagnosticBuilder, ErrorGuaranteed, FatalError}; use rustc_span::{Span, DUMMY_SP}; use std::cell::Cell; @@ -116,7 +116,6 @@ where { state: &'tcx QueryState, key: K, - id: QueryJobId, } #[cold] @@ -166,81 +165,6 @@ impl<'tcx, K, D: DepKind> JobOwner<'tcx, K, D> where K: Eq + Hash + Copy, { - /// Either gets a `JobOwner` corresponding the query, allowing us to - /// start executing the query, or returns with the result of the query. - /// This function assumes that `try_get_cached` is already called and returned `lookup`. - /// If the query is executing elsewhere, this will wait for it and return the result. - /// If the query panicked, this will silently panic. - /// - /// This function is inlined because that results in a noticeable speed-up - /// for some compile-time benchmarks. - #[inline(always)] - fn try_start<'b, Qcx>( - qcx: &'b Qcx, - state: &'b QueryState, - mut state_lock: LockGuard<'b, FxHashMap>>, - span: Span, - key: K, - ) -> TryGetJob<'b, K, D> - where - Qcx: QueryContext + HasDepContext, - { - let lock = &mut *state_lock; - let current_job_id = qcx.current_query_job(); - - match lock.entry(key) { - Entry::Vacant(entry) => { - let id = qcx.next_job_id(); - let job = QueryJob::new(id, span, current_job_id); - - let key = *entry.key(); - entry.insert(QueryResult::Started(job)); - - let owner = JobOwner { state, id, key }; - return TryGetJob::NotYetStarted(owner); - } - Entry::Occupied(mut entry) => { - match entry.get_mut() { - #[cfg(not(parallel_compiler))] - QueryResult::Started(job) => { - let id = job.id; - drop(state_lock); - - // If we are single-threaded we know that we have cycle error, - // so we just return the error. - return TryGetJob::Cycle(id.find_cycle_in_stack( - qcx.try_collect_active_jobs().unwrap(), - ¤t_job_id, - span, - )); - } - #[cfg(parallel_compiler)] - QueryResult::Started(job) => { - // For parallel queries, we'll block and wait until the query running - // in another thread has completed. Record how long we wait in the - // self-profiler. - let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked(); - - // Get the latch out - let latch = job.latch(); - - drop(state_lock); - - // With parallel queries we might just have to wait on some other - // thread. - let result = latch.wait_on(current_job_id, span); - - match result { - Ok(()) => TryGetJob::JobCompleted(query_blocked_prof_timer), - Err(cycle) => TryGetJob::Cycle(cycle), - } - } - QueryResult::Poisoned => FatalError.raise(), - } - } - } - } - /// Completes the query by updating the query cache with the `result`, /// signals the waiter and forgets the JobOwner, so it won't poison the query fn complete(self, cache: &C, result: C::Value, dep_node_index: DepNodeIndex) @@ -307,25 +231,6 @@ pub(crate) struct CycleError { pub cycle: Vec>, } -/// The result of `try_start`. -enum TryGetJob<'tcx, K, D> -where - K: Eq + Hash + Copy, - D: DepKind, -{ - /// The query is not yet started. Contains a guard to the cache eventually used to start it. - NotYetStarted(JobOwner<'tcx, K, D>), - - /// The query was already completed. - /// Returns the result of the query and its dep-node index - /// if it succeeded or a cycle error if it failed. - #[cfg(parallel_compiler)] - JobCompleted(TimingGuard<'tcx>), - - /// Trying to execute the query resulted in a cycle. - Cycle(CycleError), -} - /// Checks if the query is already computed and in the cache. /// It returns the shard index and a lock guard to the shard, /// which will be used if the query is not in the cache and we need @@ -346,6 +251,65 @@ where } } +#[cold] +#[inline(never)] +#[cfg(not(parallel_compiler))] +fn cycle_error( + query: Q, + qcx: Qcx, + try_execute: QueryJobId, + span: Span, +) -> (Q::Value, Option) +where + Q: QueryConfig, + Qcx: QueryContext, +{ + let error = try_execute.find_cycle_in_stack( + qcx.try_collect_active_jobs().unwrap(), + &qcx.current_query_job(), + span, + ); + (mk_cycle(qcx, error, query.handle_cycle_error()), None) +} + +#[inline(always)] +#[cfg(parallel_compiler)] +fn wait_for_query( + query: Q, + qcx: Qcx, + span: Span, + key: Q::Key, + latch: QueryLatch, + current: Option, +) -> (Q::Value, Option) +where + Q: QueryConfig, + Qcx: QueryContext, +{ + // For parallel queries, we'll block and wait until the query running + // in another thread has completed. Record how long we wait in the + // self-profiler. + let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked(); + + // With parallel queries we might just have to wait on some other + // thread. + let result = latch.wait_on(current, span); + + match result { + Ok(()) => { + let Some((v, index)) = query.query_cache(qcx).lookup(&key) else { + cold_path(|| panic!("value must be in cache after waiting")) + }; + + qcx.dep_context().profiler().query_cache_hit(index.into()); + query_blocked_prof_timer.finish_with_query_invocation_id(index.into()); + + (v, Some(index)) + } + Err(cycle) => (mk_cycle(qcx, cycle, query.handle_cycle_error()), None), + } +} + #[inline(never)] fn try_execute_query( query: Q, @@ -360,9 +324,9 @@ where { let state = query.query_state(qcx); #[cfg(parallel_compiler)] - let state_lock = state.active.get_shard_by_value(&key).lock(); + let mut state_lock = state.active.get_shard_by_value(&key).lock(); #[cfg(not(parallel_compiler))] - let state_lock = state.active.lock(); + let mut state_lock = state.active.lock(); // For the parallel compiler we need to check both the query cache and query state structures // while holding the state lock to ensure that 1) the query has not yet completed and 2) the @@ -377,46 +341,84 @@ where } } - match JobOwner::<'_, Q::Key, Qcx::DepKind>::try_start(&qcx, state, state_lock, span, key) { - TryGetJob::NotYetStarted(job) => { - let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() { - None => execute_job_non_incr(query, qcx, key, job.id), - Some(data) => execute_job_incr(query, qcx, data, key, dep_node, job.id), - }; + let current_job_id = qcx.current_query_job(); - let cache = query.query_cache(qcx); - if query.feedable() { - // We should not compute queries that also got a value via feeding. - // This can't happen, as query feeding adds the very dependencies to the fed query - // as its feeding query had. So if the fed query is red, so is its feeder, which will - // get evaluated first, and re-feed the query. - if let Some((cached_result, _)) = cache.lookup(&key) { - panic!( - "fed query later has its value computed. The already cached value: {cached_result:?}" - ); + match state_lock.entry(key) { + Entry::Vacant(entry) => { + // Nothing has computed or is computing the query, so we start a new job and insert it in the + // state map. + let id = qcx.next_job_id(); + let job = QueryJob::new(id, span, current_job_id); + entry.insert(QueryResult::Started(job)); + + // Drop the lock before we start executing the query + drop(state_lock); + + execute_job(query, qcx, state, key, id, dep_node) + } + Entry::Occupied(mut entry) => { + match entry.get_mut() { + #[cfg(not(parallel_compiler))] + QueryResult::Started(job) => { + let id = job.id; + drop(state_lock); + + // If we are single-threaded we know that we have cycle error, + // so we just return the error. + cycle_error(query, qcx, id, span) } + #[cfg(parallel_compiler)] + QueryResult::Started(job) => { + // Get the latch out + let latch = job.latch(); + drop(state_lock); + + wait_for_query(query, qcx, span, key, latch, current_job_id) + } + QueryResult::Poisoned => FatalError.raise(), } - job.complete(cache, result, dep_node_index); - (result, Some(dep_node_index)) - } - TryGetJob::Cycle(error) => { - let result = mk_cycle(qcx, error, query.handle_cycle_error()); - (result, None) - } - #[cfg(parallel_compiler)] - TryGetJob::JobCompleted(query_blocked_prof_timer) => { - let Some((v, index)) = query.query_cache(qcx).lookup(&key) else { - panic!("value must be in cache after waiting") - }; - - qcx.dep_context().profiler().query_cache_hit(index.into()); - query_blocked_prof_timer.finish_with_query_invocation_id(index.into()); - - (v, Some(index)) } } } +#[inline(always)] +fn execute_job( + query: Q, + qcx: Qcx, + state: &QueryState, + key: Q::Key, + id: QueryJobId, + dep_node: Option>, +) -> (Q::Value, Option) +where + Q: QueryConfig, + Qcx: QueryContext, +{ + // Use `JobOwner` so the query will be poisoned if executing it panics. + let job_owner = JobOwner { state, key }; + + let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() { + None => execute_job_non_incr(query, qcx, key, id), + Some(data) => execute_job_incr(query, qcx, data, key, dep_node, id), + }; + + let cache = query.query_cache(qcx); + if query.feedable() { + // We should not compute queries that also got a value via feeding. + // This can't happen, as query feeding adds the very dependencies to the fed query + // as its feeding query had. So if the fed query is red, so is its feeder, which will + // get evaluated first, and re-feed the query. + if let Some((cached_result, _)) = cache.lookup(&key) { + panic!( + "fed query later has its value computed. The already cached value: {cached_result:?}" + ); + } + } + job_owner.complete(cache, result, dep_node_index); + + (result, Some(dep_node_index)) +} + // Fast path for when incr. comp. is off. #[inline(always)] fn execute_job_non_incr(