Auto merge of #2054 - m-ou-se:futex-wait-bitset, r=RalfJung
Add support for FUTEX_{WAIT,WAKE}_BITSET FUTEX_WAIT_BITSET and FUTEX_WAKE_BITSET are extensions of FUTEX_WAIT and FUTEX_WAKE that allow tagging each waiting thread with up to 32 'labels', and then only wake up threads that match certain labels. The non-bitset operations behave like their bitset was fully set (u32::MAX), meaning that they'll wait for anything, and wake up anything. The only other difference is that FUTEX_WAIT_BITSET uses an absolute timeout instead of an relative timeout like FUTEX_WAIT. Often, FUTEX_WAIT_BITSET is used not for its bitset functionality, but only for its absolute timeout functionality. It is then used with a bitset of u32::MAX. ~~This adds support for only that use case to Miri, as that's all `std` currently needs. Any other bitset is still unsupported.~~ Update: This adds full support for both these syscalls.
This commit is contained in:
commit
0e2def5c12
@ -1 +1 @@
|
||||
f262ca12aac76152c4b46cefcf8300f0249a5eb2
|
||||
306ba8357fb36212b7d30efb9eb9e41659ac1445
|
||||
|
@ -36,7 +36,9 @@ pub fn futex<'tcx>(
|
||||
|
||||
let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG")?;
|
||||
let futex_wait = this.eval_libc_i32("FUTEX_WAIT")?;
|
||||
let futex_wait_bitset = this.eval_libc_i32("FUTEX_WAIT_BITSET")?;
|
||||
let futex_wake = this.eval_libc_i32("FUTEX_WAKE")?;
|
||||
let futex_wake_bitset = this.eval_libc_i32("FUTEX_WAKE_BITSET")?;
|
||||
let futex_realtime = this.eval_libc_i32("FUTEX_CLOCK_REALTIME")?;
|
||||
|
||||
// FUTEX_PRIVATE enables an optimization that stops it from working across processes.
|
||||
@ -45,12 +47,37 @@ pub fn futex<'tcx>(
|
||||
// FUTEX_WAIT: (int *addr, int op = FUTEX_WAIT, int val, const timespec *timeout)
|
||||
// Blocks the thread if *addr still equals val. Wakes up when FUTEX_WAKE is called on the same address,
|
||||
// or *timeout expires. `timeout == null` for an infinite timeout.
|
||||
op if op & !futex_realtime == futex_wait => {
|
||||
if args.len() < 5 {
|
||||
throw_ub_format!(
|
||||
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT`: got {}, expected at least 5",
|
||||
args.len()
|
||||
);
|
||||
//
|
||||
// FUTEX_WAIT_BITSET: (int *addr, int op = FUTEX_WAIT_BITSET, int val, const timespec *timeout, int *_ignored, unsigned int bitset)
|
||||
// This is identical to FUTEX_WAIT, except:
|
||||
// - The timeout is absolute rather than relative.
|
||||
// - You can specify the bitset to selecting what WAKE operations to respond to.
|
||||
op if op & !futex_realtime == futex_wait || op & !futex_realtime == futex_wait_bitset => {
|
||||
let wait_bitset = op & !futex_realtime == futex_wait_bitset;
|
||||
|
||||
let bitset = if wait_bitset {
|
||||
if args.len() != 7 {
|
||||
throw_ub_format!(
|
||||
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT_BITSET`: got {}, expected 7",
|
||||
args.len()
|
||||
);
|
||||
}
|
||||
this.read_scalar(&args[6])?.to_u32()?
|
||||
} else {
|
||||
if args.len() < 5 {
|
||||
throw_ub_format!(
|
||||
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT`: got {}, expected at least 5",
|
||||
args.len()
|
||||
);
|
||||
}
|
||||
u32::MAX
|
||||
};
|
||||
|
||||
if bitset == 0 {
|
||||
let einval = this.eval_libc("EINVAL")?;
|
||||
this.set_last_error(einval)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// `deref_operand` but not actually dereferencing the ptr yet (it might be NULL!).
|
||||
@ -70,10 +97,20 @@ pub fn futex<'tcx>(
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
Some(if op & futex_realtime != 0 {
|
||||
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
|
||||
Some(if wait_bitset {
|
||||
// FUTEX_WAIT_BITSET uses an absolute timestamp.
|
||||
if op & futex_realtime != 0 {
|
||||
Time::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
|
||||
} else {
|
||||
Time::Monotonic(this.machine.time_anchor.checked_add(duration).unwrap())
|
||||
}
|
||||
} else {
|
||||
Time::Monotonic(Instant::now().checked_add(duration).unwrap())
|
||||
// FUTEX_WAIT uses a relative timestamp.
|
||||
if op & futex_realtime != 0 {
|
||||
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
|
||||
} else {
|
||||
Time::Monotonic(Instant::now().checked_add(duration).unwrap())
|
||||
}
|
||||
})
|
||||
};
|
||||
// Check the pointer for alignment and validity.
|
||||
@ -108,7 +145,7 @@ pub fn futex<'tcx>(
|
||||
if val == futex_val {
|
||||
// The value still matches, so we block the trait make it wait for FUTEX_WAKE.
|
||||
this.block_thread(thread);
|
||||
this.futex_wait(addr_scalar.to_machine_usize(this)?, thread);
|
||||
this.futex_wait(addr_scalar.to_machine_usize(this)?, thread, bitset);
|
||||
// Succesfully waking up from FUTEX_WAIT always returns zero.
|
||||
this.write_scalar(Scalar::from_machine_isize(0, this), dest)?;
|
||||
// Register a timeout callback if a timeout was specified.
|
||||
@ -140,10 +177,29 @@ pub fn futex<'tcx>(
|
||||
// Wakes at most `val` threads waiting on the futex at `addr`.
|
||||
// Returns the amount of threads woken up.
|
||||
// Does not access the futex value at *addr.
|
||||
op if op == futex_wake => {
|
||||
// FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset)
|
||||
// Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up.
|
||||
op if op == futex_wake || op == futex_wake_bitset => {
|
||||
let bitset = if op == futex_wake_bitset {
|
||||
if args.len() != 7 {
|
||||
throw_ub_format!(
|
||||
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAKE_BITSET`: got {}, expected 7",
|
||||
args.len()
|
||||
);
|
||||
}
|
||||
this.read_scalar(&args[6])?.to_u32()?
|
||||
} else {
|
||||
u32::MAX
|
||||
};
|
||||
if bitset == 0 {
|
||||
let einval = this.eval_libc("EINVAL")?;
|
||||
this.set_last_error(einval)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
|
||||
return Ok(());
|
||||
}
|
||||
let mut n = 0;
|
||||
for _ in 0..val {
|
||||
if let Some(thread) = this.futex_wake(addr_scalar.to_machine_usize(this)?) {
|
||||
if let Some(thread) = this.futex_wake(addr_scalar.to_machine_usize(this)?, bitset) {
|
||||
this.unblock_thread(thread);
|
||||
this.unregister_timeout_callback_if_exists(thread);
|
||||
n += 1;
|
||||
|
16
src/sync.rs
16
src/sync.rs
@ -144,6 +144,8 @@ struct Futex {
|
||||
struct FutexWaiter {
|
||||
/// The thread that is waiting on this futex.
|
||||
thread: ThreadId,
|
||||
/// The bitset used by FUTEX_*_BITSET, or u32::MAX for other operations.
|
||||
bitset: u32,
|
||||
}
|
||||
|
||||
/// The state of all synchronization variables.
|
||||
@ -486,15 +488,15 @@ fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) {
|
||||
this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
|
||||
}
|
||||
|
||||
fn futex_wait(&mut self, addr: u64, thread: ThreadId) {
|
||||
fn futex_wait(&mut self, addr: u64, thread: ThreadId, bitset: u32) {
|
||||
let this = self.eval_context_mut();
|
||||
let futex = &mut this.machine.threads.sync.futexes.entry(addr).or_default();
|
||||
let waiters = &mut futex.waiters;
|
||||
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
|
||||
waiters.push_back(FutexWaiter { thread });
|
||||
waiters.push_back(FutexWaiter { thread, bitset });
|
||||
}
|
||||
|
||||
fn futex_wake(&mut self, addr: u64) -> Option<ThreadId> {
|
||||
fn futex_wake(&mut self, addr: u64, bitset: u32) -> Option<ThreadId> {
|
||||
let this = self.eval_context_mut();
|
||||
let current_thread = this.get_active_thread();
|
||||
let futex = &mut this.machine.threads.sync.futexes.get_mut(&addr)?;
|
||||
@ -504,13 +506,15 @@ fn futex_wake(&mut self, addr: u64) -> Option<ThreadId> {
|
||||
if let Some(data_race) = data_race {
|
||||
data_race.validate_lock_release(&mut futex.data_race, current_thread);
|
||||
}
|
||||
let res = futex.waiters.pop_front().map(|waiter| {
|
||||
|
||||
// Wake up the first thread in the queue that matches any of the bits in the bitset.
|
||||
futex.waiters.iter().position(|w| w.bitset & bitset != 0).map(|i| {
|
||||
let waiter = futex.waiters.remove(i).unwrap();
|
||||
if let Some(data_race) = data_race {
|
||||
data_race.validate_lock_acquire(&futex.data_race, waiter.thread);
|
||||
}
|
||||
waiter.thread
|
||||
});
|
||||
res
|
||||
})
|
||||
}
|
||||
|
||||
fn futex_remove_waiter(&mut self, addr: u64, thread: ThreadId) {
|
||||
|
@ -7,6 +7,7 @@
|
||||
#![feature(rustc_private)]
|
||||
extern crate libc;
|
||||
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ptr;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
@ -93,6 +94,42 @@ fn wait_timeout() {
|
||||
assert!((200..1000).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn wait_absolute_timeout() {
|
||||
let start = Instant::now();
|
||||
|
||||
// Get the current monotonic timestamp as timespec.
|
||||
let mut timeout = unsafe {
|
||||
let mut now: MaybeUninit<libc::timespec> = MaybeUninit::uninit();
|
||||
assert_eq!(libc::clock_gettime(libc::CLOCK_MONOTONIC, now.as_mut_ptr()), 0);
|
||||
now.assume_init()
|
||||
};
|
||||
|
||||
// Add 200ms.
|
||||
timeout.tv_nsec += 200_000_000;
|
||||
if timeout.tv_nsec > 1_000_000_000 {
|
||||
timeout.tv_nsec -= 1_000_000_000;
|
||||
timeout.tv_sec += 1;
|
||||
}
|
||||
|
||||
let futex: i32 = 123;
|
||||
|
||||
// Wait for 200ms from now, with nobody waking us up early.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&futex as *const i32,
|
||||
libc::FUTEX_WAIT_BITSET,
|
||||
123,
|
||||
&timeout,
|
||||
0,
|
||||
u32::MAX,
|
||||
), -1);
|
||||
assert_eq!(*libc::__errno_location(), libc::ETIMEDOUT);
|
||||
}
|
||||
|
||||
assert!((200..1000).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn wait_wake() {
|
||||
let start = Instant::now();
|
||||
|
||||
@ -123,10 +160,59 @@ fn wait_wake() {
|
||||
assert!((200..1000).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn wait_wake_bitset() {
|
||||
let start = Instant::now();
|
||||
|
||||
static FUTEX: i32 = 0;
|
||||
|
||||
thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&FUTEX as *const i32,
|
||||
libc::FUTEX_WAKE_BITSET,
|
||||
10, // Wake up at most 10 threads.
|
||||
0,
|
||||
0,
|
||||
0b1001, // bitset
|
||||
), 0); // Didn't match any thread.
|
||||
}
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&FUTEX as *const i32,
|
||||
libc::FUTEX_WAKE_BITSET,
|
||||
10, // Wake up at most 10 threads.
|
||||
0,
|
||||
0,
|
||||
0b0110, // bitset
|
||||
), 1); // Woken up one thread.
|
||||
}
|
||||
});
|
||||
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&FUTEX as *const i32,
|
||||
libc::FUTEX_WAIT_BITSET,
|
||||
0,
|
||||
ptr::null::<libc::timespec>(),
|
||||
0,
|
||||
0b0100, // bitset
|
||||
), 0);
|
||||
}
|
||||
|
||||
assert!((400..1000).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn main() {
|
||||
wake_nobody();
|
||||
wake_dangling();
|
||||
wait_wrong_val();
|
||||
wait_timeout();
|
||||
wait_absolute_timeout();
|
||||
wait_wake();
|
||||
wait_wake_bitset();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user