From 8f6894e177cecf3cd35833e2063256a69841415a Mon Sep 17 00:00:00 2001 From: Michael Woerister Date: Mon, 24 Jul 2017 15:50:42 +0200 Subject: [PATCH] async-llvm(6): Make the LLVM work coordinator get its work package through a channel instead of upfront. --- src/librustc_trans/back/write.rs | 77 +++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/src/librustc_trans/back/write.rs b/src/librustc_trans/back/write.rs index 9e4c1b87aac..ee3c9ace7dc 100644 --- a/src/librustc_trans/back/write.rs +++ b/src/librustc_trans/back/write.rs @@ -780,19 +780,31 @@ pub fn run_passes(sess: &Session, let (shared_emitter, shared_emitter_main) = SharedEmitter::new(); let (trans_worker_send, trans_worker_receive) = channel(); + let (coordinator_send, coordinator_receive) = channel(); let coordinator_thread = start_executing_work(sess, - work_items, + work_items.len(), shared_emitter, trans_worker_send, + coordinator_send.clone(), + coordinator_receive, client, trans.exported_symbols.clone()); + for work_item in work_items { + coordinator_send.send(Message::WorkItem(work_item)).unwrap(); + } + loop { shared_emitter_main.check(sess); match trans_worker_receive.recv() { - Ok(Message::AllWorkDone) | - Err(_) => break, + Err(_) => { + // An `Err` here means that all senders for this channel have + // been closed. This could happen because all work has + // completed successfully or there has been some error. + // At this point we don't care which it is. + break + } Ok(Message::CheckErrorMessages) => continue, Ok(msg) => { @@ -801,9 +813,15 @@ pub fn run_passes(sess: &Session, } } - coordinator_thread.join().unwrap(); + match coordinator_thread.join() { + Ok(()) => {}, + Err(err) => { + panic!("error: {:?}", err); + } + } // Just in case, check this on the way out. + shared_emitter_main.check(sess); sess.diagnostic().abort_if_errors(); // If in incr. comp. mode, preserve the `.o` files for potential re-use @@ -1080,7 +1098,6 @@ pub enum Message { Done { success: bool }, WorkItem(WorkItem), CheckErrorMessages, - AllWorkDone, } @@ -1091,15 +1108,14 @@ pub struct Diagnostic { } fn start_executing_work(sess: &Session, - mut work_items: Vec, + total_work_item_count: usize, shared_emitter: SharedEmitter, trans_worker_send: Sender, + coordinator_send: Sender, + coordinator_receive: Receiver, jobserver: Client, exported_symbols: Arc) - -> thread::JoinHandle<()> { - let (tx, rx) = channel(); - let tx2 = tx.clone(); - + -> thread::JoinHandle<()> { // First up, convert our jobserver into a helper thread so we can use normal // mpsc channels to manage our messages and such. Once we've got the helper // thread then request `n-1` tokens because all of our work items are ready @@ -1110,10 +1126,11 @@ fn start_executing_work(sess: &Session, // // After we've requested all these tokens then we'll, when we can, get // tokens on `rx` above which will get managed in the main loop below. + let coordinator_send2 = coordinator_send.clone(); let helper = jobserver.into_helper_thread(move |token| { - drop(tx2.send(Message::Token(token))); + drop(coordinator_send2.send(Message::Token(token))); }).expect("failed to spawn helper thread"); - for _ in 0..work_items.len() - 1 { + for _ in 0..total_work_item_count - 1 { helper.request_token(); } @@ -1137,7 +1154,7 @@ fn start_executing_work(sess: &Session, remark: sess.opts.cg.remark.clone(), worker: 0, incr_comp_session_dir: sess.incr_comp_session_dir_opt().map(|r| r.clone()), - coordinator_send: tx.clone(), + coordinator_send: coordinator_send, diag_emitter: shared_emitter.clone(), }; @@ -1198,29 +1215,35 @@ fn start_executing_work(sess: &Session, // the jobserver. thread::spawn(move || { + let mut work_items_left = total_work_item_count; + let mut work_items = Vec::with_capacity(total_work_item_count); let mut tokens = Vec::new(); let mut running = 0; - while work_items.len() > 0 || running > 0 { + while work_items_left > 0 || running > 0 { // Spin up what work we can, only doing this while we've got available // parallelism slots and work left to spawn. - while work_items.len() > 0 && running < tokens.len() + 1 { - let item = work_items.pop().unwrap(); - let worker_index = work_items.len(); + while work_items_left > 0 && running < tokens.len() + 1 { + if let Some(item) = work_items.pop() { + work_items_left -= 1; + let worker_index = work_items_left; - let cgcx = CodegenContext { - worker: worker_index, - .. cgcx.clone() - }; + let cgcx = CodegenContext { + worker: worker_index, + .. cgcx.clone() + }; - spawn_work(cgcx, item); - running += 1; + spawn_work(cgcx, item); + running += 1; + } else { + break + } } // Relinquish accidentally acquired extra tokens tokens.truncate(running.saturating_sub(1)); - match rx.recv().unwrap() { + match coordinator_receive.recv().unwrap() { // Save the token locally and the next turn of the loop will use // this to spawn a new unit of work, or it may get dropped // immediately if we have no more work to spawn. @@ -1228,6 +1251,10 @@ fn start_executing_work(sess: &Session, tokens.push(token.expect("failed to acquire jobserver token")); } + Message::WorkItem(work_item) => { + work_items.push(work_item); + } + // If a thread exits successfully then we drop a token associated // with that worker and update our `running` count. We may later // re-acquire a token to continue running more work. We may also not @@ -1245,8 +1272,6 @@ fn start_executing_work(sess: &Session, shared_emitter.fatal("aborting due to worker thread panic".to_string()); trans_worker_send.send(Message::CheckErrorMessages).unwrap(); } - msg @ Message::WorkItem(_) | - msg @ Message::AllWorkDone | msg @ Message::CheckErrorMessages => { bug!("unexpected message: {:?}", msg); }