Rollup merge of #96206 - m-ou-se:wasm-futex-locks, r=alexcrichton
Use sys::unix::locks::futex* on wasm+atomics. This removes the wasm-specific lock implementations and instead re-uses the implementations from sys::unix. Tracking issue: https://github.com/rust-lang/rust/issues/93740 cc ``@alexcrichton``
This commit is contained in:
commit
41235ef98a
@ -1,102 +0,0 @@
|
||||
use crate::arch::wasm32;
|
||||
use crate::cmp;
|
||||
use crate::mem;
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst};
|
||||
use crate::sys::locks::Mutex;
|
||||
use crate::time::Duration;
|
||||
|
||||
pub struct Condvar {
|
||||
cnt: AtomicUsize,
|
||||
}
|
||||
|
||||
pub type MovableCondvar = Condvar;
|
||||
|
||||
// Condition variables are implemented with a simple counter internally that is
|
||||
// likely to cause spurious wakeups. Blocking on a condition variable will first
|
||||
// read the value of the internal counter, unlock the given mutex, and then
|
||||
// block if and only if the counter's value is still the same. Notifying a
|
||||
// condition variable will modify the counter (add one for now) and then wake up
|
||||
// a thread waiting on the address of the counter.
|
||||
//
|
||||
// A thread waiting on the condition variable will as a result avoid going to
|
||||
// sleep if it's notified after the lock is unlocked but before it fully goes to
|
||||
// sleep. A sleeping thread is guaranteed to be woken up at some point as it can
|
||||
// only be woken up with a call to `wake`.
|
||||
//
|
||||
// Note that it's possible for 2 or more threads to be woken up by a call to
|
||||
// `notify_one` with this implementation. That can happen where the modification
|
||||
// of `cnt` causes any threads in the middle of `wait` to avoid going to sleep,
|
||||
// and the subsequent `wake` may wake up a thread that's actually blocking. We
|
||||
// consider this a spurious wakeup, though, which all users of condition
|
||||
// variables must already be prepared to handle. As a result, this source of
|
||||
// spurious wakeups is currently though to be ok, although it may be problematic
|
||||
// later on if it causes too many spurious wakeups.
|
||||
|
||||
impl Condvar {
|
||||
pub const fn new() -> Condvar {
|
||||
Condvar { cnt: AtomicUsize::new(0) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn init(&mut self) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
pub unsafe fn notify_one(&self) {
|
||||
self.cnt.fetch_add(1, SeqCst);
|
||||
// SAFETY: ptr() is always valid
|
||||
unsafe {
|
||||
wasm32::memory_atomic_notify(self.ptr(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn notify_all(&self) {
|
||||
self.cnt.fetch_add(1, SeqCst);
|
||||
// SAFETY: ptr() is always valid
|
||||
unsafe {
|
||||
wasm32::memory_atomic_notify(self.ptr(), u32::MAX); // -1 == "wake everyone"
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn wait(&self, mutex: &Mutex) {
|
||||
// "atomically block and unlock" implemented by loading our current
|
||||
// counter's value, unlocking the mutex, and blocking if the counter
|
||||
// still has the same value.
|
||||
//
|
||||
// Notifications happen by incrementing the counter and then waking a
|
||||
// thread. Incrementing the counter after we unlock the mutex will
|
||||
// prevent us from sleeping and otherwise the call to `wake` will
|
||||
// wake us up once we're asleep.
|
||||
let ticket = self.cnt.load(SeqCst) as i32;
|
||||
mutex.unlock();
|
||||
let val = wasm32::memory_atomic_wait32(self.ptr(), ticket, -1);
|
||||
// 0 == woken, 1 == not equal to `ticket`, 2 == timeout (shouldn't happen)
|
||||
debug_assert!(val == 0 || val == 1);
|
||||
mutex.lock();
|
||||
}
|
||||
|
||||
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
|
||||
let ticket = self.cnt.load(SeqCst) as i32;
|
||||
mutex.unlock();
|
||||
let nanos = dur.as_nanos();
|
||||
let nanos = cmp::min(i64::MAX as u128, nanos);
|
||||
|
||||
// If the return value is 2 then a timeout happened, so we return
|
||||
// `false` as we weren't actually notified.
|
||||
let ret = wasm32::memory_atomic_wait32(self.ptr(), ticket, nanos as i64) != 2;
|
||||
mutex.lock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn destroy(&self) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn ptr(&self) -> *mut i32 {
|
||||
assert_eq!(mem::size_of::<usize>(), mem::size_of::<i32>());
|
||||
self.cnt.as_mut_ptr() as *mut i32
|
||||
}
|
||||
}
|
@ -3,19 +3,33 @@ use crate::convert::TryInto;
|
||||
use crate::sync::atomic::AtomicU32;
|
||||
use crate::time::Duration;
|
||||
|
||||
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) {
|
||||
/// Wait for a futex_wake operation to wake us.
|
||||
///
|
||||
/// Returns directly if the futex doesn't hold the expected value.
|
||||
///
|
||||
/// Returns false on timeout, and true in all other cases.
|
||||
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
|
||||
let timeout = timeout.and_then(|t| t.as_nanos().try_into().ok()).unwrap_or(-1);
|
||||
unsafe {
|
||||
wasm32::memory_atomic_wait32(
|
||||
futex as *const AtomicU32 as *mut i32,
|
||||
expected as i32,
|
||||
timeout,
|
||||
);
|
||||
) < 2
|
||||
}
|
||||
}
|
||||
|
||||
pub fn futex_wake(futex: &AtomicU32) {
|
||||
/// Wake up one thread that's blocked on futex_wait on this futex.
|
||||
///
|
||||
/// Returns true if this actually woke up such a thread,
|
||||
/// or false if no thread was waiting on this futex.
|
||||
pub fn futex_wake(futex: &AtomicU32) -> bool {
|
||||
unsafe { wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, 1) > 0 }
|
||||
}
|
||||
|
||||
/// Wake up all threads that are waiting on futex_wait on this futex.
|
||||
pub fn futex_wake_all(futex: &AtomicU32) {
|
||||
unsafe {
|
||||
wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, 1);
|
||||
wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, i32::MAX as u32);
|
||||
}
|
||||
}
|
||||
|
@ -1,64 +0,0 @@
|
||||
use crate::arch::wasm32;
|
||||
use crate::mem;
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst};
|
||||
|
||||
pub struct Mutex {
|
||||
locked: AtomicUsize,
|
||||
}
|
||||
|
||||
pub type MovableMutex = Mutex;
|
||||
|
||||
// Mutexes have a pretty simple implementation where they contain an `i32`
|
||||
// internally that is 0 when unlocked and 1 when the mutex is locked.
|
||||
// Acquisition has a fast path where it attempts to cmpxchg the 0 to a 1, and
|
||||
// if it fails it then waits for a notification. Releasing a lock is then done
|
||||
// by swapping in 0 and then notifying any waiters, if present.
|
||||
|
||||
impl Mutex {
|
||||
pub const fn new() -> Mutex {
|
||||
Mutex { locked: AtomicUsize::new(0) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn init(&mut self) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
pub unsafe fn lock(&self) {
|
||||
while !self.try_lock() {
|
||||
// SAFETY: the caller must uphold the safety contract for `memory_atomic_wait32`.
|
||||
let val = unsafe {
|
||||
wasm32::memory_atomic_wait32(
|
||||
self.ptr(),
|
||||
1, // we expect our mutex is locked
|
||||
-1, // wait infinitely
|
||||
)
|
||||
};
|
||||
// we should have either woke up (0) or got a not-equal due to a
|
||||
// race (1). We should never time out (2)
|
||||
debug_assert!(val == 0 || val == 1);
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn unlock(&self) {
|
||||
let prev = self.locked.swap(0, SeqCst);
|
||||
debug_assert_eq!(prev, 1);
|
||||
wasm32::memory_atomic_notify(self.ptr(), 1); // wake up one waiter, if any
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn try_lock(&self) -> bool {
|
||||
self.locked.compare_exchange(0, 1, SeqCst, SeqCst).is_ok()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn destroy(&self) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn ptr(&self) -> *mut i32 {
|
||||
assert_eq!(mem::size_of::<usize>(), mem::size_of::<i32>());
|
||||
self.locked.as_mut_ptr() as *mut i32
|
||||
}
|
||||
}
|
@ -1,145 +0,0 @@
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::sys::locks::{Condvar, Mutex};
|
||||
|
||||
pub struct RwLock {
|
||||
lock: Mutex,
|
||||
cond: Condvar,
|
||||
state: UnsafeCell<State>,
|
||||
}
|
||||
|
||||
pub type MovableRwLock = RwLock;
|
||||
|
||||
enum State {
|
||||
Unlocked,
|
||||
Reading(usize),
|
||||
Writing,
|
||||
}
|
||||
|
||||
unsafe impl Send for RwLock {}
|
||||
unsafe impl Sync for RwLock {}
|
||||
|
||||
// This rwlock implementation is a relatively simple implementation which has a
|
||||
// condition variable for readers/writers as well as a mutex protecting the
|
||||
// internal state of the lock. A current downside of the implementation is that
|
||||
// unlocking the lock will notify *all* waiters rather than just readers or just
|
||||
// writers. This can cause lots of "thundering stampede" problems. While
|
||||
// hopefully correct this implementation is very likely to want to be changed in
|
||||
// the future.
|
||||
|
||||
impl RwLock {
|
||||
pub const fn new() -> RwLock {
|
||||
RwLock { lock: Mutex::new(), cond: Condvar::new(), state: UnsafeCell::new(State::Unlocked) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn read(&self) {
|
||||
self.lock.lock();
|
||||
while !(*self.state.get()).inc_readers() {
|
||||
self.cond.wait(&self.lock);
|
||||
}
|
||||
self.lock.unlock();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn try_read(&self) -> bool {
|
||||
self.lock.lock();
|
||||
let ok = (*self.state.get()).inc_readers();
|
||||
self.lock.unlock();
|
||||
return ok;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn write(&self) {
|
||||
self.lock.lock();
|
||||
while !(*self.state.get()).inc_writers() {
|
||||
self.cond.wait(&self.lock);
|
||||
}
|
||||
self.lock.unlock();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn try_write(&self) -> bool {
|
||||
self.lock.lock();
|
||||
let ok = (*self.state.get()).inc_writers();
|
||||
self.lock.unlock();
|
||||
return ok;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn read_unlock(&self) {
|
||||
self.lock.lock();
|
||||
let notify = (*self.state.get()).dec_readers();
|
||||
self.lock.unlock();
|
||||
if notify {
|
||||
// FIXME: should only wake up one of these some of the time
|
||||
self.cond.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn write_unlock(&self) {
|
||||
self.lock.lock();
|
||||
(*self.state.get()).dec_writers();
|
||||
self.lock.unlock();
|
||||
// FIXME: should only wake up one of these some of the time
|
||||
self.cond.notify_all();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn destroy(&self) {
|
||||
self.lock.destroy();
|
||||
self.cond.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn inc_readers(&mut self) -> bool {
|
||||
match *self {
|
||||
State::Unlocked => {
|
||||
*self = State::Reading(1);
|
||||
true
|
||||
}
|
||||
State::Reading(ref mut cnt) => {
|
||||
*cnt += 1;
|
||||
true
|
||||
}
|
||||
State::Writing => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn inc_writers(&mut self) -> bool {
|
||||
match *self {
|
||||
State::Unlocked => {
|
||||
*self = State::Writing;
|
||||
true
|
||||
}
|
||||
State::Reading(_) | State::Writing => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn dec_readers(&mut self) -> bool {
|
||||
let zero = match *self {
|
||||
State::Reading(ref mut cnt) => {
|
||||
*cnt -= 1;
|
||||
*cnt == 0
|
||||
}
|
||||
State::Unlocked | State::Writing => invalid(),
|
||||
};
|
||||
if zero {
|
||||
*self = State::Unlocked;
|
||||
}
|
||||
zero
|
||||
}
|
||||
|
||||
fn dec_writers(&mut self) {
|
||||
match *self {
|
||||
State::Writing => {}
|
||||
State::Unlocked | State::Reading(_) => invalid(),
|
||||
}
|
||||
*self = State::Unlocked;
|
||||
}
|
||||
}
|
||||
|
||||
fn invalid() -> ! {
|
||||
panic!("inconsistent rwlock");
|
||||
}
|
@ -53,37 +53,3 @@ pub mod guard {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// We currently just use our own thread-local to store our
|
||||
// current thread's ID, and then we lazily initialize it to something allocated
|
||||
// from a global counter.
|
||||
pub fn my_id() -> u32 {
|
||||
use crate::sync::atomic::{AtomicU32, Ordering::SeqCst};
|
||||
|
||||
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
#[thread_local]
|
||||
static mut MY_ID: u32 = 0;
|
||||
|
||||
unsafe {
|
||||
// If our thread ID isn't set yet then we need to allocate one. Do so
|
||||
// with with a simple "atomically add to a global counter" strategy.
|
||||
// This strategy doesn't handled what happens when the counter
|
||||
// overflows, however, so just abort everything once the counter
|
||||
// overflows and eventually we could have some sort of recycling scheme
|
||||
// (or maybe this is all totally irrelevant by that point!). In any case
|
||||
// though we're using a CAS loop instead of a `fetch_add` to ensure that
|
||||
// the global counter never overflows.
|
||||
if MY_ID == 0 {
|
||||
let mut cur = NEXT_ID.load(SeqCst);
|
||||
MY_ID = loop {
|
||||
let next = cur.checked_add(1).unwrap_or_else(|| crate::process::abort());
|
||||
match NEXT_ID.compare_exchange(cur, next, SeqCst, SeqCst) {
|
||||
Ok(_) => break next,
|
||||
Err(i) => cur = i,
|
||||
}
|
||||
};
|
||||
}
|
||||
MY_ID
|
||||
}
|
||||
}
|
||||
|
@ -49,16 +49,13 @@ pub mod time;
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_feature = "atomics")] {
|
||||
#[path = "atomics/condvar.rs"]
|
||||
mod condvar;
|
||||
#[path = "atomics/mutex.rs"]
|
||||
mod mutex;
|
||||
#[path = "atomics/rwlock.rs"]
|
||||
mod rwlock;
|
||||
#[path = "../unix/locks"]
|
||||
pub mod locks {
|
||||
pub use super::condvar::*;
|
||||
pub use super::mutex::*;
|
||||
pub use super::rwlock::*;
|
||||
#![allow(unsafe_op_in_unsafe_fn)]
|
||||
mod futex;
|
||||
mod futex_rwlock;
|
||||
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
|
||||
pub use futex_rwlock::{RwLock, MovableRwLock};
|
||||
}
|
||||
#[path = "atomics/futex.rs"]
|
||||
pub mod futex;
|
||||
|
Loading…
x
Reference in New Issue
Block a user