rust/compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

221 lines
6.9 KiB
Rust
Raw Normal View History

use std::sync::{Arc, Condvar, Mutex};
use rustc_session::Session;
use jobserver::HelperThread;
2022-08-24 14:28:40 +00:00
// FIXME don't panic when a worker thread panics
pub(super) struct ConcurrencyLimiter {
helper_thread: Option<HelperThread>,
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
available_token_condvar: Arc<Condvar>,
finished: bool,
}
impl ConcurrencyLimiter {
pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
let available_token_condvar = Arc::new(Condvar::new());
let state_helper = state.clone();
let available_token_condvar_helper = available_token_condvar.clone();
let helper_thread = sess
.jobserver
.clone()
.into_helper_thread(move |token| {
let mut state = state_helper.lock().unwrap();
match token {
Ok(token) => {
state.add_new_token(token);
available_token_condvar_helper.notify_one();
}
Err(err) => {
state.poison(format!("failed to acquire jobserver token: {}", err));
// Notify all threads waiting for a token to give them a chance to
// gracefully exit.
available_token_condvar_helper.notify_all();
}
}
})
.unwrap();
ConcurrencyLimiter {
helper_thread: Some(helper_thread),
state,
Fix signaling available_token_condvar when a new token is received This slightly improves performance on systems with many cores and barely affects systems with few cores. My laptop (2 core + HT): Before: Benchmark 2: RUSTC=rustc /home/bjorn/Projects/cg_clif2/./dist/cargo-clif build --manifest-path /home/bjorn/Projects/cg_clif2/./download/simple-raytracer/Cargo.toml --target-dir /home/bjorn/Projects/cg_clif2/./build/simple_raytracer Time (mean ± σ): 12.042 s ± 0.313 s [User: 29.434 s, System: 4.720 s] Range (min … max): 11.670 s … 12.795 s 10 runs After: Benchmark 2: RUSTC=rustc /home/bjorn/Projects/cg_clif2/./dist/cargo-clif build --manifest-path /home/bjorn/Projects/cg_clif2/./download/simple-raytracer/Cargo.toml --target-dir /home/bjorn/Projects/cg_clif2/./build/simple_raytracer Time (mean ± σ): 12.037 s ± 0.384 s [User: 29.960 s, System: 4.722 s] Range (min … max): 11.673 s … 12.769 s 10 runs Dev desktop (32 cores) Before: Benchmark 2: RUSTC=rustc /home/gh-bjorn3/cg_clif/./dist/cargo-clif build --manifest-path /home/gh-bjorn3/cg_clif/./download/simple-raytracer/Cargo.toml --target-dir /home/gh-bjorn3/cg_clif/./build/simple_raytracer Time (mean ± σ): 10.425 s ± 0.104 s [User: 25.877 s, System: 5.513 s] Range (min … max): 10.267 s … 10.640 s 10 runs After: Benchmark 2: RUSTC=rustc /home/gh-bjorn3/cg_clif/./dist/cargo-clif build --manifest-path /home/gh-bjorn3/cg_clif/./download/simple-raytracer/Cargo.toml --target-dir /home/gh-bjorn3/cg_clif/./build/simple_raytracer Time (mean ± σ): 10.212 s ± 0.100 s [User: 25.918 s, System: 5.555 s] Range (min … max): 10.079 s … 10.362 s 10 runs
2023-02-10 18:08:39 +00:00
available_token_condvar,
finished: false,
}
}
pub(super) fn acquire(&mut self, handler: &rustc_errors::Handler) -> ConcurrencyLimiterToken {
let mut state = self.state.lock().unwrap();
loop {
state.assert_invariants();
match state.try_start_job() {
Ok(true) => {
return ConcurrencyLimiterToken {
state: self.state.clone(),
available_token_condvar: self.available_token_condvar.clone(),
};
}
Ok(false) => {}
Err(err) => {
// An error happened when acquiring the token. Raise it as fatal error.
// Make sure to drop the mutex guard first to prevent poisoning the mutex.
drop(state);
if let Some(err) = err {
handler.fatal(err).raise();
} else {
// The error was already emitted, but compilation continued. Raise a silent
// fatal error.
rustc_errors::FatalError.raise();
}
}
}
self.helper_thread.as_mut().unwrap().request_token();
state = self.available_token_condvar.wait(state).unwrap();
}
}
pub(super) fn job_already_done(&mut self) {
let mut state = self.state.lock().unwrap();
state.job_already_done();
}
pub(crate) fn finished(mut self) {
self.helper_thread.take();
// Assert that all jobs have finished
let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
state.assert_done();
self.finished = true;
}
}
impl Drop for ConcurrencyLimiter {
fn drop(&mut self) {
if !self.finished && !std::thread::panicking() {
panic!("Forgot to call finished() on ConcurrencyLimiter");
}
}
}
#[derive(Debug)]
pub(super) struct ConcurrencyLimiterToken {
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
available_token_condvar: Arc<Condvar>,
}
impl Drop for ConcurrencyLimiterToken {
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
state.job_finished();
self.available_token_condvar.notify_one();
}
}
mod state {
use jobserver::Acquired;
#[derive(Debug)]
pub(super) struct ConcurrencyLimiterState {
pending_jobs: usize,
active_jobs: usize,
poisoned: bool,
stored_error: Option<String>,
// None is used to represent the implicit token, Some to represent explicit tokens
tokens: Vec<Option<Acquired>>,
}
impl ConcurrencyLimiterState {
pub(super) fn new(pending_jobs: usize) -> Self {
ConcurrencyLimiterState {
pending_jobs,
active_jobs: 0,
poisoned: false,
stored_error: None,
tokens: vec![None],
}
}
pub(super) fn assert_invariants(&self) {
// There must be no excess active jobs
assert!(self.active_jobs <= self.pending_jobs);
// There may not be more active jobs than there are tokens
assert!(self.active_jobs <= self.tokens.len());
}
pub(super) fn assert_done(&self) {
assert_eq!(self.pending_jobs, 0);
assert_eq!(self.active_jobs, 0);
}
pub(super) fn add_new_token(&mut self, token: Acquired) {
self.tokens.push(Some(token));
self.drop_excess_capacity();
}
pub(super) fn try_start_job(&mut self) -> Result<bool, Option<String>> {
if self.poisoned {
return Err(self.stored_error.take());
}
if self.active_jobs < self.tokens.len() {
// Using existing token
self.job_started();
return Ok(true);
}
Ok(false)
}
pub(super) fn job_started(&mut self) {
self.assert_invariants();
self.active_jobs += 1;
self.drop_excess_capacity();
self.assert_invariants();
}
pub(super) fn job_finished(&mut self) {
self.assert_invariants();
self.pending_jobs -= 1;
self.active_jobs -= 1;
self.assert_invariants();
self.drop_excess_capacity();
self.assert_invariants();
}
pub(super) fn job_already_done(&mut self) {
self.assert_invariants();
self.pending_jobs -= 1;
self.assert_invariants();
self.drop_excess_capacity();
self.assert_invariants();
}
pub(super) fn poison(&mut self, error: String) {
self.poisoned = true;
self.stored_error = Some(error);
}
fn drop_excess_capacity(&mut self) {
self.assert_invariants();
2022-08-24 12:22:01 +02:00
// Drop all tokens that can never be used anymore
self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
// Keep some excess tokens to satisfy requests faster
const MAX_EXTRA_CAPACITY: usize = 2;
self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
self.assert_invariants();
}
}
}