diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index c6bb09b0417..70381760003 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -15,7 +15,7 @@ use super::waker::SyncWaker; use crate::cell::UnsafeCell; -use crate::mem::MaybeUninit; +use crate::mem::{self, MaybeUninit}; use crate::ptr; use crate::sync::atomic::{self, AtomicUsize, Ordering}; use crate::time::Instant; @@ -25,7 +25,8 @@ struct Slot { /// The current stamp. stamp: AtomicUsize, - /// The message in this slot. + /// The message in this slot. Either read out in `read` or dropped through + /// `discard_all_messages`. msg: UnsafeCell>, } @@ -439,14 +440,13 @@ pub(crate) fn capacity(&self) -> Option { Some(self.cap) } - /// Disconnects the channel and wakes up all blocked senders and receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { - self.senders.disconnect(); self.receivers.disconnect(); true } else { @@ -454,6 +454,108 @@ pub(crate) fn disconnect(&self) -> bool { } } + /// Disconnects receivers and wakes up all blocked senders. + /// + /// Returns `true` if this call disconnected the channel. + /// + /// # Safety + /// May only be called once upon dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + pub(crate) unsafe fn disconnect_receivers(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + self.discard_all_messages(tail); + + if tail & self.mark_bit == 0 { + self.senders.disconnect(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// `tail` should be the current (and therefore last) value of `tail`. + /// + /// # Safety + /// This method must only be called when dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + unsafe fn discard_all_messages(&self, tail: usize) { + debug_assert!(self.is_disconnected()); + + /// Use a helper struct with a custom `Drop` to ensure all messages are + /// dropped, even if a destructor panicks. + struct DiscardState<'a, T> { + channel: &'a Channel, + head: usize, + tail: usize, + backoff: Backoff, + } + + impl<'a, T> DiscardState<'a, T> { + fn discard(&mut self) { + loop { + // Deconstruct the head. + let index = self.head & (self.channel.mark_bit - 1); + let lap = self.head & !(self.channel.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.channel.buffer.len()); + let slot = unsafe { self.channel.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if self.head + 1 == stamp { + self.head = if index + 1 < self.channel.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + self.head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.channel.one_lap) + }; + + // We updated the head, so even if this descrutor panics, + // we will not attempt to destroy the slot again. + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if self.tail == self.head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + self.backoff.spin_heavy(); + } + } + } + } + + impl<'a, T> Drop for DiscardState<'a, T> { + fn drop(&mut self) { + self.discard(); + } + } + + let mut state = DiscardState { + channel: self, + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + head: self.head.load(Ordering::Relaxed), + tail: tail & !self.mark_bit, + backoff: Backoff::new(), + }; + state.discard(); + // This point is only reached if no destructor panics, so all messages + // have already been dropped. + mem::forget(state); + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 @@ -483,23 +585,3 @@ pub(crate) fn is_full(&self) -> bool { head.wrapping_add(self.one_lap) == tail & !self.mark_bit } } - -impl Drop for Channel { - fn drop(&mut self) { - // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); - - // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; - - unsafe { - debug_assert!(index < self.buffer.len()); - let slot = self.buffer.get_unchecked_mut(index); - let msg = &mut *slot.msg.get(); - msg.as_mut_ptr().drop_in_place(); - } - } - } -} diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs index 7a602cecd3b..2068dda393a 100644 --- a/library/std/src/sync/mpmc/mod.rs +++ b/library/std/src/sync/mpmc/mod.rs @@ -227,7 +227,7 @@ impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { - SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } @@ -403,7 +403,7 @@ impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), }