1
Fork 0

async-llvm(6): Make the LLVM work coordinator get its work package through a channel instead of upfront.

This commit is contained in:
Michael Woerister 2017-07-24 15:50:42 +02:00
parent b18a61a15b
commit 8f6894e177

View file

@ -780,19 +780,31 @@ pub fn run_passes(sess: &Session,
let (shared_emitter, shared_emitter_main) = SharedEmitter::new(); let (shared_emitter, shared_emitter_main) = SharedEmitter::new();
let (trans_worker_send, trans_worker_receive) = channel(); let (trans_worker_send, trans_worker_receive) = channel();
let (coordinator_send, coordinator_receive) = channel();
let coordinator_thread = start_executing_work(sess, let coordinator_thread = start_executing_work(sess,
work_items, work_items.len(),
shared_emitter, shared_emitter,
trans_worker_send, trans_worker_send,
coordinator_send.clone(),
coordinator_receive,
client, client,
trans.exported_symbols.clone()); trans.exported_symbols.clone());
for work_item in work_items {
coordinator_send.send(Message::WorkItem(work_item)).unwrap();
}
loop { loop {
shared_emitter_main.check(sess); shared_emitter_main.check(sess);
match trans_worker_receive.recv() { match trans_worker_receive.recv() {
Ok(Message::AllWorkDone) | Err(_) => {
Err(_) => break, // An `Err` here means that all senders for this channel have
// been closed. This could happen because all work has
// completed successfully or there has been some error.
// At this point we don't care which it is.
break
}
Ok(Message::CheckErrorMessages) => continue, Ok(Message::CheckErrorMessages) => continue,
Ok(msg) => { Ok(msg) => {
@ -801,9 +813,15 @@ pub fn run_passes(sess: &Session,
} }
} }
coordinator_thread.join().unwrap(); match coordinator_thread.join() {
Ok(()) => {},
Err(err) => {
panic!("error: {:?}", err);
}
}
// Just in case, check this on the way out. // Just in case, check this on the way out.
shared_emitter_main.check(sess);
sess.diagnostic().abort_if_errors(); sess.diagnostic().abort_if_errors();
// If in incr. comp. mode, preserve the `.o` files for potential re-use // If in incr. comp. mode, preserve the `.o` files for potential re-use
@ -1080,7 +1098,6 @@ pub enum Message {
Done { success: bool }, Done { success: bool },
WorkItem(WorkItem), WorkItem(WorkItem),
CheckErrorMessages, CheckErrorMessages,
AllWorkDone,
} }
@ -1091,15 +1108,14 @@ pub struct Diagnostic {
} }
fn start_executing_work(sess: &Session, fn start_executing_work(sess: &Session,
mut work_items: Vec<WorkItem>, total_work_item_count: usize,
shared_emitter: SharedEmitter, shared_emitter: SharedEmitter,
trans_worker_send: Sender<Message>, trans_worker_send: Sender<Message>,
coordinator_send: Sender<Message>,
coordinator_receive: Receiver<Message>,
jobserver: Client, jobserver: Client,
exported_symbols: Arc<ExportedSymbols>) exported_symbols: Arc<ExportedSymbols>)
-> thread::JoinHandle<()> { -> thread::JoinHandle<()> {
let (tx, rx) = channel();
let tx2 = tx.clone();
// First up, convert our jobserver into a helper thread so we can use normal // First up, convert our jobserver into a helper thread so we can use normal
// mpsc channels to manage our messages and such. Once we've got the helper // mpsc channels to manage our messages and such. Once we've got the helper
// thread then request `n-1` tokens because all of our work items are ready // thread then request `n-1` tokens because all of our work items are ready
@ -1110,10 +1126,11 @@ fn start_executing_work(sess: &Session,
// //
// After we've requested all these tokens then we'll, when we can, get // After we've requested all these tokens then we'll, when we can, get
// tokens on `rx` above which will get managed in the main loop below. // tokens on `rx` above which will get managed in the main loop below.
let coordinator_send2 = coordinator_send.clone();
let helper = jobserver.into_helper_thread(move |token| { let helper = jobserver.into_helper_thread(move |token| {
drop(tx2.send(Message::Token(token))); drop(coordinator_send2.send(Message::Token(token)));
}).expect("failed to spawn helper thread"); }).expect("failed to spawn helper thread");
for _ in 0..work_items.len() - 1 { for _ in 0..total_work_item_count - 1 {
helper.request_token(); helper.request_token();
} }
@ -1137,7 +1154,7 @@ fn start_executing_work(sess: &Session,
remark: sess.opts.cg.remark.clone(), remark: sess.opts.cg.remark.clone(),
worker: 0, worker: 0,
incr_comp_session_dir: sess.incr_comp_session_dir_opt().map(|r| r.clone()), incr_comp_session_dir: sess.incr_comp_session_dir_opt().map(|r| r.clone()),
coordinator_send: tx.clone(), coordinator_send: coordinator_send,
diag_emitter: shared_emitter.clone(), diag_emitter: shared_emitter.clone(),
}; };
@ -1198,15 +1215,18 @@ fn start_executing_work(sess: &Session,
// the jobserver. // the jobserver.
thread::spawn(move || { thread::spawn(move || {
let mut work_items_left = total_work_item_count;
let mut work_items = Vec::with_capacity(total_work_item_count);
let mut tokens = Vec::new(); let mut tokens = Vec::new();
let mut running = 0; let mut running = 0;
while work_items.len() > 0 || running > 0 { while work_items_left > 0 || running > 0 {
// Spin up what work we can, only doing this while we've got available // Spin up what work we can, only doing this while we've got available
// parallelism slots and work left to spawn. // parallelism slots and work left to spawn.
while work_items.len() > 0 && running < tokens.len() + 1 { while work_items_left > 0 && running < tokens.len() + 1 {
let item = work_items.pop().unwrap(); if let Some(item) = work_items.pop() {
let worker_index = work_items.len(); work_items_left -= 1;
let worker_index = work_items_left;
let cgcx = CodegenContext { let cgcx = CodegenContext {
worker: worker_index, worker: worker_index,
@ -1215,12 +1235,15 @@ fn start_executing_work(sess: &Session,
spawn_work(cgcx, item); spawn_work(cgcx, item);
running += 1; running += 1;
} else {
break
}
} }
// Relinquish accidentally acquired extra tokens // Relinquish accidentally acquired extra tokens
tokens.truncate(running.saturating_sub(1)); tokens.truncate(running.saturating_sub(1));
match rx.recv().unwrap() { match coordinator_receive.recv().unwrap() {
// Save the token locally and the next turn of the loop will use // Save the token locally and the next turn of the loop will use
// this to spawn a new unit of work, or it may get dropped // this to spawn a new unit of work, or it may get dropped
// immediately if we have no more work to spawn. // immediately if we have no more work to spawn.
@ -1228,6 +1251,10 @@ fn start_executing_work(sess: &Session,
tokens.push(token.expect("failed to acquire jobserver token")); tokens.push(token.expect("failed to acquire jobserver token"));
} }
Message::WorkItem(work_item) => {
work_items.push(work_item);
}
// If a thread exits successfully then we drop a token associated // If a thread exits successfully then we drop a token associated
// with that worker and update our `running` count. We may later // with that worker and update our `running` count. We may later
// re-acquire a token to continue running more work. We may also not // re-acquire a token to continue running more work. We may also not
@ -1245,8 +1272,6 @@ fn start_executing_work(sess: &Session,
shared_emitter.fatal("aborting due to worker thread panic".to_string()); shared_emitter.fatal("aborting due to worker thread panic".to_string());
trans_worker_send.send(Message::CheckErrorMessages).unwrap(); trans_worker_send.send(Message::CheckErrorMessages).unwrap();
} }
msg @ Message::WorkItem(_) |
msg @ Message::AllWorkDone |
msg @ Message::CheckErrorMessages => { msg @ Message::CheckErrorMessages => {
bug!("unexpected message: {:?}", msg); bug!("unexpected message: {:?}", msg);
} }