store futexes in per-allocation data rather than globally

This commit is contained in:
Ralf Jung 2024-10-12 16:08:06 +02:00
parent a839fbf0a1
commit d1a4812164
5 changed files with 122 additions and 55 deletions

View File

@ -1,6 +1,8 @@
use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::ops::Not; use std::ops::Not;
use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use rustc_abi::Size; use rustc_abi::Size;
@ -121,6 +123,15 @@ struct Futex {
clock: VClock, clock: VClock,
} }
#[derive(Default, Clone)]
pub struct FutexRef(Rc<RefCell<Futex>>);
impl VisitProvenance for FutexRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance in `Futex`.
}
}
/// A thread waiting on a futex. /// A thread waiting on a futex.
#[derive(Debug)] #[derive(Debug)]
struct FutexWaiter { struct FutexWaiter {
@ -137,9 +148,6 @@ pub struct SynchronizationObjects {
rwlocks: IndexVec<RwLockId, RwLock>, rwlocks: IndexVec<RwLockId, RwLock>,
condvars: IndexVec<CondvarId, Condvar>, condvars: IndexVec<CondvarId, Condvar>,
pub(super) init_onces: IndexVec<InitOnceId, InitOnce>, pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
/// Futex info for the futex at the given address.
futexes: FxHashMap<u64, Futex>,
} }
// Private extension trait for local helper methods // Private extension trait for local helper methods
@ -184,7 +192,7 @@ pub fn init_once_create(&mut self) -> InitOnceId {
} }
impl<'tcx> AllocExtra<'tcx> { impl<'tcx> AllocExtra<'tcx> {
pub fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> { fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>()) self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
} }
} }
@ -273,27 +281,32 @@ fn lazy_sync_get_data<T: 'static + Copy>(
/// Get the synchronization primitive associated with the given pointer, /// Get the synchronization primitive associated with the given pointer,
/// or initialize a new one. /// or initialize a new one.
///
/// Return `None` if this pointer does not point to at least 1 byte of mutable memory.
fn get_sync_or_init<'a, T: 'static>( fn get_sync_or_init<'a, T: 'static>(
&'a mut self, &'a mut self,
ptr: Pointer, ptr: Pointer,
new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> InterpResult<'tcx, T>, new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> T,
) -> InterpResult<'tcx, &'a T> ) -> Option<&'a T>
where where
'tcx: 'a, 'tcx: 'a,
{ {
let this = self.eval_context_mut(); let this = self.eval_context_mut();
// Ensure there is memory behind this pointer, so that this allocation if !this.ptr_try_get_alloc_id(ptr, 0).ok().is_some_and(|(alloc_id, offset, ..)| {
// is truly the only place where the data could be stored. let info = this.get_alloc_info(alloc_id);
this.check_ptr_access(ptr, Size::from_bytes(1), CheckInAllocMsg::InboundsTest)?; info.kind == AllocKind::LiveData && info.mutbl.is_mut() && offset < info.size
}) {
let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0)?; return None;
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?; }
// This cannot fail now.
let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0).unwrap();
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc).unwrap();
// Due to borrow checker reasons, we have to do the lookup twice. // Due to borrow checker reasons, we have to do the lookup twice.
if alloc_extra.get_sync::<T>(offset).is_none() { if alloc_extra.get_sync::<T>(offset).is_none() {
let new = new(machine)?; let new = new(machine);
alloc_extra.sync.insert(offset, Box::new(new)); alloc_extra.sync.insert(offset, Box::new(new));
} }
interp_ok(alloc_extra.get_sync::<T>(offset).unwrap()) Some(alloc_extra.get_sync::<T>(offset).unwrap())
} }
#[inline] #[inline]
@ -690,7 +703,7 @@ fn condvar_signal(&mut self, id: CondvarId) -> InterpResult<'tcx, bool> {
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error. /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
fn futex_wait( fn futex_wait(
&mut self, &mut self,
addr: u64, futex_ref: FutexRef,
bitset: u32, bitset: u32,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>, timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
retval_succ: Scalar, retval_succ: Scalar,
@ -700,23 +713,25 @@ fn futex_wait(
) { ) {
let this = self.eval_context_mut(); let this = self.eval_context_mut();
let thread = this.active_thread(); let thread = this.active_thread();
let futex = &mut this.machine.sync.futexes.entry(addr).or_default(); let mut futex = futex_ref.0.borrow_mut();
let waiters = &mut futex.waiters; let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset }); waiters.push_back(FutexWaiter { thread, bitset });
drop(futex);
this.block_thread( this.block_thread(
BlockReason::Futex { addr }, BlockReason::Futex,
timeout, timeout,
callback!( callback!(
@capture<'tcx> { @capture<'tcx> {
addr: u64, futex_ref: FutexRef,
retval_succ: Scalar, retval_succ: Scalar,
retval_timeout: Scalar, retval_timeout: Scalar,
dest: MPlaceTy<'tcx>, dest: MPlaceTy<'tcx>,
errno_timeout: IoError, errno_timeout: IoError,
} }
@unblock = |this| { @unblock = |this| {
let futex = this.machine.sync.futexes.get(&addr).unwrap(); let futex = futex_ref.0.borrow();
// Acquire the clock of the futex. // Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race { if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads); data_race.acquire_clock(&futex.clock, &this.machine.threads);
@ -728,7 +743,7 @@ fn futex_wait(
@timeout = |this| { @timeout = |this| {
// Remove the waiter from the futex. // Remove the waiter from the futex.
let thread = this.active_thread(); let thread = this.active_thread();
let futex = this.machine.sync.futexes.get_mut(&addr).unwrap(); let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread); futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value. // Set errno and write return value.
this.set_last_error(errno_timeout)?; this.set_last_error(errno_timeout)?;
@ -739,12 +754,11 @@ fn futex_wait(
); );
} }
/// Wake up the first thread in the queue that matches any of the bits in the bitset.
/// Returns whether anything was woken. /// Returns whether anything was woken.
fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> { fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut(); let this = self.eval_context_mut();
let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else { let mut futex = futex_ref.0.borrow_mut();
return interp_ok(false);
};
let data_race = &this.machine.data_race; let data_race = &this.machine.data_race;
// Each futex-wake happens-before the end of the futex wait // Each futex-wake happens-before the end of the futex wait
@ -757,7 +771,8 @@ fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
return interp_ok(false); return interp_ok(false);
}; };
let waiter = futex.waiters.remove(i).unwrap(); let waiter = futex.waiters.remove(i).unwrap();
this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?; drop(futex);
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
interp_ok(true) interp_ok(true)
} }
} }

