std: Introduce std::sync

For now, this moves the following modules to std::sync

* UnsafeArc (also removed unwrap method)
* mpsc_queue
* spsc_queue
* atomics
* mpmc_bounded_queue
* deque

We may want to remove some of the queues, but for now this moves things out of
std::rt into std::sync
This commit is contained in:
Alex Crichton 2013-12-12 17:27:37 -08:00
parent dafb310ba1
commit a55c57284d
15 changed files with 288 additions and 578 deletions

View File

@ -45,7 +45,7 @@ use sync;
use sync::{Mutex, RWLock};
use std::cast;
use std::unstable::sync::UnsafeArc;
use std::sync::arc::UnsafeArc;
use std::task;
use std::borrow;
@ -127,20 +127,6 @@ impl<T:Freeze+Send> Arc<T> {
pub fn get<'a>(&'a self) -> &'a T {
unsafe { &*self.x.get_immut() }
}
/**
* Retrieve the data back out of the Arc. This function blocks until the
* reference given to it is the last existing one, and then unwrap the data
* instead of destroying it.
*
* If multiple tasks call unwrap, all but the first will fail. Do not call
* unwrap from a task that holds another reference to the same Arc; it is
* guaranteed to deadlock.
*/
pub fn unwrap(self) -> T {
let Arc { x: x } = self;
x.unwrap()
}
}
impl<T:Freeze + Send> Clone for Arc<T> {
@ -247,22 +233,6 @@ impl<T:Send> MutexArc<T> {
cond: cond })
})
}
/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc.
*/
pub fn unwrap(self) -> T {
let MutexArc { x: x } = self;
let inner = x.unwrap();
let MutexArcInner { failed: failed, data: data, .. } = inner;
if failed {
fail!("Can't unwrap poisoned MutexArc - another task failed inside!");
}
data
}
}
impl<T:Freeze + Send> MutexArc<T> {
@ -503,23 +473,6 @@ impl<T:Freeze + Send> RWArc<T> {
}
}
}
/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc
* in write mode.
*/
pub fn unwrap(self) -> T {
let RWArc { x: x, .. } = self;
let inner = x.unwrap();
let RWArcInner { failed: failed, data: data, .. } = inner;
if failed {
fail!("Can't unwrap poisoned RWArc - another task failed inside!")
}
data
}
}
// Borrowck rightly complains about immutably aliasing the rwlock in order to
@ -689,22 +642,6 @@ mod tests {
})
}
#[test] #[should_fail]
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexArc::new(1);
let arc2 = ~(&arc).clone();
let (p, c) = Chan::new();
do task::spawn {
arc2.access(|one| {
c.send(());
assert!(*one == 2);
})
}
let _ = p.recv();
let one = arc.unwrap();
assert!(one == 1);
}
#[test]
fn test_unsafe_mutex_arc_nested() {
unsafe {

View File

@ -19,8 +19,9 @@
use std::borrow;
use std::unstable::sync::{Exclusive, UnsafeArc};
use std::unstable::atomics;
use std::unstable::sync::Exclusive;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
use std::unstable::finally::Finally;
use std::util;
use std::util::NonCopyable;

View File

@ -146,7 +146,7 @@ use std::hashmap::HashMap;
use std::hashmap::HashSet;
use std::libc::{c_uint, c_ulonglong, c_longlong};
use std::ptr;
use std::unstable::atomics;
use std::sync::atomics;
use std::vec;
use syntax::codemap::{Span, Pos};
use syntax::{ast, codemap, ast_util, ast_map, opt_vec};

View File

@ -159,6 +159,7 @@ pub mod trie;
pub mod task;
pub mod comm;
pub mod local_data;
pub mod sync;
/* Runtime and platform support */

153
src/libstd/sync/arc.rs Normal file
View File

@ -0,0 +1,153 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Atomically reference counted data
//!
//! This modules contains the implementation of an atomically reference counted
//! pointer for the purpose of sharing data between tasks. This is obviously a
//! very unsafe primitive to use, but it has its use cases when implementing
//! concurrent data structures and similar tasks.
//!
//! Great care must be taken to ensure that data races do not arise through the
//! usage of `UnsafeArc`, and this often requires some form of external
//! synchronization. The only guarantee provided to you by this class is that
//! the underlying data will remain valid (not free'd) so long as the reference
//! count is greater than one.
use cast;
use clone::Clone;
use kinds::Send;
use ops::Drop;
use ptr::RawPtr;
use sync::atomics::{AtomicUint, SeqCst, Relaxed, Acquire};
use vec;
/// An atomically reference counted pointer.
///
/// Enforces no shared-memory safety.
#[unsafe_no_drop_flag]
pub struct UnsafeArc<T> {
priv data: *mut ArcData<T>,
}
struct ArcData<T> {
count: AtomicUint,
data: T,
}
unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
let data = ~ArcData { count: AtomicUint::new(refcount), data: data };
cast::transmute(data)
}
impl<T: Send> UnsafeArc<T> {
/// Creates a new `UnsafeArc` which wraps the given data.
pub fn new(data: T) -> UnsafeArc<T> {
unsafe { UnsafeArc { data: new_inner(data, 1) } }
}
/// As new(), but returns an extra pre-cloned handle.
pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) {
unsafe {
let ptr = new_inner(data, 2);
(UnsafeArc { data: ptr }, UnsafeArc { data: ptr })
}
}
/// As new(), but returns a vector of as many pre-cloned handles as
/// requested.
pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] {
unsafe {
if num_handles == 0 {
~[] // need to free data here
} else {
let ptr = new_inner(data, num_handles);
vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
}
}
}
/// Gets a pointer to the inner shared data. Note that care must be taken to
/// ensure that the outer `UnsafeArc` does not fall out of scope while this
/// pointer is in use, otherwise it could possibly contain a use-after-free.
#[inline]
pub fn get(&self) -> *mut T {
unsafe {
assert!((*self.data).count.load(Relaxed) > 0);
return &mut (*self.data).data as *mut T;
}
}
/// Gets an immutable pointer to the inner shared data. This has the same
/// caveats as the `get` method.
#[inline]
pub fn get_immut(&self) -> *T {
unsafe {
assert!((*self.data).count.load(Relaxed) > 0);
return &(*self.data).data as *T;
}
}
}
impl<T: Send> Clone for UnsafeArc<T> {
fn clone(&self) -> UnsafeArc<T> {
unsafe {
// This barrier might be unnecessary, but I'm not sure...
let old_count = (*self.data).count.fetch_add(1, Acquire);
assert!(old_count >= 1);
return UnsafeArc { data: self.data };
}
}
}
#[unsafe_destructor]
impl<T> Drop for UnsafeArc<T>{
fn drop(&mut self) {
unsafe {
// Happens when destructing an unwrapper's handle and from
// `#[unsafe_no_drop_flag]`
if self.data.is_null() {
return
}
// Must be acquire+release, not just release, to make sure this
// doesn't get reordered to after the unwrapper pointer load.
let old_count = (*self.data).count.fetch_sub(1, SeqCst);
assert!(old_count >= 1);
if old_count == 1 {
let _: ~ArcData<T> = cast::transmute(self.data);
}
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::UnsafeArc;
use task;
use mem::size_of;
#[test]
fn test_size() {
assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>());
}
#[test]
fn arclike_newN() {
// Tests that the many-refcounts-at-once constructors don't leak.
let _ = UnsafeArc::new2(~~"hello");
let x = UnsafeArc::newN(~~"hello", 0);
assert_eq!(x.len(), 0)
let x = UnsafeArc::newN(~~"hello", 1);
assert_eq!(x.len(), 1)
let x = UnsafeArc::newN(~~"hello", 10);
assert_eq!(x.len(), 10)
}
}

View File

@ -11,13 +11,16 @@
/*!
* Atomic types
*
* Basic atomic types supporting atomic operations. Each method takes an `Ordering` which
* represents the strength of the memory barrier for that operation. These orderings are the same
* as C++11 atomic orderings [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync]
* Basic atomic types supporting atomic operations. Each method takes an
* `Ordering` which represents the strength of the memory barrier for that
* operation. These orderings are the same as C++11 atomic orderings
* [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync]
*
* All atomic types are a single word in size.
*/
#[allow(missing_doc)];
use unstable::intrinsics;
use cast;
use option::{Option,Some,None};

View File

@ -50,15 +50,18 @@
use cast;
use clone::Clone;
use iter::range;
use iter::{range, Iterator};
use kinds::Send;
use libc;
use mem;
use ops::Drop;
use option::{Option, Some, None};
use ptr;
use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::{UnsafeArc, Exclusive};
use ptr::RawPtr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::Exclusive;
use vec::{OwnedVector, ImmutableVector};
// Once the queue is less than 1/K full, then it will be downsized. Note that
// the deque requires that this number be less than 2.
@ -399,8 +402,8 @@ mod tests {
use rt::thread::Thread;
use rand;
use rand::Rng;
use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
AtomicUint, INIT_ATOMIC_UINT};
use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
AtomicUint, INIT_ATOMIC_UINT};
use vec;
#[test]

