Rollup merge of #106701 - ibraheemdev:sync-sender-spin, r=Amanieu
Fix `mpsc::SyncSender` spinning behavior Resolves https://github.com/rust-lang/rust/issues/106668.
This commit is contained in:
commit
720137b5da
@ -168,7 +168,7 @@ impl<T> Channel<T> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
tail = self.tail.load(Ordering::Relaxed);
|
tail = self.tail.load(Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -182,11 +182,11 @@ impl<T> Channel<T> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
tail = self.tail.load(Ordering::Relaxed);
|
tail = self.tail.load(Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
// Snooze because we need to wait for the stamp to get updated.
|
// Snooze because we need to wait for the stamp to get updated.
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
tail = self.tail.load(Ordering::Relaxed);
|
tail = self.tail.load(Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,7 +251,7 @@ impl<T> Channel<T> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
head = self.head.load(Ordering::Relaxed);
|
head = self.head.load(Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -273,11 +273,11 @@ impl<T> Channel<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
head = self.head.load(Ordering::Relaxed);
|
head = self.head.load(Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
// Snooze because we need to wait for the stamp to get updated.
|
// Snooze because we need to wait for the stamp to get updated.
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
head = self.head.load(Ordering::Relaxed);
|
head = self.head.load(Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -330,7 +330,7 @@ impl<T> Channel<T> {
|
|||||||
if backoff.is_completed() {
|
if backoff.is_completed() {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ impl<T> Slot<T> {
|
|||||||
fn wait_write(&self) {
|
fn wait_write(&self) {
|
||||||
let backoff = Backoff::new();
|
let backoff = Backoff::new();
|
||||||
while self.state.load(Ordering::Acquire) & WRITE == 0 {
|
while self.state.load(Ordering::Acquire) & WRITE == 0 {
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,7 +82,7 @@ impl<T> Block<T> {
|
|||||||
if !next.is_null() {
|
if !next.is_null() {
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,7 +191,7 @@ impl<T> Channel<T> {
|
|||||||
|
|
||||||
// If we reached the end of the block, wait until the next one is installed.
|
// If we reached the end of the block, wait until the next one is installed.
|
||||||
if offset == BLOCK_CAP {
|
if offset == BLOCK_CAP {
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
tail = self.tail.index.load(Ordering::Acquire);
|
tail = self.tail.index.load(Ordering::Acquire);
|
||||||
block = self.tail.block.load(Ordering::Acquire);
|
block = self.tail.block.load(Ordering::Acquire);
|
||||||
continue;
|
continue;
|
||||||
@ -247,7 +247,7 @@ impl<T> Channel<T> {
|
|||||||
return true;
|
return true;
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
tail = self.tail.index.load(Ordering::Acquire);
|
tail = self.tail.index.load(Ordering::Acquire);
|
||||||
block = self.tail.block.load(Ordering::Acquire);
|
block = self.tail.block.load(Ordering::Acquire);
|
||||||
}
|
}
|
||||||
@ -286,7 +286,7 @@ impl<T> Channel<T> {
|
|||||||
|
|
||||||
// If we reached the end of the block, wait until the next one is installed.
|
// If we reached the end of the block, wait until the next one is installed.
|
||||||
if offset == BLOCK_CAP {
|
if offset == BLOCK_CAP {
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
head = self.head.index.load(Ordering::Acquire);
|
head = self.head.index.load(Ordering::Acquire);
|
||||||
block = self.head.block.load(Ordering::Acquire);
|
block = self.head.block.load(Ordering::Acquire);
|
||||||
continue;
|
continue;
|
||||||
@ -320,7 +320,7 @@ impl<T> Channel<T> {
|
|||||||
// The block can be null here only if the first message is being sent into the channel.
|
// The block can be null here only if the first message is being sent into the channel.
|
||||||
// In that case, just wait until it gets initialized.
|
// In that case, just wait until it gets initialized.
|
||||||
if block.is_null() {
|
if block.is_null() {
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
head = self.head.index.load(Ordering::Acquire);
|
head = self.head.index.load(Ordering::Acquire);
|
||||||
block = self.head.block.load(Ordering::Acquire);
|
block = self.head.block.load(Ordering::Acquire);
|
||||||
continue;
|
continue;
|
||||||
@ -351,7 +351,7 @@ impl<T> Channel<T> {
|
|||||||
return true;
|
return true;
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
backoff.spin();
|
backoff.spin_light();
|
||||||
head = self.head.index.load(Ordering::Acquire);
|
head = self.head.index.load(Ordering::Acquire);
|
||||||
block = self.head.block.load(Ordering::Acquire);
|
block = self.head.block.load(Ordering::Acquire);
|
||||||
}
|
}
|
||||||
@ -542,7 +542,7 @@ impl<T> Channel<T> {
|
|||||||
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
|
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
|
||||||
// at boundary. We need to wait for the updates take affect otherwise there
|
// at boundary. We need to wait for the updates take affect otherwise there
|
||||||
// can be memory leaks.
|
// can be memory leaks.
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
tail = self.tail.index.load(Ordering::Acquire);
|
tail = self.tail.index.load(Ordering::Acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ mod zero;
|
|||||||
use crate::fmt;
|
use crate::fmt;
|
||||||
use crate::panic::{RefUnwindSafe, UnwindSafe};
|
use crate::panic::{RefUnwindSafe, UnwindSafe};
|
||||||
use crate::time::{Duration, Instant};
|
use crate::time::{Duration, Instant};
|
||||||
use error::*;
|
pub use error::*;
|
||||||
|
|
||||||
/// Creates a channel of unbounded capacity.
|
/// Creates a channel of unbounded capacity.
|
||||||
///
|
///
|
||||||
|
@ -91,9 +91,8 @@ impl<T> DerefMut for CachePadded<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const SPIN_LIMIT: u32 = 6;
|
const SPIN_LIMIT: u32 = 6;
|
||||||
const YIELD_LIMIT: u32 = 10;
|
|
||||||
|
|
||||||
/// Performs exponential backoff in spin loops.
|
/// Performs quadratic backoff in spin loops.
|
||||||
pub struct Backoff {
|
pub struct Backoff {
|
||||||
step: Cell<u32>,
|
step: Cell<u32>,
|
||||||
}
|
}
|
||||||
@ -104,25 +103,27 @@ impl Backoff {
|
|||||||
Backoff { step: Cell::new(0) }
|
Backoff { step: Cell::new(0) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Backs off in a lock-free loop.
|
/// Backs off using lightweight spinning.
|
||||||
///
|
///
|
||||||
/// This method should be used when we need to retry an operation because another thread made
|
/// This method should be used for:
|
||||||
/// progress.
|
/// - Retrying an operation because another thread made progress. i.e. on CAS failure.
|
||||||
|
/// - Waiting for an operation to complete by spinning optimistically for a few iterations
|
||||||
|
/// before falling back to parking the thread (see `Backoff::is_completed`).
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn spin(&self) {
|
pub fn spin_light(&self) {
|
||||||
let step = self.step.get().min(SPIN_LIMIT);
|
let step = self.step.get().min(SPIN_LIMIT);
|
||||||
for _ in 0..step.pow(2) {
|
for _ in 0..step.pow(2) {
|
||||||
crate::hint::spin_loop();
|
crate::hint::spin_loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.step.get() <= SPIN_LIMIT {
|
self.step.set(self.step.get() + 1);
|
||||||
self.step.set(self.step.get() + 1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Backs off in a blocking loop.
|
/// Backs off using heavyweight spinning.
|
||||||
|
///
|
||||||
|
/// This method should be used in blocking loops where parking the thread is not an option.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn snooze(&self) {
|
pub fn spin_heavy(&self) {
|
||||||
if self.step.get() <= SPIN_LIMIT {
|
if self.step.get() <= SPIN_LIMIT {
|
||||||
for _ in 0..self.step.get().pow(2) {
|
for _ in 0..self.step.get().pow(2) {
|
||||||
crate::hint::spin_loop()
|
crate::hint::spin_loop()
|
||||||
@ -131,14 +132,12 @@ impl Backoff {
|
|||||||
crate::thread::yield_now();
|
crate::thread::yield_now();
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.step.get() <= YIELD_LIMIT {
|
self.step.set(self.step.get() + 1);
|
||||||
self.step.set(self.step.get() + 1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if quadratic backoff has completed and blocking the thread is advised.
|
/// Returns `true` if quadratic backoff has completed and parking the thread is advised.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn is_completed(&self) -> bool {
|
pub fn is_completed(&self) -> bool {
|
||||||
self.step.get() > YIELD_LIMIT
|
self.step.get() > SPIN_LIMIT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ impl<T> Packet<T> {
|
|||||||
fn wait_ready(&self) {
|
fn wait_ready(&self) {
|
||||||
let backoff = Backoff::new();
|
let backoff = Backoff::new();
|
||||||
while !self.ready.load(Ordering::Acquire) {
|
while !self.ready.load(Ordering::Acquire) {
|
||||||
backoff.snooze();
|
backoff.spin_heavy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -738,6 +738,15 @@ impl<T> SyncSender<T> {
|
|||||||
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
|
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
|
||||||
self.inner.try_send(t)
|
self.inner.try_send(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attempts to send for a value on this receiver, returning an error if the
|
||||||
|
// corresponding channel has hung up, or if it waits more than `timeout`.
|
||||||
|
//
|
||||||
|
// This method is currently private and only used for tests.
|
||||||
|
#[allow(unused)]
|
||||||
|
fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
|
||||||
|
self.inner.send_timeout(t, timeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[stable(feature = "rust1", since = "1.0.0")]
|
#[stable(feature = "rust1", since = "1.0.0")]
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::env;
|
use crate::env;
|
||||||
|
use crate::sync::mpmc::SendTimeoutError;
|
||||||
use crate::thread;
|
use crate::thread;
|
||||||
use crate::time::Duration;
|
use crate::time::Duration;
|
||||||
|
|
||||||
@ -41,6 +42,13 @@ fn recv_timeout() {
|
|||||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
|
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn send_timeout() {
|
||||||
|
let (tx, _rx) = sync_channel::<i32>(1);
|
||||||
|
assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
|
||||||
|
assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn smoke_threads() {
|
fn smoke_threads() {
|
||||||
let (tx, rx) = sync_channel::<i32>(0);
|
let (tx, rx) = sync_channel::<i32>(0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user