View File

@ -147,7 +147,7 @@ pub enum BlockReason {
/// Blocked on a reader-writer lock. /// Blocked on a reader-writer lock.
RwLock(RwLockId), RwLock(RwLockId),
/// Blocked on a Futex variable. /// Blocked on a Futex variable.
Futex { addr: u64 }, Futex,
/// Blocked on an InitOnce. /// Blocked on an InitOnce.
InitOnce(InitOnceId), InitOnce(InitOnceId),
/// Blocked on epoll. /// Blocked on epoll.

View File

@ -1,6 +1,11 @@
use crate::concurrency::sync::FutexRef;
use crate::helpers::check_min_arg_count; use crate::helpers::check_min_arg_count;
use crate::*; use crate::*;
struct LinuxFutex {
futex: FutexRef,
}
/// Implementation of the SYS_futex syscall. /// Implementation of the SYS_futex syscall.
/// `args` is the arguments *including* the syscall number. /// `args` is the arguments *including* the syscall number.
pub fn futex<'tcx>( pub fn futex<'tcx>(
@ -27,7 +32,6 @@ pub fn futex<'tcx>(
// This is a vararg function so we have to bring our own type for this pointer. // This is a vararg function so we have to bring our own type for this pointer.
let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32); let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
let addr_usize = addr.ptr().addr().bytes();
let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG"); let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG");
let futex_wait = this.eval_libc_i32("FUTEX_WAIT"); let futex_wait = this.eval_libc_i32("FUTEX_WAIT");
@ -63,8 +67,7 @@ pub fn futex<'tcx>(
}; };
if bitset == 0 { if bitset == 0 {
this.set_last_error_and_return(LibcError("EINVAL"), dest)?; return this.set_last_error_and_return(LibcError("EINVAL"), dest);
return interp_ok(());
} }
let timeout = this.deref_pointer_as(timeout, this.libc_ty_layout("timespec"))?; let timeout = this.deref_pointer_as(timeout, this.libc_ty_layout("timespec"))?;
@ -99,19 +102,18 @@ pub fn futex<'tcx>(
// effects of this and the other thread are correctly observed, // effects of this and the other thread are correctly observed,
// otherwise we will deadlock. // otherwise we will deadlock.
// //
// There are two scenarios to consider: // There are two scenarios to consider, depending on whether WAIT or WAKE goes first:
// 1. If we (FUTEX_WAIT) execute first, we'll push ourselves into // 1. If we (FUTEX_WAIT) execute first, we'll push ourselves into the waiters queue and
// the waiters queue and go to sleep. They (addr write & FUTEX_WAKE) // go to sleep. They (FUTEX_WAKE) will see us in the queue and wake us up. It doesn't
// will see us in the queue and wake us up. // matter how the addr write is ordered.
// 2. If they (addr write & FUTEX_WAKE) execute first, we must observe // 2. If they (FUTEX_WAKE) execute first, that means the addr write is also before us
// addr's new value. If we see an outdated value that happens to equal // (FUTEX_WAIT). It is crucial that we observe addr's new value. If we see an
// the expected val, then we'll put ourselves to sleep with no one to wake us // outdated value that happens to equal the expected val, then we'll put ourselves to
// up, so we end up with a deadlock. This is prevented by having a SeqCst // sleep with no one to wake us up, so we end up with a deadlock. This is prevented
// fence inside FUTEX_WAKE syscall, and another SeqCst fence // by having a SeqCst fence inside FUTEX_WAKE syscall, and another SeqCst fence here
// below, the atomic read on addr after the SeqCst fence is guaranteed // in FUTEX_WAIT. The atomic read on addr after the SeqCst fence is guaranteed not to
// not to see any value older than the addr write immediately before // see any value older than the addr write immediately before calling FUTEX_WAKE.
// calling FUTEX_WAKE. We'll see futex_val != val and return without // We'll see futex_val != val and return without sleeping.
// sleeping.
// //
// Note that the fences do not create any happens-before relationship. // Note that the fences do not create any happens-before relationship.
// The read sees the write immediately before the fence not because // The read sees the write immediately before the fence not because
@ -140,11 +142,22 @@ pub fn futex<'tcx>(
this.atomic_fence(AtomicFenceOrd::SeqCst)?; this.atomic_fence(AtomicFenceOrd::SeqCst)?;
// Read an `i32` through the pointer, regardless of any wrapper types. // Read an `i32` through the pointer, regardless of any wrapper types.
// It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`. // It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`.
let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?; // We do an acquire read -- it only seems reasonable that if we observe a value here, we
// actually establish an ordering with that value.
let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Acquire)?.to_i32()?;
if val == futex_val { if val == futex_val {
// The value still matches, so we block the thread and make it wait for FUTEX_WAKE. // The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
// This cannot fail since we already did an atomic acquire read on that pointer.
// Acquire reads are only allowed on mutable memory.
let futex_ref = this
.get_sync_or_init(addr.ptr(), |_| LinuxFutex { futex: Default::default() })
.unwrap()
.futex
.clone();
this.futex_wait( this.futex_wait(
addr_usize, futex_ref,
bitset, bitset,
timeout, timeout,
Scalar::from_target_isize(0, this), // retval_succ Scalar::from_target_isize(0, this), // retval_succ
@ -165,6 +178,17 @@ pub fn futex<'tcx>(
// FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset) // FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset)
// Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up. // Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up.
op if op == futex_wake || op == futex_wake_bitset => { op if op == futex_wake || op == futex_wake_bitset => {
let Some(futex_ref) =
this.get_sync_or_init(addr.ptr(), |_| LinuxFutex { futex: Default::default() })
else {
// No AllocId, or no live allocation at that AllocId.
// Return an error code. (That seems nicer than silently doing something non-intuitive.)
// This means that if an address gets reused by a new allocation,
// we'll use an independent futex queue for this... that seems acceptable.
return this.set_last_error_and_return(LibcError("EFAULT"), dest);
};
let futex_ref = futex_ref.futex.clone();
let bitset = if op == futex_wake_bitset { let bitset = if op == futex_wake_bitset {
let [_, _, _, _, timeout, uaddr2, bitset] = let [_, _, _, _, timeout, uaddr2, bitset] =
check_min_arg_count("`syscall(SYS_futex, FUTEX_WAKE_BITSET, ...)`", args)?; check_min_arg_count("`syscall(SYS_futex, FUTEX_WAKE_BITSET, ...)`", args)?;
@ -184,7 +208,7 @@ pub fn futex<'tcx>(
let mut n = 0; let mut n = 0;
#[expect(clippy::arithmetic_side_effects)] #[expect(clippy::arithmetic_side_effects)]
for _ in 0..val { for _ in 0..val {
if this.futex_wake(addr_usize, bitset)? { if this.futex_wake(&futex_ref, bitset)? {
n += 1; n += 1;
} else { } else {
break; break;

View File

@ -3,6 +3,7 @@
use rustc_abi::Size; use rustc_abi::Size;
use crate::concurrency::init_once::InitOnceStatus; use crate::concurrency::init_once::InitOnceStatus;
use crate::concurrency::sync::FutexRef;
use crate::*; use crate::*;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -10,6 +11,10 @@ struct WindowsInitOnce {
id: InitOnceId, id: InitOnceId,
} }
struct WindowsFutex {
futex: FutexRef,
}
impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {} impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {}
trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> { trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
// Windows sync primitives are pointer sized. // Windows sync primitives are pointer sized.
@ -168,8 +173,6 @@ fn WaitOnAddress(
let size = this.read_target_usize(size_op)?; let size = this.read_target_usize(size_op)?;
let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?; let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?;
let addr = ptr.addr().bytes();
if size > 8 || !size.is_power_of_two() { if size > 8 || !size.is_power_of_two() {
let invalid_param = this.eval_windows("c", "ERROR_INVALID_PARAMETER"); let invalid_param = this.eval_windows("c", "ERROR_INVALID_PARAMETER");
this.set_last_error(invalid_param)?; this.set_last_error(invalid_param)?;
@ -190,13 +193,21 @@ fn WaitOnAddress(
let layout = this.machine.layouts.uint(size).unwrap(); let layout = this.machine.layouts.uint(size).unwrap();
let futex_val = let futex_val =
this.read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Relaxed)?; this.read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)?;
let compare_val = this.read_scalar(&this.ptr_to_mplace(compare, layout))?; let compare_val = this.read_scalar(&this.ptr_to_mplace(compare, layout))?;
if futex_val == compare_val { if futex_val == compare_val {
// If the values are the same, we have to block. // If the values are the same, we have to block.
// This cannot fail since we already did an atomic acquire read on that pointer.
let futex_ref = this
.get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
.unwrap()
.futex
.clone();
this.futex_wait( this.futex_wait(
addr, futex_ref,
u32::MAX, // bitset u32::MAX, // bitset
timeout, timeout,
Scalar::from_i32(1), // retval_succ Scalar::from_i32(1), // retval_succ
@ -219,8 +230,15 @@ fn WakeByAddressSingle(&mut self, ptr_op: &OpTy<'tcx>) -> InterpResult<'tcx> {
// See the Linux futex implementation for why this fence exists. // See the Linux futex implementation for why this fence exists.
this.atomic_fence(AtomicFenceOrd::SeqCst)?; this.atomic_fence(AtomicFenceOrd::SeqCst)?;
let addr = ptr.addr().bytes(); let Some(futex_ref) =
this.futex_wake(addr, u32::MAX)?; this.get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
else {
// Seems like this cannot return an error, so we just wake nobody.
return interp_ok(());
};
let futex_ref = futex_ref.futex.clone();
this.futex_wake(&futex_ref, u32::MAX)?;
interp_ok(()) interp_ok(())
} }
@ -232,8 +250,15 @@ fn WakeByAddressAll(&mut self, ptr_op: &OpTy<'tcx>) -> InterpResult<'tcx> {
// See the Linux futex implementation for why this fence exists. // See the Linux futex implementation for why this fence exists.
this.atomic_fence(AtomicFenceOrd::SeqCst)?; this.atomic_fence(AtomicFenceOrd::SeqCst)?;
let addr = ptr.addr().bytes(); let Some(futex_ref) =
while this.futex_wake(addr, u32::MAX)? {} this.get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
else {
// Seems like this cannot return an error, so we just wake nobody.
return interp_ok(());
};
let futex_ref = futex_ref.futex.clone();
while this.futex_wake(&futex_ref, u32::MAX)? {}
interp_ok(()) interp_ok(())
} }

View File

@ -41,9 +41,12 @@ fn wake_dangling() {
let ptr: *const i32 = &*futex; let ptr: *const i32 = &*futex;
drop(futex); drop(futex);
// Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting. // Expect error since this is now "unmapped" memory.
// parking_lot relies on this:
// <https://github.com/Amanieu/parking_lot/blob/ca920b31312839013b4455aba1d53a4aede21b2f/core/src/thread_parker/linux.rs#L138-L145>
unsafe { unsafe {
assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), 0); assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), -1);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EFAULT);
} }
} }