Rollup merge of #38421 - apasel422:issue-36934, r=alexcrichton

Replace invalid use of `&mut` with `UnsafeCell` in `std::sync::mpsc`

Closes #36934

r? @alexcrichton
This commit is contained in:
Seo Sanghyeon 2016-12-19 16:59:38 +09:00 committed by GitHub
commit 4b5cffc04b
4 changed files with 321 additions and 321 deletions

View File

@ -348,7 +348,7 @@ impl<T> !Sync for Sender<T> { }
/// owned by one thread, but it can be cloned to send to other threads.
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<UnsafeCell<sync::Packet<T>>>,
inner: Arc<sync::Packet<T>>,
}
#[stable(feature = "rust1", since = "1.0.0")]
@ -426,10 +426,10 @@ pub enum TrySendError<T> {
}
enum Flavor<T> {
Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
Stream(Arc<UnsafeCell<stream::Packet<T>>>),
Shared(Arc<UnsafeCell<shared::Packet<T>>>),
Sync(Arc<UnsafeCell<sync::Packet<T>>>),
Oneshot(Arc<oneshot::Packet<T>>),
Stream(Arc<stream::Packet<T>>),
Shared(Arc<shared::Packet<T>>),
Sync(Arc<sync::Packet<T>>),
}
#[doc(hidden)]
@ -487,7 +487,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
let a = Arc::new(oneshot::Packet::new());
(Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
}
@ -532,7 +532,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
let a = Arc::new(sync::Packet::new(bound));
(SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
}
@ -578,38 +578,30 @@ impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
unsafe {
let p = p.get();
if !(*p).sent() {
return (*p).send(t).map_err(SendError);
} else {
let a =
Arc::new(UnsafeCell::new(stream::Packet::new()));
let rx = Receiver::new(Flavor::Stream(a.clone()));
match (*p).upgrade(rx) {
oneshot::UpSuccess => {
let ret = (*a.get()).send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
(*a.get()).send(t).ok().unwrap();
token.signal();
(a, Ok(()))
}
if !p.sent() {
return p.send(t).map_err(SendError);
} else {
let a = Arc::new(stream::Packet::new());
let rx = Receiver::new(Flavor::Stream(a.clone()));
match p.upgrade(rx) {
oneshot::UpSuccess => {
let ret = a.send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
a.send(t).ok().unwrap();
token.signal();
(a, Ok(()))
}
}
}
}
Flavor::Stream(ref p) => return unsafe {
(*p.get()).send(t).map_err(SendError)
},
Flavor::Shared(ref p) => return unsafe {
(*p.get()).send(t).map_err(SendError)
},
Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
Flavor::Sync(..) => unreachable!(),
};
@ -624,41 +616,43 @@ impl<T> Sender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
let packet = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
match (*p.get()).upgrade(rx) {
let sleeper = match p.upgrade(rx) {
oneshot::UpSuccess |
oneshot::UpDisconnected => (a, None, guard),
oneshot::UpWoke(task) => (a, Some(task), guard)
}
oneshot::UpDisconnected => None,
oneshot::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
match (*p.get()).upgrade(rx) {
let sleeper = match p.upgrade(rx) {
stream::UpSuccess |
stream::UpDisconnected => (a, None, guard),
stream::UpWoke(task) => (a, Some(task), guard),
}
stream::UpDisconnected => None,
stream::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Shared(ref p) => {
unsafe { (*p.get()).clone_chan(); }
p.clone_chan();
return Sender::new(Flavor::Shared(p.clone()));
}
Flavor::Sync(..) => unreachable!(),
};
unsafe {
(*packet.get()).inherit_blocker(sleeper, guard);
let tmp = Sender::new(Flavor::Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
}
@ -669,10 +663,10 @@ impl<T> Clone for Sender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_chan(),
Flavor::Stream(ref p) => p.drop_chan(),
Flavor::Shared(ref p) => p.drop_chan(),
Flavor::Sync(..) => unreachable!(),
}
}
@ -690,7 +684,7 @@ impl<T> fmt::Debug for Sender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> SyncSender<T> {
fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
SyncSender { inner: inner }
}
@ -710,7 +704,7 @@ impl<T> SyncSender<T> {
/// information.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
unsafe { (*self.inner.get()).send(t).map_err(SendError) }
self.inner.send(t).map_err(SendError)
}
/// Attempts to send a value on this channel without blocking.
@ -724,14 +718,14 @@ impl<T> SyncSender<T> {
/// receiver has received the data or not if this function is successful.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.inner.get()).try_send(t) }
self.inner.try_send(t)
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
unsafe { (*self.inner.get()).clone_chan(); }
self.inner.clone_chan();
SyncSender::new(self.inner.clone())
}
}
@ -739,7 +733,7 @@ impl<T> Clone for SyncSender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
unsafe { (*self.inner.get()).drop_chan(); }
self.inner.drop_chan();
}
}
@ -772,7 +766,7 @@ impl<T> Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(oneshot::Empty) => return Err(TryRecvError::Empty),
Err(oneshot::Disconnected) => {
@ -782,7 +776,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(stream::Empty) => return Err(TryRecvError::Empty),
Err(stream::Disconnected) => {
@ -792,7 +786,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(shared::Empty) => return Err(TryRecvError::Empty),
Err(shared::Disconnected) => {
@ -801,7 +795,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Sync(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(sync::Empty) => return Err(TryRecvError::Empty),
Err(sync::Disconnected) => {
@ -875,7 +869,7 @@ impl<T> Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(RecvError),
Err(oneshot::Upgraded(rx)) => rx,
@ -883,7 +877,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(RecvError),
Err(stream::Upgraded(rx)) => rx,
@ -891,15 +885,13 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(RecvError),
Err(shared::Empty) => unreachable!(),
}
}
Flavor::Sync(ref p) => return unsafe {
(*p.get()).recv(None).map_err(|_| RecvError)
}
Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
};
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
@ -952,7 +944,7 @@ impl<T> Receiver<T> {
loop {
let port_or_empty = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(Disconnected),
Err(oneshot::Upgraded(rx)) => Some(rx),
@ -960,7 +952,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(Disconnected),
Err(stream::Upgraded(rx)) => Some(rx),
@ -968,14 +960,14 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(Disconnected),
Err(shared::Empty) => None,
}
}
Flavor::Sync(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(sync::Disconnected) => return Err(Disconnected),
Err(sync::Empty) => None,
@ -1020,23 +1012,19 @@ impl<T> select::Packet for Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).can_recv() } {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).can_recv() } {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Shared(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
Flavor::Sync(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
Flavor::Shared(ref p) => return p.can_recv(),
Flavor::Sync(ref p) => return p.can_recv(),
};
unsafe {
mem::swap(self.inner_mut(),
@ -1049,25 +1037,21 @@ impl<T> select::Packet for Receiver<T> {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).start_selection(token) } {
match p.start_selection(token) {
oneshot::SelSuccess => return Installed,
oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).start_selection(token) } {
match p.start_selection(token) {
stream::SelSuccess => return Installed,
stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Shared(ref p) => {
return unsafe { (*p.get()).start_selection(token) };
}
Flavor::Sync(ref p) => {
return unsafe { (*p.get()).start_selection(token) };
}
Flavor::Shared(ref p) => return p.start_selection(token),
Flavor::Sync(ref p) => return p.start_selection(token),
};
token = t;
unsafe {
@ -1080,16 +1064,10 @@ impl<T> select::Packet for Receiver<T> {
let mut was_upgrade = false;
loop {
let result = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
Flavor::Stream(ref p) => unsafe {
(*p.get()).abort_selection(was_upgrade)
},
Flavor::Shared(ref p) => return unsafe {
(*p.get()).abort_selection(was_upgrade)
},
Flavor::Sync(ref p) => return unsafe {
(*p.get()).abort_selection()
},
Flavor::Oneshot(ref p) => p.abort_selection(),
Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
Flavor::Sync(ref p) => return p.abort_selection(),
};
let new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
@ -1142,11 +1120,11 @@ impl <T> IntoIterator for Receiver<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_port(),
Flavor::Stream(ref p) => p.drop_port(),
Flavor::Shared(ref p) => p.drop_port(),
Flavor::Sync(ref p) => p.drop_port(),
}
}
}

View File

@ -39,7 +39,8 @@ use self::MyUpgrade::*;
use sync::mpsc::Receiver;
use sync::mpsc::blocking::{self, SignalToken};
use core::mem;
use cell::UnsafeCell;
use ptr;
use sync::atomic::{AtomicUsize, Ordering};
use time::Instant;
@ -57,10 +58,10 @@ pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked thread as well)
state: AtomicUsize,
// One-shot data slot location
data: Option<T>,
data: UnsafeCell<Option<T>>,
// when used for the second time, a oneshot channel must be upgraded, and
// this contains the slot for the upgrade
upgrade: MyUpgrade<T>,
upgrade: UnsafeCell<MyUpgrade<T>>,
}
pub enum Failure<T> {
@ -90,42 +91,44 @@ enum MyUpgrade<T> {
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
data: None,
upgrade: NothingSent,
data: UnsafeCell::new(None),
upgrade: UnsafeCell::new(NothingSent),
state: AtomicUsize::new(EMPTY),
}
}
pub fn send(&mut self, t: T) -> Result<(), T> {
// Sanity check
match self.upgrade {
NothingSent => {}
_ => panic!("sending on a oneshot that's already sent on "),
}
assert!(self.data.is_none());
self.data = Some(t);
self.upgrade = SendUsed;
match self.state.swap(DATA, Ordering::SeqCst) {
// Sent the data, no one was waiting
EMPTY => Ok(()),
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.state.swap(DISCONNECTED, Ordering::SeqCst);
self.upgrade = NothingSent;
Err(self.data.take().unwrap())
pub fn send(&self, t: T) -> Result<(), T> {
unsafe {
// Sanity check
match *self.upgrade.get() {
NothingSent => {}
_ => panic!("sending on a oneshot that's already sent on "),
}
assert!((*self.data.get()).is_none());
ptr::write(self.data.get(), Some(t));
ptr::write(self.upgrade.get(), SendUsed);
// Not possible, these are one-use channels
DATA => unreachable!(),
match self.state.swap(DATA, Ordering::SeqCst) {
// Sent the data, no one was waiting
EMPTY => Ok(()),
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => unsafe {
SignalToken::cast_from_usize(ptr).signal();
Ok(())
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.state.swap(DISCONNECTED, Ordering::SeqCst);
ptr::write(self.upgrade.get(), NothingSent);
Err((&mut *self.data.get()).take().unwrap())
}
// Not possible, these are one-use channels
DATA => unreachable!(),
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => {
SignalToken::cast_from_usize(ptr).signal();
Ok(())
}
}
}
}
@ -133,13 +136,15 @@ impl<T> Packet<T> {
// Just tests whether this channel has been sent on or not, this is only
// safe to use from the sender.
pub fn sent(&self) -> bool {
match self.upgrade {
NothingSent => false,
_ => true,
unsafe {
match *self.upgrade.get() {
NothingSent => false,
_ => true,
}
}
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Attempt to not block the thread (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(Ordering::SeqCst) == EMPTY {
@ -167,73 +172,77 @@ impl<T> Packet<T> {
self.try_recv()
}
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
match self.state.load(Ordering::SeqCst) {
EMPTY => Err(Empty),
pub fn try_recv(&self) -> Result<T, Failure<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Err(Empty),
// We saw some data on the channel, but the channel can be used
// again to send us an upgrade. As a result, we need to re-insert
// into the channel that there's no data available (otherwise we'll
// just see DATA next time). This is done as a cmpxchg because if
// the state changes under our feet we'd rather just see that state
// change.
DATA => {
self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
match self.data.take() {
Some(data) => Ok(data),
None => unreachable!(),
// We saw some data on the channel, but the channel can be used
// again to send us an upgrade. As a result, we need to re-insert
// into the channel that there's no data available (otherwise we'll
// just see DATA next time). This is done as a cmpxchg because if
// the state changes under our feet we'd rather just see that state
// change.
DATA => {
self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => unreachable!(),
}
}
}
// There's no guarantee that we receive before an upgrade happens,
// and an upgrade flags the channel as disconnected, so when we see
// this we first need to check if there's data available and *then*
// we go through and process the upgrade.
DISCONNECTED => {
match self.data.take() {
Some(data) => Ok(data),
None => {
match mem::replace(&mut self.upgrade, SendUsed) {
SendUsed | NothingSent => Err(Disconnected),
GoUp(upgrade) => Err(Upgraded(upgrade))
// There's no guarantee that we receive before an upgrade happens,
// and an upgrade flags the channel as disconnected, so when we see
// this we first need to check if there's data available and *then*
// we go through and process the upgrade.
DISCONNECTED => {
match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => {
match ptr::replace(self.upgrade.get(), SendUsed) {
SendUsed | NothingSent => Err(Disconnected),
GoUp(upgrade) => Err(Upgraded(upgrade))
}
}
}
}
}
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!()
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!()
}
}
}
// Returns whether the upgrade was completed. If the upgrade wasn't
// completed, then the port couldn't get sent to the other half (it will
// never receive it).
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
let prev = match self.upgrade {
NothingSent => NothingSent,
SendUsed => SendUsed,
_ => panic!("upgrading again"),
};
self.upgrade = GoUp(up);
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
unsafe {
let prev = match *self.upgrade.get() {
NothingSent => NothingSent,
SendUsed => SendUsed,
_ => panic!("upgrading again"),
};
ptr::write(self.upgrade.get(), GoUp(up));
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// If the channel is empty or has data on it, then we're good to go.
// Senders will check the data before the upgrade (in case we
// plastered over the DATA state).
DATA | EMPTY => UpSuccess,
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// If the channel is empty or has data on it, then we're good to go.
// Senders will check the data before the upgrade (in case we
// plastered over the DATA state).
DATA | EMPTY => UpSuccess,
// If the other end is already disconnected, then we failed the
// upgrade. Be sure to trash the port we were given.
DISCONNECTED => { self.upgrade = prev; UpDisconnected }
// If the other end is already disconnected, then we failed the
// upgrade. Be sure to trash the port we were given.
DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
// If someone's waiting, we gotta wake them up
ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) })
// If someone's waiting, we gotta wake them up
ptr => UpWoke(SignalToken::cast_from_usize(ptr))
}
}
}
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
DATA | DISCONNECTED | EMPTY => {}
@ -244,7 +253,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// An empty channel has nothing to do, and a remotely disconnected
// channel also has nothing to do b/c we're about to run the drop
@ -254,7 +263,7 @@ impl<T> Packet<T> {
// There's data on the channel, so make sure we destroy it promptly.
// This is why not using an arc is a little difficult (need the box
// to stay valid while we take the data).
DATA => { self.data.take().unwrap(); }
DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
// We're the only ones that can block on this port
_ => unreachable!()
@ -267,62 +276,66 @@ impl<T> Packet<T> {
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if self.data.is_some() => Ok(true), // we have data
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { self.upgrade = up; Ok(true) }
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { ptr::write(self.upgrade.get(), up); Ok(true) }
}
}
_ => unreachable!(), // we're the "one blocker"
}
_ => unreachable!(), // we're the "one blocker"
}
}
// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
let ptr = unsafe { token.cast_to_usize() };
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
}
DISCONNECTED if self.data.is_some() => {
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
}
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade)
}
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
unsafe {
let ptr = token.cast_to_usize();
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED if (*self.data.get()).is_some() => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
}
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
self.upgrade = up;
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
ptr::write(self.upgrade.get(), up);
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
}
}
_ => unreachable!(), // we're the "one blocker"
}
_ => unreachable!(), // we're the "one blocker"
}
}
@ -330,7 +343,7 @@ impl<T> Packet<T> {
// blocked thread will no longer be visible to any other threads.
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
let state = match self.state.load(Ordering::SeqCst) {
// Each of these states means that no further activity will happen
// with regard to abortion selection
@ -356,16 +369,16 @@ impl<T> Packet<T> {
//
// We then need to check to see if there was an upgrade requested,
// and if so, the upgraded port needs to have its selection aborted.
DISCONNECTED => {
if self.data.is_some() {
DISCONNECTED => unsafe {
if (*self.data.get()).is_some() {
Ok(true)
} else {
match mem::replace(&mut self.upgrade, SendUsed) {
match ptr::replace(self.upgrade.get(), SendUsed) {
GoUp(port) => Err(port),
_ => Ok(true),
}
}
}
},
// We woke ourselves up from select.
ptr => unsafe {

View File

@ -24,6 +24,8 @@ use core::cmp;
use core::intrinsics::abort;
use core::isize;
use cell::UnsafeCell;
use ptr;
use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
use sync::mpsc::blocking::{self, SignalToken};
use sync::mpsc::mpsc_queue as mpsc;
@ -44,7 +46,7 @@ const MAX_STEALS: isize = 1 << 20;
pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: AtomicIsize, // How many items are on this channel
steals: isize, // How many times has a port received without blocking?
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for wake up
// The number of channels which are currently using this packet.
@ -72,7 +74,7 @@ impl<T> Packet<T> {
Packet {
queue: mpsc::Queue::new(),
cnt: AtomicIsize::new(0),
steals: 0,
steals: UnsafeCell::new(0),
to_wake: AtomicUsize::new(0),
channels: AtomicUsize::new(2),
port_dropped: AtomicBool::new(false),
@ -95,7 +97,7 @@ impl<T> Packet<T> {
// threads in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self,
pub fn inherit_blocker(&self,
token: Option<SignalToken>,
guard: MutexGuard<()>) {
token.map(|token| {
@ -122,7 +124,7 @@ impl<T> Packet<T> {
// To offset this bad increment, we initially set the steal count to
// -1. You'll find some special code in abort_selection() as well to
// ensure that this -1 steal count doesn't escape too far.
self.steals = -1;
unsafe { *self.steals.get() = -1; }
});
// When the shared packet is constructed, we grabbed this lock. The
@ -133,7 +135,7 @@ impl<T> Packet<T> {
drop(guard);
}
pub fn send(&mut self, t: T) -> Result<(), T> {
pub fn send(&self, t: T) -> Result<(), T> {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
@ -218,7 +220,7 @@ impl<T> Packet<T> {
Ok(())
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
// This code is essentially the exact same as that found in the stream
// case (see stream.rs)
match self.try_recv() {
@ -239,37 +241,38 @@ impl<T> Packet<T> {
}
match self.try_recv() {
data @ Ok(..) => { self.steals -= 1; data }
data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },
data => data,
}
}
// Essentially the exact same thing as the stream decrement function.
// Returns true if blocking should proceed.
fn decrement(&mut self, token: SignalToken) -> StartResult {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
self.to_wake.store(ptr, Ordering::SeqCst);
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = token.cast_to_usize();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = self.steals;
self.steals = 0;
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Installed }
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Installed }
}
}
}
self.to_wake.store(0, Ordering::SeqCst);
drop(unsafe { SignalToken::cast_from_usize(ptr) });
Abort
self.to_wake.store(0, Ordering::SeqCst);
drop(SignalToken::cast_from_usize(ptr));
Abort
}
}
pub fn try_recv(&mut self) -> Result<T, Failure> {
pub fn try_recv(&self) -> Result<T, Failure> {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
@ -303,23 +306,23 @@ impl<T> Packet<T> {
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => {
if self.steals > MAX_STEALS {
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, self.steals);
self.steals -= m;
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(self.steals >= 0);
assert!(*self.steals.get() >= 0);
}
self.steals += 1;
*self.steals.get() += 1;
Ok(data)
}
},
// See the discussion in the stream implementation for why we try
// again.
@ -341,7 +344,7 @@ impl<T> Packet<T> {
// Prepares this shared packet for a channel clone, essentially just bumping
// a refcount.
pub fn clone_chan(&mut self) {
pub fn clone_chan(&self) {
let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
// See comments on Arc::clone() on why we do this (for `mem::forget`).
@ -355,7 +358,7 @@ impl<T> Packet<T> {
// Decrement the reference count on a channel. This is called whenever a
// Chan is dropped and may end up waking up a receiver. It's the receiver's
// responsibility on the other end to figure out that we've disconnected.
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
match self.channels.fetch_sub(1, Ordering::SeqCst) {
1 => {}
n if n > 1 => return,
@ -371,9 +374,9 @@ impl<T> Packet<T> {
// See the long discussion inside of stream.rs for why the queue is drained,
// and why it is done in this fashion.
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
self.port_dropped.store(true, Ordering::SeqCst);
let mut steals = self.steals;
let mut steals = unsafe { *self.steals.get() };
while {
let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
cnt != DISCONNECTED && cnt != steals
@ -390,7 +393,7 @@ impl<T> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> SignalToken {
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
@ -406,13 +409,13 @@ impl<T> Packet<T> {
//
// This is different than the stream version because there's no need to peek
// at the queue, we can just look at the local count.
pub fn can_recv(&mut self) -> bool {
pub fn can_recv(&self) -> bool {
let cnt = self.cnt.load(Ordering::SeqCst);
cnt == DISCONNECTED || cnt - self.steals > 0
cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
}
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: isize) -> isize {
fn bump(&self, amt: isize) -> isize {
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@ -427,7 +430,7 @@ impl<T> Packet<T> {
//
// The code here is the same as in stream.rs, except that it doesn't need to
// peek at the channel to see if an upgrade is pending.
pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
pub fn start_selection(&self, token: SignalToken) -> StartResult {
match self.decrement(token) {
Installed => Installed,
Abort => {
@ -443,7 +446,7 @@ impl<T> Packet<T> {
//
// This is similar to the stream implementation (hence fewer comments), but
// uses a different value for the "steals" variable.
pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
// Before we do anything else, we bounce on this lock. The reason for
// doing this is to ensure that any upgrade-in-progress is gone and
// done with. Without this bounce, we can race with inherit_blocker
@ -477,12 +480,15 @@ impl<T> Packet<T> {
thread::yield_now();
}
}
// if the number of steals is -1, it was the pre-emptive -1 steal
// count from when we inherited a blocker. This is fine because
// we're just going to overwrite it with a real value.
assert!(self.steals == 0 || self.steals == -1);
self.steals = steals;
prev >= 0
unsafe {
// if the number of steals is -1, it was the pre-emptive -1 steal
// count from when we inherited a blocker. This is fine because
// we're just going to overwrite it with a real value.
let old = self.steals.get();
assert!(*old == 0 || *old == -1);
*old = steals;
prev >= 0
}
}
}
}

View File

@ -22,8 +22,10 @@ pub use self::UpgradeResult::*;
pub use self::SelectionResult::*;
use self::Message::*;
use cell::UnsafeCell;
use core::cmp;
use core::isize;
use ptr;
use thread;
use time::Instant;
@ -42,7 +44,7 @@ pub struct Packet<T> {
queue: spsc::Queue<Message<T>>, // internal queue for all message
cnt: AtomicIsize, // How many items are on this channel
steals: isize, // How many times has a port received without blocking?
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
@ -79,14 +81,14 @@ impl<T> Packet<T> {
queue: unsafe { spsc::Queue::new(128) },
cnt: AtomicIsize::new(0),
steals: 0,
steals: UnsafeCell::new(0),
to_wake: AtomicUsize::new(0),
port_dropped: AtomicBool::new(false),
}
}
pub fn send(&mut self, t: T) -> Result<(), T> {
pub fn send(&self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
@ -99,7 +101,7 @@ impl<T> Packet<T> {
Ok(())
}
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
@ -107,7 +109,7 @@ impl<T> Packet<T> {
self.do_send(GoUp(up))
}
fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
fn do_send(&self, t: Message<T>) -> UpgradeResult {
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
@ -141,7 +143,7 @@ impl<T> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> SignalToken {
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
@ -151,13 +153,12 @@ impl<T> Packet<T> {
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = self.steals;
self.steals = 0;
let steals = unsafe { ptr::replace(self.steals.get(), 0) };
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
@ -173,7 +174,7 @@ impl<T> Packet<T> {
Err(unsafe { SignalToken::cast_from_usize(ptr) })
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Optimistic preflight check (scheduling is expensive).
match self.try_recv() {
Err(Empty) => {}
@ -199,16 +200,16 @@ impl<T> Packet<T> {
// a steal, so offset the decrement here (we already have our
// "steal" factored into the channel count above).
data @ Ok(..) |
data @ Err(Upgraded(..)) => {
self.steals -= 1;
data @ Err(Upgraded(..)) => unsafe {
*self.steals.get() -= 1;
data
}
},
data => data,
}
}
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
pub fn try_recv(&self) -> Result<T, Failure<T>> {
match self.queue.pop() {
// If we stole some data, record to that effect (this will be
// factored into cnt later on).
@ -221,26 +222,26 @@ impl<T> Packet<T> {
// a pretty slow operation, of swapping 0 into cnt, taking steals
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => {
if self.steals > MAX_STEALS {
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, self.steals);
self.steals -= m;
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(self.steals >= 0);
assert!(*self.steals.get() >= 0);
}
self.steals += 1;
*self.steals.get() += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
}
}
},
None => {
match self.cnt.load(Ordering::SeqCst) {
@ -269,7 +270,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
@ -279,7 +280,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
// Dropping a port seems like a fairly trivial thing. In theory all we
// need to do is flag that we're disconnected and then everything else
// can take over (we don't have anyone to wake up).
@ -309,7 +310,7 @@ impl<T> Packet<T> {
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
let mut steals = self.steals;
let mut steals = unsafe { *self.steals.get() };
while {
let cnt = self.cnt.compare_and_swap(
steals, DISCONNECTED, Ordering::SeqCst);
@ -332,7 +333,7 @@ impl<T> Packet<T> {
// Tests to see whether this port can receive without blocking. If Ok is
// returned, then that's the answer. If Err is returned, then the returned
// port needs to be queried instead (an upgrade happened)
pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
// We peek at the queue to see if there's anything on it, and we use
// this return value to determine if we should pop from the queue and
// upgrade this channel immediately. If it looks like we've got an
@ -351,7 +352,7 @@ impl<T> Packet<T> {
}
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: isize) -> isize {
fn bump(&self, amt: isize) -> isize {
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@ -363,7 +364,7 @@ impl<T> Packet<T> {
// Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade.
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
match self.decrement(token) {
Ok(()) => SelSuccess,
Err(token) => {
@ -387,7 +388,7 @@ impl<T> Packet<T> {
}
// Removes a previous thread from being blocked in this port
pub fn abort_selection(&mut self,
pub fn abort_selection(&self,
was_upgrade: bool) -> Result<bool, Receiver<T>> {
// If we're aborting selection after upgrading from a oneshot, then
// we're guarantee that no one is waiting. The only way that we could
@ -403,7 +404,7 @@ impl<T> Packet<T> {
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(self.steals, 0);
assert_eq!(unsafe { *self.steals.get() }, 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
return Ok(true)
}
@ -444,8 +445,10 @@ impl<T> Packet<T> {
thread::yield_now();
}
}
assert_eq!(self.steals, 0);
self.steals = steals;
unsafe {
assert_eq!(*self.steals.get(), 0);
*self.steals.get() = steals;
}
// if we were previously positive, then there's surely data to
// receive