diff --git a/library/std/src/sys/sgx/mod.rs b/library/std/src/sys/sgx/mod.rs index 63e070207cd..9865a945bad 100644 --- a/library/std/src/sys/sgx/mod.rs +++ b/library/std/src/sys/sgx/mod.rs @@ -34,7 +34,7 @@ pub mod process; pub mod stdio; pub mod thread; pub mod thread_local_key; -pub mod thread_parker; +pub mod thread_parking; pub mod time; mod condvar; diff --git a/library/std/src/sys/sgx/thread.rs b/library/std/src/sys/sgx/thread.rs index 579f758c6cc..8c3d8c37a19 100644 --- a/library/std/src/sys/sgx/thread.rs +++ b/library/std/src/sys/sgx/thread.rs @@ -65,9 +65,10 @@ mod task_queue { /// execution. The signal is sent once all TLS destructors have finished at /// which point no new thread locals should be created. pub mod wait_notify { - use super::super::thread_parker::Parker; + use crate::mem::MaybeUninit; use crate::pin::Pin; use crate::sync::Arc; + use crate::sys_common::thread_parking::Parker; pub struct Notifier(Arc); @@ -94,7 +95,18 @@ pub mod wait_notify { } pub fn new() -> (Notifier, Waiter) { - let inner = Arc::new(Parker::new_internal()); + // Safety: + // Some other platforms (looking at you, UNIX!) require that the thread + // parker is constructed in-place. This is just a noisy way of writing: + // ```rust + // let parker = Parker::new(); + // ``` + let parker = unsafe { + let mut place = MaybeUninit::uninit(); + Parker::new(place.as_mut_ptr()); + place.assume_init() + }; + let inner = Arc::new(parker); (Notifier(inner.clone()), Waiter(inner)) } } diff --git a/library/std/src/sys/sgx/thread_parker.rs b/library/std/src/sys/sgx/thread_parker.rs deleted file mode 100644 index 1c55bcffb1e..00000000000 --- a/library/std/src/sys/sgx/thread_parker.rs +++ /dev/null @@ -1,107 +0,0 @@ -//! Thread parking based on SGX events. - -use super::abi::{thread, usercalls}; -use crate::io::ErrorKind; -use crate::pin::Pin; -use crate::ptr::{self, NonNull}; -use crate::sync::atomic::AtomicPtr; -use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use crate::time::Duration; -use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE}; - -// The TCS structure must be page-aligned (this is checked by EENTER), so these cannot -// be valid pointers -const EMPTY: *mut u8 = ptr::invalid_mut(1); -const NOTIFIED: *mut u8 = ptr::invalid_mut(2); - -pub struct Parker { - /// The park state. One of EMPTY, NOTIFIED or a TCS address. - /// A state change to NOTIFIED must be done with release ordering - /// and be observed with acquire ordering so that operations after - /// `thread::park` returns will not occur before the unpark message - /// was sent. - state: AtomicPtr, -} - -impl Parker { - /// Construct the thread parker. The UNIX parker implementation - /// requires this to happen in-place. - pub unsafe fn new(parker: *mut Parker) { - unsafe { parker.write(Parker::new_internal()) } - } - - pub(super) fn new_internal() -> Parker { - Parker { state: AtomicPtr::new(EMPTY) } - } - - // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. - pub unsafe fn park(self: Pin<&Self>) { - if self.state.load(Acquire) != NOTIFIED { - let mut prev = EMPTY; - loop { - // Guard against changing TCS addresses by always setting the state to - // the current value. - let tcs = thread::current().as_ptr(); - if self.state.compare_exchange(prev, tcs, Relaxed, Acquire).is_ok() { - let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap(); - assert!(event & EV_UNPARK == EV_UNPARK); - prev = tcs; - } else { - // The state was definitely changed by another thread at this point. - // The only time this occurs is when the state is changed to NOTIFIED. - // We observed this change with acquire ordering, so we can simply - // change the state to EMPTY with a relaxed store. - break; - } - } - } - - // At this point, the token was definately read with acquire ordering, - // so this can be a relaxed store. - self.state.store(EMPTY, Relaxed); - } - - // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. - pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64; - let tcs = thread::current().as_ptr(); - - if self.state.load(Acquire) != NOTIFIED { - if self.state.compare_exchange(EMPTY, tcs, Relaxed, Acquire).is_ok() { - match usercalls::wait(EV_UNPARK, timeout) { - Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK), - Err(e) => { - assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock)) - } - } - - // Swap to provide acquire ordering even if the timeout occurred - // before the token was set. This situation can result in spurious - // wakeups on the next call to `park_timeout`, but it is better to let - // those be handled by the user than do some perhaps unnecessary, but - // always expensive guarding. - self.state.swap(EMPTY, Acquire); - return; - } - } - - // The token was already read with `acquire` ordering, this can be a store. - self.state.store(EMPTY, Relaxed); - } - - // This implementation doesn't require `Pin`, but other implementations do. - pub fn unpark(self: Pin<&Self>) { - let state = self.state.swap(NOTIFIED, Release); - - if !matches!(state, EMPTY | NOTIFIED) { - // There is a thread waiting, wake it up. - let tcs = NonNull::new(state).unwrap(); - // This will fail if the thread has already terminated or its TCS is destroyed - // by the time the signal is sent, but that is fine. If another thread receives - // the same TCS, it will receive this notification as a spurious wakeup, but - // all users of `wait` should and (internally) do guard against those where - // necessary. - let _ = usercalls::send(EV_UNPARK, Some(tcs)); - } - } -} diff --git a/library/std/src/sys/sgx/thread_parking.rs b/library/std/src/sys/sgx/thread_parking.rs new file mode 100644 index 00000000000..a1795c358d6 --- /dev/null +++ b/library/std/src/sys/sgx/thread_parking.rs @@ -0,0 +1,23 @@ +use super::abi::usercalls; +use crate::io::ErrorKind; +use crate::time::Duration; +use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE}; + +pub type ThreadId = fortanix_sgx_abi::Tcs; + +pub use super::abi::thread::current; + +pub fn park() { + usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap(); +} + +pub fn park_timeout(dur: Duration) { + let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64; + if let Err(e) = usercalls::wait(EV_UNPARK, timeout) { + assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock)) + } +} + +pub fn unpark(tid: ThreadId) { + let _ = usercalls::send(EV_UNPARK, Some(tid)); +} diff --git a/library/std/src/sys/unix/mod.rs b/library/std/src/sys/unix/mod.rs index 233e4a26bdc..7b931375612 100644 --- a/library/std/src/sys/unix/mod.rs +++ b/library/std/src/sys/unix/mod.rs @@ -40,7 +40,7 @@ pub mod stdio; pub mod thread; pub mod thread_local_dtor; pub mod thread_local_key; -pub mod thread_parker; +pub mod thread_parking; pub mod time; #[cfg(target_os = "espidf")] diff --git a/library/std/src/sys/unix/thread_parker/netbsd.rs b/library/std/src/sys/unix/thread_parker/netbsd.rs deleted file mode 100644 index 7657605b52f..00000000000 --- a/library/std/src/sys/unix/thread_parker/netbsd.rs +++ /dev/null @@ -1,113 +0,0 @@ -use crate::ffi::{c_int, c_void}; -use crate::pin::Pin; -use crate::ptr::{null, null_mut}; -use crate::sync::atomic::{ - AtomicU64, - Ordering::{Acquire, Relaxed, Release}, -}; -use crate::time::Duration; -use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC}; - -extern "C" { - fn ___lwp_park60( - clock_id: clockid_t, - flags: c_int, - ts: *mut timespec, - unpark: lwpid_t, - hint: *const c_void, - unparkhint: *const c_void, - ) -> c_int; - fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int; -} - -/// The thread is not parked and the token is not available. -/// -/// Zero cannot be a valid LWP id, since it is used as empty value for the unpark -/// argument in _lwp_park. -const EMPTY: u64 = 0; -/// The token is available. Do not park anymore. -const NOTIFIED: u64 = u64::MAX; - -pub struct Parker { - /// The parker state. Contains either one of the two state values above or the LWP - /// id of the parked thread. - state: AtomicU64, -} - -impl Parker { - pub unsafe fn new(parker: *mut Parker) { - parker.write(Parker { state: AtomicU64::new(EMPTY) }) - } - - // Does not actually need `unsafe` or `Pin`, but the pthread implementation does. - pub unsafe fn park(self: Pin<&Self>) { - // If the token has already been made available, we can skip - // a bit of work, so check for it here. - if self.state.load(Acquire) != NOTIFIED { - let parked = _lwp_self() as u64; - let hint = self.state.as_mut_ptr().cast(); - if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() { - // Loop to guard against spurious wakeups. - loop { - ___lwp_park60(0, 0, null_mut(), 0, hint, null()); - if self.state.load(Acquire) == NOTIFIED { - break; - } - } - } - } - - // At this point, the change to NOTIFIED has always been observed with acquire - // ordering, so we can just use a relaxed store here (instead of a swap). - self.state.store(EMPTY, Relaxed); - } - - // Does not actually need `unsafe` or `Pin`, but the pthread implementation does. - pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - if self.state.load(Acquire) != NOTIFIED { - let parked = _lwp_self() as u64; - let hint = self.state.as_mut_ptr().cast(); - let mut timeout = timespec { - // Saturate so that the operation will definitely time out - // (even if it is after the heat death of the universe). - tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX), - tv_nsec: dur.subsec_nanos().into(), - }; - - if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() { - // Timeout needs to be mutable since it is modified on NetBSD 9.0 and - // above. - ___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, hint, null()); - // Use a swap to get acquire ordering even if the token was set after - // the timeout occurred. - self.state.swap(EMPTY, Acquire); - return; - } - } - - self.state.store(EMPTY, Relaxed); - } - - // Does not actually need `Pin`, but the pthread implementation does. - pub fn unpark(self: Pin<&Self>) { - let state = self.state.swap(NOTIFIED, Release); - if !matches!(state, EMPTY | NOTIFIED) { - let lwp = state as lwpid_t; - let hint = self.state.as_mut_ptr().cast(); - - // If the parking thread terminated and did not actually park, this will - // probably return an error, which is OK. In the worst case, another - // thread has received the same LWP id. It will then receive a spurious - // wakeup, but those are allowable per the API contract. The same reasoning - // applies if a timeout occurred before this call, but the state was not - // yet reset. - - // SAFETY: - // The syscall has no invariants to hold. Only unsafe because it is an - // extern function. - unsafe { - _lwp_unpark(lwp, hint); - } - } - } -} diff --git a/library/std/src/sys/unix/thread_parker/darwin.rs b/library/std/src/sys/unix/thread_parking/darwin.rs similarity index 100% rename from library/std/src/sys/unix/thread_parker/darwin.rs rename to library/std/src/sys/unix/thread_parking/darwin.rs diff --git a/library/std/src/sys/unix/thread_parker/mod.rs b/library/std/src/sys/unix/thread_parking/mod.rs similarity index 90% rename from library/std/src/sys/unix/thread_parker/mod.rs rename to library/std/src/sys/unix/thread_parking/mod.rs index 35f1e68a87e..185333c072f 100644 --- a/library/std/src/sys/unix/thread_parker/mod.rs +++ b/library/std/src/sys/unix/thread_parking/mod.rs @@ -24,7 +24,7 @@ cfg_if::cfg_if! { pub use darwin::Parker; } else if #[cfg(target_os = "netbsd")] { mod netbsd; - pub use netbsd::Parker; + pub use netbsd::{current, park, park_timeout, unpark, ThreadId}; } else { mod pthread; pub use pthread::Parker; diff --git a/library/std/src/sys/unix/thread_parking/netbsd.rs b/library/std/src/sys/unix/thread_parking/netbsd.rs new file mode 100644 index 00000000000..a441a05da0f --- /dev/null +++ b/library/std/src/sys/unix/thread_parking/netbsd.rs @@ -0,0 +1,54 @@ +#![cfg(target_os = "netbsd")] + +use crate::ffi::{c_int, c_void}; +use crate::ptr::{null, null_mut}; +use crate::time::Duration; +use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC}; + +extern "C" { + fn ___lwp_park60( + clock_id: clockid_t, + flags: c_int, + ts: *mut timespec, + unpark: lwpid_t, + hint: *const c_void, + unparkhint: *const c_void, + ) -> c_int; + fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int; +} + +pub type ThreadId = lwpid_t; + +#[inline] +pub fn current() -> ThreadId { + unsafe { _lwp_self() } +} + +#[inline] +pub fn park() { + unsafe { + ___lwp_park60(0, 0, null_mut(), 0, null(), null()); + } +} + +pub fn park_timeout(dur: Duration) { + let mut timeout = timespec { + // Saturate so that the operation will definitely time out + // (even if it is after the heat death of the universe). + tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX), + tv_nsec: dur.subsec_nanos().into(), + }; + + // Timeout needs to be mutable since it is modified on NetBSD 9.0 and + // above. + unsafe { + ___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, null(), null()); + } +} + +#[inline] +pub fn unpark(tid: ThreadId) { + unsafe { + _lwp_unpark(tid, null()); + } +} diff --git a/library/std/src/sys/unix/thread_parker/pthread.rs b/library/std/src/sys/unix/thread_parking/pthread.rs similarity index 100% rename from library/std/src/sys/unix/thread_parker/pthread.rs rename to library/std/src/sys/unix/thread_parking/pthread.rs diff --git a/library/std/src/sys/windows/mod.rs b/library/std/src/sys/windows/mod.rs index e67411e1686..77359abe429 100644 --- a/library/std/src/sys/windows/mod.rs +++ b/library/std/src/sys/windows/mod.rs @@ -33,7 +33,7 @@ pub mod stdio; pub mod thread; pub mod thread_local_dtor; pub mod thread_local_key; -pub mod thread_parker; +pub mod thread_parking; pub mod time; cfg_if::cfg_if! { if #[cfg(not(target_vendor = "uwp"))] { diff --git a/library/std/src/sys/windows/thread_parker.rs b/library/std/src/sys/windows/thread_parking.rs similarity index 100% rename from library/std/src/sys/windows/thread_parker.rs rename to library/std/src/sys/windows/thread_parking.rs diff --git a/library/std/src/sys_common/mod.rs b/library/std/src/sys_common/mod.rs index 73da1ce066c..6b24b0e9aa8 100644 --- a/library/std/src/sys_common/mod.rs +++ b/library/std/src/sys_common/mod.rs @@ -30,7 +30,7 @@ pub mod process; pub mod thread; pub mod thread_info; pub mod thread_local_dtor; -pub mod thread_parker; +pub mod thread_parking; pub mod wstr; pub mod wtf8; diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parking/futex.rs similarity index 100% rename from library/std/src/sys_common/thread_parker/futex.rs rename to library/std/src/sys_common/thread_parking/futex.rs diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parking/generic.rs similarity index 100% rename from library/std/src/sys_common/thread_parker/generic.rs rename to library/std/src/sys_common/thread_parking/generic.rs diff --git a/library/std/src/sys_common/thread_parking/id.rs b/library/std/src/sys_common/thread_parking/id.rs new file mode 100644 index 00000000000..9525340b75f --- /dev/null +++ b/library/std/src/sys_common/thread_parking/id.rs @@ -0,0 +1,104 @@ +//! Thread parking using thread ids. +//! +//! Some platforms (notably NetBSD) have thread parking primitives whose semantics +//! match those offered by `thread::park`, with the difference that the thread to +//! be unparked is referenced by a platform-specific thread id. Since the thread +//! parker is constructed before that id is known, an atomic state variable is used +//! to manage the park state and propagate the thread id. This also avoids platform +//! calls in the case where `unpark` is called before `park`. + +use crate::cell::UnsafeCell; +use crate::pin::Pin; +use crate::sync::atomic::{ + fence, AtomicI8, + Ordering::{Acquire, Relaxed, Release}, +}; +use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId}; +use crate::time::Duration; + +pub struct Parker { + state: AtomicI8, + tid: UnsafeCell>, +} + +const PARKED: i8 = -1; +const EMPTY: i8 = 0; +const NOTIFIED: i8 = 1; + +impl Parker { + /// Create a new thread parker. UNIX requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) }) + } + + /// # Safety + /// * must always be called from the same thread + /// * must be called before the state is set to PARKED + unsafe fn init_tid(&self) { + // The field is only ever written to from this thread, so we don't need + // synchronization to read it here. + if self.tid.get().read().is_none() { + // Because this point is only reached once, before the state is set + // to PARKED for the first time, the non-atomic write here can not + // conflict with reads by other threads. + self.tid.get().write(Some(current())); + // Ensure that the write can be observed by all threads reading the + // state. Synchronizes with the acquire barrier in `unpark`. + fence(Release); + } + } + + pub unsafe fn park(self: Pin<&Self>) { + self.init_tid(); + + // Changes NOTIFIED to EMPTY and EMPTY to PARKED. + let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1); + if state == PARKED { + // Loop to guard against spurious wakeups. + while state == PARKED { + park(); + state = self.state.load(Acquire); + } + + // Since the state change has already been observed with acquire + // ordering, the state can be reset with a relaxed store instead + // of a swap. + self.state.store(EMPTY, Relaxed); + } + } + + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + self.init_tid(); + + let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1); + if state == PARKED { + park_timeout(dur); + // Swap to ensure that we observe all state changes with acquire + // ordering, even if the state has been changed after the timeout + // occured. + self.state.swap(EMPTY, Acquire); + } + } + + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, Release); + if state == PARKED { + // Synchronize with the release fence in `init_tid` to observe the + // write to `tid`. + fence(Acquire); + // # Safety + // The thread id is initialized before the state is set to `PARKED` + // for the first time and is not written to from that point on + // (negating the need for an atomic read). + let tid = unsafe { self.tid.get().read().unwrap_unchecked() }; + // It is possible that the waiting thread woke up because of a timeout + // and terminated before this call is made. This call then returns an + // error or wakes up an unrelated thread. The platform API and + // environment does allow this, however. + unpark(tid); + } + } +} + +unsafe impl Send for Parker {} +unsafe impl Sync for Parker {} diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parking/mod.rs similarity index 73% rename from library/std/src/sys_common/thread_parker/mod.rs rename to library/std/src/sys_common/thread_parking/mod.rs index 08a2bdd8229..0ead6633c35 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parking/mod.rs @@ -11,13 +11,17 @@ cfg_if::cfg_if! { ))] { mod futex; pub use futex::Parker; + } else if #[cfg(any( + target_os = "netbsd", + all(target_vendor = "fortanix", target_env = "sgx"), + ))] { + mod id; + pub use id::Parker; } else if #[cfg(target_os = "solid_asp3")] { mod wait_flag; pub use wait_flag::Parker; } else if #[cfg(any(windows, target_family = "unix"))] { - pub use crate::sys::thread_parker::Parker; - } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { - pub use crate::sys::thread_parker::Parker; + pub use crate::sys::thread_parking::Parker; } else { mod generic; pub use generic::Parker; diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parking/wait_flag.rs similarity index 100% rename from library/std/src/sys_common/thread_parker/wait_flag.rs rename to library/std/src/sys_common/thread_parking/wait_flag.rs diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 34bdb8bd461..4add4b85ee6 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -173,7 +173,7 @@ use crate::sync::Arc; use crate::sys::thread as imp; use crate::sys_common::thread; use crate::sys_common::thread_info; -use crate::sys_common::thread_parker::Parker; +use crate::sys_common::thread_parking::Parker; use crate::sys_common::{AsInner, IntoInner}; use crate::time::Duration;