Split libcore/arc.rs: arc -> std::arc; exclusive -> unsafe::exclusive

This commit is contained in:
Ben Blum 2012-08-10 18:20:03 -04:00
parent 4808d59909
commit 42825fbea6
12 changed files with 319 additions and 272 deletions

View File

@ -1,219 +0,0 @@
/**
* An atomically reference counted wrapper that can be used to
* share immutable data between tasks.
*/
export arc, get, clone;
export exclusive, methods;
#[abi = "cdecl"]
extern mod rustrt {
#[rust_stack]
fn rust_atomic_increment(p: &mut libc::intptr_t)
-> libc::intptr_t;
#[rust_stack]
fn rust_atomic_decrement(p: &mut libc::intptr_t)
-> libc::intptr_t;
}
type arc_data<T> = {
mut count: libc::intptr_t,
data: T
};
class arc_destruct<T> {
let data: *libc::c_void;
new(data: *libc::c_void) { self.data = data; }
drop unsafe {
let data: ~arc_data<T> = unsafe::reinterpret_cast(self.data);
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
assert new_count >= 0;
if new_count == 0 {
// drop glue takes over.
} else {
unsafe::forget(data);
}
}
}
type arc<T: const send> = arc_destruct<T>;
/// Create an atomically reference counted wrapper.
fn arc<T: const send>(-data: T) -> arc<T> {
let data = ~{mut count: 1, data: data};
unsafe {
let ptr = unsafe::transmute(data);
arc_destruct(ptr)
}
}
/**
* Access the underlying data in an atomically reference counted
* wrapper.
*/
fn get<T: const send>(rc: &arc<T>) -> &T {
unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast((*rc).data);
// Cast us back into the correct region
let r = unsafe::reinterpret_cast(&ptr.data);
unsafe::forget(ptr);
return r;
}
}
/**
* Duplicate an atomically reference counted wrapper.
*
* The resulting two `arc` objects will point to the same underlying data
* object. However, one of the `arc` objects can be sent to another task,
* allowing them to share the underlying data.
*/
fn clone<T: const send>(rc: &arc<T>) -> arc<T> {
unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast((*rc).data);
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
assert new_count >= 2;
unsafe::forget(ptr);
}
arc_destruct((*rc).data)
}
// An arc over mutable data that is protected by a lock.
type ex_data<T: send> =
{lock: sys::little_lock, mut failed: bool, mut data: T};
type exclusive<T: send> = arc_destruct<ex_data<T>>;
fn exclusive<T:send >(-data: T) -> exclusive<T> {
let data = ~{mut count: 1, data: {lock: sys::little_lock(), failed: false,
data: data}};
unsafe {
let ptr = unsafe::reinterpret_cast(data);
unsafe::forget(data);
arc_destruct(ptr)
}
}
impl<T: send> exclusive<T> {
/// Duplicate an exclusive ARC. See arc::clone.
fn clone() -> exclusive<T> {
unsafe {
// this makes me nervous...
let ptr: ~arc_data<ex_data<T>> =
unsafe::reinterpret_cast(self.data);
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
assert new_count > 1;
unsafe::forget(ptr);
}
arc_destruct(self.data)
}
/**
* Access the underlying mutable data with mutual exclusion from other
* tasks. The argument closure will be run with the mutex locked; all
* other tasks wishing to access the data will block until the closure
* finishes running.
*
* Currently, scheduling operations (i.e., yielding, receiving on a pipe,
* accessing the provided condition variable) are prohibited while inside
* the exclusive. Supporting that is a work in progress.
*
* The reason this function is 'unsafe' is because it is possible to
* construct a circular reference among multiple ARCs by mutating the
* underlying data. This creates potential for deadlock, but worse, this
* will guarantee a memory leak of all involved ARCs. Using exclusive
* ARCs inside of other ARCs is safe in absence of circular references.
*/
unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
let ptr: ~arc_data<ex_data<T>> =
unsafe::reinterpret_cast(self.data);
assert ptr.count > 0;
let ptr2: &arc_data<ex_data<T>> = unsafe::reinterpret_cast(&*ptr);
unsafe::forget(ptr);
let rec: &ex_data<T> = &(*ptr2).data;
do rec.lock.lock {
if rec.failed {
fail ~"Poisoned arc::exclusive - another task failed inside!";
}
rec.failed = true;
let result = f(&mut rec.data);
rec.failed = false;
result
}
}
}
#[cfg(test)]
mod tests {
import comm::*;
#[test]
fn manually_share_arc() {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = arc::arc(v);
let p = port();
let c = chan(p);
do task::spawn() {
let p = port();
c.send(chan(p));
let arc_v = p.recv();
let v = *arc::get::<~[int]>(&arc_v);
assert v[3] == 4;
};
let c = p.recv();
c.send(arc::clone(&arc_v));
assert (*arc::get(&arc_v))[2] == 3;
log(info, arc_v);
}
#[test]
fn exclusive_arc() {
let mut futures = ~[];
let num_tasks = 10u;
let count = 10u;
let total = exclusive(~mut 0u);
for uint::range(0u, num_tasks) |_i| {
let total = total.clone();
futures += ~[future::spawn(|| {
for uint::range(0u, count) |_i| {
do total.with |count| {
**count += 1u;
}
}
})];
};
for futures.each |f| { f.get() }
do total.with |total| {
assert **total == num_tasks * count
};
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn exclusive_poison() {
// Tests that if one task fails inside of an exclusive, subsequent
// accesses will also fail.
let x = arc::exclusive(1);
let x2 = x.clone();
do task::try {
do x2.with |one| {
assert *one == 2;
}
};
do x.with |one| {
assert *one == 1;
}
}
}

View File

@ -39,7 +39,7 @@ export float, f32, f64;
export box, char, str, ptr, vec, at_vec, bool;
export either, option, result, iter;
export libc, os, io, run, rand, sys, unsafe, logging;
export arc, comm, task, future, pipes, sync;
export comm, task, future, pipes;
export extfmt;
// The test harness links against core, so don't include runtime in tests.
// FIXME (#2861): Uncomment this after snapshot gets updated.
@ -199,12 +199,10 @@ mod dlist_iter {
mod send_map;
// Concurrency
mod arc;
mod comm;
mod task;
mod future;
mod pipes;
mod sync;
// Runtime and language-primitive support

View File

@ -1064,7 +1064,7 @@ impl<T: send> port<T>: selectable {
}
/// A channel that can be shared between many senders.
type shared_chan<T: send> = arc::exclusive<chan<T>>;
type shared_chan<T: send> = unsafe::exclusive<chan<T>>;
impl<T: send> shared_chan<T>: channel<T> {
fn send(+x: T) {
@ -1088,7 +1088,7 @@ impl<T: send> shared_chan<T>: channel<T> {
/// Converts a `chan` into a `shared_chan`.
fn shared_chan<T:send>(+c: chan<T>) -> shared_chan<T> {
arc::exclusive(c)
unsafe::exclusive(c)
}
/// Receive a message from one of two endpoints.

View File

@ -7,27 +7,17 @@ export min_align_of;
export pref_align_of;
export refcount;
export log_str;
export little_lock, methods;
export shape_eq, shape_lt, shape_le;
import task::atomically;
enum type_desc = {
size: uint,
align: uint
// Remaining fields not listed
};
type rust_little_lock = *libc::c_void;
#[abi = "cdecl"]
extern mod rustrt {
pure fn shape_log_str(t: *sys::type_desc, data: *()) -> ~str;
fn rust_create_little_lock() -> rust_little_lock;
fn rust_destroy_little_lock(lock: rust_little_lock);
fn rust_lock_little_lock(lock: rust_little_lock);
fn rust_unlock_little_lock(lock: rust_little_lock);
}
#[abi = "rust-intrinsic"]
@ -98,30 +88,6 @@ pure fn log_str<T>(t: T) -> ~str {
}
}
class little_lock {
let l: rust_little_lock;
new() {
self.l = rustrt::rust_create_little_lock();
}
drop { rustrt::rust_destroy_little_lock(self.l); }
}
impl little_lock {
unsafe fn lock<T>(f: fn() -> T) -> T {
class unlock {
let l: rust_little_lock;
new(l: rust_little_lock) { self.l = l; }
drop { rustrt::rust_unlock_little_lock(self.l); }
}
do atomically {
rustrt::rust_lock_little_lock(self.l);
let _r = unlock(self.l);
f()
}
}
}
#[cfg(test)]
mod tests {

View File

@ -625,7 +625,7 @@ unsafe fn atomically<U>(f: fn() -> U) -> U {
* Several data structures are involved in task management to allow properly
* propagating failure across linked/supervised tasks.
*
* (1) The "taskgroup_arc" is an arc::exclusive which contains a hashset of
* (1) The "taskgroup_arc" is an unsafe::exclusive which contains a hashset of
* all tasks that are part of the group. Some tasks are 'members', which
* means if they fail, they will kill everybody else in the taskgroup.
* Other tasks are 'descendants', which means they will not kill tasks
@ -639,7 +639,7 @@ unsafe fn atomically<U>(f: fn() -> U) -> U {
* whether it's part of the 'main'/'root' taskgroup, and an optionally
* configured notification port. These are stored in TLS.
*
* (3) The "ancestor_list" is a cons-style list of arc::exclusives which
* (3) The "ancestor_list" is a cons-style list of unsafe::exclusives which
* tracks 'generations' of taskgroups -- a group's ancestors are groups
* which (directly or transitively) spawn_supervised-ed them. Each task
* is recorded in the 'descendants' of each of its ancestor groups.
@ -725,7 +725,7 @@ type taskgroup_data = {
// tasks in this group.
mut descendants: taskset,
};
type taskgroup_arc = arc::exclusive<option<taskgroup_data>>;
type taskgroup_arc = unsafe::exclusive<option<taskgroup_data>>;
type taskgroup_inner = &mut option<taskgroup_data>;
@ -754,7 +754,7 @@ type ancestor_node = {
// Recursive rest of the list.
mut ancestors: ancestor_list,
};
enum ancestor_list = option<arc::exclusive<ancestor_node>>;
enum ancestor_list = option<unsafe::exclusive<ancestor_node>>;
// Accessors for taskgroup arcs and ancestor arcs that wrap the unsafety.
#[inline(always)]
@ -762,7 +762,7 @@ fn access_group<U>(x: taskgroup_arc, blk: fn(taskgroup_inner) -> U) -> U {
unsafe { x.with(blk) }
}
#[inline(always)]
fn access_ancestors<U>(x: arc::exclusive<ancestor_node>,
fn access_ancestors<U>(x: unsafe::exclusive<ancestor_node>,
blk: fn(x: &mut ancestor_node) -> U) -> U {
unsafe { x.with(blk) }
}
@ -1035,8 +1035,8 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
let mut members = new_taskset();
taskset_insert(&mut members, spawner);
let tasks =
arc::exclusive(some({ mut members: members,
mut descendants: new_taskset() }));
unsafe::exclusive(some({ mut members: members,
mut descendants: new_taskset() }));
// Main task/group has no ancestors, no notifier, etc.
let group =
@tcb(spawner, tasks, ancestor_list(none), true, none);
@ -1057,8 +1057,8 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
(g, a, spawner_group.is_main)
} else {
// Child is in a separate group from spawner.
let g = arc::exclusive(some({ mut members: new_taskset(),
mut descendants: new_taskset() }));
let g = unsafe::exclusive(some({ mut members: new_taskset(),
mut descendants: new_taskset() }));
let a = if supervised {
// Child's ancestors start with the spawner.
let old_ancestors = share_ancestors(&mut spawner_group.ancestors);
@ -1072,7 +1072,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
};
assert new_generation < uint::max_value;
// Build a new node in the ancestor list.
ancestor_list(some(arc::exclusive(
ancestor_list(some(unsafe::exclusive(
{ generation: new_generation,
mut parent_group: some(spawner_group.tasks.clone()),
mut ancestors: old_ancestors })))

View File

@ -2,6 +2,12 @@
export reinterpret_cast, forget, bump_box_refcount, transmute;
export shared_mutable_state, clone_shared_mutable_state;
export get_shared_mutable_state, get_shared_immutable_state;
export exclusive;
import task::atomically;
#[abi = "rust-intrinsic"]
extern mod rusti {
fn forget<T>(-x: T);
@ -47,6 +53,167 @@ unsafe fn transmute<L, G>(-thing: L) -> G {
return newthing;
}
/****************************************************************************
* Shared state & exclusive ARC
****************************************************************************/
type arc_data<T> = {
mut count: libc::intptr_t,
data: T
};
class arc_destruct<T> {
let data: *libc::c_void;
new(data: *libc::c_void) { self.data = data; }
drop unsafe {
let data: ~arc_data<T> = unsafe::reinterpret_cast(self.data);
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
assert new_count >= 0;
if new_count == 0 {
// drop glue takes over.
} else {
unsafe::forget(data);
}
}
}
/**
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
*
* Data races between tasks can result in crashes and, with sufficient
* cleverness, arbitrary type coercion.
*/
type shared_mutable_state<T: send> = arc_destruct<T>;
unsafe fn shared_mutable_state<T: send>(+data: T) -> shared_mutable_state<T> {
let data = ~{mut count: 1, data: data};
unsafe {
let ptr = unsafe::transmute(data);
arc_destruct(ptr)
}
}
unsafe fn get_shared_mutable_state<T: send>(rc: &shared_mutable_state<T>)
-> &mut T {
unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast((*rc).data);
assert ptr.count > 0;
// Cast us back into the correct region
let r = unsafe::reinterpret_cast(&ptr.data);
unsafe::forget(ptr);
return r;
}
}
unsafe fn get_shared_immutable_state<T: send>(rc: &shared_mutable_state<T>)
-> &T {
unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast((*rc).data);
assert ptr.count > 0;
// Cast us back into the correct region
let r = unsafe::reinterpret_cast(&ptr.data);
unsafe::forget(ptr);
return r;
}
}
unsafe fn clone_shared_mutable_state<T: send>(rc: &shared_mutable_state<T>)
-> shared_mutable_state<T> {
unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast((*rc).data);
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
assert new_count >= 2;
unsafe::forget(ptr);
}
arc_destruct((*rc).data)
}
/****************************************************************************/
type rust_little_lock = *libc::c_void;
#[abi = "cdecl"]
extern mod rustrt {
#[rust_stack]
fn rust_atomic_increment(p: &mut libc::intptr_t)
-> libc::intptr_t;
#[rust_stack]
fn rust_atomic_decrement(p: &mut libc::intptr_t)
-> libc::intptr_t;
fn rust_create_little_lock() -> rust_little_lock;
fn rust_destroy_little_lock(lock: rust_little_lock);
fn rust_lock_little_lock(lock: rust_little_lock);
fn rust_unlock_little_lock(lock: rust_little_lock);
}
class little_lock {
let l: rust_little_lock;
new() {
self.l = rustrt::rust_create_little_lock();
}
drop { rustrt::rust_destroy_little_lock(self.l); }
}
impl little_lock {
unsafe fn lock<T>(f: fn() -> T) -> T {
class unlock {
let l: rust_little_lock;
new(l: rust_little_lock) { self.l = l; }
drop { rustrt::rust_unlock_little_lock(self.l); }
}
do atomically {
rustrt::rust_lock_little_lock(self.l);
let _r = unlock(self.l);
f()
}
}
}
struct ex_data<T: send> { lock: little_lock; mut failed: bool; mut data: T; }
/**
* An arc over mutable data that is protected by a lock. For library use only.
*/
struct exclusive<T: send> { x: shared_mutable_state<ex_data<T>>; }
fn exclusive<T:send >(+user_data: T) -> exclusive<T> {
let data = ex_data {
lock: little_lock(), mut failed: false, mut data: user_data
};
exclusive { x: unsafe { shared_mutable_state(data) } }
}
impl<T: send> exclusive<T> {
// Duplicate an exclusive ARC, as std::arc::clone.
fn clone() -> exclusive<T> {
exclusive { x: unsafe { clone_shared_mutable_state(&self.x) } }
}
// Exactly like std::arc::mutex_arc,access(), but with the little_lock
// instead of a proper mutex. Same reason for being unsafe.
//
// Currently, scheduling operations (i.e., yielding, receiving on a pipe,
// accessing the provided condition variable) are prohibited while inside
// the exclusive. Supporting that is a work in progress.
unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
let rec = unsafe { get_shared_mutable_state(&self.x) };
do rec.lock.lock {
if rec.failed {
fail ~"Poisoned exclusive - another task failed inside!";
}
rec.failed = true;
let result = f(&mut rec.data);
rec.failed = false;
result
}
}
}
/****************************************************************************
* Tests
****************************************************************************/
#[cfg(test)]
mod tests {
@ -86,4 +253,47 @@ mod tests {
assert transmute(~"L") == ~[76u8, 0u8];
}
}
#[test]
fn exclusive_arc() {
let mut futures = ~[];
let num_tasks = 10u;
let count = 10u;
let total = exclusive(~mut 0u);
for uint::range(0u, num_tasks) |_i| {
let total = total.clone();
futures += ~[future::spawn(|| {
for uint::range(0u, count) |_i| {
do total.with |count| {
**count += 1u;
}
}
})];
};
for futures.each |f| { f.get() }
do total.with |total| {
assert **total == num_tasks * count
};
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn exclusive_poison() {
// Tests that if one task fails inside of an exclusive, subsequent
// accesses will also fail.
let x = exclusive(1);
let x2 = x.clone();
do task::try {
do x2.with |one| {
assert *one == 2;
}
};
do x.with |one| {
assert *one == 1;
}
}
}

83
src/libstd/arc.rs Normal file
View File

@ -0,0 +1,83 @@
/**
* Concurrency-enabled mechanisms for sharing mutable and/or immutable state
* between tasks.
*/
import unsafe::{shared_mutable_state, clone_shared_mutable_state,
get_shared_mutable_state, get_shared_immutable_state};
export arc, clone, get;
/****************************************************************************
* Immutable ARC
****************************************************************************/
/// An atomically reference counted wrapper for shared immutable state.
struct arc<T: const send> { x: shared_mutable_state<T>; }
/// Create an atomically reference counted wrapper.
fn arc<T: const send>(+data: T) -> arc<T> {
arc { x: unsafe { shared_mutable_state(data) } }
}
/**
* Access the underlying data in an atomically reference counted
* wrapper.
*/
fn get<T: const send>(rc: &arc<T>) -> &T {
unsafe { get_shared_immutable_state(&rc.x) }
}
/**
* Duplicate an atomically reference counted wrapper.
*
* The resulting two `arc` objects will point to the same underlying data
* object. However, one of the `arc` objects can be sent to another task,
* allowing them to share the underlying data.
*/
fn clone<T: const send>(rc: &arc<T>) -> arc<T> {
arc { x: unsafe { clone_shared_mutable_state(&rc.x) } }
}
/****************************************************************************
* Mutex protected ARC (unsafe)
****************************************************************************/
/****************************************************************************
* R/W lock protected ARC
****************************************************************************/
/****************************************************************************
* Tests
****************************************************************************/
#[cfg(test)]
mod tests {
import comm::*;
#[test]
fn manually_share_arc() {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = arc::arc(v);
let p = port();
let c = chan(p);
do task::spawn() {
let p = port();
c.send(chan(p));
let arc_v = p.recv();
let v = *arc::get::<~[int]>(&arc_v);
assert v[3] == 4;
};
let c = p.recv();
c.send(arc::clone(&arc_v));
assert (*arc::get(&arc_v))[2] == 3;
log(info, arc_v);
}
}

View File

@ -45,6 +45,10 @@ mod uv_global_loop;
mod c_vec;
mod timer;
// Concurrency
mod sync;
mod arc;
// Collections

View File

@ -8,7 +8,7 @@
export condvar, semaphore, mutex, rwlock;
// FIXME (#3119) This shouldn't be a thing exported from core.
import arc::exclusive;
import unsafe::exclusive;
/****************************************************************************
* Internals
@ -292,14 +292,14 @@ struct rwlock_inner {
struct rwlock {
/* priv */ order_lock: semaphore;
/* priv */ access_lock: sem<waitqueue>;
/* priv */ state: arc::exclusive<rwlock_inner>;
/* priv */ state: exclusive<rwlock_inner>;
}
/// Create a new rwlock.
fn rwlock() -> rwlock {
rwlock { order_lock: semaphore(1), access_lock: new_sem_and_signal(1),
state: arc::exclusive(rwlock_inner { read_mode: false,
read_count: 0 }) }
state: exclusive(rwlock_inner { read_mode: false,
read_count: 0 }) }
}
impl &rwlock {

View File

@ -1,5 +1,6 @@
// error-pattern: copying a noncopyable value
use std;
import comm::*;
fn main() {

View File

@ -1,3 +1,4 @@
use std;
import comm::*;
fn main() {

View File

@ -1,5 +1,8 @@
// error-pattern:explicit failure
use std;
import std::arc;
enum e<T: const send> { e(arc::arc<T>) }
fn foo() -> e<int> {fail;}