Auto merge of #131841 - paulmenage:futex-abstraction, r=joboet

Abstract the state type for futexes

In the same way that we expose `SmallAtomic` and `SmallPrimitive` to allow Windows to use a value other than an `AtomicU32` for its futex state, switch the primary futex state type from `AtomicU32` to `futex::Futex`.  The `futex::Futex` type should be usable as an atomic value with underlying primitive type equal to `futex::Primitive`. (`SmallAtomic` is also renamed to `SmallFutex`).

This allows supporting the futex API on systems where the underlying kernel futex implementation requires more user state than simply an `AtomicU32`.

All in-tree futex implementations simply define {`Futex`,`Primitive`} directly as {`AtomicU32`,`u32`}.
This commit is contained in:
bors 2024-10-18 13:43:57 +00:00
commit b0c2d2e5b0
10 changed files with 83 additions and 66 deletions

View File

@ -3,9 +3,14 @@
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;
/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;
/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {

View File

@ -11,9 +11,14 @@
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;
/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;
/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;
/// Waits for a `futex_wake` operation to wake us.

View File

@ -6,9 +6,14 @@
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;
/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;
/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;
/// Wait for a futex_wake operation to wake us.

View File

@ -9,22 +9,27 @@
use super::api::{self, WinError};
use crate::sys::{c, dur2timeout};
/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;
/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU8;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU8;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u8;
pub unsafe trait Futex {}
pub unsafe trait Futexable {}
pub unsafe trait Waitable {
type Atomic;
type Futex;
}
macro_rules! unsafe_waitable_int {
($(($int:ty, $atomic:ty)),*$(,)?) => {
$(
unsafe impl Waitable for $int {
type Atomic = $atomic;
type Futex = $atomic;
}
unsafe impl Futex for $atomic {}
unsafe impl Futexable for $atomic {}
)*
};
}
@ -42,15 +47,15 @@ unsafe impl Futex for $atomic {}
(usize, AtomicUsize),
}
unsafe impl<T> Waitable for *const T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Waitable for *mut T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Futex for AtomicPtr<T> {}
unsafe impl<T> Futexable for AtomicPtr<T> {}
pub fn wait_on_address<W: Waitable>(
address: &W::Atomic,
address: &W::Futex,
compare: W,
timeout: Option<Duration>,
) -> bool {
@ -63,30 +68,30 @@ pub fn wait_on_address<W: Waitable>(
}
}
pub fn wake_by_address_single<T: Futex>(address: &T) {
pub fn wake_by_address_single<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressSingle(addr);
}
}
pub fn wake_by_address_all<T: Futex>(address: &T) {
pub fn wake_by_address_all<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressAll(addr);
}
}
pub fn futex_wait<W: Waitable>(futex: &W::Atomic, expected: W, timeout: Option<Duration>) -> bool {
pub fn futex_wait<W: Waitable>(futex: &W::Futex, expected: W, timeout: Option<Duration>) -> bool {
// return false only on timeout
wait_on_address(futex, expected, timeout) || api::get_last_error() != WinError::TIMEOUT
}
pub fn futex_wake<T: Futex>(futex: &T) -> bool {
pub fn futex_wake<T: Futexable>(futex: &T) -> bool {
wake_by_address_single(futex);
false
}
pub fn futex_wake_all<T: Futex>(futex: &T) {
pub fn futex_wake_all<T: Futexable>(futex: &T) {
wake_by_address_all(futex)
}

View File

@ -1,6 +1,5 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, futex_wait, futex_wake, futex_wake_all};
use crate::sys::sync::Mutex;
use crate::time::Duration;
@ -8,13 +7,13 @@ pub struct Condvar {
// The value of this atomic is simply incremented on every notification.
// This is used by `.wait()` to not miss any notifications after
// unlocking the mutex and before waiting for notifications.
futex: AtomicU32,
futex: Futex,
}
impl Condvar {
#[inline]
pub const fn new() -> Self {
Self { futex: AtomicU32::new(0) }
Self { futex: Futex::new(0) }
}
// All the memory orderings here are `Relaxed`,

View File

@ -1,11 +1,11 @@
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{self, futex_wait, futex_wake};
type Atomic = futex::SmallAtomic;
type Futex = futex::SmallFutex;
type State = futex::SmallPrimitive;
pub struct Mutex {
futex: Atomic,
futex: Futex,
}
const UNLOCKED: State = 0;
@ -15,7 +15,7 @@ pub struct Mutex {
impl Mutex {
#[inline]
pub const fn new() -> Self {
Self { futex: Atomic::new(UNLOCKED) }
Self { futex: Futex::new(UNLOCKED) }
}
#[inline]

View File

@ -1,39 +1,38 @@
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};
// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 4 states:
/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
const INCOMPLETE: Primitive = 0;
/// Some thread has previously attempted to initialize the Once, but it panicked,
/// so the Once is now poisoned. There are no other threads currently accessing
/// this Once.
const POISONED: u32 = 1;
const POISONED: Primitive = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
const RUNNING: Primitive = 2;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 3;
const COMPLETE: Primitive = 3;
// An additional bit indicates whether there are waiting threads:
/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;
const QUEUED: Primitive = 4;
// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.
const STATE_MASK: u32 = 0b11;
const STATE_MASK: Primitive = 0b11;
pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
set_state_to: Cell<Primitive>,
}
impl OnceState {
@ -49,8 +48,8 @@ pub fn poison(&self) {
}
struct CompletionGuard<'a> {
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
state_and_queued: &'a Futex,
set_state_on_drop_to: Primitive,
}
impl<'a> Drop for CompletionGuard<'a> {
@ -65,13 +64,13 @@ fn drop(&mut self) {
}
pub struct Once {
state_and_queued: AtomicU32,
state_and_queued: Futex,
}
impl Once {
#[inline]
pub const fn new() -> Once {
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
Once { state_and_queued: Futex::new(INCOMPLETE) }
}
#[inline]

View File

@ -23,7 +23,7 @@
// You'll find a few more details in the implementation, but that's the gist of
// it!
//
// Atomic orderings:
// Futex orderings:
// When running `Once` we deal with multiple atomics:
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the

View File

@ -1,6 +1,5 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};
pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
@ -10,41 +9,41 @@ pub struct RwLock {
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicU32,
state: Futex,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicU32,
writer_notify: Futex,
}
const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
const READ_LOCKED: Primitive = 1;
const MASK: Primitive = (1 << 30) - 1;
const WRITE_LOCKED: Primitive = MASK;
const MAX_READERS: Primitive = MASK - 1;
const READERS_WAITING: Primitive = 1 << 30;
const WRITERS_WAITING: Primitive = 1 << 31;
#[inline]
fn is_unlocked(state: u32) -> bool {
fn is_unlocked(state: Primitive) -> bool {
state & MASK == 0
}
#[inline]
fn is_write_locked(state: u32) -> bool {
fn is_write_locked(state: Primitive) -> bool {
state & MASK == WRITE_LOCKED
}
#[inline]
fn has_readers_waiting(state: u32) -> bool {
fn has_readers_waiting(state: Primitive) -> bool {
state & READERS_WAITING != 0
}
#[inline]
fn has_writers_waiting(state: u32) -> bool {
fn has_writers_waiting(state: Primitive) -> bool {
state & WRITERS_WAITING != 0
}
#[inline]
fn is_read_lockable(state: u32) -> bool {
fn is_read_lockable(state: Primitive) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
@ -55,14 +54,14 @@ fn is_read_lockable(state: u32) -> bool {
}
#[inline]
fn has_reached_max_readers(state: u32) -> bool {
fn has_reached_max_readers(state: Primitive) -> bool {
state & MASK == MAX_READERS
}
impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
Self { state: Futex::new(0), writer_notify: Futex::new(0) }
}
#[inline]
@ -225,7 +224,7 @@ fn write_contended(&self) {
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
fn wake_writer_or_readers(&self, mut state: Primitive) {
assert!(is_unlocked(state));
// The readers waiting bit might be turned on at any point now,
@ -290,7 +289,7 @@ fn wake_writer(&self) -> bool {
/// Spin for a while, but stop directly at the given condition.
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
@ -303,13 +302,13 @@ fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
}
#[inline]
fn spin_write(&self) -> u32 {
fn spin_write(&self) -> Primitive {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}
#[inline]
fn spin_read(&self) -> u32 {
fn spin_read(&self) -> Primitive {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)

View File

@ -4,7 +4,7 @@
use crate::sys::futex::{self, futex_wait, futex_wake};
use crate::time::Duration;
type Atomic = futex::SmallAtomic;
type Futex = futex::SmallFutex;
type State = futex::SmallPrimitive;
const PARKED: State = State::MAX;
@ -12,7 +12,7 @@
const NOTIFIED: State = 1;
pub struct Parker {
state: Atomic,
state: Futex,
}
// Notes about memory ordering:
@ -39,7 +39,7 @@ impl Parker {
/// Constructs the futex parker. The UNIX parker implementation
/// requires this to happen in-place.
pub unsafe fn new_in_place(parker: *mut Parker) {
unsafe { parker.write(Self { state: Atomic::new(EMPTY) }) };
unsafe { parker.write(Self { state: Futex::new(EMPTY) }) };
}
// Assumes this is only called by the thread that owns the Parker,