diff --git a/src/Cargo.lock b/src/Cargo.lock index 18d97972cd3..31742023d46 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1517,11 +1517,11 @@ dependencies = [ name = "rustc_trans" version = "0.0.0" dependencies = [ - "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "flate2 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)", "gcc 0.3.51 (registry+https://github.com/rust-lang/crates.io-index)", "jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rustc 0.0.0", "rustc-demangle 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/librustc_trans/Cargo.toml b/src/librustc_trans/Cargo.toml index c7db2a9a8ae..ed9321cc3f3 100644 --- a/src/librustc_trans/Cargo.toml +++ b/src/librustc_trans/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["dylib"] test = false [dependencies] -crossbeam = "0.2" +num_cpus = "1.0" flate2 = "0.2" jobserver = "0.1.5" log = "0.3" diff --git a/src/librustc_trans/back/write.rs b/src/librustc_trans/back/write.rs index 649b16f17a9..4e68fa8ce40 100644 --- a/src/librustc_trans/back/write.rs +++ b/src/librustc_trans/back/write.rs @@ -1077,7 +1077,8 @@ enum Message { }, TranslationDone { llvm_work_item: WorkItem, - is_last: bool + cost: u64, + is_last: bool, }, TranslateItem, } @@ -1089,7 +1090,7 @@ struct Diagnostic { } #[derive(PartialEq, Clone, Copy, Debug)] -enum TransWorkerState { +enum MainThreadWorkerState { Idle, Translating, LLVMing, @@ -1148,16 +1149,110 @@ fn start_executing_work(sess: &Session, // It's here that we manage parallelism, schedule work, and work with // messages coming from clients. // - // Our channel `rx` created above is a channel of messages coming from our - // various worker threads. This includes the jobserver helper thread above - // as well as the work we'll spawn off here. Each turn of this loop starts - // off by trying to spawn as much work as possible. After we've done that we - // then wait for an event and dispatch accordingly once the event is - // received. We're only done once all our work items have been drained and - // nothing is running, at which point we return back up the stack. + // There are a few environmental pre-conditions that shape how the system + // is set up: // - // ## Parallelism management + // - Error reporting only can happen on the main thread because that's the + // only place where we have access to the compiler `Session`. + // - LLVM work can be done on any thread. + // - Translation can only happen on the main thread. + // - Each thread doing substantial work most be in possession of a `Token` + // from the `Jobserver`. + // - The compiler process always holds one `Token`. Any additional `Tokens` + // have to be requested from the `Jobserver`. // + // Error Reporting + // =============== + // The error reporting restriction is handled separately from the rest: We + // set up a `SharedEmitter` the holds an open channel to the main thread. + // When an error occurs on any thread, the shared emitter will send the + // error message to the receiver main thread (`SharedEmitterMain`). The + // main thread will periodically query this error message queue and emit + // any error messages it has received. It might even abort compilation if + // has received a fatal error. In this case we rely on all other threads + // being torn down automatically with the main thread. + // Since the main thread will often be busy doing translation work, error + // reporting will be somewhat delayed, since the message queue can only be + // checked in between to work packages. + // + // Work Processing Infrastructure + // ============================== + // The work processing infrastructure knows three major actors: + // + // - the coordinator thread, + // - the main thread, and + // - LLVM worker threads + // + // The coordinator thread is running a message loop. It instructs the main + // thread about what work to do when, and it will spawn off LLVM worker + // threads as open LLVM WorkItems become available. + // + // The job of the main thread is to translate CGUs into LLVM work package + // (since the main thread is the only thread that can do this). The main + // thread will block until it receives a message from the coordinator, upon + // which it will translate one CGU, send it to the coordinator and block + // again. This way the coordinator can control what the main thread is + // doing. + // + // The coordinator keeps a queue of LLVM WorkItems, and when a `Token` is + // available, it will spawn off a new LLVM worker thread and let it process + // that a WorkItem. When a LLVM worker thread is done with its WorkItem, + // it will just shut down, which also frees all resources associated with + // the given LLVM module, and sends a message to the coordinator that the + // has been completed. + // + // Work Scheduling + // =============== + // The scheduler's goal is to minimize the time it takes to complete all + // work there is, however, we also want to keep memory consumption low + // if possible. These two goals are at odds with each other: If memory + // consumption were not an issue, we could just let the main thread produce + // LLVM WorkItems at full speed, assuring maximal utilization of + // Tokens/LLVM worker threads. However, since translation usual is faster + // than LLVM processing, the queue of LLVM WorkItems would fill up and each + // WorkItem potentially holds on to a substantial amount of memory. + // + // So the actual goal is to always produce just enough LLVM WorkItems as + // not to starve our LLVM worker threads. That means, once we have enough + // WorkItems in our queue, we can block the main thread, so it does not + // produce more until we need them. + // + // Doing LLVM Work on the Main Thread + // ---------------------------------- + // Since the main thread owns the compiler processes implicit `Token`, it is + // wasteful to keep it blocked without doing any work. Therefore, what we do + // in this case is: We spawn off an additional LLVM worker thread that helps + // reduce the queue. The work it is doing corresponds to the implicit + // `Token`. The coordinator will mark the main thread as being busy with + // LLVM work. (The actual work happens on another OS thread but we just care + // about `Tokens`, not actual threads). + // + // When any LLVM worker thread finishes while the main thread is marked as + // "busy with LLVM work", we can do a little switcheroo: We give the Token + // of the just finished thread to the LLVM worker thread that is working on + // behalf of the main thread's implicit Token, thus freeing up the main + // thread again. The coordinator can then again decide what the main thread + // should do. This allows the coordinator to make decisions at more points + // in time. + // + // Striking a Balance between Throughput and Memory Consumption + // ------------------------------------------------------------ + // Since our two goals, (1) use as many Tokens as possible and (2) keep + // memory consumption as low as possible, are in conflict with each other, + // we have to find a trade off between them. Right now, the goal is to keep + // all workers busy, which means that no worker should find the queue empty + // when it is ready to start. + // How do we do achieve this? Good question :) We actually never know how + // many `Tokens` are potentially available so it's hard to say how much to + // fill up the queue before switching the main thread to LLVM work. Also we + // currently don't have a means to estimate how long a running LLVM worker + // will still be busy with it's current WorkItem. However, we know the + // maximal count of available Tokens that makes sense (=the number of CPU + // cores), so we can take a conservative guess. The heuristic we use here + // is implemented in the `queue_full_enough()` function. + // + // Some Background on Jobservers + // ----------------------------- // It's worth also touching on the management of parallelism here. We don't // want to just spawn a thread per work item because while that's optimal // parallelism it may overload a system with too many threads or violate our @@ -1170,36 +1265,8 @@ fn start_executing_work(sess: &Session, // and whenever we're done with that work we release the semaphore. In this // manner we can ensure that the maximum number of parallel workers is // capped at any one point in time. - // - // The jobserver protocol is a little unique, however. We, as a running - // process, already have an ephemeral token assigned to us. We're not going - // to be doing any productive work in this thread though so we're going to - // give this token to a worker thread (there's no actual token to give, this - // is just conceptually). As a result you'll see a few `+1` and `-1` - // instances below, and it's about working with this ephemeral token. - // - // To acquire tokens we have our `helper` thread above which is just in a - // loop acquiring tokens and sending them to us. We then store all tokens - // locally in a `tokens` vector once they're acquired. Currently we don't - // literally send a token to a worker thread to assist with management of - // our "ephemeral token". - // - // As a result, our "spawn as much work as possible" basically means that we - // fill up the `running` counter up to the limit of the `tokens` list. - // Whenever we get a new token this'll mean a new unit of work is spawned, - // and then whenever a unit of work finishes we relinquish a token, if we - // had one, to maybe get re-acquired later. - // - // Note that there's a race which may mean that we acquire more tokens than - // we originally anticipated. For example let's say we have 2 units of work. - // First we request one token from the helper thread and then we - // immediately spawn one unit of work with our ephemeral token after. We may - // then finish the first piece of work before the token is acquired, but we - // can continue to spawn the second piece of work with our ephemeral token. - // Before that work finishes, however, we may acquire a token. In that case - // we actually wastefully acquired the token, so we relinquish it back to - // the jobserver. - thread::spawn(move || { + return thread::spawn(move || { + let max_workers = ::num_cpus::get(); let mut worker_id_counter = 0; let mut free_worker_ids = Vec::new(); let mut get_worker_id = |free_worker_ids: &mut Vec| { @@ -1212,74 +1279,75 @@ fn start_executing_work(sess: &Session, } }; + // This is where we collect codegen units that have gone all the way + // through translation and LLVM. let mut compiled_modules = vec![]; let mut compiled_metadata_module = None; let mut compiled_allocator_module = None; + // This flag tracks whether all items have gone through translations let mut translation_done = false; + + // This is the queue of LLVM work items that still need processing. let mut work_items = Vec::new(); + + // This are the Jobserver Tokens we currently hold. Does not include + // the implicit Token the compiler process owns no matter what. let mut tokens = Vec::new(); - let mut trans_worker_state = TransWorkerState::Idle; + let mut main_thread_worker_state = MainThreadWorkerState::Idle; let mut running = 0; + // Run the message loop while there's still anything that needs message + // processing: while !translation_done || work_items.len() > 0 || running > 0 || - trans_worker_state != TransWorkerState::Idle { + main_thread_worker_state != MainThreadWorkerState::Idle { + // While there are still CGUs to be translated, the coordinator has + // to decide how to utilize the compiler processes implicit Token: + // For translating more CGU or for running them through LLVM. if !translation_done { - if trans_worker_state == TransWorkerState::Idle { - // Translation is not done yet, so there are two things the - // translation worker could do: - // - // (1) Translate another CGU - // (2) Run an already translated CGU through LLVM - // - // Option (2) makes sense if there's already enough work for - // all the other workers. In that case it's better to run - // a CGU through LLVM, so its resources can be freed. - // - // However, it's not trivial to determines what "enough work - // for all the other workers" means because: - // - // (1) We don't know how long the currently working workers - // will need to finish their work package, and - // (2) we don't know how many idle workers would be available - // because that is dynamically decided by the jobserver. - // - // TODO: Come up with a useful heuristic. - if work_items.len() <= 4 { + if main_thread_worker_state == MainThreadWorkerState::Idle { + if !queue_full_enough(work_items.len(), running, max_workers) { + // The queue is not full enough, translate more items: trans_worker_send.send(Message::TranslateItem).unwrap(); - trans_worker_state = TransWorkerState::Translating; + main_thread_worker_state = MainThreadWorkerState::Translating; } else { - let item = work_items.pop().unwrap(); + // The queue is full enough to not let the worker + // threads starve. Use the implicit Token to do some + // LLVM work too. + let (item, _) = work_items.pop().unwrap(); let cgcx = CodegenContext { - worker: TRANS_WORKER_ID, + worker: get_worker_id(&mut free_worker_ids), .. cgcx.clone() }; - trans_worker_state = TransWorkerState::LLVMing; + main_thread_worker_state = MainThreadWorkerState::LLVMing; spawn_work(cgcx, item); } } } else { - match trans_worker_state { - TransWorkerState::Idle => { - if let Some(item) = work_items.pop() { + // In this branch, we know that everything has been translated, + // so it's just a matter of determining whether the implicit + // Token is free to use for LLVM work. + match main_thread_worker_state { + MainThreadWorkerState::Idle => { + if let Some((item, _)) = work_items.pop() { let cgcx = CodegenContext { - worker: TRANS_WORKER_ID, + worker: get_worker_id(&mut free_worker_ids), .. cgcx.clone() }; - trans_worker_state = TransWorkerState::LLVMing; + main_thread_worker_state = MainThreadWorkerState::LLVMing; spawn_work(cgcx, item); } } - TransWorkerState::Translating => { + MainThreadWorkerState::Translating => { bug!("trans worker should not be translating after \ translation was already completed") } - TransWorkerState::LLVMing => { + MainThreadWorkerState::LLVMing => { // Already making good use of that token } } @@ -1288,11 +1356,10 @@ fn start_executing_work(sess: &Session, // Spin up what work we can, only doing this while we've got available // parallelism slots and work left to spawn. while work_items.len() > 0 && running < tokens.len() { - let item = work_items.pop().unwrap(); - let worker_id = get_worker_id(&mut free_worker_ids); + let (item, _) = work_items.pop().unwrap(); let cgcx = CodegenContext { - worker: worker_id, + worker: get_worker_id(&mut free_worker_ids), .. cgcx.clone() }; @@ -1310,6 +1377,15 @@ fn start_executing_work(sess: &Session, Message::Token(token) => { if let Ok(token) = token { tokens.push(token); + + if main_thread_worker_state == MainThreadWorkerState::LLVMing { + // If the main thread token is used for LLVM work + // at the moment, we turn that thread into a regular + // LLVM worker thread, so the main thread is free + // to react to translation demand. + main_thread_worker_state = MainThreadWorkerState::Idle; + running += 1; + } } else { shared_emitter.fatal("failed to acquire jobserver token"); // Exit the coordinator thread @@ -1317,8 +1393,21 @@ fn start_executing_work(sess: &Session, } } - Message::TranslationDone { llvm_work_item, is_last } => { - work_items.insert(0, llvm_work_item); + Message::TranslationDone { llvm_work_item, cost, is_last } => { + // We keep the queue sorted by estimated processing cost, + // so that more expensive items are processed earlier. This + // is good for throughput as it gives the main thread more + // time to fill up the queue and it avoids scheduling + // expensive items to the end. + // Note, however, that this is not ideal for memory + // consumption, as LLVM module sizes are not evenly + // distributed. + let insertion_index = + work_items.binary_search_by_key(&cost, |&(_, cost)| cost); + let insertion_index = match insertion_index { + Ok(idx) | Err(idx) => idx + }; + work_items.insert(insertion_index, (llvm_work_item, cost)); if is_last { // If this is the last, don't request a token because @@ -1329,8 +1418,9 @@ fn start_executing_work(sess: &Session, helper.request_token(); } - assert_eq!(trans_worker_state, TransWorkerState::Translating); - trans_worker_state = TransWorkerState::Idle; + assert_eq!(main_thread_worker_state, + MainThreadWorkerState::Translating); + main_thread_worker_state = MainThreadWorkerState::Idle; } // If a thread exits successfully then we drop a token associated @@ -1342,15 +1432,14 @@ fn start_executing_work(sess: &Session, // Note that if the thread failed that means it panicked, so we // abort immediately. Message::Done { result: Ok(compiled_module), worker_id } => { - if worker_id == TRANS_WORKER_ID { - assert_eq!(trans_worker_state, TransWorkerState::LLVMing); - trans_worker_state = TransWorkerState::Idle; + if main_thread_worker_state == MainThreadWorkerState::LLVMing { + main_thread_worker_state = MainThreadWorkerState::Idle; } else { - drop(tokens.pop()); running -= 1; - free_worker_ids.push(worker_id); } + free_worker_ids.push(worker_id); + match compiled_module.kind { ModuleKind::Regular => { compiled_modules.push(compiled_module); @@ -1381,7 +1470,16 @@ fn start_executing_work(sess: &Session, metadata_module: compiled_metadata_module.unwrap(), allocator_module: compiled_allocator_module, } - }) + }); + + // A heuristic that determines if we have enough LLVM WorkItems in the + // queue so that the main thread can do LLVM work instead of translation + fn queue_full_enough(items_in_queue: usize, + workers_running: usize, + max_workers: usize) -> bool { + // Tune me, plz. + items_in_queue >= max_workers.saturating_sub(workers_running / 2) + } } pub const TRANS_WORKER_ID: usize = ::std::usize::MAX; @@ -1729,6 +1827,7 @@ impl OngoingCrateTranslation { pub fn submit_translated_module_to_llvm(&self, sess: &Session, mtrans: ModuleTranslation, + cost: u64, is_last: bool) { let module_config = match mtrans.kind { ModuleKind::Regular => self.regular_module_config.clone(sess), @@ -1742,6 +1841,7 @@ impl OngoingCrateTranslation { drop(self.coordinator_send.send(Message::TranslationDone { llvm_work_item, + cost, is_last })); } @@ -1752,7 +1852,10 @@ impl OngoingCrateTranslation { is_last: bool) { self.wait_for_signal_to_translate_item(); self.check_for_errors(sess); - self.submit_translated_module_to_llvm(sess, mtrans, is_last); + + // These are generally cheap and won't through off scheduling. + let cost = 0; + self.submit_translated_module_to_llvm(sess, mtrans, cost, is_last); } pub fn check_for_errors(&self, sess: &Session) { diff --git a/src/librustc_trans/base.rs b/src/librustc_trans/base.rs index 2d1f43aff57..e4a76345528 100644 --- a/src/librustc_trans/base.rs +++ b/src/librustc_trans/base.rs @@ -80,6 +80,7 @@ use libc::c_uint; use std::ffi::{CStr, CString}; use std::str; use std::sync::Arc; +use std::time::Instant; use std::i32; use syntax_pos::Span; use syntax::attr; @@ -1082,10 +1083,22 @@ pub fn trans_crate<'a, 'tcx>(tcx: TyCtxt<'a, 'tcx, 'tcx>, let mut all_stats = Stats::default(); let mut module_dispositions = tcx.sess.opts.incremental.as_ref().map(|_| Vec::new()); + // We sort the codegen units by size. This way we can schedule work for LLVM + // a bit more efficiently. Note that "size" is defined rather crudely at the + // moment as it is just the number of TransItems in the CGU, not taking into + // account the size of each TransItem. + let codegen_units = { + let mut codegen_units = codegen_units; + codegen_units.sort_by_key(|cgu| -(cgu.items().len() as isize)); + codegen_units + }; + for (cgu_index, cgu) in codegen_units.into_iter().enumerate() { ongoing_translation.wait_for_signal_to_translate_item(); ongoing_translation.check_for_errors(tcx.sess); + let start_time = Instant::now(); + let module = { let _timing_guard = time_graph .as_ref() @@ -1108,10 +1121,18 @@ pub fn trans_crate<'a, 'tcx>(tcx: TyCtxt<'a, 'tcx, 'tcx>, module }; + let time_to_translate = Instant::now().duration_since(start_time); + + // We assume that the cost to run LLVM on a CGU is proportional to + // the time we needed for translating it. + let cost = time_to_translate.as_secs() * 1_000_000_000 + + time_to_translate.subsec_nanos() as u64; + let is_last_cgu = (cgu_index + 1) == codegen_unit_count; ongoing_translation.submit_translated_module_to_llvm(tcx.sess, module, + cost, is_last_cgu); ongoing_translation.check_for_errors(tcx.sess); } diff --git a/src/librustc_trans/lib.rs b/src/librustc_trans/lib.rs index 83835cb794a..5a4a5b95cf9 100644 --- a/src/librustc_trans/lib.rs +++ b/src/librustc_trans/lib.rs @@ -39,7 +39,6 @@ use syntax_pos::symbol::Symbol; use std::sync::Arc; extern crate flate2; -extern crate crossbeam; extern crate libc; extern crate owning_ref; #[macro_use] extern crate rustc; @@ -55,6 +54,7 @@ extern crate rustc_const_math; extern crate rustc_bitflags; extern crate rustc_demangle; extern crate jobserver; +extern crate num_cpus; #[macro_use] extern crate log; #[macro_use] extern crate syntax;