// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. /*! * Concurrency-enabled mechanisms for sharing mutable and/or immutable state * between tasks. * * # Example * * In this example, a large vector of floats is shared between several tasks. * With simple pipes, without Arc, a copy would have to be made for each task. * * ~~~ {.rust} * extern mod std; * use extra::arc; * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random()); * let shared_numbers=arc::Arc::new(numbers); * * for 10.times { * let (port, chan) = stream(); * chan.send(shared_numbers.clone()); * * do spawn { * let shared_numbers=port.recv(); * let local_numbers=shared_numbers.get(); * * // Work with the local numbers * } * } * ~~~ */ #[allow(missing_doc)]; use sync; use sync::{Mutex, RWLock}; use std::cast; use std::unstable::sync::UnsafeAtomicRcBox; use std::task; use std::borrow; /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling. pub struct Condvar<'self> { priv is_mutex: bool, priv failed: &'self mut bool, priv cond: &'self sync::Condvar<'self> } impl<'self> Condvar<'self> { /// Atomically exit the associated Arc and block until a signal is sent. #[inline] pub fn wait(&self) { self.wait_on(0) } /** * Atomically exit the associated Arc and block on a specified condvar * until a signal is sent on that same condvar (as sync::cond.wait_on). * * wait() is equivalent to wait_on(0). */ #[inline] pub fn wait_on(&self, condvar_id: uint) { assert!(!*self.failed); self.cond.wait_on(condvar_id); // This is why we need to wrap sync::condvar. check_poison(self.is_mutex, *self.failed); } /// Wake up a blocked task. Returns false if there was no blocked task. #[inline] pub fn signal(&self) -> bool { self.signal_on(0) } /** * Wake up a blocked task on a specified condvar (as * sync::cond.signal_on). Returns false if there was no blocked task. */ #[inline] pub fn signal_on(&self, condvar_id: uint) -> bool { assert!(!*self.failed); self.cond.signal_on(condvar_id) } /// Wake up all blocked tasks. Returns the number of tasks woken. #[inline] pub fn broadcast(&self) -> uint { self.broadcast_on(0) } /** * Wake up all blocked tasks on a specified condvar (as * sync::cond.broadcast_on). Returns the number of tasks woken. */ #[inline] pub fn broadcast_on(&self, condvar_id: uint) -> uint { assert!(!*self.failed); self.cond.broadcast_on(condvar_id) } } /**************************************************************************** * Immutable Arc ****************************************************************************/ /// An atomically reference counted wrapper for shared immutable state. pub struct Arc { priv x: UnsafeAtomicRcBox } /** * Access the underlying data in an atomically reference counted * wrapper. */ impl Arc { /// Create an atomically reference counted wrapper. pub fn new(data: T) -> Arc { Arc { x: UnsafeAtomicRcBox::new(data) } } pub fn get<'a>(&'a self) -> &'a T { unsafe { &*self.x.get_immut() } } /** * Retrieve the data back out of the Arc. This function blocks until the * reference given to it is the last existing one, and then unwrap the data * instead of destroying it. * * If multiple tasks call unwrap, all but the first will fail. Do not call * unwrap from a task that holds another reference to the same Arc; it is * guaranteed to deadlock. */ pub fn unwrap(self) -> T { let Arc { x: x } = self; x.unwrap() } } /** * Duplicate an atomically reference counted wrapper. * * The resulting two `arc` objects will point to the same underlying data * object. However, one of the `arc` objects can be sent to another task, * allowing them to share the underlying data. */ impl Clone for Arc { fn clone(&self) -> Arc { Arc { x: self.x.clone() } } } /**************************************************************************** * Mutex protected Arc (unsafe) ****************************************************************************/ #[doc(hidden)] struct MutexArcInner { priv lock: Mutex, priv failed: bool, priv data: T } /// An Arc with mutable data protected by a blocking mutex. struct MutexArc { priv x: UnsafeAtomicRcBox> } impl Clone for MutexArc { /// Duplicate a mutex-protected Arc, as arc::clone. fn clone(&self) -> MutexArc { // NB: Cloning the underlying mutex is not necessary. Its reference // count would be exactly the same as the shared state's. MutexArc { x: self.x.clone() } } } impl MutexArc { /// Create a mutex-protected Arc with the supplied data. pub fn new(user_data: T) -> MutexArc { MutexArc::new_with_condvars(user_data, 1) } /** * Create a mutex-protected Arc with the supplied data and a specified number * of condvars (as sync::Mutex::new_with_condvars). */ pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc { let data = MutexArcInner { lock: Mutex::new_with_condvars(num_condvars), failed: false, data: user_data }; MutexArc { x: UnsafeAtomicRcBox::new(data) } } /** * Access the underlying mutable data with mutual exclusion from other * tasks. The argument closure will be run with the mutex locked; all * other tasks wishing to access the data will block until the closure * finishes running. * * The reason this function is 'unsafe' is because it is possible to * construct a circular reference among multiple Arcs by mutating the * underlying data. This creates potential for deadlock, but worse, this * will guarantee a memory leak of all involved Arcs. Using mutex Arcs * inside of other Arcs is safe in absence of circular references. * * If you wish to nest mutex_arcs, one strategy for ensuring safety at * runtime is to add a "nesting level counter" inside the stored data, and * when traversing the arcs, assert that they monotonically decrease. * * # Failure * * Failing while inside the Arc will unlock the Arc while unwinding, so * that other tasks won't block forever. It will also poison the Arc: * any tasks that subsequently try to access it (including those already * blocked on the mutex) will also fail immediately. */ #[inline] pub unsafe fn access(&self, blk: &fn(x: &mut T) -> U) -> U { let state = self.x.get(); // Borrowck would complain about this if the function were // not already unsafe. See borrow_rwlock, far below. do (&(*state).lock).lock { check_poison(true, (*state).failed); let _z = PoisonOnFail(&mut (*state).failed); blk(&mut (*state).data) } } /// As access(), but with a condvar, as sync::mutex.lock_cond(). #[inline] pub unsafe fn access_cond<'x, 'c, U>(&self, blk: &fn(x: &'x mut T, c: &'c Condvar) -> U) -> U { let state = self.x.get(); do (&(*state).lock).lock_cond |cond| { check_poison(true, (*state).failed); let _z = PoisonOnFail(&mut (*state).failed); blk(&mut (*state).data, &Condvar {is_mutex: true, failed: &mut (*state).failed, cond: cond }) } } /** * Retrieves the data, blocking until all other references are dropped, * exactly as arc::unwrap. * * Will additionally fail if another task has failed while accessing the arc. */ pub fn unwrap(self) -> T { let MutexArc { x: x } = self; let inner = x.unwrap(); let MutexArcInner { failed: failed, data: data, _ } = inner; if failed { fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!"); } data } } // Common code for {mutex.access,rwlock.write}{,_cond}. #[inline] #[doc(hidden)] fn check_poison(is_mutex: bool, failed: bool) { if failed { if is_mutex { fail!("Poisoned MutexArc - another task failed inside!"); } else { fail!("Poisoned rw_arc - another task failed inside!"); } } } #[doc(hidden)] struct PoisonOnFail { failed: *mut bool, } impl Drop for PoisonOnFail { fn drop(&self) { unsafe { /* assert!(!*self.failed); -- might be false in case of cond.wait() */ if task::failing() { *self.failed = true; } } } } fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail { PoisonOnFail { failed: failed } } /**************************************************************************** * R/W lock protected Arc ****************************************************************************/ #[doc(hidden)] struct RWArcInner { priv lock: RWLock, priv failed: bool, priv data: T } /** * A dual-mode Arc protected by a reader-writer lock. The data can be accessed * mutably or immutably, and immutably-accessing tasks may run concurrently. * * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested. */ #[no_freeze] struct RWArc { priv x: UnsafeAtomicRcBox>, } impl RWArc { /// Duplicate a rwlock-protected Arc, as arc::clone. pub fn clone(&self) -> RWArc { RWArc { x: self.x.clone(), } } } impl RWArc { /// Create a reader/writer Arc with the supplied data. pub fn new(user_data: T) -> RWArc { RWArc::new_with_condvars(user_data, 1) } /** * Create a reader/writer Arc with the supplied data and a specified number * of condvars (as sync::RWLock::new_with_condvars). */ pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc { let data = RWArcInner { lock: RWLock::new_with_condvars(num_condvars), failed: false, data: user_data }; RWArc { x: UnsafeAtomicRcBox::new(data), } } /** * Access the underlying data mutably. Locks the rwlock in write mode; * other readers and writers will block. * * # Failure * * Failing while inside the Arc will unlock the Arc while unwinding, so * that other tasks won't block forever. As MutexArc.access, it will also * poison the Arc, so subsequent readers and writers will both also fail. */ #[inline] pub fn write(&self, blk: &fn(x: &mut T) -> U) -> U { unsafe { let state = self.x.get(); do (*borrow_rwlock(state)).write { check_poison(false, (*state).failed); let _z = PoisonOnFail(&mut (*state).failed); blk(&mut (*state).data) } } } /// As write(), but with a condvar, as sync::rwlock.write_cond(). #[inline] pub fn write_cond<'x, 'c, U>(&self, blk: &fn(x: &'x mut T, c: &'c Condvar) -> U) -> U { unsafe { let state = self.x.get(); do (*borrow_rwlock(state)).write_cond |cond| { check_poison(false, (*state).failed); let _z = PoisonOnFail(&mut (*state).failed); blk(&mut (*state).data, &Condvar {is_mutex: false, failed: &mut (*state).failed, cond: cond}) } } } /** * Access the underlying data immutably. May run concurrently with other * reading tasks. * * # Failure * * Failing will unlock the Arc while unwinding. However, unlike all other * access modes, this will not poison the Arc. */ pub fn read(&self, blk: &fn(x: &T) -> U) -> U { unsafe { let state = self.x.get(); do (*state).lock.read { check_poison(false, (*state).failed); blk(&(*state).data) } } } /** * As write(), but with the ability to atomically 'downgrade' the lock. * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used * to obtain the &mut T, and can be transformed into a RWReadMode token by * calling downgrade(), after which a &T can be obtained instead. * * # Example * * ~~~ {.rust} * do arc.write_downgrade |mut write_token| { * do write_token.write_cond |state, condvar| { * ... exclusive access with mutable state ... * } * let read_token = arc.downgrade(write_token); * do read_token.read |state| { * ... shared access with immutable state ... * } * } * ~~~ */ pub fn write_downgrade(&self, blk: &fn(v: RWWriteMode) -> U) -> U { unsafe { let state = self.x.get(); do (*borrow_rwlock(state)).write_downgrade |write_mode| { check_poison(false, (*state).failed); blk(RWWriteMode { data: &mut (*state).data, token: write_mode, poison: PoisonOnFail(&mut (*state).failed) }) } } } /// To be called inside of the write_downgrade block. pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>) -> RWReadMode<'a, T> { unsafe { // The rwlock should assert that the token belongs to us for us. let state = self.x.get(); let RWWriteMode { data: data, token: t, poison: _poison } = token; // Let readers in let new_token = (*state).lock.downgrade(t); // Whatever region the input reference had, it will be safe to use // the same region for the output reference. (The only 'unsafe' part // of this cast is removing the mutability.) let new_data = cast::transmute_immut(data); // Downgrade ensured the token belonged to us. Just a sanity check. assert!(borrow::ref_eq(&(*state).data, new_data)); // Produce new token RWReadMode { data: new_data, token: new_token, } } } /** * Retrieves the data, blocking until all other references are dropped, * exactly as arc::unwrap. * * Will additionally fail if another task has failed while accessing the arc * in write mode. */ pub fn unwrap(self) -> T { let RWArc { x: x, _ } = self; let inner = x.unwrap(); let RWArcInner { failed: failed, data: data, _ } = inner; if failed { fail!(~"Can't unwrap poisoned RWArc - another task failed inside!") } data } } // Borrowck rightly complains about immutably aliasing the rwlock in order to // lock it. This wraps the unsafety, with the justification that the 'lock' // field is never overwritten; only 'failed' and 'data'. #[doc(hidden)] fn borrow_rwlock(state: *mut RWArcInner) -> *RWLock { unsafe { cast::transmute(&(*state).lock) } } /// The "write permission" token used for RWArc.write_downgrade(). pub struct RWWriteMode<'self, T> { data: &'self mut T, token: sync::RWLockWriteMode<'self>, poison: PoisonOnFail, } /// The "read permission" token used for RWArc.write_downgrade(). pub struct RWReadMode<'self, T> { data: &'self T, token: sync::RWLockReadMode<'self>, } impl<'self, T:Freeze + Send> RWWriteMode<'self, T> { /// Access the pre-downgrade RWArc in write mode. pub fn write(&mut self, blk: &fn(x: &mut T) -> U) -> U { match *self { RWWriteMode { data: &ref mut data, token: ref token, poison: _ } => { do token.write { blk(data) } } } } /// Access the pre-downgrade RWArc in write mode with a condvar. pub fn write_cond<'x, 'c, U>(&mut self, blk: &fn(x: &'x mut T, c: &'c Condvar) -> U) -> U { match *self { RWWriteMode { data: &ref mut data, token: ref token, poison: ref poison } => { do token.write_cond |cond| { unsafe { let cvar = Condvar { is_mutex: false, failed: &mut *poison.failed, cond: cond }; blk(data, &cvar) } } } } } } impl<'self, T:Freeze + Send> RWReadMode<'self, T> { /// Access the post-downgrade rwlock in read mode. pub fn read(&self, blk: &fn(x: &T) -> U) -> U { match *self { RWReadMode { data: data, token: ref token } => { do token.read { blk(data) } } } } } /**************************************************************************** * Tests ****************************************************************************/ #[cfg(test)] mod tests { use arc::*; use std::cell::Cell; use std::comm; use std::task; use std::uint; #[test] fn manually_share_arc() { let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let arc_v = Arc::new(v); let (p, c) = comm::stream(); do task::spawn() || { let p = comm::PortSet::new(); c.send(p.chan()); let arc_v : Arc<~[int]> = p.recv(); let v = (*arc_v.get()).clone(); assert_eq!(v[3], 4); }; let c = p.recv(); c.send(arc_v.clone()); assert_eq!(arc_v.get()[2], 3); assert_eq!(arc_v.get()[4], 5); info!(arc_v); } #[test] fn test_mutex_arc_condvar() { unsafe { let arc = ~MutexArc::new(false); let arc2 = ~arc.clone(); let (p,c) = comm::oneshot(); let (c,p) = (Cell::new(c), Cell::new(p)); do task::spawn || { // wait until parent gets in comm::recv_one(p.take()); do arc2.access_cond |state, cond| { *state = true; cond.signal(); } } do arc.access_cond |state, cond| { comm::send_one(c.take(), ()); assert!(!*state); while !*state { cond.wait(); } } } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_arc_condvar_poison() { unsafe { let arc = ~MutexArc::new(1); let arc2 = ~arc.clone(); let (p, c) = comm::stream(); do task::spawn_unlinked || { let _ = p.recv(); do arc2.access_cond |one, cond| { cond.signal(); // Parent should fail when it wakes up. assert_eq!(*one, 0); } } do arc.access_cond |one, cond| { c.send(()); while *one == 1 { cond.wait(); } } } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_mutex_arc_poison() { unsafe { let arc = ~MutexArc::new(1); let arc2 = ~arc.clone(); do task::try || { do arc2.access |one| { assert_eq!(*one, 2); } }; do arc.access |one| { assert_eq!(*one, 1); } } } #[test] #[should_fail] #[ignore(cfg(windows))] pub fn test_mutex_arc_unwrap_poison() { let arc = MutexArc::new(1); let arc2 = ~(&arc).clone(); let (p, c) = comm::stream(); do task::spawn { unsafe { do arc2.access |one| { c.send(()); assert!(*one == 2); } } } let _ = p.recv(); let one = arc.unwrap(); assert!(one == 1); } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_rw_arc_poison_wr() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.write |one| { assert_eq!(*one, 2); } }; do arc.read |one| { assert_eq!(*one, 1); } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_rw_arc_poison_ww() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.write |one| { assert_eq!(*one, 2); } }; do arc.write |one| { assert_eq!(*one, 1); } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_rw_arc_poison_dw() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.write_downgrade |mut write_mode| { do write_mode.write |one| { assert_eq!(*one, 2); } } }; do arc.write |one| { assert_eq!(*one, 1); } } #[test] #[ignore(cfg(windows))] fn test_rw_arc_no_poison_rr() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.read |one| { assert_eq!(*one, 2); } }; do arc.read |one| { assert_eq!(*one, 1); } } #[test] #[ignore(cfg(windows))] fn test_rw_arc_no_poison_rw() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.read |one| { assert_eq!(*one, 2); } }; do arc.write |one| { assert_eq!(*one, 1); } } #[test] #[ignore(cfg(windows))] fn test_rw_arc_no_poison_dr() { let arc = ~RWArc::new(1); let arc2 = (*arc).clone(); do task::try || { do arc2.write_downgrade |write_mode| { let read_mode = arc2.downgrade(write_mode); do (&read_mode).read |one| { assert_eq!(*one, 2); } } }; do arc.write |one| { assert_eq!(*one, 1); } } #[test] fn test_rw_arc() { let arc = ~RWArc::new(0); let arc2 = (*arc).clone(); let (p,c) = comm::stream(); do task::spawn || { do arc2.write |num| { for 10.times { let tmp = *num; *num = -1; task::yield(); *num = tmp + 1; } c.send(()); } } // Readers try to catch the writer in the act let mut children = ~[]; for 5.times { let arc3 = (*arc).clone(); let mut builder = task::task(); builder.future_result(|r| children.push(r)); do builder.spawn { do arc3.read |num| { assert!(*num >= 0); } } } // Wait for children to pass their asserts for children.iter().advance |r| { r.recv(); } // Wait for writer to finish p.recv(); do arc.read |num| { assert_eq!(*num, 10); } } #[test] fn test_rw_downgrade() { // (1) A downgrader gets in write mode and does cond.wait. // (2) A writer gets in write mode, sets state to 42, and does signal. // (3) Downgrader wakes, sets state to 31337. // (4) tells writer and all other readers to contend as it downgrades. // (5) Writer attempts to set state back to 42, while downgraded task // and all reader tasks assert that it's 31337. let arc = ~RWArc::new(0); // Reader tasks let mut reader_convos = ~[]; for 10.times { let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream()); reader_convos.push((rc1, rp2)); let arcn = (*arc).clone(); do task::spawn || { rp1.recv(); // wait for downgrader to give go-ahead do arcn.read |state| { assert_eq!(*state, 31337); rc2.send(()); } } } // Writer task let arc2 = (*arc).clone(); let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream()); do task::spawn || { wp1.recv(); do arc2.write_cond |state, cond| { assert_eq!(*state, 0); *state = 42; cond.signal(); } wp1.recv(); do arc2.write |state| { // This shouldn't happen until after the downgrade read // section, and all other readers, finish. assert_eq!(*state, 31337); *state = 42; } wc2.send(()); } // Downgrader (us) do arc.write_downgrade |mut write_mode| { do write_mode.write_cond |state, cond| { wc1.send(()); // send to another writer who will wake us up while *state == 0 { cond.wait(); } assert_eq!(*state, 42); *state = 31337; // FIXME: #7372: hits type inference bug with iterators // send to other readers for uint::range(0, reader_convos.len()) |i| { match reader_convos[i] { (ref rc, _) => rc.send(()), } } } let read_mode = arc.downgrade(write_mode); do (&read_mode).read |state| { // FIXME: #7372: hits type inference bug with iterators // complete handshake with other readers for uint::range(0, reader_convos.len()) |i| { match reader_convos[i] { (_, ref rp) => rp.recv(), } } wc1.send(()); // tell writer to try again assert_eq!(*state, 31337); } } wp2.recv(); // complete handshake with writer } #[cfg(test)] fn test_rw_write_cond_downgrade_read_race_helper() { // Tests that when a downgrader hands off the "reader cloud" lock // because of a contending reader, a writer can't race to get it // instead, which would result in readers_and_writers. This tests // the sync module rather than this one, but it's here because an // rwarc gives us extra shared state to help check for the race. // If you want to see this test fail, go to sync.rs and replace the // line in RWLock::write_cond() that looks like: // "blk(&Condvar { order: opt_lock, ..*cond })" // with just "blk(cond)". let x = ~RWArc::new(true); let (wp, wc) = comm::stream(); // writer task let xw = (*x).clone(); do task::spawn { do xw.write_cond |state, c| { wc.send(()); // tell downgrader it's ok to go c.wait(); // The core of the test is here: the condvar reacquire path // must involve order_lock, so that it cannot race with a reader // trying to receive the "reader cloud lock hand-off". *state = false; } } wp.recv(); // wait for writer to get in do x.write_downgrade |mut write_mode| { do write_mode.write_cond |state, c| { assert!(*state); // make writer contend in the cond-reacquire path c.signal(); } // make a reader task to trigger the "reader cloud lock" handoff let xr = (*x).clone(); let (rp, rc) = comm::stream(); do task::spawn { rc.send(()); do xr.read |_state| { } } rp.recv(); // wait for reader task to exist let read_mode = x.downgrade(write_mode); do read_mode.read |state| { // if writer mistakenly got in, make sure it mutates state // before we assert on it for 5.times { task::yield(); } // make sure writer didn't get in. assert!(*state); } } } #[test] fn test_rw_write_cond_downgrade_read_race() { // Ideally the above test case would have yield statements in it that // helped to expose the race nearly 100% of the time... but adding // yields in the intuitively-right locations made it even less likely, // and I wasn't sure why :( . This is a mediocre "next best" option. for 8.times { test_rw_write_cond_downgrade_read_race_helper() } } }