/** * Concurrency-enabled mechanisms for sharing mutable and/or immutable state * between tasks. */ import unsafe::{SharedMutableState, shared_mutable_state, clone_shared_mutable_state, get_shared_mutable_state, get_shared_immutable_state}; import sync; import sync::{mutex, rwlock}; export arc, clone, get; export condvar, mutex_arc, rw_arc; /// As sync::condvar, a mechanism for unlock-and-descheduling and signalling. struct condvar { is_mutex: bool; failed: &mut bool; cond: &sync::condvar; } impl &condvar { /// Atomically exit the associated ARC and block until a signal is sent. fn wait() { assert !*self.failed; self.cond.wait(); // This is why we need to wrap sync::condvar. check_poison(self.is_mutex, *self.failed); } /// Wake up a blocked task. Returns false if there was no blocked task. fn signal() -> bool { assert !*self.failed; self.cond.signal() } /// Wake up all blocked tasks. Returns the number of tasks woken. fn broadcast() -> uint { assert !*self.failed; self.cond.broadcast() } } /**************************************************************************** * Immutable ARC ****************************************************************************/ /// An atomically reference counted wrapper for shared immutable state. struct arc { x: SharedMutableState; } /// Create an atomically reference counted wrapper. fn arc(+data: T) -> arc { arc { x: unsafe { shared_mutable_state(data) } } } /** * Access the underlying data in an atomically reference counted * wrapper. */ fn get(rc: &arc) -> &T { unsafe { get_shared_immutable_state(&rc.x) } } /** * Duplicate an atomically reference counted wrapper. * * The resulting two `arc` objects will point to the same underlying data * object. However, one of the `arc` objects can be sent to another task, * allowing them to share the underlying data. */ fn clone(rc: &arc) -> arc { arc { x: unsafe { clone_shared_mutable_state(&rc.x) } } } /**************************************************************************** * Mutex protected ARC (unsafe) ****************************************************************************/ struct mutex_arc_inner { lock: mutex; failed: bool; data: T; } /// An ARC with mutable data protected by a blocking mutex. struct mutex_arc { x: SharedMutableState>; } /// Create a mutex-protected ARC with the supplied data. fn mutex_arc(+user_data: T) -> mutex_arc { let data = mutex_arc_inner { lock: mutex(), failed: false, data: user_data }; mutex_arc { x: unsafe { shared_mutable_state(data) } } } impl &mutex_arc { /// Duplicate a mutex-protected ARC, as arc::clone. fn clone() -> mutex_arc { // NB: Cloning the underlying mutex is not necessary. Its reference // count would be exactly the same as the shared state's. mutex_arc { x: unsafe { clone_shared_mutable_state(&self.x) } } } /** * Access the underlying mutable data with mutual exclusion from other * tasks. The argument closure will be run with the mutex locked; all * other tasks wishing to access the data will block until the closure * finishes running. * * The reason this function is 'unsafe' is because it is possible to * construct a circular reference among multiple ARCs by mutating the * underlying data. This creates potential for deadlock, but worse, this * will guarantee a memory leak of all involved ARCs. Using mutex ARCs * inside of other ARCs is safe in absence of circular references. * * If you wish to nest mutex_arcs, one strategy for ensuring safety at * runtime is to add a "nesting level counter" inside the stored data, and * when traversing the arcs, assert that they monotonically decrease. * * # Failure * * Failing while inside the ARC will unlock the ARC while unwinding, so * that other tasks won't block forever. It will also poison the ARC: * any tasks that subsequently try to access it (including those already * blocked on the mutex) will also fail immediately. */ #[inline(always)] unsafe fn access(blk: fn(x: &mut T) -> U) -> U { let state = unsafe { get_shared_mutable_state(&self.x) }; // Borrowck would complain about this if the function were not already // unsafe. See borrow_rwlock, far below. do (&state.lock).lock { check_poison(true, state.failed); let _z = poison_on_fail(&mut state.failed); blk(&mut state.data) } } /// As access(), but with a condvar, as sync::mutex.lock_cond(). #[inline(always)] unsafe fn access_cond(blk: fn(x: &x/mut T, c: &c/condvar) -> U) -> U { let state = unsafe { get_shared_mutable_state(&self.x) }; do (&state.lock).lock_cond |cond| { check_poison(true, state.failed); let _z = poison_on_fail(&mut state.failed); /* blk(&mut state.data, &condvar { is_mutex: true, failed: &mut state.failed, cond: cond }) */ // XXX: Working around two seeming region bugs here let fref = unsafe { unsafe::reinterpret_cast(&mut state.failed) }; let cvar = condvar { is_mutex: true, failed: fref, cond: cond }; blk(&mut state.data, unsafe { unsafe::reinterpret_cast(&cvar) } ) } } } // Common code for {mutex.access,rwlock.write}{,_cond}. #[inline(always)] fn check_poison(is_mutex: bool, failed: bool) { if failed { if is_mutex { fail ~"Poisoned mutex_arc - another task failed inside!"; } else { fail ~"Poisoned rw_arc - another task failed inside!"; } } } struct poison_on_fail { failed: &mut bool; new(failed: &mut bool) { self.failed = failed; } drop { /* assert !*self.failed; -- might be false in case of cond.wait() */ if task::failing() { *self.failed = true; } } } /**************************************************************************** * R/W lock protected ARC ****************************************************************************/ struct rw_arc_inner { lock: rwlock; failed: bool; data: T; } /** * A dual-mode ARC protected by a reader-writer lock. The data can be accessed * mutably or immutably, and immutably-accessing tasks may run concurrently. * * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested. */ struct rw_arc { x: SharedMutableState>; mut cant_nest: (); } /// Create a reader/writer ARC with the supplied data. fn rw_arc(+user_data: T) -> rw_arc { let data = rw_arc_inner { lock: rwlock(), failed: false, data: user_data }; rw_arc { x: unsafe { shared_mutable_state(data) }, cant_nest: () } } impl &rw_arc { /// Duplicate a rwlock-protected ARC, as arc::clone. fn clone() -> rw_arc { rw_arc { x: unsafe { clone_shared_mutable_state(&self.x) }, cant_nest: () } } /** * Access the underlying data mutably. Locks the rwlock in write mode; * other readers and writers will block. * * # Failure * * Failing while inside the ARC will unlock the ARC while unwinding, so * that other tasks won't block forever. As mutex_arc.access, it will also * poison the ARC, so subsequent readers and writers will both also fail. */ #[inline(always)] fn write(blk: fn(x: &mut T) -> U) -> U { let state = unsafe { get_shared_mutable_state(&self.x) }; do borrow_rwlock(state).write { check_poison(false, state.failed); let _z = poison_on_fail(&mut state.failed); blk(&mut state.data) } } /// As write(), but with a condvar, as sync::rwlock.write_cond(). #[inline(always)] fn write_cond(blk: fn(x: &x/mut T, c: &c/condvar) -> U) -> U { let state = unsafe { get_shared_mutable_state(&self.x) }; do borrow_rwlock(state).write_cond |cond| { check_poison(false, state.failed); let _z = poison_on_fail(&mut state.failed); /* blk(&mut state.data, &condvar { is_mutex: false, failed: &mut state.failed, cond: cond }) */ // XXX: Working around two seeming region bugs here let fref = unsafe { unsafe::reinterpret_cast(&mut state.failed) }; let cvar = condvar { is_mutex: false, failed: fref, cond: cond }; blk(&mut state.data, unsafe { unsafe::reinterpret_cast(&cvar) } ) } } /** * Access the underlying data immutably. May run concurrently with other * reading tasks. * * # Failure * * Failing will unlock the ARC while unwinding. However, unlike all other * access modes, this will not poison the ARC. */ fn read(blk: fn(x: &T) -> U) -> U { let state = unsafe { get_shared_immutable_state(&self.x) }; do (&state.lock).read { check_poison(false, state.failed); blk(&state.data) } } } // Borrowck rightly complains about immutably aliasing the rwlock in order to // lock it. This wraps the unsafety, with the justification that the 'lock' // field is never overwritten; only 'failed' and 'data'. fn borrow_rwlock(state: &mut rw_arc_inner) -> &rwlock { unsafe { unsafe::reinterpret_cast(&state.lock) } } /**************************************************************************** * Tests ****************************************************************************/ #[cfg(test)] mod tests { import comm::*; #[test] fn manually_share_arc() { let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let arc_v = arc::arc(v); let p = port(); let c = chan(p); do task::spawn() { let p = port(); c.send(chan(p)); let arc_v = p.recv(); let v = *arc::get::<~[int]>(&arc_v); assert v[3] == 4; }; let c = p.recv(); c.send(arc::clone(&arc_v)); assert (*arc::get(&arc_v))[2] == 3; log(info, arc_v); } #[test] fn test_mutex_arc_condvar() { let arc = ~mutex_arc(false); let arc2 = ~arc.clone(); let (c,p) = pipes::oneshot(); let (c,p) = (~mut some(c), ~mut some(p)); do task::spawn { // wait until parent gets in pipes::recv_one(option::swap_unwrap(p)); do arc2.access_cond |state, cond| { *state = true; cond.signal(); } } do arc.access_cond |state, cond| { pipes::send_one(option::swap_unwrap(c), ()); assert !*state; while !*state { cond.wait(); } } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_arc_condvar_poison() { let arc = ~mutex_arc(1); let arc2 = ~arc.clone(); let (c,p) = pipes::stream(); do task::spawn_unlinked { let _ = p.recv(); do arc2.access_cond |one, cond| { cond.signal(); assert *one == 0; // Parent should fail when it wakes up. } } do arc.access_cond |one, cond| { c.send(()); while *one == 1 { cond.wait(); } } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_mutex_arc_poison() { let arc = ~mutex_arc(1); let arc2 = ~arc.clone(); do task::try { do arc2.access |one| { assert *one == 2; } }; do arc.access |one| { assert *one == 1; } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_rw_arc_poison_wr() { let arc = ~rw_arc(1); let arc2 = ~arc.clone(); do task::try { do arc2.write |one| { assert *one == 2; } }; do arc.read |one| { assert *one == 1; } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_rw_arc_poison_ww() { let arc = ~rw_arc(1); let arc2 = ~arc.clone(); do task::try { do arc2.write |one| { assert *one == 2; } }; do arc.write |one| { assert *one == 1; } } #[test] #[ignore(cfg(windows))] fn test_rw_arc_no_poison_rr() { let arc = ~rw_arc(1); let arc2 = ~arc.clone(); do task::try { do arc2.read |one| { assert *one == 2; } }; do arc.read |one| { assert *one == 1; } } #[test] #[ignore(cfg(windows))] fn test_rw_arc_no_poison_rw() { let arc = ~rw_arc(1); let arc2 = ~arc.clone(); do task::try { do arc2.read |one| { assert *one == 2; } }; do arc.write |one| { assert *one == 1; } } #[test] fn test_rw_arc() { let arc = ~rw_arc(0); let arc2 = ~arc.clone(); let (c,p) = pipes::stream(); do task::spawn { do arc2.write |num| { for 10.times { let tmp = *num; *num = -1; task::yield(); *num = tmp + 1; } c.send(()); } } // Readers try to catch the writer in the act let mut children = ~[]; for 5.times { let arc3 = ~arc.clone(); do task::task().future_result(|+r| vec::push(children, r)).spawn { do arc3.read |num| { assert *num >= 0; } } } // Wait for children to pass their asserts for vec::each(children) |r| { future::get(r); } // Wait for writer to finish p.recv(); do arc.read |num| { assert *num == 10; } } }