Split query execution into hot and cold paths
This commit is contained in:
parent
7710ae0e26
commit
545e290a93
3 changed files with 163 additions and 111 deletions
|
@ -1122,6 +1122,7 @@ impl CurrentDepGraph {
|
|||
}
|
||||
|
||||
impl DepGraphData {
|
||||
#[inline]
|
||||
fn read_index(&self, source: DepNodeIndex) {
|
||||
ty::tls::with_context_opt(|icx| {
|
||||
let icx = if let Some(icx) = icx { icx } else { return };
|
||||
|
|
|
@ -1688,6 +1688,7 @@ pub mod tls {
|
|||
|
||||
/// Gets the pointer to the current `ImplicitCtxt`.
|
||||
#[cfg(not(parallel_compiler))]
|
||||
#[inline]
|
||||
fn get_tlv() -> usize {
|
||||
TLV.with(|tlv| tlv.get())
|
||||
}
|
||||
|
|
|
@ -12,10 +12,8 @@ use crate::ty::{self, TyCtxt};
|
|||
#[cfg(not(parallel_compiler))]
|
||||
use rustc_data_structures::cold_path;
|
||||
use rustc_data_structures::fx::{FxHashMap, FxHasher};
|
||||
#[cfg(parallel_compiler)]
|
||||
use rustc_data_structures::profiling::TimingGuard;
|
||||
use rustc_data_structures::sharded::Sharded;
|
||||
use rustc_data_structures::sync::Lock;
|
||||
use rustc_data_structures::sync::{Lock, LockGuard};
|
||||
use rustc_data_structures::thin_vec::ThinVec;
|
||||
use rustc_errors::{struct_span_err, Diagnostic, DiagnosticBuilder, FatalError, Handler, Level};
|
||||
use rustc_span::source_map::DUMMY_SP;
|
||||
|
@ -70,6 +68,12 @@ impl<'tcx, M: QueryConfig<'tcx>> Default for QueryCache<'tcx, M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Values used when checking a query cache which can be reused on a cache-miss to execute the query.
|
||||
pub(super) struct QueryLookup<'tcx, Q: QueryDescription<'tcx>> {
|
||||
shard: usize,
|
||||
lock: LockGuard<'tcx, QueryCache<'tcx, Q>>,
|
||||
}
|
||||
|
||||
/// A type representing the responsibility to execute the job in the `job` field.
|
||||
/// This will poison the relevant query if dropped.
|
||||
pub(super) struct JobOwner<'a, 'tcx, Q: QueryDescription<'tcx>> {
|
||||
|
@ -81,119 +85,87 @@ pub(super) struct JobOwner<'a, 'tcx, Q: QueryDescription<'tcx>> {
|
|||
impl<'a, 'tcx, Q: QueryDescription<'tcx>> JobOwner<'a, 'tcx, Q> {
|
||||
/// Either gets a `JobOwner` corresponding the query, allowing us to
|
||||
/// start executing the query, or returns with the result of the query.
|
||||
/// If the query is executing elsewhere, this will wait for it.
|
||||
/// This function assumes that `try_get_cached` is already called and returned `lookup`.
|
||||
/// If the query is executing elsewhere, this will wait for it and return the result.
|
||||
/// If the query panicked, this will silently panic.
|
||||
///
|
||||
/// This function is inlined because that results in a noticeable speed-up
|
||||
/// for some compile-time benchmarks.
|
||||
#[inline(always)]
|
||||
pub(super) fn try_get(tcx: TyCtxt<'tcx>, span: Span, key: &Q::Key) -> TryGetJob<'a, 'tcx, Q> {
|
||||
// Handling the `query_blocked_prof_timer` is a bit weird because of the
|
||||
// control flow in this function: Blocking is implemented by
|
||||
// awaiting a running job and, once that is done, entering the loop below
|
||||
// again from the top. In that second iteration we will hit the
|
||||
// cache which provides us with the information we need for
|
||||
// finishing the "query-blocked" event.
|
||||
//
|
||||
// We thus allocate `query_blocked_prof_timer` outside the loop,
|
||||
// initialize it during the first iteration and finish it during the
|
||||
// second iteration.
|
||||
pub(super) fn try_start(
|
||||
tcx: TyCtxt<'tcx>,
|
||||
span: Span,
|
||||
key: &Q::Key,
|
||||
mut lookup: QueryLookup<'tcx, Q>,
|
||||
) -> TryGetJob<'a, 'tcx, Q> {
|
||||
let lock = &mut *lookup.lock;
|
||||
|
||||
let (latch, mut _query_blocked_prof_timer) = match lock.active.entry((*key).clone()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
QueryResult::Started(job) => {
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
let _query_blocked_prof_timer = if cfg!(parallel_compiler) {
|
||||
Some(tcx.prof.query_blocked())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create the id of the job we're waiting for
|
||||
let id = QueryJobId::new(job.id, lookup.shard, Q::dep_kind());
|
||||
|
||||
(job.latch(id), _query_blocked_prof_timer)
|
||||
}
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
// No job entry for this query. Return a new one to be started later.
|
||||
|
||||
// Generate an id unique within this shard.
|
||||
let id = lock.jobs.checked_add(1).unwrap();
|
||||
lock.jobs = id;
|
||||
let id = QueryShardJobId(NonZeroU32::new(id).unwrap());
|
||||
|
||||
let global_id = QueryJobId::new(id, lookup.shard, Q::dep_kind());
|
||||
|
||||
let job = tls::with_related_context(tcx, |icx| QueryJob::new(id, span, icx.query));
|
||||
|
||||
entry.insert(QueryResult::Started(job));
|
||||
|
||||
let owner =
|
||||
JobOwner { cache: Q::query_cache(tcx), id: global_id, key: (*key).clone() };
|
||||
return TryGetJob::NotYetStarted(owner);
|
||||
}
|
||||
};
|
||||
mem::drop(lookup.lock);
|
||||
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
#[cfg(not(parallel_compiler))]
|
||||
return TryGetJob::Cycle(cold_path(|| {
|
||||
Q::handle_cycle_error(tcx, latch.find_cycle_in_stack(tcx, span))
|
||||
}));
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
#[cfg(parallel_compiler)]
|
||||
let mut query_blocked_prof_timer: Option<TimingGuard<'_>> = None;
|
||||
{
|
||||
let result = latch.wait_on(tcx, span);
|
||||
|
||||
let cache = Q::query_cache(tcx);
|
||||
loop {
|
||||
// We compute the key's hash once and then use it for both the
|
||||
// shard lookup and the hashmap lookup. This relies on the fact
|
||||
// that both of them use `FxHasher`.
|
||||
let mut state = FxHasher::default();
|
||||
key.hash(&mut state);
|
||||
let key_hash = state.finish();
|
||||
|
||||
let shard = cache.get_shard_index_by_hash(key_hash);
|
||||
let mut lock_guard = cache.get_shard_by_index(shard).lock();
|
||||
let lock = &mut *lock_guard;
|
||||
|
||||
if let Some((_, value)) =
|
||||
lock.results.raw_entry().from_key_hashed_nocheck(key_hash, key)
|
||||
{
|
||||
if unlikely!(tcx.prof.enabled()) {
|
||||
tcx.prof.query_cache_hit(value.index.into());
|
||||
|
||||
#[cfg(parallel_compiler)]
|
||||
{
|
||||
if let Some(prof_timer) = query_blocked_prof_timer.take() {
|
||||
prof_timer.finish_with_query_invocation_id(value.index.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let result = (value.value.clone(), value.index);
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
lock.cache_hits += 1;
|
||||
}
|
||||
return TryGetJob::JobCompleted(result);
|
||||
if let Err(cycle) = result {
|
||||
return TryGetJob::Cycle(Q::handle_cycle_error(tcx, cycle));
|
||||
}
|
||||
|
||||
let latch = match lock.active.entry((*key).clone()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get_mut() {
|
||||
QueryResult::Started(job) => {
|
||||
// For parallel queries, we'll block and wait until the query running
|
||||
// in another thread has completed. Record how long we wait in the
|
||||
// self-profiler.
|
||||
#[cfg(parallel_compiler)]
|
||||
{
|
||||
query_blocked_prof_timer = Some(tcx.prof.query_blocked());
|
||||
}
|
||||
let cached = tcx.try_get_cached::<Q>(key).0.unwrap();
|
||||
|
||||
// Create the id of the job we're waiting for
|
||||
let id = QueryJobId::new(job.id, shard, Q::dep_kind());
|
||||
|
||||
job.latch(id)
|
||||
}
|
||||
QueryResult::Poisoned => FatalError.raise(),
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
// No job entry for this query. Return a new one to be started later.
|
||||
|
||||
// Generate an id unique within this shard.
|
||||
let id = lock.jobs.checked_add(1).unwrap();
|
||||
lock.jobs = id;
|
||||
let id = QueryShardJobId(NonZeroU32::new(id).unwrap());
|
||||
|
||||
let global_id = QueryJobId::new(id, shard, Q::dep_kind());
|
||||
|
||||
let job =
|
||||
tls::with_related_context(tcx, |icx| QueryJob::new(id, span, icx.query));
|
||||
|
||||
entry.insert(QueryResult::Started(job));
|
||||
|
||||
let owner = JobOwner { cache, id: global_id, key: (*key).clone() };
|
||||
return TryGetJob::NotYetStarted(owner);
|
||||
}
|
||||
};
|
||||
mem::drop(lock_guard);
|
||||
|
||||
// If we are single-threaded we know that we have cycle error,
|
||||
// so we just return the error.
|
||||
#[cfg(not(parallel_compiler))]
|
||||
return TryGetJob::Cycle(cold_path(|| {
|
||||
Q::handle_cycle_error(tcx, latch.find_cycle_in_stack(tcx, span))
|
||||
}));
|
||||
|
||||
// With parallel queries we might just have to wait on some other
|
||||
// thread.
|
||||
#[cfg(parallel_compiler)]
|
||||
{
|
||||
let result = latch.wait_on(tcx, span);
|
||||
|
||||
if let Err(cycle) = result {
|
||||
return TryGetJob::Cycle(Q::handle_cycle_error(tcx, cycle));
|
||||
}
|
||||
if let Some(prof_timer) = _query_blocked_prof_timer.take() {
|
||||
prof_timer.finish_with_query_invocation_id(cached.1.into());
|
||||
}
|
||||
|
||||
return TryGetJob::JobCompleted(cached);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,6 +241,7 @@ pub(super) enum TryGetJob<'a, 'tcx, D: QueryDescription<'tcx>> {
|
|||
/// The query was already completed.
|
||||
/// Returns the result of the query and its dep-node index
|
||||
/// if it succeeded or a cycle error if it failed.
|
||||
#[cfg(parallel_compiler)]
|
||||
JobCompleted((D::Value, DepNodeIndex)),
|
||||
|
||||
/// Trying to execute the query resulted in a cycle.
|
||||
|
@ -396,13 +369,76 @@ impl<'tcx> TyCtxt<'tcx> {
|
|||
eprintln!("end of query stack");
|
||||
}
|
||||
|
||||
/// Checks if the query is already computed and in the cache.
|
||||
/// It returns the shard index and a lock guard to the shard,
|
||||
/// which will be used if the query is not in the cache and we need
|
||||
/// to compute it.
|
||||
#[inline(always)]
|
||||
fn try_get_cached<Q: QueryDescription<'tcx>>(
|
||||
self,
|
||||
key: &Q::Key,
|
||||
) -> (Option<(Q::Value, DepNodeIndex)>, QueryLookup<'tcx, Q>) {
|
||||
let cache = Q::query_cache(self);
|
||||
|
||||
// We compute the key's hash once and then use it for both the
|
||||
// shard lookup and the hashmap lookup. This relies on the fact
|
||||
// that both of them use `FxHasher`.
|
||||
let mut state = FxHasher::default();
|
||||
key.hash(&mut state);
|
||||
let key_hash = state.finish();
|
||||
|
||||
let shard = cache.get_shard_index_by_hash(key_hash);
|
||||
let mut lock_guard = cache.get_shard_by_index(shard).lock();
|
||||
let lock = &mut *lock_guard;
|
||||
|
||||
let result =
|
||||
lock.results.raw_entry().from_key_hashed_nocheck(key_hash, key).map(|(_, value)| {
|
||||
if unlikely!(self.prof.enabled()) {
|
||||
self.prof.query_cache_hit(value.index.into());
|
||||
}
|
||||
|
||||
(value.value.clone(), value.index)
|
||||
});
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
if result.is_some() {
|
||||
lock.cache_hits += 1;
|
||||
}
|
||||
}
|
||||
|
||||
(result, QueryLookup { lock: lock_guard, shard })
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
pub(super) fn get_query<Q: QueryDescription<'tcx>>(self, span: Span, key: Q::Key) -> Q::Value {
|
||||
pub(super) fn get_query<Q: QueryDescription<'tcx> + 'tcx>(
|
||||
self,
|
||||
span: Span,
|
||||
key: Q::Key,
|
||||
) -> Q::Value {
|
||||
debug!("ty::query::get_query<{}>(key={:?}, span={:?})", Q::NAME, key, span);
|
||||
|
||||
let job = match JobOwner::try_get(self, span, &key) {
|
||||
let (cached, lookup) = self.try_get_cached::<Q>(&key);
|
||||
|
||||
if let Some((v, index)) = cached {
|
||||
self.dep_graph.read_index(index);
|
||||
return v;
|
||||
}
|
||||
|
||||
self.try_execute_query(span, key, lookup)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(super) fn try_execute_query<Q: QueryDescription<'tcx>>(
|
||||
self,
|
||||
span: Span,
|
||||
key: Q::Key,
|
||||
lookup: QueryLookup<'tcx, Q>,
|
||||
) -> Q::Value {
|
||||
let job = match JobOwner::try_start(self, span, &key, lookup) {
|
||||
TryGetJob::NotYetStarted(job) => job,
|
||||
TryGetJob::Cycle(result) => return result,
|
||||
#[cfg(parallel_compiler)]
|
||||
TryGetJob::JobCompleted((v, index)) => {
|
||||
self.dep_graph.read_index(index);
|
||||
return v;
|
||||
|
@ -615,7 +651,7 @@ impl<'tcx> TyCtxt<'tcx> {
|
|||
/// side-effects -- e.g., in order to report errors for erroneous programs.
|
||||
///
|
||||
/// Note: The optimization is only available during incr. comp.
|
||||
pub(super) fn ensure_query<Q: QueryDescription<'tcx>>(self, key: Q::Key) -> () {
|
||||
pub(super) fn ensure_query<Q: QueryDescription<'tcx> + 'tcx>(self, key: Q::Key) -> () {
|
||||
if Q::EVAL_ALWAYS {
|
||||
let _ = self.get_query::<Q>(DUMMY_SP, key);
|
||||
return;
|
||||
|
@ -643,12 +679,26 @@ impl<'tcx> TyCtxt<'tcx> {
|
|||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn force_query<Q: QueryDescription<'tcx>>(self, key: Q::Key, span: Span, dep_node: DepNode) {
|
||||
fn force_query<Q: QueryDescription<'tcx> + 'tcx>(
|
||||
self,
|
||||
key: Q::Key,
|
||||
span: Span,
|
||||
dep_node: DepNode,
|
||||
) {
|
||||
// We may be concurrently trying both execute and force a query.
|
||||
// Ensure that only one of them runs the query.
|
||||
let job = match JobOwner::try_get(self, span, &key) {
|
||||
|
||||
let (cached, lookup) = self.try_get_cached::<Q>(&key);
|
||||
|
||||
if cached.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let job = match JobOwner::try_start(self, span, &key, lookup) {
|
||||
TryGetJob::NotYetStarted(job) => job,
|
||||
TryGetJob::Cycle(_) | TryGetJob::JobCompleted(_) => return,
|
||||
TryGetJob::Cycle(_) => return,
|
||||
#[cfg(parallel_compiler)]
|
||||
TryGetJob::JobCompleted(_) => return,
|
||||
};
|
||||
self.force_query_with_job::<Q>(key, job, dep_node);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue