1
Fork 0

Stream the dep-graph to a file.

This commit is contained in:
Camille GILLOT 2021-03-02 22:38:49 +01:00
parent 16156fb278
commit 6bfaf3a9cb
18 changed files with 710 additions and 918 deletions

View file

@ -1,9 +1,18 @@
//! The data that we will serialize and deserialize.
use super::{DepKind, DepNode};
use super::query::DepGraphQuery;
use super::{DepKind, DepNode, DepNodeIndex};
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_index::vec::IndexVec;
use rustc_serialize::{Decodable, Decoder};
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{AtomicU32, Lock, Lrc, Ordering};
use rustc_index::vec::{Idx, IndexVec};
use rustc_serialize::opaque::{self, FileEncodeResult, FileEncoder, IntEncodedWithFixedSize};
use rustc_serialize::{Decodable, Encodable};
use smallvec::SmallVec;
use std::convert::TryInto;
#[cfg(parallel_compiler)]
use {rustc_data_structures::sync::WorkerLocal, std::sync::mpsc, std::thread};
// The maximum value of `SerializedDepNodeIndex` leaves the upper two bits
// unused so that we can store multiple index types in `CompressedHybridIndex`,
@ -50,78 +59,347 @@ impl<K: DepKind> SerializedDepGraph<K> {
}
}
impl<D: Decoder, K: DepKind + Decodable<D>> Decodable<D> for SerializedDepGraph<K> {
fn decode(d: &mut D) -> Result<SerializedDepGraph<K>, D::Error> {
// We used to serialize the dep graph by creating and serializing a `SerializedDepGraph`
// using data copied from the `DepGraph`. But copying created a large memory spike, so we
// now serialize directly from the `DepGraph` as if it's a `SerializedDepGraph`. Because we
// deserialize that data into a `SerializedDepGraph` in the next compilation session, we
// need `DepGraph`'s `Encodable` and `SerializedDepGraph`'s `Decodable` implementations to
// be in sync. If you update this decoding, be sure to update the encoding, and vice-versa.
//
// We mimic the sequence of `Encode` and `Encodable` method calls used by the `DepGraph`'s
// `Encodable` implementation with the corresponding sequence of `Decode` and `Decodable`
// method calls. E.g. `Decode::read_struct` pairs with `Encode::emit_struct`, `DepNode`'s
// `decode` pairs with `DepNode`'s `encode`, and so on. Any decoding methods not associated
// with corresponding encoding methods called in `DepGraph`'s `Encodable` implementation
// are off limits, because we'd be relying on their implementation details.
//
// For example, because we know it happens to do the right thing, its tempting to just use
// `IndexVec`'s `Decodable` implementation to decode into some of the collections below,
// even though `DepGraph` doesn't use its `Encodable` implementation. But the `IndexVec`
// implementation could change, and we'd have a bug.
//
// Variables below are explicitly typed so that anyone who changes the `SerializedDepGraph`
// representation without updating this function will encounter a compilation error, and
// know to update this and possibly the `DepGraph` `Encodable` implementation accordingly
// (the latter should serialize data in a format compatible with our representation).
impl<'a, K: DepKind + Decodable<opaque::Decoder<'a>>> Decodable<opaque::Decoder<'a>>
for SerializedDepGraph<K>
{
#[instrument(skip(d))]
fn decode(d: &mut opaque::Decoder<'a>) -> Result<SerializedDepGraph<K>, String> {
let position = d.position();
d.read_struct("SerializedDepGraph", 4, |d| {
let nodes: IndexVec<SerializedDepNodeIndex, DepNode<K>> =
d.read_struct_field("nodes", 0, |d| {
d.read_seq(|d, len| {
let mut v = IndexVec::with_capacity(len);
for i in 0..len {
v.push(d.read_seq_elt(i, |d| Decodable::decode(d))?);
}
Ok(v)
})
})?;
// The last 16 bytes are the node count and edge count.
debug!("position: {:?}", d.position());
d.set_position(d.data.len() - 2 * IntEncodedWithFixedSize::ENCODED_SIZE);
debug!("position: {:?}", d.position());
let fingerprints: IndexVec<SerializedDepNodeIndex, Fingerprint> =
d.read_struct_field("fingerprints", 1, |d| {
d.read_seq(|d, len| {
let mut v = IndexVec::with_capacity(len);
for i in 0..len {
v.push(d.read_seq_elt(i, |d| Decodable::decode(d))?);
}
Ok(v)
})
})?;
let node_count = IntEncodedWithFixedSize::decode(d)?.0 as usize;
let edge_count = IntEncodedWithFixedSize::decode(d)?.0 as usize;
debug!(?node_count, ?edge_count);
let edge_list_indices: IndexVec<SerializedDepNodeIndex, (u32, u32)> = d
.read_struct_field("edge_list_indices", 2, |d| {
d.read_seq(|d, len| {
let mut v = IndexVec::with_capacity(len);
for i in 0..len {
v.push(d.read_seq_elt(i, |d| Decodable::decode(d))?);
}
Ok(v)
})
})?;
debug!("position: {:?}", d.position());
d.set_position(position);
debug!("position: {:?}", d.position());
let edge_list_data: Vec<SerializedDepNodeIndex> =
d.read_struct_field("edge_list_data", 3, |d| {
d.read_seq(|d, len| {
let mut v = Vec::with_capacity(len);
for i in 0..len {
v.push(d.read_seq_elt(i, |d| Decodable::decode(d))?);
}
Ok(v)
})
})?;
let mut nodes = IndexVec::with_capacity(node_count);
let mut fingerprints = IndexVec::with_capacity(node_count);
let mut edge_list_indices = IndexVec::with_capacity(node_count);
let mut edge_list_data = Vec::with_capacity(edge_count);
Ok(SerializedDepGraph { nodes, fingerprints, edge_list_indices, edge_list_data })
})
for _index in 0..node_count {
let node = NodeInfo::<K, SerializedDepNodeIndex>::decode(d)?;
debug!(?_index, ?node);
let _i: SerializedDepNodeIndex = nodes.push(node.node);
debug_assert_eq!(_i.index(), _index);
let _i: SerializedDepNodeIndex = fingerprints.push(node.fingerprint);
debug_assert_eq!(_i.index(), _index);
let start = edge_list_data.len().try_into().unwrap();
edge_list_data.extend(node.edges.into_iter());
let end = edge_list_data.len().try_into().unwrap();
let _i: SerializedDepNodeIndex = edge_list_indices.push((start, end));
debug_assert_eq!(_i.index(), _index);
}
Ok(SerializedDepGraph { nodes, fingerprints, edge_list_indices, edge_list_data })
}
}
#[derive(Debug, Encodable, Decodable)]
pub struct NodeInfo<K: DepKind, I: Idx> {
node: DepNode<K>,
fingerprint: Fingerprint,
edges: SmallVec<[I; 8]>,
}
struct Stat<K: DepKind> {
kind: K,
node_counter: u64,
edge_counter: u64,
}
struct Stats<K: DepKind> {
stats: FxHashMap<K, Stat<K>>,
total_node_count: usize,
total_edge_count: usize,
}
#[instrument(skip(encoder, _record_graph, record_stats))]
fn encode_node<K: DepKind>(
encoder: &mut FileEncoder,
_index: DepNodeIndex,
node: &NodeInfo<K, DepNodeIndex>,
_record_graph: &Option<Lrc<Lock<DepGraphQuery<K>>>>,
record_stats: &Option<Lrc<Lock<Stats<K>>>>,
) -> FileEncodeResult {
#[cfg(debug_assertions)]
if let Some(record_graph) = &_record_graph {
record_graph.lock().push(_index, node.node, &node.edges);
}
if let Some(record_stats) = &record_stats {
let mut stats = record_stats.lock();
let kind = node.node.kind;
let edge_count = node.edges.len();
let stat =
stats.stats.entry(kind).or_insert(Stat { kind, node_counter: 0, edge_counter: 0 });
stat.node_counter += 1;
stat.edge_counter += edge_count as u64;
stats.total_node_count += 1;
stats.total_edge_count += edge_count;
}
debug!(?_index, ?node);
node.encode(encoder)
}
fn encode_counts(
mut encoder: FileEncoder,
node_count: usize,
edge_count: usize,
) -> FileEncodeResult {
let node_count = node_count.try_into().unwrap();
let edge_count = edge_count.try_into().unwrap();
debug!(?node_count, ?edge_count);
debug!("position: {:?}", encoder.position());
IntEncodedWithFixedSize(node_count).encode(&mut encoder)?;
IntEncodedWithFixedSize(edge_count).encode(&mut encoder)?;
debug!("position: {:?}", encoder.position());
// Drop the encoder so that nothing is written after the counts.
encoder.flush()
}
#[cfg(not(parallel_compiler))]
pub struct GraphEncoder<K: DepKind> {
status: Lock<(FileEncoder, usize, FileEncodeResult)>,
counter: AtomicU32,
record_graph: Option<Lrc<Lock<DepGraphQuery<K>>>>,
record_stats: Option<Lrc<Lock<Stats<K>>>>,
}
#[cfg(parallel_compiler)]
pub struct GraphEncoder<K: DepKind> {
send: WorkerLocal<mpsc::Sender<(DepNodeIndex, NodeInfo<K, DepNodeIndex>)>>,
thread: thread::JoinHandle<FileEncodeResult>,
counter: AtomicU32,
record_graph: Option<Lrc<Lock<DepGraphQuery<K>>>>,
record_stats: Option<Lrc<Lock<Stats<K>>>>,
}
impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
pub fn new(
encoder: FileEncoder,
prev_node_count: usize,
record_graph: bool,
record_stats: bool,
) -> Self {
let record_graph = if cfg!(debug_assertions) && record_graph {
Some(Lrc::new(Lock::new(DepGraphQuery::new(prev_node_count))))
} else {
None
};
let record_stats = if record_stats {
Some(Lrc::new(Lock::new(Stats {
stats: FxHashMap::default(),
total_node_count: 0,
total_edge_count: 0,
})))
} else {
None
};
let counter = AtomicU32::new(0);
#[cfg(not(parallel_compiler))]
{
let status = Lock::new((encoder, 0, Ok(())));
GraphEncoder { status, counter, record_graph, record_stats }
}
#[cfg(parallel_compiler)]
{
let (send, recv) = mpsc::channel();
let thread = {
let record_graph = record_graph.clone();
let record_stats = record_stats.clone();
thread::spawn(move || {
encode_graph(encoder, recv, |encoder, index, node| {
encode_node(encoder, index, node, &record_graph, &record_stats)
})
})
};
let send = WorkerLocal::new(move |_| send.clone());
GraphEncoder { send, thread, counter, record_graph, record_stats }
}
}
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery<K>)) {
if let Some(record_graph) = &self.record_graph {
f(&record_graph.lock())
}
}
pub(crate) fn print_incremental_info(
&self,
total_read_count: u64,
total_duplicate_read_count: u64,
) {
if let Some(record_stats) = &self.record_stats {
let record_stats = record_stats.lock();
let mut stats: Vec<_> = record_stats.stats.values().collect();
stats.sort_by_key(|s| -(s.node_counter as i64));
const SEPARATOR: &str = "[incremental] --------------------------------\
----------------------------------------------\
------------";
eprintln!("[incremental]");
eprintln!("[incremental] DepGraph Statistics");
eprintln!("{}", SEPARATOR);
eprintln!("[incremental]");
eprintln!("[incremental] Total Node Count: {}", record_stats.total_node_count);
eprintln!("[incremental] Total Edge Count: {}", record_stats.total_edge_count);
if cfg!(debug_assertions) {
eprintln!("[incremental] Total Edge Reads: {}", total_read_count);
eprintln!(
"[incremental] Total Duplicate Edge Reads: {}",
total_duplicate_read_count
);
}
eprintln!("[incremental]");
eprintln!(
"[incremental] {:<36}| {:<17}| {:<12}| {:<17}|",
"Node Kind", "Node Frequency", "Node Count", "Avg. Edge Count"
);
eprintln!("{}", SEPARATOR);
for stat in stats {
let node_kind_ratio =
(100.0 * (stat.node_counter as f64)) / (record_stats.total_node_count as f64);
let node_kind_avg_edges = (stat.edge_counter as f64) / (stat.node_counter as f64);
eprintln!(
"[incremental] {:<36}|{:>16.1}% |{:>12} |{:>17.1} |",
format!("{:?}", stat.kind),
node_kind_ratio,
stat.node_counter,
node_kind_avg_edges,
);
}
eprintln!("{}", SEPARATOR);
eprintln!("[incremental]");
}
}
}
#[cfg(not(parallel_compiler))]
impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
pub(crate) fn send(
&self,
node: DepNode<K>,
fingerprint: Fingerprint,
edges: SmallVec<[DepNodeIndex; 8]>,
) -> DepNodeIndex {
let index = self.counter.fetch_add(1, Ordering::SeqCst);
let index = DepNodeIndex::from_u32(index);
let &mut (ref mut encoder, ref mut edge_count, ref mut result) = &mut *self.status.lock();
*edge_count += edges.len();
*result = std::mem::replace(result, Ok(())).and_then(|()| {
let node = NodeInfo { node, fingerprint, edges };
encode_node(encoder, index, &node, &self.record_graph, &self.record_stats)
});
index
}
pub fn finish(self) -> FileEncodeResult {
let (encoder, edge_count, result) = self.status.into_inner();
let () = result?;
let node_count = self.counter.into_inner() as usize;
encode_counts(encoder, node_count, edge_count)
}
}
#[cfg(parallel_compiler)]
impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
pub(crate) fn send(
&self,
node: DepNode<K>,
fingerprint: Fingerprint,
edges: SmallVec<[DepNodeIndex; 8]>,
) -> DepNodeIndex {
let node = NodeInfo { node, fingerprint, edges };
let index = self.counter.fetch_add(1, Ordering::SeqCst);
let index = DepNodeIndex::from_u32(index);
self.send.send((index, node)).unwrap();
index
}
pub fn finish(self) -> FileEncodeResult {
std::mem::drop(self.send);
self.thread.join().unwrap()
}
}
#[cfg(parallel_compiler)]
#[instrument(skip(encoder, recv, process))]
fn encode_graph<K: DepKind + Encodable<FileEncoder>>(
mut encoder: FileEncoder,
recv: mpsc::Receiver<(DepNodeIndex, NodeInfo<K, DepNodeIndex>)>,
process: impl Fn(&mut FileEncoder, DepNodeIndex, &NodeInfo<K, DepNodeIndex>) -> FileEncodeResult,
) -> FileEncodeResult {
let mut edge_count: usize = 0;
let node_count: usize = ordered_recv(recv, |index, node| {
edge_count += node.edges.len();
process(&mut encoder, index, node)
})?;
encode_counts(encoder, node_count, edge_count)
}
/// Since there are multiple producers assigning the DepNodeIndex using an atomic,
/// the messages may not arrive in order. This function sorts them as they come.
#[cfg(parallel_compiler)]
fn ordered_recv<K: DepKind + Encodable<opaque::FileEncoder>>(
recv: mpsc::Receiver<(DepNodeIndex, NodeInfo<K, DepNodeIndex>)>,
mut f: impl FnMut(DepNodeIndex, &NodeInfo<K, DepNodeIndex>) -> FileEncodeResult,
) -> Result<usize, std::io::Error> {
let mut pending = Vec::<(DepNodeIndex, _)>::new();
let mut expected = DepNodeIndex::new(0);
// INVARIANT: No message can arrive with an index less than `expected`.
'outer: loop {
pending.sort_by_key(|n| n.0);
for (index, node) in pending.drain_filter(|(index, _)| {
if *index == expected {
expected.increment_by(1);
true
} else {
false
}
}) {
f(index, &node)?;
}
while let Ok((index, node)) = recv.recv() {
if index > expected {
pending.push((index, node));
} else if index == expected {
f(index, &node)?;
expected.increment_by(1);
continue 'outer;
} else {
panic!("Unexpected index {:?} while waiting for {:?}", index, expected);
}
}
break;
}
Ok(expected.as_u32() as usize)
}