Auto merge of #38466 - sanxiyn:rollup, r=sanxiyn

Rollup of 9 pull requests

- Successful merges: #38334, #38397, #38413, #38421, #38422, #38433, #38438, #38445, #38459
- Failed merges:
This commit is contained in:
bors 2016-12-19 12:17:24 +00:00
commit 10271ea24f
13 changed files with 441 additions and 356 deletions

View File

@ -30,7 +30,7 @@ extern crate bootstrap;
use std::env;
use std::ffi::OsString;
use std::path::PathBuf;
use std::process::Command;
use std::process::{Command, ExitStatus};
fn main() {
let args = env::args_os().skip(1).collect::<Vec<_>>();
@ -180,8 +180,19 @@ fn main() {
}
// Actually run the compiler!
std::process::exit(match cmd.status() {
Ok(s) => s.code().unwrap_or(1),
std::process::exit(match exec_cmd(&mut cmd) {
Ok(s) => s.code().unwrap_or(0xfe),
Err(e) => panic!("\n\nfailed to run {:?}: {}\n\n", cmd, e),
})
}
#[cfg(unix)]
fn exec_cmd(cmd: &mut Command) -> ::std::io::Result<ExitStatus> {
use std::os::unix::process::CommandExt;
Err(cmd.exec())
}
#[cfg(not(unix))]
fn exec_cmd(cmd: &mut Command) -> ::std::io::Result<ExitStatus> {
cmd.status()
}

View File

@ -255,10 +255,44 @@ pub trait BuildHasher {
fn build_hasher(&self) -> Self::Hasher;
}
/// A structure which implements `BuildHasher` for all `Hasher` types which also
/// implement `Default`.
/// The `BuildHasherDefault` structure is used in scenarios where one has a
/// type that implements [`Hasher`] and [`Default`], but needs that type to
/// implement [`BuildHasher`].
///
/// This struct is 0-sized and does not need construction.
/// This structure is zero-sized and does not need construction.
///
/// # Examples
///
/// Using `BuildHasherDefault` to specify a custom [`BuildHasher`] for
/// [`HashMap`]:
///
/// ```
/// use std::collections::HashMap;
/// use std::hash::{BuildHasherDefault, Hasher};
///
/// #[derive(Default)]
/// struct MyHasher;
///
/// impl Hasher for MyHasher {
/// fn write(&mut self, bytes: &[u8]) {
/// // Your hashing algorithm goes here!
/// unimplemented!()
/// }
///
/// fn finish(&self) -> u64 {
/// // Your hashing algorithm goes here!
/// unimplemented!()
/// }
/// }
///
/// type MyBuildHasher = BuildHasherDefault<MyHasher>;
///
/// let hash_map = HashMap::<u32, u32, MyBuildHasher>::default();
/// ```
///
/// [`BuildHasher`]: trait.BuildHasher.html
/// [`Default`]: ../default/trait.Default.html
/// [`Hasher`]: trait.Hasher.html
#[stable(since = "1.7.0", feature = "build_hasher")]
pub struct BuildHasherDefault<H>(marker::PhantomData<H>);

View File

@ -10,6 +10,9 @@
use target::{Target, TargetOptions, TargetResult};
// See https://developer.android.com/ndk/guides/abis.html#arm64-v8a
// for target ABI requirements.
pub fn target() -> TargetResult {
let mut base = super::android_base::opts();
base.max_atomic_width = Some(128);

View File

@ -1,4 +1,4 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// Copyright 2016 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
@ -10,9 +10,12 @@
use target::{Target, TargetOptions, TargetResult};
// See https://developer.android.com/ndk/guides/abis.html#v7a
// for target ABI requirements.
pub fn target() -> TargetResult {
let mut base = super::android_base::opts();
base.features = "+v7,+thumb2,+vfp3,+d16".to_string();
base.features = "+v7,+thumb2,+vfp3,+d16,-neon".to_string();
base.max_atomic_width = Some(64);
Ok(Target {

View File

@ -10,6 +10,9 @@
use target::{Target, TargetResult};
// See https://developer.android.com/ndk/guides/abis.html#x86
// for target ABI requirements.
pub fn target() -> TargetResult {
let mut base = super::android_base::opts();

View File

@ -828,6 +828,12 @@ impl Child {
/// this function at a known point where there are no more destructors left
/// to run.
///
/// ## Platform-specific behavior
///
/// **Unix**: On Unix-like platforms, it is unlikely that all 32 bits of `exit`
/// will be visible to a parent process inspecting the exit code. On most
/// Unix-like platforms, only the eight least-significant bits are considered.
///
/// # Examples
///
/// ```
@ -835,6 +841,17 @@ impl Child {
///
/// process::exit(0);
/// ```
///
/// Due to [platform-specific behavior], the exit code for this example will be
/// `0` on Linux, but `256` on Windows:
///
/// ```no_run
/// use std::process;
///
/// process::exit(0x0f00);
/// ```
///
/// [platform-specific behavior]: #platform-specific-behavior
#[stable(feature = "rust1", since = "1.0.0")]
pub fn exit(code: i32) -> ! {
::sys_common::cleanup();

View File

@ -348,7 +348,7 @@ impl<T> !Sync for Sender<T> { }
/// owned by one thread, but it can be cloned to send to other threads.
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<UnsafeCell<sync::Packet<T>>>,
inner: Arc<sync::Packet<T>>,
}
#[stable(feature = "rust1", since = "1.0.0")]
@ -426,10 +426,10 @@ pub enum TrySendError<T> {
}
enum Flavor<T> {
Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
Stream(Arc<UnsafeCell<stream::Packet<T>>>),
Shared(Arc<UnsafeCell<shared::Packet<T>>>),
Sync(Arc<UnsafeCell<sync::Packet<T>>>),
Oneshot(Arc<oneshot::Packet<T>>),
Stream(Arc<stream::Packet<T>>),
Shared(Arc<shared::Packet<T>>),
Sync(Arc<sync::Packet<T>>),
}
#[doc(hidden)]
@ -487,7 +487,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
let a = Arc::new(oneshot::Packet::new());
(Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
}
@ -532,7 +532,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
let a = Arc::new(sync::Packet::new(bound));
(SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
}
@ -578,38 +578,30 @@ impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
unsafe {
let p = p.get();
if !(*p).sent() {
return (*p).send(t).map_err(SendError);
} else {
let a =
Arc::new(UnsafeCell::new(stream::Packet::new()));
let rx = Receiver::new(Flavor::Stream(a.clone()));
match (*p).upgrade(rx) {
oneshot::UpSuccess => {
let ret = (*a.get()).send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
(*a.get()).send(t).ok().unwrap();
token.signal();
(a, Ok(()))
}
if !p.sent() {
return p.send(t).map_err(SendError);
} else {
let a = Arc::new(stream::Packet::new());
let rx = Receiver::new(Flavor::Stream(a.clone()));
match p.upgrade(rx) {
oneshot::UpSuccess => {
let ret = a.send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
a.send(t).ok().unwrap();
token.signal();
(a, Ok(()))
}
}
}
}
Flavor::Stream(ref p) => return unsafe {
(*p.get()).send(t).map_err(SendError)
},
Flavor::Shared(ref p) => return unsafe {
(*p.get()).send(t).map_err(SendError)
},
Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
Flavor::Sync(..) => unreachable!(),
};
@ -624,41 +616,43 @@ impl<T> Sender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
let packet = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
match (*p.get()).upgrade(rx) {
let sleeper = match p.upgrade(rx) {
oneshot::UpSuccess |
oneshot::UpDisconnected => (a, None, guard),
oneshot::UpWoke(task) => (a, Some(task), guard)
}
oneshot::UpDisconnected => None,
oneshot::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
match (*p.get()).upgrade(rx) {
let sleeper = match p.upgrade(rx) {
stream::UpSuccess |
stream::UpDisconnected => (a, None, guard),
stream::UpWoke(task) => (a, Some(task), guard),
}
stream::UpDisconnected => None,
stream::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Shared(ref p) => {
unsafe { (*p.get()).clone_chan(); }
p.clone_chan();
return Sender::new(Flavor::Shared(p.clone()));
}
Flavor::Sync(..) => unreachable!(),
};
unsafe {
(*packet.get()).inherit_blocker(sleeper, guard);
let tmp = Sender::new(Flavor::Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
}
@ -669,10 +663,10 @@ impl<T> Clone for Sender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_chan(),
Flavor::Stream(ref p) => p.drop_chan(),
Flavor::Shared(ref p) => p.drop_chan(),
Flavor::Sync(..) => unreachable!(),
}
}
@ -690,7 +684,7 @@ impl<T> fmt::Debug for Sender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> SyncSender<T> {
fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
SyncSender { inner: inner }
}
@ -710,7 +704,7 @@ impl<T> SyncSender<T> {
/// information.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
unsafe { (*self.inner.get()).send(t).map_err(SendError) }
self.inner.send(t).map_err(SendError)
}
/// Attempts to send a value on this channel without blocking.
@ -724,14 +718,14 @@ impl<T> SyncSender<T> {
/// receiver has received the data or not if this function is successful.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.inner.get()).try_send(t) }
self.inner.try_send(t)
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
unsafe { (*self.inner.get()).clone_chan(); }
self.inner.clone_chan();
SyncSender::new(self.inner.clone())
}
}
@ -739,7 +733,7 @@ impl<T> Clone for SyncSender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
unsafe { (*self.inner.get()).drop_chan(); }
self.inner.drop_chan();
}
}
@ -772,7 +766,7 @@ impl<T> Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(oneshot::Empty) => return Err(TryRecvError::Empty),
Err(oneshot::Disconnected) => {
@ -782,7 +776,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(stream::Empty) => return Err(TryRecvError::Empty),
Err(stream::Disconnected) => {
@ -792,7 +786,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(shared::Empty) => return Err(TryRecvError::Empty),
Err(shared::Disconnected) => {
@ -801,7 +795,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Sync(ref p) => {
match unsafe { (*p.get()).try_recv() } {
match p.try_recv() {
Ok(t) => return Ok(t),
Err(sync::Empty) => return Err(TryRecvError::Empty),
Err(sync::Disconnected) => {
@ -875,7 +869,7 @@ impl<T> Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(RecvError),
Err(oneshot::Upgraded(rx)) => rx,
@ -883,7 +877,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(RecvError),
Err(stream::Upgraded(rx)) => rx,
@ -891,15 +885,13 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).recv(None) } {
match p.recv(None) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(RecvError),
Err(shared::Empty) => unreachable!(),
}
}
Flavor::Sync(ref p) => return unsafe {
(*p.get()).recv(None).map_err(|_| RecvError)
}
Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
};
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
@ -952,7 +944,7 @@ impl<T> Receiver<T> {
loop {
let port_or_empty = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(Disconnected),
Err(oneshot::Upgraded(rx)) => Some(rx),
@ -960,7 +952,7 @@ impl<T> Receiver<T> {
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(Disconnected),
Err(stream::Upgraded(rx)) => Some(rx),
@ -968,14 +960,14 @@ impl<T> Receiver<T> {
}
}
Flavor::Shared(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(Disconnected),
Err(shared::Empty) => None,
}
}
Flavor::Sync(ref p) => {
match unsafe { (*p.get()).recv(Some(deadline)) } {
match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(sync::Disconnected) => return Err(Disconnected),
Err(sync::Empty) => None,
@ -1020,23 +1012,19 @@ impl<T> select::Packet for Receiver<T> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).can_recv() } {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).can_recv() } {
match p.can_recv() {
Ok(ret) => return ret,
Err(upgrade) => upgrade,
}
}
Flavor::Shared(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
Flavor::Sync(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
Flavor::Shared(ref p) => return p.can_recv(),
Flavor::Sync(ref p) => return p.can_recv(),
};
unsafe {
mem::swap(self.inner_mut(),
@ -1049,25 +1037,21 @@ impl<T> select::Packet for Receiver<T> {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
match unsafe { (*p.get()).start_selection(token) } {
match p.start_selection(token) {
oneshot::SelSuccess => return Installed,
oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Stream(ref p) => {
match unsafe { (*p.get()).start_selection(token) } {
match p.start_selection(token) {
stream::SelSuccess => return Installed,
stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Flavor::Shared(ref p) => {
return unsafe { (*p.get()).start_selection(token) };
}
Flavor::Sync(ref p) => {
return unsafe { (*p.get()).start_selection(token) };
}
Flavor::Shared(ref p) => return p.start_selection(token),
Flavor::Sync(ref p) => return p.start_selection(token),
};
token = t;
unsafe {
@ -1080,16 +1064,10 @@ impl<T> select::Packet for Receiver<T> {
let mut was_upgrade = false;
loop {
let result = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
Flavor::Stream(ref p) => unsafe {
(*p.get()).abort_selection(was_upgrade)
},
Flavor::Shared(ref p) => return unsafe {
(*p.get()).abort_selection(was_upgrade)
},
Flavor::Sync(ref p) => return unsafe {
(*p.get()).abort_selection()
},
Flavor::Oneshot(ref p) => p.abort_selection(),
Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
Flavor::Sync(ref p) => return p.abort_selection(),
};
let new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
@ -1142,11 +1120,11 @@ impl <T> IntoIterator for Receiver<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_port(),
Flavor::Stream(ref p) => p.drop_port(),
Flavor::Shared(ref p) => p.drop_port(),
Flavor::Sync(ref p) => p.drop_port(),
}
}
}

View File

@ -39,7 +39,8 @@ use self::MyUpgrade::*;
use sync::mpsc::Receiver;
use sync::mpsc::blocking::{self, SignalToken};
use core::mem;
use cell::UnsafeCell;
use ptr;
use sync::atomic::{AtomicUsize, Ordering};
use time::Instant;
@ -57,10 +58,10 @@ pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked thread as well)
state: AtomicUsize,
// One-shot data slot location
data: Option<T>,
data: UnsafeCell<Option<T>>,
// when used for the second time, a oneshot channel must be upgraded, and
// this contains the slot for the upgrade
upgrade: MyUpgrade<T>,
upgrade: UnsafeCell<MyUpgrade<T>>,
}
pub enum Failure<T> {
@ -90,42 +91,44 @@ enum MyUpgrade<T> {
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
data: None,
upgrade: NothingSent,
data: UnsafeCell::new(None),
upgrade: UnsafeCell::new(NothingSent),
state: AtomicUsize::new(EMPTY),
}
}
pub fn send(&mut self, t: T) -> Result<(), T> {
// Sanity check
match self.upgrade {
NothingSent => {}
_ => panic!("sending on a oneshot that's already sent on "),
}
assert!(self.data.is_none());
self.data = Some(t);
self.upgrade = SendUsed;
match self.state.swap(DATA, Ordering::SeqCst) {
// Sent the data, no one was waiting
EMPTY => Ok(()),
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.state.swap(DISCONNECTED, Ordering::SeqCst);
self.upgrade = NothingSent;
Err(self.data.take().unwrap())
pub fn send(&self, t: T) -> Result<(), T> {
unsafe {
// Sanity check
match *self.upgrade.get() {
NothingSent => {}
_ => panic!("sending on a oneshot that's already sent on "),
}
assert!((*self.data.get()).is_none());
ptr::write(self.data.get(), Some(t));
ptr::write(self.upgrade.get(), SendUsed);
// Not possible, these are one-use channels
DATA => unreachable!(),
match self.state.swap(DATA, Ordering::SeqCst) {
// Sent the data, no one was waiting
EMPTY => Ok(()),
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => unsafe {
SignalToken::cast_from_usize(ptr).signal();
Ok(())
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.state.swap(DISCONNECTED, Ordering::SeqCst);
ptr::write(self.upgrade.get(), NothingSent);
Err((&mut *self.data.get()).take().unwrap())
}
// Not possible, these are one-use channels
DATA => unreachable!(),
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => {
SignalToken::cast_from_usize(ptr).signal();
Ok(())
}
}
}
}
@ -133,13 +136,15 @@ impl<T> Packet<T> {
// Just tests whether this channel has been sent on or not, this is only
// safe to use from the sender.
pub fn sent(&self) -> bool {
match self.upgrade {
NothingSent => false,
_ => true,
unsafe {
match *self.upgrade.get() {
NothingSent => false,
_ => true,
}
}
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Attempt to not block the thread (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(Ordering::SeqCst) == EMPTY {
@ -167,73 +172,77 @@ impl<T> Packet<T> {
self.try_recv()
}
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
match self.state.load(Ordering::SeqCst) {
EMPTY => Err(Empty),
pub fn try_recv(&self) -> Result<T, Failure<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Err(Empty),
// We saw some data on the channel, but the channel can be used
// again to send us an upgrade. As a result, we need to re-insert
// into the channel that there's no data available (otherwise we'll
// just see DATA next time). This is done as a cmpxchg because if
// the state changes under our feet we'd rather just see that state
// change.
DATA => {
self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
match self.data.take() {
Some(data) => Ok(data),
None => unreachable!(),
// We saw some data on the channel, but the channel can be used
// again to send us an upgrade. As a result, we need to re-insert
// into the channel that there's no data available (otherwise we'll
// just see DATA next time). This is done as a cmpxchg because if
// the state changes under our feet we'd rather just see that state
// change.
DATA => {
self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => unreachable!(),
}
}
}
// There's no guarantee that we receive before an upgrade happens,
// and an upgrade flags the channel as disconnected, so when we see
// this we first need to check if there's data available and *then*
// we go through and process the upgrade.
DISCONNECTED => {
match self.data.take() {
Some(data) => Ok(data),
None => {
match mem::replace(&mut self.upgrade, SendUsed) {
SendUsed | NothingSent => Err(Disconnected),
GoUp(upgrade) => Err(Upgraded(upgrade))
// There's no guarantee that we receive before an upgrade happens,
// and an upgrade flags the channel as disconnected, so when we see
// this we first need to check if there's data available and *then*
// we go through and process the upgrade.
DISCONNECTED => {
match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => {
match ptr::replace(self.upgrade.get(), SendUsed) {
SendUsed | NothingSent => Err(Disconnected),
GoUp(upgrade) => Err(Upgraded(upgrade))
}
}
}
}
}
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!()
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!()
}
}
}
// Returns whether the upgrade was completed. If the upgrade wasn't
// completed, then the port couldn't get sent to the other half (it will
// never receive it).
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
let prev = match self.upgrade {
NothingSent => NothingSent,
SendUsed => SendUsed,
_ => panic!("upgrading again"),
};
self.upgrade = GoUp(up);
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
unsafe {
let prev = match *self.upgrade.get() {
NothingSent => NothingSent,
SendUsed => SendUsed,
_ => panic!("upgrading again"),
};
ptr::write(self.upgrade.get(), GoUp(up));
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// If the channel is empty or has data on it, then we're good to go.
// Senders will check the data before the upgrade (in case we
// plastered over the DATA state).
DATA | EMPTY => UpSuccess,
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// If the channel is empty or has data on it, then we're good to go.
// Senders will check the data before the upgrade (in case we
// plastered over the DATA state).
DATA | EMPTY => UpSuccess,
// If the other end is already disconnected, then we failed the
// upgrade. Be sure to trash the port we were given.
DISCONNECTED => { self.upgrade = prev; UpDisconnected }
// If the other end is already disconnected, then we failed the
// upgrade. Be sure to trash the port we were given.
DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
// If someone's waiting, we gotta wake them up
ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) })
// If someone's waiting, we gotta wake them up
ptr => UpWoke(SignalToken::cast_from_usize(ptr))
}
}
}
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
DATA | DISCONNECTED | EMPTY => {}
@ -244,7 +253,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// An empty channel has nothing to do, and a remotely disconnected
// channel also has nothing to do b/c we're about to run the drop
@ -254,7 +263,7 @@ impl<T> Packet<T> {
// There's data on the channel, so make sure we destroy it promptly.
// This is why not using an arc is a little difficult (need the box
// to stay valid while we take the data).
DATA => { self.data.take().unwrap(); }
DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
// We're the only ones that can block on this port
_ => unreachable!()
@ -267,62 +276,66 @@ impl<T> Packet<T> {
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if self.data.is_some() => Ok(true), // we have data
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Ok(false), // Welp, we tried
DATA => Ok(true), // we have some un-acquired data
DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => Err(upgrade),
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { self.upgrade = up; Ok(true) }
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => { ptr::write(self.upgrade.get(), up); Ok(true) }
}
}
_ => unreachable!(), // we're the "one blocker"
}
_ => unreachable!(), // we're the "one blocker"
}
}
// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
let ptr = unsafe { token.cast_to_usize() };
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
}
DISCONNECTED if self.data.is_some() => {
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
}
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade)
}
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
unsafe {
let ptr = token.cast_to_usize();
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
EMPTY => SelSuccess,
DATA => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED if (*self.data.get()).is_some() => {
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
DISCONNECTED => {
match ptr::replace(self.upgrade.get(), SendUsed) {
// The other end sent us an upgrade, so we need to
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
}
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
self.upgrade = up;
drop(unsafe { SignalToken::cast_from_usize(ptr) });
SelCanceled
// If the other end disconnected without sending an
// upgrade, then we have data to receive (the channel is
// disconnected).
up => {
ptr::write(self.upgrade.get(), up);
drop(SignalToken::cast_from_usize(ptr));
SelCanceled
}
}
}
_ => unreachable!(), // we're the "one blocker"
}
_ => unreachable!(), // we're the "one blocker"
}
}
@ -330,7 +343,7 @@ impl<T> Packet<T> {
// blocked thread will no longer be visible to any other threads.
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
let state = match self.state.load(Ordering::SeqCst) {
// Each of these states means that no further activity will happen
// with regard to abortion selection
@ -356,16 +369,16 @@ impl<T> Packet<T> {
//
// We then need to check to see if there was an upgrade requested,
// and if so, the upgraded port needs to have its selection aborted.
DISCONNECTED => {
if self.data.is_some() {
DISCONNECTED => unsafe {
if (*self.data.get()).is_some() {
Ok(true)
} else {
match mem::replace(&mut self.upgrade, SendUsed) {
match ptr::replace(self.upgrade.get(), SendUsed) {
GoUp(port) => Err(port),
_ => Ok(true),
}
}
}
},
// We woke ourselves up from select.
ptr => unsafe {

View File

@ -24,6 +24,8 @@ use core::cmp;
use core::intrinsics::abort;
use core::isize;
use cell::UnsafeCell;
use ptr;
use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
use sync::mpsc::blocking::{self, SignalToken};
use sync::mpsc::mpsc_queue as mpsc;
@ -44,7 +46,7 @@ const MAX_STEALS: isize = 1 << 20;
pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: AtomicIsize, // How many items are on this channel
steals: isize, // How many times has a port received without blocking?
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for wake up
// The number of channels which are currently using this packet.
@ -72,7 +74,7 @@ impl<T> Packet<T> {
Packet {
queue: mpsc::Queue::new(),
cnt: AtomicIsize::new(0),
steals: 0,
steals: UnsafeCell::new(0),
to_wake: AtomicUsize::new(0),
channels: AtomicUsize::new(2),
port_dropped: AtomicBool::new(false),
@ -95,7 +97,7 @@ impl<T> Packet<T> {
// threads in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self,
pub fn inherit_blocker(&self,
token: Option<SignalToken>,
guard: MutexGuard<()>) {
token.map(|token| {
@ -122,7 +124,7 @@ impl<T> Packet<T> {
// To offset this bad increment, we initially set the steal count to
// -1. You'll find some special code in abort_selection() as well to
// ensure that this -1 steal count doesn't escape too far.
self.steals = -1;
unsafe { *self.steals.get() = -1; }
});
// When the shared packet is constructed, we grabbed this lock. The
@ -133,7 +135,7 @@ impl<T> Packet<T> {
drop(guard);
}
pub fn send(&mut self, t: T) -> Result<(), T> {
pub fn send(&self, t: T) -> Result<(), T> {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
@ -218,7 +220,7 @@ impl<T> Packet<T> {
Ok(())
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
// This code is essentially the exact same as that found in the stream
// case (see stream.rs)
match self.try_recv() {
@ -239,37 +241,38 @@ impl<T> Packet<T> {
}
match self.try_recv() {
data @ Ok(..) => { self.steals -= 1; data }
data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },
data => data,
}
}
// Essentially the exact same thing as the stream decrement function.
// Returns true if blocking should proceed.
fn decrement(&mut self, token: SignalToken) -> StartResult {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
self.to_wake.store(ptr, Ordering::SeqCst);
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = token.cast_to_usize();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = self.steals;
self.steals = 0;
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Installed }
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Installed }
}
}
}
self.to_wake.store(0, Ordering::SeqCst);
drop(unsafe { SignalToken::cast_from_usize(ptr) });
Abort
self.to_wake.store(0, Ordering::SeqCst);
drop(SignalToken::cast_from_usize(ptr));
Abort
}
}
pub fn try_recv(&mut self) -> Result<T, Failure> {
pub fn try_recv(&self) -> Result<T, Failure> {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
@ -303,23 +306,23 @@ impl<T> Packet<T> {
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => {
if self.steals > MAX_STEALS {
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, self.steals);
self.steals -= m;
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(self.steals >= 0);
assert!(*self.steals.get() >= 0);
}
self.steals += 1;
*self.steals.get() += 1;
Ok(data)
}
},
// See the discussion in the stream implementation for why we try
// again.
@ -341,7 +344,7 @@ impl<T> Packet<T> {
// Prepares this shared packet for a channel clone, essentially just bumping
// a refcount.
pub fn clone_chan(&mut self) {
pub fn clone_chan(&self) {
let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
// See comments on Arc::clone() on why we do this (for `mem::forget`).
@ -355,7 +358,7 @@ impl<T> Packet<T> {
// Decrement the reference count on a channel. This is called whenever a
// Chan is dropped and may end up waking up a receiver. It's the receiver's
// responsibility on the other end to figure out that we've disconnected.
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
match self.channels.fetch_sub(1, Ordering::SeqCst) {
1 => {}
n if n > 1 => return,
@ -371,9 +374,9 @@ impl<T> Packet<T> {
// See the long discussion inside of stream.rs for why the queue is drained,
// and why it is done in this fashion.
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
self.port_dropped.store(true, Ordering::SeqCst);
let mut steals = self.steals;
let mut steals = unsafe { *self.steals.get() };
while {
let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
cnt != DISCONNECTED && cnt != steals
@ -390,7 +393,7 @@ impl<T> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> SignalToken {
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
@ -406,13 +409,13 @@ impl<T> Packet<T> {
//
// This is different than the stream version because there's no need to peek
// at the queue, we can just look at the local count.
pub fn can_recv(&mut self) -> bool {
pub fn can_recv(&self) -> bool {
let cnt = self.cnt.load(Ordering::SeqCst);
cnt == DISCONNECTED || cnt - self.steals > 0
cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
}
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: isize) -> isize {
fn bump(&self, amt: isize) -> isize {
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@ -427,7 +430,7 @@ impl<T> Packet<T> {
//
// The code here is the same as in stream.rs, except that it doesn't need to
// peek at the channel to see if an upgrade is pending.
pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
pub fn start_selection(&self, token: SignalToken) -> StartResult {
match self.decrement(token) {
Installed => Installed,
Abort => {
@ -443,7 +446,7 @@ impl<T> Packet<T> {
//
// This is similar to the stream implementation (hence fewer comments), but
// uses a different value for the "steals" variable.
pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
// Before we do anything else, we bounce on this lock. The reason for
// doing this is to ensure that any upgrade-in-progress is gone and
// done with. Without this bounce, we can race with inherit_blocker
@ -477,12 +480,15 @@ impl<T> Packet<T> {
thread::yield_now();
}
}
// if the number of steals is -1, it was the pre-emptive -1 steal
// count from when we inherited a blocker. This is fine because
// we're just going to overwrite it with a real value.
assert!(self.steals == 0 || self.steals == -1);
self.steals = steals;
prev >= 0
unsafe {
// if the number of steals is -1, it was the pre-emptive -1 steal
// count from when we inherited a blocker. This is fine because
// we're just going to overwrite it with a real value.
let old = self.steals.get();
assert!(*old == 0 || *old == -1);
*old = steals;
prev >= 0
}
}
}
}

View File

@ -22,8 +22,10 @@ pub use self::UpgradeResult::*;
pub use self::SelectionResult::*;
use self::Message::*;
use cell::UnsafeCell;
use core::cmp;
use core::isize;
use ptr;
use thread;
use time::Instant;
@ -42,7 +44,7 @@ pub struct Packet<T> {
queue: spsc::Queue<Message<T>>, // internal queue for all message
cnt: AtomicIsize, // How many items are on this channel
steals: isize, // How many times has a port received without blocking?
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
@ -79,14 +81,14 @@ impl<T> Packet<T> {
queue: unsafe { spsc::Queue::new(128) },
cnt: AtomicIsize::new(0),
steals: 0,
steals: UnsafeCell::new(0),
to_wake: AtomicUsize::new(0),
port_dropped: AtomicBool::new(false),
}
}
pub fn send(&mut self, t: T) -> Result<(), T> {
pub fn send(&self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
@ -99,7 +101,7 @@ impl<T> Packet<T> {
Ok(())
}
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
@ -107,7 +109,7 @@ impl<T> Packet<T> {
self.do_send(GoUp(up))
}
fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
fn do_send(&self, t: Message<T>) -> UpgradeResult {
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
@ -141,7 +143,7 @@ impl<T> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> SignalToken {
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
@ -151,13 +153,12 @@ impl<T> Packet<T> {
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = self.steals;
self.steals = 0;
let steals = unsafe { ptr::replace(self.steals.get(), 0) };
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
@ -173,7 +174,7 @@ impl<T> Packet<T> {
Err(unsafe { SignalToken::cast_from_usize(ptr) })
}
pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Optimistic preflight check (scheduling is expensive).
match self.try_recv() {
Err(Empty) => {}
@ -199,16 +200,16 @@ impl<T> Packet<T> {
// a steal, so offset the decrement here (we already have our
// "steal" factored into the channel count above).
data @ Ok(..) |
data @ Err(Upgraded(..)) => {
self.steals -= 1;
data @ Err(Upgraded(..)) => unsafe {
*self.steals.get() -= 1;
data
}
},
data => data,
}
}
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
pub fn try_recv(&self) -> Result<T, Failure<T>> {
match self.queue.pop() {
// If we stole some data, record to that effect (this will be
// factored into cnt later on).
@ -221,26 +222,26 @@ impl<T> Packet<T> {
// a pretty slow operation, of swapping 0 into cnt, taking steals
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => {
if self.steals > MAX_STEALS {
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, self.steals);
self.steals -= m;
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(self.steals >= 0);
assert!(*self.steals.get() >= 0);
}
self.steals += 1;
*self.steals.get() += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
}
}
},
None => {
match self.cnt.load(Ordering::SeqCst) {
@ -269,7 +270,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_chan(&mut self) {
pub fn drop_chan(&self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
@ -279,7 +280,7 @@ impl<T> Packet<T> {
}
}
pub fn drop_port(&mut self) {
pub fn drop_port(&self) {
// Dropping a port seems like a fairly trivial thing. In theory all we
// need to do is flag that we're disconnected and then everything else
// can take over (we don't have anyone to wake up).
@ -309,7 +310,7 @@ impl<T> Packet<T> {
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
let mut steals = self.steals;
let mut steals = unsafe { *self.steals.get() };
while {
let cnt = self.cnt.compare_and_swap(
steals, DISCONNECTED, Ordering::SeqCst);
@ -332,7 +333,7 @@ impl<T> Packet<T> {
// Tests to see whether this port can receive without blocking. If Ok is
// returned, then that's the answer. If Err is returned, then the returned
// port needs to be queried instead (an upgrade happened)
pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
// We peek at the queue to see if there's anything on it, and we use
// this return value to determine if we should pop from the queue and
// upgrade this channel immediately. If it looks like we've got an
@ -351,7 +352,7 @@ impl<T> Packet<T> {
}
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: isize) -> isize {
fn bump(&self, amt: isize) -> isize {
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@ -363,7 +364,7 @@ impl<T> Packet<T> {
// Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade.
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
match self.decrement(token) {
Ok(()) => SelSuccess,
Err(token) => {
@ -387,7 +388,7 @@ impl<T> Packet<T> {
}
// Removes a previous thread from being blocked in this port
pub fn abort_selection(&mut self,
pub fn abort_selection(&self,
was_upgrade: bool) -> Result<bool, Receiver<T>> {
// If we're aborting selection after upgrading from a oneshot, then
// we're guarantee that no one is waiting. The only way that we could
@ -403,7 +404,7 @@ impl<T> Packet<T> {
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(self.steals, 0);
assert_eq!(unsafe { *self.steals.get() }, 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
return Ok(true)
}
@ -444,8 +445,10 @@ impl<T> Packet<T> {
thread::yield_now();
}
}
assert_eq!(self.steals, 0);
self.steals = steals;
unsafe {
assert_eq!(*self.steals.get(), 0);
*self.steals.get() = steals;
}
// if we were previously positive, then there's surely data to
// receive

View File

@ -17,13 +17,11 @@
//! provide some built-in support for low-level synchronization.
//!
//! Communication between threads can be done through
//! [channels](../../std/sync/mpsc/index.html), Rust's message-passing
//! types, along with [other forms of thread
//! [channels], Rust's message-passing types, along with [other forms of thread
//! synchronization](../../std/sync/index.html) and shared-memory data
//! structures. In particular, types that are guaranteed to be
//! threadsafe are easily shared between threads using the
//! atomically-reference-counted container,
//! [`Arc`](../../std/sync/struct.Arc.html).
//! atomically-reference-counted container, [`Arc`].
//!
//! Fatal logic errors in Rust cause *thread panic*, during which
//! a thread will unwind the stack, running destructors and freeing
@ -40,7 +38,7 @@
//!
//! ## Spawning a thread
//!
//! A new thread can be spawned using the `thread::spawn` function:
//! A new thread can be spawned using the [`thread::spawn`][`spawn`] function:
//!
//! ```rust
//! use std::thread;
@ -55,7 +53,7 @@
//! it), unless this parent is the main thread.
//!
//! The parent thread can also wait on the completion of the child
//! thread; a call to `spawn` produces a `JoinHandle`, which provides
//! thread; a call to [`spawn`] produces a [`JoinHandle`], which provides
//! a `join` method for waiting:
//!
//! ```rust
@ -68,13 +66,13 @@
//! let res = child.join();
//! ```
//!
//! The `join` method returns a `Result` containing `Ok` of the final
//! value produced by the child thread, or `Err` of the value given to
//! a call to `panic!` if the child panicked.
//! The [`join`] method returns a [`Result`] containing [`Ok`] of the final
//! value produced by the child thread, or [`Err`] of the value given to
//! a call to [`panic!`] if the child panicked.
//!
//! ## Configuring threads
//!
//! A new thread can be configured before it is spawned via the `Builder` type,
//! A new thread can be configured before it is spawned via the [`Builder`] type,
//! which currently allows you to set the name and stack size for the child thread:
//!
//! ```rust
@ -88,43 +86,43 @@
//!
//! ## The `Thread` type
//!
//! Threads are represented via the `Thread` type, which you can get in one of
//! Threads are represented via the [`Thread`] type, which you can get in one of
//! two ways:
//!
//! * By spawning a new thread, e.g. using the `thread::spawn` function, and
//! calling `thread()` on the `JoinHandle`.
//! * By requesting the current thread, using the `thread::current` function.
//! * By spawning a new thread, e.g. using the [`thread::spawn`][`spawn`]
//! function, and calling [`thread()`] on the [`JoinHandle`].
//! * By requesting the current thread, using the [`thread::current()`] function.
//!
//! The `thread::current()` function is available even for threads not spawned
//! The [`thread::current()`] function is available even for threads not spawned
//! by the APIs of this module.
//!
//! ## Blocking support: park and unpark
//!
//! Every thread is equipped with some basic low-level blocking support, via the
//! `thread::park()` function and `thread::Thread::unpark()` method. `park()`
//! blocks the current thread, which can then be resumed from another thread by
//! calling the `unpark()` method on the blocked thread's handle.
//! [`thread::park()`][`park()`] function and [`thread::Thread::unpark()`][`unpark()`]
//! method. [`park()`] blocks the current thread, which can then be resumed from
//! another thread by calling the [`unpark()`] method on the blocked thread's handle.
//!
//! Conceptually, each `Thread` handle has an associated token, which is
//! Conceptually, each [`Thread`] handle has an associated token, which is
//! initially not present:
//!
//! * The `thread::park()` function blocks the current thread unless or until
//! * The [`thread::park()`][`park()`] function blocks the current thread unless or until
//! the token is available for its thread handle, at which point it atomically
//! consumes the token. It may also return *spuriously*, without consuming the
//! token. `thread::park_timeout()` does the same, but allows specifying a
//! token. [`thread::park_timeout()`] does the same, but allows specifying a
//! maximum time to block the thread for.
//!
//! * The `unpark()` method on a `Thread` atomically makes the token available
//! * The [`unpark()`] method on a [`Thread`] atomically makes the token available
//! if it wasn't already.
//!
//! In other words, each `Thread` acts a bit like a semaphore with initial count
//! In other words, each [`Thread`] acts a bit like a semaphore with initial count
//! 0, except that the semaphore is *saturating* (the count cannot go above 1),
//! and can return spuriously.
//!
//! The API is typically used by acquiring a handle to the current thread,
//! placing that handle in a shared data structure so that other threads can
//! find it, and then `park`ing. When some desired condition is met, another
//! thread calls `unpark` on the handle.
//! thread calls [`unpark()`] on the handle.
//!
//! The motivation for this design is twofold:
//!
@ -149,6 +147,22 @@
//! will want to make use of some form of **interior mutability** through the
//! [`Cell`] or [`RefCell`] types.
//!
//! [channels]: ../../std/sync/mpsc/index.html
//! [`Arc`]: ../../std/sync/struct.Arc.html
//! [`spawn`]: ../../std/thread/fn.spawn.html
//! [`JoinHandle`]: ../../std/thread/struct.JoinHandle.html
//! [`thread()`]: ../../std/thread/struct.JoinHandle.html#method.thread
//! [`join`]: ../../std/thread/struct.JoinHandle.html#method.join
//! [`Result`]: ../../std/result/enum.Result.html
//! [`Ok`]: ../../std/result/enum.Result.html#variant.Ok
//! [`Err`]: ../../std/result/enum.Result.html#variant.Err
//! [`panic!`]: ../../std/macro.panic.html
//! [`Builder`]: ../../std/thread/struct.Builder.html
//! [`thread::current()`]: ../../std/thread/fn.spawn.html
//! [`Thread`]: ../../std/thread/struct.Thread.html
//! [`park()`]: ../../std/thread/fn.park.html
//! [`unpark()`]: ../../std/thread/struct.Thread.html#method.unpark
//! [`thread::park_timeout()`]: ../../std/thread/fn.park_timeout.html
//! [`Cell`]: ../cell/struct.Cell.html
//! [`RefCell`]: ../cell/struct.RefCell.html
//! [`thread_local!`]: ../macro.thread_local.html

View File

@ -2,7 +2,7 @@
# compiler itself. For the rustbuild build system, this also describes the
# relevant Cargo revision that we're using.
#
# Currently Rust always bootstrap from the previous stable release, and in our
# Currently Rust always bootstraps from the previous stable release, and in our
# train model this means that the master branch bootstraps from beta, beta
# bootstraps from current stable, and stable bootstraps from the previous stable
# release.
@ -12,5 +12,5 @@
# tarball for a stable release you'll likely see `1.x.0-$date` where `1.x.0` was
# released on `$date`
rustc: beta-2016-11-16
rustc: beta-2016-12-16
cargo: nightly-2016-11-16

View File

@ -67,7 +67,7 @@ pub fn check(path: &Path, bad: &mut bool) {
}
contents.truncate(0);
t!(t!(File::open(file)).read_to_string(&mut contents));
t!(t!(File::open(&file), &file).read_to_string(&mut contents));
for (i, line) in contents.lines().enumerate() {
let mut err = |msg: &str| {