23
src/libstd/sync/mod.rs Normal file
View File

@ -0,0 +1,23 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Useful synchronization primitives
//!
//! This modules contains useful safe and unsafe synchronization primitives.
//! Most of the primitives in this module do not provide any sort of locking
//! and/or blocking at all, but rather provide the necessary tools to build
//! other types of concurrent primitives.
pub mod arc;
pub mod atomics;
pub mod deque;
pub mod mpmc_bounded_queue;
pub mod mpsc_queue;
pub mod spsc_queue;

View File

@ -25,15 +25,17 @@
* policies, either expressed or implied, of Dmitry Vyukov.
*/
#[allow(missing_doc, dead_code)];
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
use unstable::sync::UnsafeArc;
use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire};
use option::*;
use vec;
use clone::Clone;
use kinds::Send;
use num::{Exponential,Algebraic,Round};
use option::{Option, Some, None};
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
use vec;
struct Node<T> {
sequence: AtomicUint,

View File

@ -26,6 +26,14 @@
*/
//! A mostly lock-free multi-producer, single consumer queue.
//!
//! This module contains an implementation of a concurrent MPSC queue. This
//! queue can be used to share data between tasks, and is also used as the
//! building block of channels in rust.
//!
//! Note that the current implementation of this queue has a caveat of the `pop`
//! method, and see the method for more information about it. Due to this
//! caveat, this queue may not be appropriate for all use-cases.
// http://www.1024cores.net/home/lock-free-algorithms
// /queues/non-intrusive-mpsc-node-based-queue
@ -35,9 +43,11 @@ use clone::Clone;
use kinds::Send;
use ops::Drop;
use option::{Option, None, Some};
use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
use unstable::sync::UnsafeArc;
use ptr::RawPtr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
/// A result of the `pop` function.
pub enum PopResult<T> {
/// Some data has been popped
Data(T),
@ -61,10 +71,14 @@ struct State<T, P> {
packet: P,
}
/// The consumer half of this concurrent queue. This half is used to receive
/// data from the producers.
pub struct Consumer<T, P> {
priv state: UnsafeArc<State<T, P>>,
}
/// The production half of the concurrent queue. This handle may be cloned in
/// order to make handles for new producers.
pub struct Producer<T, P> {
priv state: UnsafeArc<State<T, P>>,
}
@ -75,6 +89,11 @@ impl<T: Send, P: Send> Clone for Producer<T, P> {
}
}
/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet"
/// of information which will be shared by the consumer and the producer which
/// can be re-acquired via the `packet` function. This is helpful when extra
/// state is shared between the producer and consumer, but note that there is no
/// synchronization performed of this data.
pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) {
unsafe {
let (a, b) = UnsafeArc::new2(State::new(p));
@ -92,7 +111,7 @@ impl<T> Node<T> {
}
impl<T: Send, P: Send> State<T, P> {
pub unsafe fn new(p: P) -> State<T, P> {
unsafe fn new(p: P) -> State<T, P> {
let stub = Node::new(None);
State {
head: AtomicPtr::new(stub),
@ -122,10 +141,6 @@ impl<T: Send, P: Send> State<T, P> {
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
}
unsafe fn is_empty(&mut self) -> bool {
return (*self.tail).next.load(Acquire).is_null();
}
}
#[unsafe_destructor]
@ -143,27 +158,42 @@ impl<T: Send, P: Send> Drop for State<T, P> {
}
impl<T: Send, P: Send> Producer<T, P> {
/// Pushes a new value onto this queue.
pub fn push(&mut self, value: T) {
unsafe { (*self.state.get()).push(value) }
}
pub fn is_empty(&self) -> bool {
unsafe{ (*self.state.get()).is_empty() }
}
/// Gets an unsafe pointer to the user-defined packet shared by the
/// producers and the consumer. Note that care must be taken to ensure that
/// the lifetime of the queue outlives the usage of the returned pointer.
pub unsafe fn packet(&self) -> *mut P {
&mut (*self.state.get()).packet as *mut P
}
}
impl<T: Send, P: Send> Consumer<T, P> {
/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option<T>`. It is possible for this queue to be in an
/// inconsistent state where many pushes have suceeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
pub fn pop(&mut self) -> PopResult<T> {
unsafe { (*self.state.get()).pop() }
}
/// Attempts to pop data from this queue, but doesn't attempt too hard. This
/// will canonicalize inconsistent states to a `None` value.
pub fn casual_pop(&mut self) -> Option<T> {
match self.pop() {
Data(t) => Some(t),
Empty | Inconsistent => None,
}
}
/// Gets an unsafe pointer to the underlying user-defined packet. See
/// `Producer.packet` for more information.
pub unsafe fn packet(&self) -> *mut P {
&mut (*self.state.get()).packet as *mut P
}

View File

@ -26,12 +26,20 @@
*/
// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
//! A single-producer single-consumer concurrent queue
//!
//! This module contains the implementation of an SPSC queue which can be used
//! concurrently between two tasks. This data structure is safe to use and
//! enforces the semantics that there is one pusher and one popper.
use cast;
use kinds::Send;
use ops::Drop;
use option::{Some, None, Option};
use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
use unstable::sync::UnsafeArc;
use ptr::RawPtr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
// Node within the linked list queue of messages to send
struct Node<T> {
@ -64,14 +72,34 @@ struct State<T, P> {
packet: P,
}
/// Producer half of this queue. This handle is used to push data to the
/// consumer.
pub struct Producer<T, P> {
priv state: UnsafeArc<State<T, P>>,
}
/// Consumer half of this queue. This handle is used to receive data from the
/// producer.
pub struct Consumer<T, P> {
priv state: UnsafeArc<State<T, P>>,
}
/// Creates a new queue. The producer returned is connected to the consumer to
/// push all data to the consumer.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked list,
/// and this means that a push is always a malloc. In order to
/// amortize this cost, an internal cache of nodes is maintained
/// to prevent a malloc from always being necessary. This bound is
/// the limit on the size of the cache (if desired). If the value
/// is 0, then the cache has no bound. Otherwise, the cache will
/// never grow larger than `bound` (although the queue itself
/// could be much larger.
///
/// * `p` - This is the user-defined packet of data which will also be shared
/// between the producer and consumer.
pub fn queue<T: Send, P: Send>(bound: uint,
p: P) -> (Consumer<T, P>, Producer<T, P>)
{
@ -105,21 +133,31 @@ impl<T: Send> Node<T> {
}
impl<T: Send, P: Send> Producer<T, P> {
/// Pushes data onto the queue
pub fn push(&mut self, t: T) {
unsafe { (*self.state.get()).push(t) }
}
/// Tests whether the queue is empty. Note that if this function returns
/// `false`, the return value is significant, but if the return value is
/// `true` then almost no meaning can be attached to the return value.
pub fn is_empty(&self) -> bool {
unsafe { (*self.state.get()).is_empty() }
}
/// Acquires an unsafe pointer to the underlying user-defined packet. Note
/// that care must be taken to ensure that the queue outlives the usage of
/// the packet (because it is an unsafe pointer).
pub unsafe fn packet(&self) -> *mut P {
&mut (*self.state.get()).packet as *mut P
}
}
impl<T: Send, P: Send> Consumer<T, P> {
/// Pops some data from this queue, returning `None` when the queue is
/// empty.
pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.state.get()).pop() }
}
/// Same function as the producer's `packet` method.
pub unsafe fn packet(&self) -> *mut P {
&mut (*self.state.get()).packet as *mut P
}

View File

@ -140,7 +140,6 @@ pub mod dl {
use path;
use ptr;
use str;
use unstable::sync::atomic;
use result::*;
pub unsafe fn open_external(filename: &path::Path) -> *libc::c_void {
@ -158,11 +157,7 @@ pub mod dl {
static mut lock: Mutex = MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire
// sequence. `atomic` asserts that we don't do anything that
// would cause this task to be descheduled, which could deadlock
// the scheduler if it happens while the lock is held.
// FIXME #9105 use a Rust mutex instead of C++ mutexes.
let _guard = atomic();
// sequence
lock.lock();
let _old_error = dlerror();
@ -208,7 +203,6 @@ pub mod dl {
use libc;
use path;
use ptr;
use unstable::sync::atomic;
use result::*;
pub unsafe fn open_external(filename: &path::Path) -> *libc::c_void {
@ -225,7 +219,6 @@ pub mod dl {
pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, ~str> {
unsafe {
let _guard = atomic();
SetLastError(0);
let result = f();

View File

@ -22,7 +22,6 @@ pub mod simd;
pub mod lang;
pub mod sync;
pub mod mutex;
pub mod atomics;
pub mod raw;
/**

View File

@ -48,7 +48,7 @@
#[allow(non_camel_case_types)];
use libc::c_void;
use unstable::atomics;
use sync::atomics;
pub struct Mutex {
// pointers for the lock/cond handles, atomically updated

View File

@ -8,353 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cast;
use comm::{Chan, Port};
use ptr;
use option::{Option,Some,None};
use task;
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst};
use unstable::mutex::Mutex;
use ops::Drop;
use clone::Clone;
use kinds::Send;
use vec;
/// An atomically reference counted pointer.
///
/// Enforces no shared-memory safety.
//#[unsafe_no_drop_flag] FIXME: #9758
pub struct UnsafeArc<T> {
data: *mut ArcData<T>,
}
pub enum UnsafeArcUnwrap<T> {
UnsafeArcSelf(UnsafeArc<T>),
UnsafeArcT(T)
}
#[cfg(test)]
impl<T> UnsafeArcUnwrap<T> {
fn expect_t(self, msg: &'static str) -> T {
match self {
UnsafeArcSelf(_) => fail!(msg),
UnsafeArcT(t) => t
}
}
fn is_self(&self) -> bool {
match *self {
UnsafeArcSelf(_) => true,
UnsafeArcT(_) => false
}
}
}
struct ArcData<T> {
count: AtomicUint,
// An unwrapper uses this protocol to communicate with the "other" task that
// drops the last refcount on an arc. Unfortunately this can't be a proper
// pipe protocol because the unwrapper has to access both stages at once.
// FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)?
unwrapper: AtomicOption<(Chan<()>, Port<bool>)>,
// FIXME(#3224) should be able to make this non-option to save memory
data: Option<T>,
}
unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
let data = ~ArcData { count: AtomicUint::new(refcount),
unwrapper: AtomicOption::empty(),
data: Some(data) };
cast::transmute(data)
}
/// A helper object used by `UnsafeArc::unwrap`.
struct ChannelAndDataGuard<T> {
channel: Option<Chan<bool>>,
data: Option<~ArcData<T>>,
}
#[unsafe_destructor]
impl<T> Drop for ChannelAndDataGuard<T> {
fn drop(&mut self) {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
unsafe {
let channel = self.channel.take_unwrap();
let data = self.data.take_unwrap();
channel.send(false);
cast::forget(data);
}
}
}
}
impl<T> ChannelAndDataGuard<T> {
fn unwrap(mut self) -> (Chan<bool>, ~ArcData<T>) {
(self.channel.take_unwrap(), self.data.take_unwrap())
}
}
impl<T: Send> UnsafeArc<T> {
pub fn new(data: T) -> UnsafeArc<T> {
unsafe { UnsafeArc { data: new_inner(data, 1) } }
}
/// As new(), but returns an extra pre-cloned handle.
pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) {
unsafe {
let ptr = new_inner(data, 2);
(UnsafeArc { data: ptr }, UnsafeArc { data: ptr })
}
}
/// As new(), but returns a vector of as many pre-cloned handles as requested.
pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] {
unsafe {
if num_handles == 0 {
~[] // need to free data here
} else {
let ptr = new_inner(data, num_handles);
vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
}
}
}
/// As newN(), but from an already-existing handle. Uses one xadd.
pub fn cloneN(self, num_handles: uint) -> ~[UnsafeArc<T>] {
if num_handles == 0 {
~[] // The "num_handles - 1" trick (below) fails in the 0 case.
} else {
unsafe {
// Minus one because we are recycling the given handle's refcount.
let old_count = (*self.data).count.fetch_add(num_handles - 1, Acquire);
// let old_count = (*self.data).count.fetch_add(num_handles, Acquire);
assert!(old_count >= 1);
let ptr = self.data;
cast::forget(self); // Don't run the destructor on this handle.
vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
}
}
}
#[inline]
pub fn get(&self) -> *mut T {
unsafe {
assert!((*self.data).count.load(Relaxed) > 0);
let r: *mut T = (*self.data).data.get_mut_ref();
return r;
}
}
#[inline]
pub fn get_immut(&self) -> *T {
unsafe {
assert!((*self.data).count.load(Relaxed) > 0);
let r: *T = (*self.data).data.get_ref();
return r;
}
}
/// Wait until all other handles are dropped, then retrieve the enclosed
/// data. See extra::arc::Arc for specific semantics documentation.
/// If called when the task is already unkillable, unwrap will unkillably
/// block; otherwise, an unwrapping task can be killed by linked failure.
pub fn unwrap(self) -> T {
unsafe {
let mut this = self;
// The ~ dtor needs to run if this code succeeds.
let mut data: ~ArcData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = Chan::new(); // ()
let (p2,c2) = Chan::new(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
// FIXME(#3224): it should be like this
// let ~ArcData { data: user_data, _ } = data;
// user_data
data.data.take_unwrap()
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let c2_and_data = ChannelAndDataGuard {
channel: Some(c2),
data: Some(data),
};
p1.recv();
// Got here. Back in the 'unkillable' without getting killed.
let (c2, data) = c2_and_data.unwrap();
c2.send(true);
// FIXME(#3224): it should be like this
// let ~ArcData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this Arc!");
}
}
}
/// As unwrap above, but without blocking. Returns 'UnsafeArcSelf(self)' if this is
/// not the last reference; 'UnsafeArcT(unwrapped_data)' if so.
pub fn try_unwrap(mut self) -> UnsafeArcUnwrap<T> {
unsafe {
// The ~ dtor needs to run if this code succeeds.
let mut data: ~ArcData<T> = cast::transmute(self.data);
// This can of course race with anybody else who has a handle, but in
// such a case, the returned count will always be at least 2. If we
// see 1, no race was possible. All that matters is 1 or not-1.
let count = data.count.load(Acquire);
assert!(count >= 1);
// The more interesting race is one with an unwrapper. They may have
// already dropped their count -- but if so, the unwrapper pointer
// will have been set first, which the barriers ensure we will see.
// (Note: using is_empty(), not take(), to not free the unwrapper.)
if count == 1 && data.unwrapper.is_empty(Acquire) {
// Tell this handle's destructor not to run (we are now it).
self.data = ptr::mut_null();
// FIXME(#3224) as above
UnsafeArcT(data.data.take_unwrap())
} else {
cast::forget(data);
UnsafeArcSelf(self)
}
}
}
}
impl<T: Send> Clone for UnsafeArc<T> {
fn clone(&self) -> UnsafeArc<T> {
unsafe {
// This barrier might be unnecessary, but I'm not sure...
let old_count = (*self.data).count.fetch_add(1, Acquire);
assert!(old_count >= 1);
return UnsafeArc { data: self.data };
}
}
}
#[unsafe_destructor]
impl<T> Drop for UnsafeArc<T>{
fn drop(&mut self) {
unsafe {
// Happens when destructing an unwrapper's handle and from `#[unsafe_no_drop_flag]`
if self.data.is_null() {
return
}
let mut data: ~ArcData<T> = cast::transmute(self.data);
// Must be acquire+release, not just release, to make sure this
// doesn't get reordered to after the unwrapper pointer load.
let old_count = data.count.fetch_sub(1, SeqCst);
assert!(old_count >= 1);
if old_count == 1 {
// Were we really last, or should we hand off to an
// unwrapper? It's safe to not xchg because the unwrapper
// will set the unwrap lock *before* dropping his/her
// reference. In effect, being here means we're the only
// *awake* task with the data.
match data.unwrapper.take(Acquire) {
Some(~(message, response)) => {
// Send 'ready' and wait for a response.
message.send(());
// Unkillable wait. Message guaranteed to come.
if response.recv() {
// Other task got the data.
cast::forget(data);
} else {
// Other task was killed. drop glue takes over.
}
}
None => {
// drop glue takes over.
}
}
} else {
cast::forget(data);
}
}
}
}
/****************************************************************************/
pub struct AtomicGuard {
on: bool,
}
impl Drop for AtomicGuard {
fn drop(&mut self) {
use rt::task::{Task, GreenTask, SchedTask};
use rt::local::Local;
if self.on {
unsafe {
let task_opt: Option<*mut Task> = Local::try_unsafe_borrow();
match task_opt {
Some(t) => {
match (*t).task_type {
GreenTask(_) => (*t).death.allow_deschedule(),
SchedTask => {}
}
}
None => {}
}
}
}
}
}
/**
* Enables a runtime assertion that no operation while the returned guard is
* live uses scheduler operations (deschedule, recv, spawn, etc). This is for
* use with pthread mutexes, which may block the entire scheduler thread,
* rather than just one task, and is hence prone to deadlocks if mixed with
* descheduling.
*
* NOTE: THIS DOES NOT PROVIDE LOCKING, or any sort of critical-section
* synchronization whatsoever. It only makes sense to use for CPU-local issues.
*/
// FIXME(#8140) should not be pub
pub unsafe fn atomic() -> AtomicGuard {
use rt::task::{Task, GreenTask, SchedTask};
use rt::local::Local;
let task_opt: Option<*mut Task> = Local::try_unsafe_borrow();
match task_opt {
Some(t) => {
match (*t).task_type {
GreenTask(_) => {
(*t).death.inhibit_deschedule();
return AtomicGuard {
on: true,
};
}
SchedTask => {}
}
}
None => {}
}
AtomicGuard {
on: false,
}
}
use ops::Drop;
use option::{Option,Some,None};
use sync::arc::UnsafeArc;
use unstable::mutex::Mutex;
pub struct LittleLock {
priv l: Mutex,
@ -496,14 +155,6 @@ impl<T:Send> Exclusive<T> {
l.wait();
}
}
pub fn unwrap(self) -> T {
let Exclusive { x: x } = self;
// Someday we might need to unkillably unwrap an Exclusive, but not today.
let inner = x.unwrap();
let ExData { data: user_data, .. } = inner; // will destroy the LittleLock
user_data
}
}
#[cfg(test)]
@ -514,20 +165,6 @@ mod tests {
use task;
use mem::size_of;
//#[unsafe_no_drop_flag] FIXME: #9758
#[ignore]
#[test]
fn test_size() {
assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>());
}
#[test]
fn test_atomic() {
// NB. The whole runtime will abort on an 'atomic-sleep' violation,
// so we can't really test for the converse behaviour.
unsafe { let _ = atomic(); } // oughtn't fail
}
#[test]
fn exclusive_new_arc() {
unsafe {
@ -570,114 +207,4 @@ mod tests {
x.with(|one| assert_eq!(*one, 1));
}
}
#[test]
fn arclike_newN() {
// Tests that the many-refcounts-at-once constructors don't leak.
let _ = UnsafeArc::new2(~~"hello");
let x = UnsafeArc::newN(~~"hello", 0);
assert_eq!(x.len(), 0)
let x = UnsafeArc::newN(~~"hello", 1);
assert_eq!(x.len(), 1)
let x = UnsafeArc::newN(~~"hello", 10);
assert_eq!(x.len(), 10)
}
#[test]
fn arclike_cloneN() {
// Tests that the many-refcounts-at-once special-clone doesn't leak.
let x = UnsafeArc::new(~~"hello");
let x = x.cloneN(0);
assert_eq!(x.len(), 0);
let x = UnsafeArc::new(~~"hello");
let x = x.cloneN(1);
assert_eq!(x.len(), 1);
let x = UnsafeArc::new(~~"hello");
let x = x.cloneN(10);
assert_eq!(x.len(), 10);
}
#[test]
fn arclike_unwrap_basic() {
let x = UnsafeArc::new(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
#[test]
fn arclike_try_unwrap() {
let x = UnsafeArc::new(~~"hello");
assert!(x.try_unwrap().expect_t("try_unwrap failed") == ~~"hello");
}
#[test]
fn arclike_try_unwrap_fail() {
let x = UnsafeArc::new(~~"hello");
let x2 = x.clone();
let left_x = x.try_unwrap();
assert!(left_x.is_self());
drop(left_x);
assert!(x2.try_unwrap().expect_t("try_unwrap none") == ~~"hello");
}
#[test]
fn arclike_try_unwrap_unwrap_race() {
// When an unwrap and a try_unwrap race, the unwrapper should always win.
let x = UnsafeArc::new(~~"hello");
let x2 = x.clone();
let (p,c) = Chan::new();
do task::spawn {
c.send(());
assert!(x2.unwrap() == ~~"hello");
c.send(());
}
p.recv();
task::deschedule(); // Try to make the unwrapper get blocked first.
let left_x = x.try_unwrap();
assert!(left_x.is_self());
drop(left_x);
p.recv();
}
#[test]
fn exclusive_new_unwrap_basic() {
// Unlike the above, also tests no double-freeing of the LittleLock.
let x = Exclusive::new(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
#[test]
fn exclusive_new_unwrap_contended() {
let x = Exclusive::new(~~"hello");
let x2 = x.clone();
do task::spawn {
unsafe { x2.with(|_hello| ()); }
task::deschedule();
}
assert!(x.unwrap() == ~~"hello");
// Now try the same thing, but with the child task blocking.
let x = Exclusive::new(~~"hello");
let x2 = x.clone();
let mut builder = task::task();
let res = builder.future_result();
do builder.spawn {
assert!(x2.unwrap() == ~~"hello");
}
// Have to get rid of our reference before blocking.
drop(x);
res.recv();
}
#[test] #[should_fail]
fn exclusive_new_unwrap_conflict() {
let x = Exclusive::new(~~"hello");
let x2 = x.clone();
let mut builder = task::task();
let res = builder.future_result();
do builder.spawn {
assert!(x2.unwrap() == ~~"hello");
}
assert!(x.unwrap() == ~~"hello");
assert!(res.recv().is_ok());
}
}