Don't deadlock when failing to acquire a jobserver token
This commit is contained in:
parent
fd4e1d55ea
commit
9970b04646
@ -25,8 +25,18 @@ pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
|
||||
.clone()
|
||||
.into_helper_thread(move |token| {
|
||||
let mut state = state_helper.lock().unwrap();
|
||||
state.add_new_token(token.unwrap());
|
||||
available_token_condvar_helper.notify_one();
|
||||
match token {
|
||||
Ok(token) => {
|
||||
state.add_new_token(token);
|
||||
available_token_condvar_helper.notify_one();
|
||||
}
|
||||
Err(err) => {
|
||||
state.poison(format!("failed to acquire jobserver token: {}", err));
|
||||
// Notify all threads waiting for a token to give them a chance to
|
||||
// gracefully exit.
|
||||
available_token_condvar_helper.notify_all();
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
ConcurrencyLimiter {
|
||||
@ -37,16 +47,31 @@ pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
|
||||
pub(super) fn acquire(&mut self, handler: &rustc_errors::Handler) -> ConcurrencyLimiterToken {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
loop {
|
||||
state.assert_invariants();
|
||||
|
||||
if state.try_start_job() {
|
||||
return ConcurrencyLimiterToken {
|
||||
state: self.state.clone(),
|
||||
available_token_condvar: self.available_token_condvar.clone(),
|
||||
};
|
||||
match state.try_start_job() {
|
||||
Ok(true) => {
|
||||
return ConcurrencyLimiterToken {
|
||||
state: self.state.clone(),
|
||||
available_token_condvar: self.available_token_condvar.clone(),
|
||||
};
|
||||
}
|
||||
Ok(false) => {}
|
||||
Err(err) => {
|
||||
// An error happened when acquiring the token. Raise it as fatal error.
|
||||
// Make sure to drop the mutex guard first to prevent poisoning the mutex.
|
||||
drop(state);
|
||||
if let Some(err) = err {
|
||||
handler.fatal(&err).raise();
|
||||
} else {
|
||||
// The error was already emitted, but compilation continued. Raise a silent
|
||||
// fatal error.
|
||||
rustc_errors::FatalError.raise();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.helper_thread.as_mut().unwrap().request_token();
|
||||
@ -100,13 +125,22 @@ pub(super) struct ConcurrencyLimiterState {
|
||||
pending_jobs: usize,
|
||||
active_jobs: usize,
|
||||
|
||||
poisoned: bool,
|
||||
stored_error: Option<String>,
|
||||
|
||||
// None is used to represent the implicit token, Some to represent explicit tokens
|
||||
tokens: Vec<Option<Acquired>>,
|
||||
}
|
||||
|
||||
impl ConcurrencyLimiterState {
|
||||
pub(super) fn new(pending_jobs: usize) -> Self {
|
||||
ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
|
||||
ConcurrencyLimiterState {
|
||||
pending_jobs,
|
||||
active_jobs: 0,
|
||||
poisoned: false,
|
||||
stored_error: None,
|
||||
tokens: vec![None],
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn assert_invariants(&self) {
|
||||
@ -127,14 +161,18 @@ pub(super) fn add_new_token(&mut self, token: Acquired) {
|
||||
self.drop_excess_capacity();
|
||||
}
|
||||
|
||||
pub(super) fn try_start_job(&mut self) -> bool {
|
||||
pub(super) fn try_start_job(&mut self) -> Result<bool, Option<String>> {
|
||||
if self.poisoned {
|
||||
return Err(self.stored_error.take());
|
||||
}
|
||||
|
||||
if self.active_jobs < self.tokens.len() {
|
||||
// Using existing token
|
||||
self.job_started();
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
false
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub(super) fn job_started(&mut self) {
|
||||
@ -161,6 +199,11 @@ pub(super) fn job_already_done(&mut self) {
|
||||
self.assert_invariants();
|
||||
}
|
||||
|
||||
pub(super) fn poison(&mut self, error: String) {
|
||||
self.poisoned = true;
|
||||
self.stored_error = Some(error);
|
||||
}
|
||||
|
||||
fn drop_excess_capacity(&mut self) {
|
||||
self.assert_invariants();
|
||||
|
||||
|
@ -407,7 +407,7 @@ pub(crate) fn run_aot(
|
||||
backend_config.clone(),
|
||||
global_asm_config.clone(),
|
||||
cgu.name(),
|
||||
concurrency_limiter.acquire(),
|
||||
concurrency_limiter.acquire(tcx.sess.diagnostic()),
|
||||
),
|
||||
module_codegen,
|
||||
Some(rustc_middle::dep_graph::hash_result),
|
||||
|
Loading…
Reference in New Issue
Block a user