1
Fork 0

async-llvm(24): Improve scheduling and documentation.

This commit is contained in:
Michael Woerister 2017-07-28 14:28:08 +02:00
parent f5acc392e0
commit bd36df84a5
5 changed files with 215 additions and 91 deletions

2
src/Cargo.lock generated
View file

@ -1517,11 +1517,11 @@ dependencies = [
name = "rustc_trans"
version = "0.0.0"
dependencies = [
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"flate2 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
"gcc 0.3.51 (registry+https://github.com/rust-lang/crates.io-index)",
"jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc 0.0.0",
"rustc-demangle 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -10,7 +10,7 @@ crate-type = ["dylib"]
test = false
[dependencies]
crossbeam = "0.2"
num_cpus = "1.0"
flate2 = "0.2"
jobserver = "0.1.5"
log = "0.3"

View file

@ -1077,7 +1077,8 @@ enum Message {
},
TranslationDone {
llvm_work_item: WorkItem,
is_last: bool
cost: u64,
is_last: bool,
},
TranslateItem,
}
@ -1089,7 +1090,7 @@ struct Diagnostic {
}
#[derive(PartialEq, Clone, Copy, Debug)]
enum TransWorkerState {
enum MainThreadWorkerState {
Idle,
Translating,
LLVMing,
@ -1148,16 +1149,110 @@ fn start_executing_work(sess: &Session,
// It's here that we manage parallelism, schedule work, and work with
// messages coming from clients.
//
// Our channel `rx` created above is a channel of messages coming from our
// various worker threads. This includes the jobserver helper thread above
// as well as the work we'll spawn off here. Each turn of this loop starts
// off by trying to spawn as much work as possible. After we've done that we
// then wait for an event and dispatch accordingly once the event is
// received. We're only done once all our work items have been drained and
// nothing is running, at which point we return back up the stack.
// There are a few environmental pre-conditions that shape how the system
// is set up:
//
// ## Parallelism management
// - Error reporting only can happen on the main thread because that's the
// only place where we have access to the compiler `Session`.
// - LLVM work can be done on any thread.
// - Translation can only happen on the main thread.
// - Each thread doing substantial work most be in possession of a `Token`
// from the `Jobserver`.
// - The compiler process always holds one `Token`. Any additional `Tokens`
// have to be requested from the `Jobserver`.
//
// Error Reporting
// ===============
// The error reporting restriction is handled separately from the rest: We
// set up a `SharedEmitter` the holds an open channel to the main thread.
// When an error occurs on any thread, the shared emitter will send the
// error message to the receiver main thread (`SharedEmitterMain`). The
// main thread will periodically query this error message queue and emit
// any error messages it has received. It might even abort compilation if
// has received a fatal error. In this case we rely on all other threads
// being torn down automatically with the main thread.
// Since the main thread will often be busy doing translation work, error
// reporting will be somewhat delayed, since the message queue can only be
// checked in between to work packages.
//
// Work Processing Infrastructure
// ==============================
// The work processing infrastructure knows three major actors:
//
// - the coordinator thread,
// - the main thread, and
// - LLVM worker threads
//
// The coordinator thread is running a message loop. It instructs the main
// thread about what work to do when, and it will spawn off LLVM worker
// threads as open LLVM WorkItems become available.
//
// The job of the main thread is to translate CGUs into LLVM work package
// (since the main thread is the only thread that can do this). The main
// thread will block until it receives a message from the coordinator, upon
// which it will translate one CGU, send it to the coordinator and block
// again. This way the coordinator can control what the main thread is
// doing.
//
// The coordinator keeps a queue of LLVM WorkItems, and when a `Token` is
// available, it will spawn off a new LLVM worker thread and let it process
// that a WorkItem. When a LLVM worker thread is done with its WorkItem,
// it will just shut down, which also frees all resources associated with
// the given LLVM module, and sends a message to the coordinator that the
// has been completed.
//
// Work Scheduling
// ===============
// The scheduler's goal is to minimize the time it takes to complete all
// work there is, however, we also want to keep memory consumption low
// if possible. These two goals are at odds with each other: If memory
// consumption were not an issue, we could just let the main thread produce
// LLVM WorkItems at full speed, assuring maximal utilization of
// Tokens/LLVM worker threads. However, since translation usual is faster
// than LLVM processing, the queue of LLVM WorkItems would fill up and each
// WorkItem potentially holds on to a substantial amount of memory.
//
// So the actual goal is to always produce just enough LLVM WorkItems as
// not to starve our LLVM worker threads. That means, once we have enough
// WorkItems in our queue, we can block the main thread, so it does not
// produce more until we need them.
//
// Doing LLVM Work on the Main Thread
// ----------------------------------
// Since the main thread owns the compiler processes implicit `Token`, it is
// wasteful to keep it blocked without doing any work. Therefore, what we do
// in this case is: We spawn off an additional LLVM worker thread that helps
// reduce the queue. The work it is doing corresponds to the implicit
// `Token`. The coordinator will mark the main thread as being busy with
// LLVM work. (The actual work happens on another OS thread but we just care
// about `Tokens`, not actual threads).
//
// When any LLVM worker thread finishes while the main thread is marked as
// "busy with LLVM work", we can do a little switcheroo: We give the Token
// of the just finished thread to the LLVM worker thread that is working on
// behalf of the main thread's implicit Token, thus freeing up the main
// thread again. The coordinator can then again decide what the main thread
// should do. This allows the coordinator to make decisions at more points
// in time.
//
// Striking a Balance between Throughput and Memory Consumption
// ------------------------------------------------------------
// Since our two goals, (1) use as many Tokens as possible and (2) keep
// memory consumption as low as possible, are in conflict with each other,
// we have to find a trade off between them. Right now, the goal is to keep
// all workers busy, which means that no worker should find the queue empty
// when it is ready to start.
// How do we do achieve this? Good question :) We actually never know how
// many `Tokens` are potentially available so it's hard to say how much to
// fill up the queue before switching the main thread to LLVM work. Also we
// currently don't have a means to estimate how long a running LLVM worker
// will still be busy with it's current WorkItem. However, we know the
// maximal count of available Tokens that makes sense (=the number of CPU
// cores), so we can take a conservative guess. The heuristic we use here
// is implemented in the `queue_full_enough()` function.
//
// Some Background on Jobservers
// -----------------------------
// It's worth also touching on the management of parallelism here. We don't
// want to just spawn a thread per work item because while that's optimal
// parallelism it may overload a system with too many threads or violate our
@ -1170,36 +1265,8 @@ fn start_executing_work(sess: &Session,
// and whenever we're done with that work we release the semaphore. In this
// manner we can ensure that the maximum number of parallel workers is
// capped at any one point in time.
//
// The jobserver protocol is a little unique, however. We, as a running
// process, already have an ephemeral token assigned to us. We're not going
// to be doing any productive work in this thread though so we're going to
// give this token to a worker thread (there's no actual token to give, this
// is just conceptually). As a result you'll see a few `+1` and `-1`
// instances below, and it's about working with this ephemeral token.
//
// To acquire tokens we have our `helper` thread above which is just in a
// loop acquiring tokens and sending them to us. We then store all tokens
// locally in a `tokens` vector once they're acquired. Currently we don't
// literally send a token to a worker thread to assist with management of
// our "ephemeral token".
//
// As a result, our "spawn as much work as possible" basically means that we
// fill up the `running` counter up to the limit of the `tokens` list.
// Whenever we get a new token this'll mean a new unit of work is spawned,
// and then whenever a unit of work finishes we relinquish a token, if we
// had one, to maybe get re-acquired later.
//
// Note that there's a race which may mean that we acquire more tokens than
// we originally anticipated. For example let's say we have 2 units of work.
// First we request one token from the helper thread and then we
// immediately spawn one unit of work with our ephemeral token after. We may
// then finish the first piece of work before the token is acquired, but we
// can continue to spawn the second piece of work with our ephemeral token.
// Before that work finishes, however, we may acquire a token. In that case
// we actually wastefully acquired the token, so we relinquish it back to
// the jobserver.
thread::spawn(move || {
return thread::spawn(move || {
let max_workers = ::num_cpus::get();
let mut worker_id_counter = 0;
let mut free_worker_ids = Vec::new();
let mut get_worker_id = |free_worker_ids: &mut Vec<usize>| {
@ -1212,74 +1279,75 @@ fn start_executing_work(sess: &Session,
}
};
// This is where we collect codegen units that have gone all the way
// through translation and LLVM.
let mut compiled_modules = vec![];
let mut compiled_metadata_module = None;
let mut compiled_allocator_module = None;
// This flag tracks whether all items have gone through translations
let mut translation_done = false;
// This is the queue of LLVM work items that still need processing.
let mut work_items = Vec::new();
// This are the Jobserver Tokens we currently hold. Does not include
// the implicit Token the compiler process owns no matter what.
let mut tokens = Vec::new();
let mut trans_worker_state = TransWorkerState::Idle;
let mut main_thread_worker_state = MainThreadWorkerState::Idle;
let mut running = 0;
// Run the message loop while there's still anything that needs message
// processing:
while !translation_done ||
work_items.len() > 0 ||
running > 0 ||
trans_worker_state != TransWorkerState::Idle {
main_thread_worker_state != MainThreadWorkerState::Idle {
// While there are still CGUs to be translated, the coordinator has
// to decide how to utilize the compiler processes implicit Token:
// For translating more CGU or for running them through LLVM.
if !translation_done {
if trans_worker_state == TransWorkerState::Idle {
// Translation is not done yet, so there are two things the
// translation worker could do:
//
// (1) Translate another CGU
// (2) Run an already translated CGU through LLVM
//
// Option (2) makes sense if there's already enough work for
// all the other workers. In that case it's better to run
// a CGU through LLVM, so its resources can be freed.
//
// However, it's not trivial to determines what "enough work
// for all the other workers" means because:
//
// (1) We don't know how long the currently working workers
// will need to finish their work package, and
// (2) we don't know how many idle workers would be available
// because that is dynamically decided by the jobserver.
//
// TODO: Come up with a useful heuristic.
if work_items.len() <= 4 {
if main_thread_worker_state == MainThreadWorkerState::Idle {
if !queue_full_enough(work_items.len(), running, max_workers) {
// The queue is not full enough, translate more items:
trans_worker_send.send(Message::TranslateItem).unwrap();
trans_worker_state = TransWorkerState::Translating;
main_thread_worker_state = MainThreadWorkerState::Translating;
} else {
let item = work_items.pop().unwrap();
// The queue is full enough to not let the worker
// threads starve. Use the implicit Token to do some
// LLVM work too.
let (item, _) = work_items.pop().unwrap();
let cgcx = CodegenContext {
worker: TRANS_WORKER_ID,
worker: get_worker_id(&mut free_worker_ids),
.. cgcx.clone()
};
trans_worker_state = TransWorkerState::LLVMing;
main_thread_worker_state = MainThreadWorkerState::LLVMing;
spawn_work(cgcx, item);
}
}
} else {
match trans_worker_state {
TransWorkerState::Idle => {
if let Some(item) = work_items.pop() {
// In this branch, we know that everything has been translated,
// so it's just a matter of determining whether the implicit
// Token is free to use for LLVM work.
match main_thread_worker_state {
MainThreadWorkerState::Idle => {
if let Some((item, _)) = work_items.pop() {
let cgcx = CodegenContext {
worker: TRANS_WORKER_ID,
worker: get_worker_id(&mut free_worker_ids),
.. cgcx.clone()
};
trans_worker_state = TransWorkerState::LLVMing;
main_thread_worker_state = MainThreadWorkerState::LLVMing;
spawn_work(cgcx, item);
}
}
TransWorkerState::Translating => {
MainThreadWorkerState::Translating => {
bug!("trans worker should not be translating after \
translation was already completed")
}
TransWorkerState::LLVMing => {
MainThreadWorkerState::LLVMing => {
// Already making good use of that token
}
}
@ -1288,11 +1356,10 @@ fn start_executing_work(sess: &Session,
// Spin up what work we can, only doing this while we've got available
// parallelism slots and work left to spawn.
while work_items.len() > 0 && running < tokens.len() {
let item = work_items.pop().unwrap();
let worker_id = get_worker_id(&mut free_worker_ids);
let (item, _) = work_items.pop().unwrap();
let cgcx = CodegenContext {
worker: worker_id,
worker: get_worker_id(&mut free_worker_ids),
.. cgcx.clone()
};
@ -1310,6 +1377,15 @@ fn start_executing_work(sess: &Session,
Message::Token(token) => {
if let Ok(token) = token {
tokens.push(token);
if main_thread_worker_state == MainThreadWorkerState::LLVMing {
// If the main thread token is used for LLVM work
// at the moment, we turn that thread into a regular
// LLVM worker thread, so the main thread is free
// to react to translation demand.
main_thread_worker_state = MainThreadWorkerState::Idle;
running += 1;
}
} else {
shared_emitter.fatal("failed to acquire jobserver token");
// Exit the coordinator thread
@ -1317,8 +1393,21 @@ fn start_executing_work(sess: &Session,
}
}
Message::TranslationDone { llvm_work_item, is_last } => {
work_items.insert(0, llvm_work_item);
Message::TranslationDone { llvm_work_item, cost, is_last } => {
// We keep the queue sorted by estimated processing cost,
// so that more expensive items are processed earlier. This
// is good for throughput as it gives the main thread more
// time to fill up the queue and it avoids scheduling
// expensive items to the end.
// Note, however, that this is not ideal for memory
// consumption, as LLVM module sizes are not evenly
// distributed.
let insertion_index =
work_items.binary_search_by_key(&cost, |&(_, cost)| cost);
let insertion_index = match insertion_index {
Ok(idx) | Err(idx) => idx
};
work_items.insert(insertion_index, (llvm_work_item, cost));
if is_last {
// If this is the last, don't request a token because
@ -1329,8 +1418,9 @@ fn start_executing_work(sess: &Session,
helper.request_token();
}
assert_eq!(trans_worker_state, TransWorkerState::Translating);
trans_worker_state = TransWorkerState::Idle;
assert_eq!(main_thread_worker_state,
MainThreadWorkerState::Translating);
main_thread_worker_state = MainThreadWorkerState::Idle;
}
// If a thread exits successfully then we drop a token associated
@ -1342,15 +1432,14 @@ fn start_executing_work(sess: &Session,
// Note that if the thread failed that means it panicked, so we
// abort immediately.
Message::Done { result: Ok(compiled_module), worker_id } => {
if worker_id == TRANS_WORKER_ID {
assert_eq!(trans_worker_state, TransWorkerState::LLVMing);
trans_worker_state = TransWorkerState::Idle;
if main_thread_worker_state == MainThreadWorkerState::LLVMing {
main_thread_worker_state = MainThreadWorkerState::Idle;
} else {
drop(tokens.pop());
running -= 1;
free_worker_ids.push(worker_id);
}
free_worker_ids.push(worker_id);
match compiled_module.kind {
ModuleKind::Regular => {
compiled_modules.push(compiled_module);
@ -1381,7 +1470,16 @@ fn start_executing_work(sess: &Session,
metadata_module: compiled_metadata_module.unwrap(),
allocator_module: compiled_allocator_module,
}
})
});
// A heuristic that determines if we have enough LLVM WorkItems in the
// queue so that the main thread can do LLVM work instead of translation
fn queue_full_enough(items_in_queue: usize,
workers_running: usize,
max_workers: usize) -> bool {
// Tune me, plz.
items_in_queue >= max_workers.saturating_sub(workers_running / 2)
}
}
pub const TRANS_WORKER_ID: usize = ::std::usize::MAX;
@ -1729,6 +1827,7 @@ impl OngoingCrateTranslation {
pub fn submit_translated_module_to_llvm(&self,
sess: &Session,
mtrans: ModuleTranslation,
cost: u64,
is_last: bool) {
let module_config = match mtrans.kind {
ModuleKind::Regular => self.regular_module_config.clone(sess),
@ -1742,6 +1841,7 @@ impl OngoingCrateTranslation {
drop(self.coordinator_send.send(Message::TranslationDone {
llvm_work_item,
cost,
is_last
}));
}
@ -1752,7 +1852,10 @@ impl OngoingCrateTranslation {
is_last: bool) {
self.wait_for_signal_to_translate_item();
self.check_for_errors(sess);
self.submit_translated_module_to_llvm(sess, mtrans, is_last);
// These are generally cheap and won't through off scheduling.
let cost = 0;
self.submit_translated_module_to_llvm(sess, mtrans, cost, is_last);
}
pub fn check_for_errors(&self, sess: &Session) {

View file

@ -80,6 +80,7 @@ use libc::c_uint;
use std::ffi::{CStr, CString};
use std::str;
use std::sync::Arc;
use std::time::Instant;
use std::i32;
use syntax_pos::Span;
use syntax::attr;
@ -1082,10 +1083,22 @@ pub fn trans_crate<'a, 'tcx>(tcx: TyCtxt<'a, 'tcx, 'tcx>,
let mut all_stats = Stats::default();
let mut module_dispositions = tcx.sess.opts.incremental.as_ref().map(|_| Vec::new());
// We sort the codegen units by size. This way we can schedule work for LLVM
// a bit more efficiently. Note that "size" is defined rather crudely at the
// moment as it is just the number of TransItems in the CGU, not taking into
// account the size of each TransItem.
let codegen_units = {
let mut codegen_units = codegen_units;
codegen_units.sort_by_key(|cgu| -(cgu.items().len() as isize));
codegen_units
};
for (cgu_index, cgu) in codegen_units.into_iter().enumerate() {
ongoing_translation.wait_for_signal_to_translate_item();
ongoing_translation.check_for_errors(tcx.sess);
let start_time = Instant::now();
let module = {
let _timing_guard = time_graph
.as_ref()
@ -1108,10 +1121,18 @@ pub fn trans_crate<'a, 'tcx>(tcx: TyCtxt<'a, 'tcx, 'tcx>,
module
};
let time_to_translate = Instant::now().duration_since(start_time);
// We assume that the cost to run LLVM on a CGU is proportional to
// the time we needed for translating it.
let cost = time_to_translate.as_secs() * 1_000_000_000 +
time_to_translate.subsec_nanos() as u64;
let is_last_cgu = (cgu_index + 1) == codegen_unit_count;
ongoing_translation.submit_translated_module_to_llvm(tcx.sess,
module,
cost,
is_last_cgu);
ongoing_translation.check_for_errors(tcx.sess);
}

View file

@ -39,7 +39,6 @@ use syntax_pos::symbol::Symbol;
use std::sync::Arc;
extern crate flate2;
extern crate crossbeam;
extern crate libc;
extern crate owning_ref;
#[macro_use] extern crate rustc;
@ -55,6 +54,7 @@ extern crate rustc_const_math;
extern crate rustc_bitflags;
extern crate rustc_demangle;
extern crate jobserver;
extern crate num_cpus;
#[macro_use] extern crate log;
#[macro_use] extern crate syntax;