Move the WorkerLocal type from the rustc-rayon fork into rustc_data_structures
This commit is contained in:
parent
c6fb7b9815
commit
64474a40b0
@ -1,14 +1,10 @@
|
|||||||
use crate::fx::{FxHashMap, FxHasher};
|
use crate::fx::{FxHashMap, FxHasher};
|
||||||
use crate::sync::{Lock, LockGuard};
|
use crate::sync::{CacheAligned, Lock, LockGuard};
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::collections::hash_map::RawEntryMut;
|
use std::collections::hash_map::RawEntryMut;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
#[cfg_attr(parallel_compiler, repr(align(64)))]
|
|
||||||
struct CacheAligned<T>(T);
|
|
||||||
|
|
||||||
#[cfg(parallel_compiler)]
|
#[cfg(parallel_compiler)]
|
||||||
// 32 shards is sufficient to reduce contention on an 8-core Ryzen 7 1700,
|
// 32 shards is sufficient to reduce contention on an 8-core Ryzen 7 1700,
|
||||||
// but this should be tested on higher core count CPUs. How the `Sharded` type gets used
|
// but this should be tested on higher core count CPUs. How the `Sharded` type gets used
|
||||||
|
@ -45,6 +45,9 @@
|
|||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
|
||||||
|
|
||||||
|
mod worker_local;
|
||||||
|
pub use worker_local::{Registry, WorkerLocal};
|
||||||
|
|
||||||
pub use std::sync::atomic::Ordering;
|
pub use std::sync::atomic::Ordering;
|
||||||
pub use std::sync::atomic::Ordering::SeqCst;
|
pub use std::sync::atomic::Ordering::SeqCst;
|
||||||
|
|
||||||
@ -205,33 +208,6 @@ pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item)
|
|||||||
|
|
||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct WorkerLocal<T>(OneThread<T>);
|
|
||||||
|
|
||||||
impl<T> WorkerLocal<T> {
|
|
||||||
/// Creates a new worker local where the `initial` closure computes the
|
|
||||||
/// value this worker local should take for each thread in the thread pool.
|
|
||||||
#[inline]
|
|
||||||
pub fn new<F: FnMut(usize) -> T>(mut f: F) -> WorkerLocal<T> {
|
|
||||||
WorkerLocal(OneThread::new(f(0)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the worker-local value for each thread
|
|
||||||
#[inline]
|
|
||||||
pub fn into_inner(self) -> Vec<T> {
|
|
||||||
vec![OneThread::into_inner(self.0)]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Deref for WorkerLocal<T> {
|
|
||||||
type Target = T;
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn deref(&self) -> &T {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type MTLockRef<'a, T> = &'a mut MTLock<T>;
|
pub type MTLockRef<'a, T> = &'a mut MTLock<T>;
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
@ -351,8 +327,6 @@ macro_rules! parallel {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub use rayon_core::WorkerLocal;
|
|
||||||
|
|
||||||
pub use rayon::iter::ParallelIterator;
|
pub use rayon::iter::ParallelIterator;
|
||||||
use rayon::iter::IntoParallelIterator;
|
use rayon::iter::IntoParallelIterator;
|
||||||
|
|
||||||
@ -383,6 +357,10 @@ pub fn assert_send<T: ?Sized + Send>() {}
|
|||||||
pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
|
pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
|
||||||
pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
|
pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
#[cfg_attr(parallel_compiler, repr(align(64)))]
|
||||||
|
pub struct CacheAligned<T>(pub T);
|
||||||
|
|
||||||
pub trait HashMapExt<K, V> {
|
pub trait HashMapExt<K, V> {
|
||||||
/// Same as HashMap::insert, but it may panic if there's already an
|
/// Same as HashMap::insert, but it may panic if there's already an
|
||||||
/// entry for `key` with a value not equal to `value`
|
/// entry for `key` with a value not equal to `value`
|
||||||
|
180
compiler/rustc_data_structures/src/sync/worker_local.rs
Normal file
180
compiler/rustc_data_structures/src/sync/worker_local.rs
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
use crate::sync::Lock;
|
||||||
|
use std::cell::Cell;
|
||||||
|
use std::cell::OnceCell;
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::ptr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
use {crate::cold_path, crate::sync::CacheAligned};
|
||||||
|
|
||||||
|
/// A pointer to the `RegistryData` which uniquely identifies a registry.
|
||||||
|
/// This identifier can be reused if the registry gets freed.
|
||||||
|
#[derive(Clone, Copy, PartialEq)]
|
||||||
|
struct RegistryId(*const RegistryData);
|
||||||
|
|
||||||
|
impl RegistryId {
|
||||||
|
#[inline(always)]
|
||||||
|
/// Verifies that the current thread is associated with the registry and returns its unique
|
||||||
|
/// index within the registry. This panics if the current thread is not associated with this
|
||||||
|
/// registry.
|
||||||
|
///
|
||||||
|
/// Note that there's a race possible where the identifer in `THREAD_DATA` could be reused
|
||||||
|
/// so this can succeed from a different registry.
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
fn verify(self) -> usize {
|
||||||
|
let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get()));
|
||||||
|
|
||||||
|
if id == self {
|
||||||
|
index
|
||||||
|
} else {
|
||||||
|
cold_path(|| panic!("Unable to verify registry association"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct RegistryData {
|
||||||
|
thread_limit: usize,
|
||||||
|
threads: Lock<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents a list of threads which can access worker locals.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Registry(Arc<RegistryData>);
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
/// The registry associated with the thread.
|
||||||
|
/// This allows the `WorkerLocal` type to clone the registry in its constructor.
|
||||||
|
static REGISTRY: OnceCell<Registry> = OnceCell::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ThreadData {
|
||||||
|
registry_id: Cell<RegistryId>,
|
||||||
|
index: Cell<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
/// A thread local which contains the identifer of `REGISTRY` but allows for faster access.
|
||||||
|
/// It also holds the index of the current thread.
|
||||||
|
static THREAD_DATA: ThreadData = const { ThreadData {
|
||||||
|
registry_id: Cell::new(RegistryId(ptr::null())),
|
||||||
|
index: Cell::new(0),
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Registry {
|
||||||
|
/// Creates a registry which can hold up to `thread_limit` threads.
|
||||||
|
pub fn new(thread_limit: usize) -> Self {
|
||||||
|
Registry(Arc::new(RegistryData { thread_limit, threads: Lock::new(0) }))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the registry associated with the current thread. Panics if there's no such registry.
|
||||||
|
pub fn current() -> Self {
|
||||||
|
REGISTRY.with(|registry| registry.get().cloned().expect("No assocated registry"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers the current thread with the registry so worker locals can be used on it.
|
||||||
|
/// Panics if the thread limit is hit or if the thread already has an associated registry.
|
||||||
|
pub fn register(&self) {
|
||||||
|
let mut threads = self.0.threads.lock();
|
||||||
|
if *threads < self.0.thread_limit {
|
||||||
|
REGISTRY.with(|registry| {
|
||||||
|
if registry.get().is_some() {
|
||||||
|
drop(threads);
|
||||||
|
panic!("Thread already has a registry");
|
||||||
|
}
|
||||||
|
registry.set(self.clone()).ok();
|
||||||
|
THREAD_DATA.with(|data| {
|
||||||
|
data.registry_id.set(self.id());
|
||||||
|
data.index.set(*threads);
|
||||||
|
});
|
||||||
|
*threads += 1;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
drop(threads);
|
||||||
|
panic!("Thread limit reached");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the identifer of this registry.
|
||||||
|
fn id(&self) -> RegistryId {
|
||||||
|
RegistryId(&*self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holds worker local values for each possible thread in a registry. You can only access the
|
||||||
|
/// worker local value through the `Deref` impl on the registry associated with the thread it was
|
||||||
|
/// created on. It will panic otherwise.
|
||||||
|
pub struct WorkerLocal<T> {
|
||||||
|
#[cfg(not(parallel_compiler))]
|
||||||
|
local: T,
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
locals: Box<[CacheAligned<T>]>,
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
registry: Registry,
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is safe because the `deref` call will return a reference to a `T` unique to each thread
|
||||||
|
// or it will panic for threads without an associated local. So there isn't a need for `T` to do
|
||||||
|
// it's own synchronization. The `verify` method on `RegistryId` has an issue where the the id
|
||||||
|
// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse.
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
unsafe impl<T: Send> Sync for WorkerLocal<T> {}
|
||||||
|
|
||||||
|
impl<T> WorkerLocal<T> {
|
||||||
|
/// Creates a new worker local where the `initial` closure computes the
|
||||||
|
/// value this worker local should take for each thread in the registry.
|
||||||
|
#[inline]
|
||||||
|
pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> {
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
{
|
||||||
|
let registry = Registry::current();
|
||||||
|
WorkerLocal {
|
||||||
|
locals: (0..registry.0.thread_limit).map(|i| CacheAligned(initial(i))).collect(),
|
||||||
|
registry,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(not(parallel_compiler))]
|
||||||
|
{
|
||||||
|
WorkerLocal { local: initial(0) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the worker-local values for each thread
|
||||||
|
#[inline]
|
||||||
|
pub fn into_inner(self) -> impl Iterator<Item = T> {
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
{
|
||||||
|
self.locals.into_vec().into_iter().map(|local| local.0)
|
||||||
|
}
|
||||||
|
#[cfg(not(parallel_compiler))]
|
||||||
|
{
|
||||||
|
std::iter::once(self.local)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> WorkerLocal<Vec<T>> {
|
||||||
|
/// Joins the elements of all the worker locals into one Vec
|
||||||
|
pub fn join(self) -> Vec<T> {
|
||||||
|
self.into_inner().into_iter().flat_map(|v| v).collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for WorkerLocal<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
#[cfg(not(parallel_compiler))]
|
||||||
|
fn deref(&self) -> &T {
|
||||||
|
&self.local
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
fn deref(&self) -> &T {
|
||||||
|
// This is safe because `verify` will only return values less than
|
||||||
|
// `self.registry.thread_limit` which is the size of the `self.locals` array.
|
||||||
|
unsafe { &self.locals.get_unchecked(self.registry.id().verify()).0 }
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,7 @@
|
|||||||
use rustc_session::{early_error, CompilerIO};
|
use rustc_session::{early_error, CompilerIO};
|
||||||
use rustc_span::source_map::{FileLoader, FileName};
|
use rustc_span::source_map::{FileLoader, FileName};
|
||||||
use rustc_span::symbol::sym;
|
use rustc_span::symbol::sym;
|
||||||
|
use std::cell::OnceCell;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::result;
|
use std::result;
|
||||||
|
|
||||||
@ -58,9 +59,25 @@ pub fn build_output_filenames(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn registry_setup() {
|
||||||
|
thread_local! {
|
||||||
|
static ONCE: OnceCell<()> = OnceCell::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a dummy registry to allow `WorkerLocal` construction.
|
||||||
|
// We use `OnceCell` so we only register one dummy registry per thread.
|
||||||
|
ONCE.with(|once| {
|
||||||
|
once.get_or_init(|| {
|
||||||
|
rustc_data_structures::sync::Registry::new(1).register();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Converts strings provided as `--cfg [cfgspec]` into a `crate_cfg`.
|
/// Converts strings provided as `--cfg [cfgspec]` into a `crate_cfg`.
|
||||||
pub fn parse_cfgspecs(cfgspecs: Vec<String>) -> FxHashSet<(String, Option<String>)> {
|
pub fn parse_cfgspecs(cfgspecs: Vec<String>) -> FxHashSet<(String, Option<String>)> {
|
||||||
rustc_span::create_default_session_if_not_set_then(move |_| {
|
rustc_span::create_default_session_if_not_set_then(move |_| {
|
||||||
|
registry_setup();
|
||||||
|
|
||||||
let cfg = cfgspecs
|
let cfg = cfgspecs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
@ -120,6 +137,8 @@ macro_rules! error {
|
|||||||
/// Converts strings provided as `--check-cfg [specs]` into a `CheckCfg`.
|
/// Converts strings provided as `--check-cfg [specs]` into a `CheckCfg`.
|
||||||
pub fn parse_check_cfg(specs: Vec<String>) -> CheckCfg {
|
pub fn parse_check_cfg(specs: Vec<String>) -> CheckCfg {
|
||||||
rustc_span::create_default_session_if_not_set_then(move |_| {
|
rustc_span::create_default_session_if_not_set_then(move |_| {
|
||||||
|
registry_setup();
|
||||||
|
|
||||||
let mut cfg = CheckCfg::default();
|
let mut cfg = CheckCfg::default();
|
||||||
|
|
||||||
'specs: for s in specs {
|
'specs: for s in specs {
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
use rustc_ast as ast;
|
use rustc_ast as ast;
|
||||||
use rustc_codegen_ssa::traits::CodegenBackend;
|
use rustc_codegen_ssa::traits::CodegenBackend;
|
||||||
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
|
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
|
||||||
|
#[cfg(parallel_compiler)]
|
||||||
|
use rustc_data_structures::sync;
|
||||||
use rustc_errors::registry::Registry;
|
use rustc_errors::registry::Registry;
|
||||||
use rustc_parse::validate_attr;
|
use rustc_parse::validate_attr;
|
||||||
use rustc_session as session;
|
use rustc_session as session;
|
||||||
@ -170,6 +172,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce() -> R + Send, R: Send>(
|
|||||||
use rustc_middle::ty::tls;
|
use rustc_middle::ty::tls;
|
||||||
use rustc_query_impl::{deadlock, QueryContext, QueryCtxt};
|
use rustc_query_impl::{deadlock, QueryContext, QueryCtxt};
|
||||||
|
|
||||||
|
let registry = sync::Registry::new(threads);
|
||||||
let mut builder = rayon::ThreadPoolBuilder::new()
|
let mut builder = rayon::ThreadPoolBuilder::new()
|
||||||
.thread_name(|_| "rustc".to_string())
|
.thread_name(|_| "rustc".to_string())
|
||||||
.acquire_thread_handler(jobserver::acquire_thread)
|
.acquire_thread_handler(jobserver::acquire_thread)
|
||||||
@ -200,6 +203,9 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce() -> R + Send, R: Send>(
|
|||||||
.build_scoped(
|
.build_scoped(
|
||||||
// Initialize each new worker thread when created.
|
// Initialize each new worker thread when created.
|
||||||
move |thread: rayon::ThreadBuilder| {
|
move |thread: rayon::ThreadBuilder| {
|
||||||
|
// Register the thread for use with the `WorkerLocal` type.
|
||||||
|
registry.register();
|
||||||
|
|
||||||
rustc_span::set_session_globals_then(session_globals, || thread.run())
|
rustc_span::set_session_globals_then(session_globals, || thread.run())
|
||||||
},
|
},
|
||||||
// Run `f` on the first thread in the thread pool.
|
// Run `f` on the first thread in the thread pool.
|
||||||
|
Loading…
Reference in New Issue
Block a user