diff --git a/src/librustc/Cargo.toml b/src/librustc/Cargo.toml index e3557132a12..26a27ea88e2 100644 --- a/src/librustc/Cargo.toml +++ b/src/librustc/Cargo.toml @@ -20,8 +20,8 @@ num_cpus = "1.0" scoped-tls = "1.0" log = { version = "0.4", features = ["release_max_level_info", "std"] } polonius-engine = "0.6.2" -rustc-rayon = "0.1.1" -rustc-rayon-core = "0.1.1" +rustc-rayon = "0.1.2" +rustc-rayon-core = "0.1.2" rustc_apfloat = { path = "../librustc_apfloat" } rustc_target = { path = "../librustc_target" } rustc_data_structures = { path = "../librustc_data_structures" } diff --git a/src/librustc/session/mod.rs b/src/librustc/session/mod.rs index 5b9b70edc68..3cff5ec2309 100644 --- a/src/librustc/session/mod.rs +++ b/src/librustc/session/mod.rs @@ -34,7 +34,8 @@ use rustc_target::spec::{PanicStrategy, RelroLevel, Target, TargetTriple}; use rustc_data_structures::flock; -use jobserver::Client; +use rustc_data_structures::jobserver; +use ::jobserver::Client; use std; use std::cell::{self, Cell, RefCell}; @@ -1230,32 +1231,7 @@ pub fn build_session_( optimization_fuel, print_fuel_crate, print_fuel, - // Note that this is unsafe because it may misinterpret file descriptors - // on Unix as jobserver file descriptors. We hopefully execute this near - // the beginning of the process though to ensure we don't get false - // positives, or in other words we try to execute this before we open - // any file descriptors ourselves. - // - // Pick a "reasonable maximum" if we don't otherwise have - // a jobserver in our environment, capping out at 32 so we - // don't take everything down by hogging the process run queue. - // The fixed number is used to have deterministic compilation - // across machines. - // - // Also note that we stick this in a global because there could be - // multiple `Session` instances in this process, and the jobserver is - // per-process. - jobserver: unsafe { - static mut GLOBAL_JOBSERVER: *mut Client = 0 as *mut _; - static INIT: std::sync::Once = std::sync::ONCE_INIT; - INIT.call_once(|| { - let client = Client::from_env().unwrap_or_else(|| { - Client::new(32).expect("failed to create jobserver") - }); - GLOBAL_JOBSERVER = Box::into_raw(Box::new(client)); - }); - (*GLOBAL_JOBSERVER).clone() - }, + jobserver: jobserver::client(), has_global_allocator: Once::new(), has_panic_handler: Once::new(), driver_lint_caps, diff --git a/src/librustc/ty/query/job.rs b/src/librustc/ty/query/job.rs index 22211468412..8e68c9fa304 100644 --- a/src/librustc/ty/query/job.rs +++ b/src/librustc/ty/query/job.rs @@ -7,6 +7,7 @@ use rustc_data_structures::fx::FxHashSet; use rustc_data_structures::sync::{Lock, LockGuard, Lrc, Weak}; use rustc_data_structures::OnDrop; +use rustc_data_structures::jobserver; use syntax_pos::Span; use crate::ty::tls; @@ -198,7 +199,11 @@ fn r#await(&self, waiter: &Lrc>) { // we have to be in the `wait` call. This is ensured by the deadlock handler // getting the self.info lock. rayon_core::mark_blocked(); + jobserver::release_thread(); waiter.condvar.wait(&mut info); + // Release the lock before we potentially block in `acquire_thread` + mem::drop(info); + jobserver::acquire_thread(); } } diff --git a/src/librustc_data_structures/Cargo.toml b/src/librustc_data_structures/Cargo.toml index 6aa262715ec..6002bf69b70 100644 --- a/src/librustc_data_structures/Cargo.toml +++ b/src/librustc_data_structures/Cargo.toml @@ -12,13 +12,15 @@ crate-type = ["dylib"] [dependencies] ena = "0.11" log = "0.4" +jobserver_crate = { version = "0.1", package = "jobserver" } +lazy_static = "1" rustc_cratesio_shim = { path = "../librustc_cratesio_shim" } serialize = { path = "../libserialize" } graphviz = { path = "../libgraphviz" } cfg-if = "0.1.2" stable_deref_trait = "1.0.0" -rayon = { version = "0.1.1", package = "rustc-rayon" } -rayon-core = { version = "0.1.1", package = "rustc-rayon-core" } +rayon = { version = "0.1.2", package = "rustc-rayon" } +rayon-core = { version = "0.1.2", package = "rustc-rayon-core" } rustc-hash = "1.0.1" smallvec = { version = "0.6.7", features = ["union", "may_dangle"] } diff --git a/src/librustc_data_structures/jobserver.rs b/src/librustc_data_structures/jobserver.rs new file mode 100644 index 00000000000..c85cdfbdcd8 --- /dev/null +++ b/src/librustc_data_structures/jobserver.rs @@ -0,0 +1,153 @@ +use jobserver_crate::{Client, HelperThread, Acquired}; +use lazy_static::lazy_static; +use std::sync::{Condvar, Arc, Mutex}; +use std::mem; + +#[derive(Default)] +pub struct LockedProxyData { + /// The number of free thread tokens, this may include the implicit token given to the process + free: usize, + + /// The number of threads waiting for a token + waiters: usize, + + /// The number of tokens we requested from the server + requested: usize, + + /// Stored tokens which will be dropped when we no longer need them + tokens: Vec, +} + +impl LockedProxyData { + fn request_token(&mut self, thread: &Mutex) { + self.requested += 1; + thread.lock().unwrap().request_token(); + } + + fn release_token(&mut self, cond_var: &Condvar) { + if self.waiters > 0 { + self.free += 1; + cond_var.notify_one(); + } else { + if self.tokens.is_empty() { + // We are returning the implicit token + self.free += 1; + } else { + // Return a real token to the server + self.tokens.pop().unwrap(); + } + } + } + + fn take_token(&mut self, thread: &Mutex) -> bool { + if self.free > 0 { + self.free -= 1; + self.waiters -= 1; + + // We stole some token reqested by someone else + // Request another one + if self.requested + self.free < self.waiters { + self.request_token(thread); + } + + true + } else { + false + } + } + + fn new_requested_token(&mut self, token: Acquired, cond_var: &Condvar) { + self.requested -= 1; + + // Does anything need this token? + if self.waiters > 0 { + self.free += 1; + self.tokens.push(token); + cond_var.notify_one(); + } else { + // Otherwise we'll just drop it + mem::drop(token); + } + } +} + +#[derive(Default)] +pub struct ProxyData { + lock: Mutex, + cond_var: Condvar, +} + +pub struct Proxy { + thread: Mutex, + data: Arc, +} + +lazy_static! { + // We can only call `from_env` once per process + + // Note that this is unsafe because it may misinterpret file descriptors + // on Unix as jobserver file descriptors. We hopefully execute this near + // the beginning of the process though to ensure we don't get false + // positives, or in other words we try to execute this before we open + // any file descriptors ourselves. + // + // Pick a "reasonable maximum" if we don't otherwise have + // a jobserver in our environment, capping out at 32 so we + // don't take everything down by hogging the process run queue. + // The fixed number is used to have deterministic compilation + // across machines. + // + // Also note that we stick this in a global because there could be + // multiple rustc instances in this process, and the jobserver is + // per-process. + static ref GLOBAL_CLIENT: Client = unsafe { + Client::from_env().unwrap_or_else(|| { + Client::new(32).expect("failed to create jobserver") + }) + }; + + static ref GLOBAL_PROXY: Proxy = { + let data = Arc::new(ProxyData::default()); + + Proxy { + data: data.clone(), + thread: Mutex::new(client().into_helper_thread(move |token| { + data.lock.lock().unwrap().new_requested_token(token.unwrap(), &data.cond_var); + }).unwrap()), + } + }; +} + +pub fn client() -> Client { + GLOBAL_CLIENT.clone() +} + +pub fn acquire_thread() { + GLOBAL_PROXY.acquire_token(); +} + +pub fn release_thread() { + GLOBAL_PROXY.release_token(); +} + +impl Proxy { + pub fn release_token(&self) { + self.data.lock.lock().unwrap().release_token(&self.data.cond_var); + } + + pub fn acquire_token(&self) { + let mut data = self.data.lock.lock().unwrap(); + data.waiters += 1; + if data.take_token(&self.thread) { + return; + } + // Request a token for us + data.request_token(&self.thread); + loop { + data = self.data.cond_var.wait(data).unwrap(); + if data.take_token(&self.thread) { + return; + } + } + } +} diff --git a/src/librustc_data_structures/lib.rs b/src/librustc_data_structures/lib.rs index 2bfb1b24a81..09482340b1a 100644 --- a/src/librustc_data_structures/lib.rs +++ b/src/librustc_data_structures/lib.rs @@ -77,6 +77,7 @@ macro_rules! unlikely { pub mod graph; pub mod indexed_vec; pub mod interner; +pub mod jobserver; pub mod obligation_forest; pub mod owning_ref; pub mod ptr_key; diff --git a/src/librustc_driver/Cargo.toml b/src/librustc_driver/Cargo.toml index 0b379ef662d..a77e497af7b 100644 --- a/src/librustc_driver/Cargo.toml +++ b/src/librustc_driver/Cargo.toml @@ -13,7 +13,7 @@ arena = { path = "../libarena" } graphviz = { path = "../libgraphviz" } log = "0.4" env_logger = { version = "0.5", default-features = false } -rustc-rayon = "0.1.1" +rustc-rayon = "0.1.2" scoped-tls = "1.0" rustc = { path = "../librustc" } rustc_allocator = { path = "../librustc_allocator" } diff --git a/src/librustc_driver/driver.rs b/src/librustc_driver/driver.rs index f87a809e6c6..c4d8a66aa6d 100644 --- a/src/librustc_driver/driver.rs +++ b/src/librustc_driver/driver.rs @@ -17,6 +17,7 @@ use rustc_borrowck as borrowck; use rustc_codegen_utils::codegen_backend::CodegenBackend; use rustc_data_structures::sync::{self, Lock}; +use rustc_data_structures::jobserver; use rustc_incremental; use rustc_metadata::creader::CrateLoader; use rustc_metadata::cstore::{self, CStore}; @@ -72,6 +73,8 @@ pub fn spawn_thread_pool R + sync::Send, R: sync:: let gcx_ptr = &Lock::new(0); let config = ThreadPoolBuilder::new() + .acquire_thread_handler(jobserver::acquire_thread) + .release_thread_handler(jobserver::release_thread) .num_threads(Session::threads_from_count(opts.debugging_opts.threads)) .deadlock_handler(|| unsafe { ty::query::handle_deadlock() }) .stack_size(::STACK_SIZE);