From 746331edf3a6d055859ed0ed70dde1aa883c517d Mon Sep 17 00:00:00 2001 From: joboet Date: Fri, 17 Feb 2023 15:47:58 +0100 Subject: [PATCH 1/4] std: drop all messages in bounded channel when destroying the last receiver --- library/std/src/sync/mpmc/array.rs | 132 +++++++++++++++++++++++------ library/std/src/sync/mpmc/mod.rs | 4 +- 2 files changed, 109 insertions(+), 27 deletions(-) diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index c6bb09b0417..70381760003 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded}; use super::waker::SyncWaker; use crate::cell::UnsafeCell; -use crate::mem::MaybeUninit; +use crate::mem::{self, MaybeUninit}; use crate::ptr; use crate::sync::atomic::{self, AtomicUsize, Ordering}; use crate::time::Instant; @@ -25,7 +25,8 @@ struct Slot { /// The current stamp. stamp: AtomicUsize, - /// The message in this slot. + /// The message in this slot. Either read out in `read` or dropped through + /// `discard_all_messages`. msg: UnsafeCell>, } @@ -439,14 +440,13 @@ impl Channel { Some(self.cap) } - /// Disconnects the channel and wakes up all blocked senders and receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { - self.senders.disconnect(); self.receivers.disconnect(); true } else { @@ -454,6 +454,108 @@ impl Channel { } } + /// Disconnects receivers and wakes up all blocked senders. + /// + /// Returns `true` if this call disconnected the channel. + /// + /// # Safety + /// May only be called once upon dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + pub(crate) unsafe fn disconnect_receivers(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + self.discard_all_messages(tail); + + if tail & self.mark_bit == 0 { + self.senders.disconnect(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// `tail` should be the current (and therefore last) value of `tail`. + /// + /// # Safety + /// This method must only be called when dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + unsafe fn discard_all_messages(&self, tail: usize) { + debug_assert!(self.is_disconnected()); + + /// Use a helper struct with a custom `Drop` to ensure all messages are + /// dropped, even if a destructor panicks. + struct DiscardState<'a, T> { + channel: &'a Channel, + head: usize, + tail: usize, + backoff: Backoff, + } + + impl<'a, T> DiscardState<'a, T> { + fn discard(&mut self) { + loop { + // Deconstruct the head. + let index = self.head & (self.channel.mark_bit - 1); + let lap = self.head & !(self.channel.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.channel.buffer.len()); + let slot = unsafe { self.channel.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if self.head + 1 == stamp { + self.head = if index + 1 < self.channel.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + self.head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.channel.one_lap) + }; + + // We updated the head, so even if this descrutor panics, + // we will not attempt to destroy the slot again. + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if self.tail == self.head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + self.backoff.spin_heavy(); + } + } + } + } + + impl<'a, T> Drop for DiscardState<'a, T> { + fn drop(&mut self) { + self.discard(); + } + } + + let mut state = DiscardState { + channel: self, + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + head: self.head.load(Ordering::Relaxed), + tail: tail & !self.mark_bit, + backoff: Backoff::new(), + }; + state.discard(); + // This point is only reached if no destructor panics, so all messages + // have already been dropped. + mem::forget(state); + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 @@ -483,23 +585,3 @@ impl Channel { head.wrapping_add(self.one_lap) == tail & !self.mark_bit } } - -impl Drop for Channel { - fn drop(&mut self) { - // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); - - // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; - - unsafe { - debug_assert!(index < self.buffer.len()); - let slot = self.buffer.get_unchecked_mut(index); - let msg = &mut *slot.msg.get(); - msg.as_mut_ptr().drop_in_place(); - } - } - } -} diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs index 7a602cecd3b..2068dda393a 100644 --- a/library/std/src/sync/mpmc/mod.rs +++ b/library/std/src/sync/mpmc/mod.rs @@ -227,7 +227,7 @@ impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { - SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } @@ -403,7 +403,7 @@ impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } From 642a3247462a582ee97abea1c06c3fabac3bcb3f Mon Sep 17 00:00:00 2001 From: joboet Date: Fri, 17 Feb 2023 15:58:15 +0100 Subject: [PATCH 2/4] std: add regression test for #107466 Tests that messages are immediately dropped once the last receiver is destroyed. --- library/std/src/sync/mpsc/sync_tests.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs index 9d2f92ffc9b..632709fd98d 100644 --- a/library/std/src/sync/mpsc/sync_tests.rs +++ b/library/std/src/sync/mpsc/sync_tests.rs @@ -1,5 +1,6 @@ use super::*; use crate::env; +use crate::rc::Rc; use crate::sync::mpmc::SendTimeoutError; use crate::thread; use crate::time::Duration; @@ -656,3 +657,15 @@ fn issue_15761() { repro() } } + +#[test] +fn drop_unreceived() { + let (tx, rx) = sync_channel::>(1); + let msg = Rc::new(()); + let weak = Rc::downgrade(&msg); + assert!(tx.send(msg).is_ok()); + drop(rx); + // Messages should be dropped immediately when the last receiver is destroyed. + assert!(weak.upgrade().is_none()); + drop(tx); +} From 4e9e465bd4cbdfe3946ea6f0ff4786f2f495a020 Mon Sep 17 00:00:00 2001 From: joboet Date: Sun, 26 Feb 2023 11:57:27 +0100 Subject: [PATCH 3/4] std: disconnect senders before discarding messages --- library/std/src/sync/mpmc/array.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index 70381760003..fb893695a9a 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -464,14 +464,15 @@ impl Channel { /// ordering or stronger. pub(crate) unsafe fn disconnect_receivers(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); - self.discard_all_messages(tail); - - if tail & self.mark_bit == 0 { + let disconnected = if tail & self.mark_bit == 0 { self.senders.disconnect(); true } else { false - } + }; + + self.discard_all_messages(tail); + disconnected } /// Discards all messages. From 34aa87292c5cd45c88a72235dad6e973a9f2b62f Mon Sep 17 00:00:00 2001 From: joboet Date: Tue, 14 Mar 2023 16:42:34 +0100 Subject: [PATCH 4/4] std: leak remaining messages in bounded channel if message destructor panics --- library/std/src/sync/mpmc/array.rs | 102 +++++++++++------------------ 1 file changed, 39 insertions(+), 63 deletions(-) diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index fb893695a9a..492e21d9bdb 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded}; use super::waker::SyncWaker; use crate::cell::UnsafeCell; -use crate::mem::{self, MaybeUninit}; +use crate::mem::MaybeUninit; use crate::ptr; use crate::sync::atomic::{self, AtomicUsize, Ordering}; use crate::time::Instant; @@ -479,6 +479,10 @@ impl Channel { /// /// `tail` should be the current (and therefore last) value of `tail`. /// + /// # Panicking + /// If a destructor panics, the remaining messages are leaked, matching the + /// behaviour of the unbounded channel. + /// /// # Safety /// This method must only be called when dropping the last receiver. The /// destruction of all other receivers must have been observed with acquire @@ -486,75 +490,47 @@ impl Channel { unsafe fn discard_all_messages(&self, tail: usize) { debug_assert!(self.is_disconnected()); - /// Use a helper struct with a custom `Drop` to ensure all messages are - /// dropped, even if a destructor panicks. - struct DiscardState<'a, T> { - channel: &'a Channel, - head: usize, - tail: usize, - backoff: Backoff, - } + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + let mut head = self.head.load(Ordering::Relaxed); + let tail = tail & !self.mark_bit; - impl<'a, T> DiscardState<'a, T> { - fn discard(&mut self) { - loop { - // Deconstruct the head. - let index = self.head & (self.channel.mark_bit - 1); - let lap = self.head & !(self.channel.one_lap - 1); + let backoff = Backoff::new(); + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); - // Inspect the corresponding slot. - debug_assert!(index < self.channel.buffer.len()); - let slot = unsafe { self.channel.buffer.get_unchecked(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); - // If the stamp is ahead of the head by 1, we may drop the message. - if self.head + 1 == stamp { - self.head = if index + 1 < self.channel.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, mark: 0, index: index + 1 }`. - self.head + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. - lap.wrapping_add(self.channel.one_lap) - }; + // If the stamp is ahead of the head by 1, we may drop the message. + if head + 1 == stamp { + head = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; - // We updated the head, so even if this descrutor panics, - // we will not attempt to destroy the slot again. - unsafe { - (*slot.msg.get()).assume_init_drop(); - } - // If the tail equals the head, that means the channel is empty. - } else if self.tail == self.head { - return; - // Otherwise, a sender is about to write into the slot, so we need - // to wait for it to update the stamp. - } else { - self.backoff.spin_heavy(); - } + unsafe { + (*slot.msg.get()).assume_init_drop(); } + // If the tail equals the head, that means the channel is empty. + } else if tail == head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + backoff.spin_heavy(); } } - - impl<'a, T> Drop for DiscardState<'a, T> { - fn drop(&mut self) { - self.discard(); - } - } - - let mut state = DiscardState { - channel: self, - // Only receivers modify `head`, so since we are the last one, - // this value will not change and will not be observed (since - // no new messages can be sent after disconnection). - head: self.head.load(Ordering::Relaxed), - tail: tail & !self.mark_bit, - backoff: Backoff::new(), - }; - state.discard(); - // This point is only reached if no destructor panics, so all messages - // have already been dropped. - mem::forget(state); } /// Returns `true` if the channel is disconnected.