// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! Wrappers for safe, shared, mutable memory between tasks //! //! The wrappers in this module build on the primitives from `sync::raw` to //! provide safe interfaces around using the primitive locks. These primitives //! implement a technique called "poisoning" where when a task failed with a //! held lock, all future attempts to use the lock will fail. //! //! For example, if two tasks are contending on a mutex and one of them fails //! after grabbing the lock, the second task will immediately fail because the //! lock is now poisoned. use std::task; use std::ty::Unsafe; use raw; /**************************************************************************** * Poisoning helpers ****************************************************************************/ struct PoisonOnFail<'a> { flag: &'a mut bool, failed: bool, } impl<'a> PoisonOnFail<'a> { fn check(flag: bool, name: &str) { if flag { fail!("Poisoned {} - another task failed inside!", name); } } fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> { PoisonOnFail::check(*flag, name); PoisonOnFail { flag: flag, failed: task::failing() } } } #[unsafe_destructor] impl<'a> Drop for PoisonOnFail<'a> { fn drop(&mut self) { if !self.failed && task::failing() { *self.flag = true; } } } /**************************************************************************** * Condvar ****************************************************************************/ enum Inner<'a> { InnerMutex(raw::MutexGuard<'a>), InnerRWLock(raw::RWLockWriteGuard<'a>), } impl<'b> Inner<'b> { fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> { match *self { InnerMutex(ref m) => &m.cond, InnerRWLock(ref m) => &m.cond, } } } /// A condition variable, a mechanism for unlock-and-descheduling and /// signaling, for use with the lock types. pub struct Condvar<'a> { name: &'static str, // n.b. Inner must be after PoisonOnFail because we must set the poison flag // *inside* the mutex, and struct fields are destroyed top-to-bottom // (destroy the lock guard last). poison: PoisonOnFail<'a>, inner: Inner<'a>, } impl<'a> Condvar<'a> { /// Atomically exit the associated lock and block until a signal is sent. /// /// wait() is equivalent to wait_on(0). /// /// # Failure /// /// A task which is killed while waiting on a condition variable will wake /// up, fail, and unlock the associated lock as it unwinds. #[inline] pub fn wait(&self) { self.wait_on(0) } /// Atomically exit the associated lock and block on a specified condvar /// until a signal is sent on that same condvar. /// /// The associated lock must have been initialised with an appropriate /// number of condvars. The condvar_id must be between 0 and num_condvars-1 /// or else this call will fail. #[inline] pub fn wait_on(&self, condvar_id: uint) { assert!(!*self.poison.flag); self.inner.cond().wait_on(condvar_id); // This is why we need to wrap sync::condvar. PoisonOnFail::check(*self.poison.flag, self.name); } /// Wake up a blocked task. Returns false if there was no blocked task. #[inline] pub fn signal(&self) -> bool { self.signal_on(0) } /// Wake up a blocked task on a specified condvar (as /// sync::cond.signal_on). Returns false if there was no blocked task. #[inline] pub fn signal_on(&self, condvar_id: uint) -> bool { assert!(!*self.poison.flag); self.inner.cond().signal_on(condvar_id) } /// Wake up all blocked tasks. Returns the number of tasks woken. #[inline] pub fn broadcast(&self) -> uint { self.broadcast_on(0) } /// Wake up all blocked tasks on a specified condvar (as /// sync::cond.broadcast_on). Returns the number of tasks woken. #[inline] pub fn broadcast_on(&self, condvar_id: uint) -> uint { assert!(!*self.poison.flag); self.inner.cond().broadcast_on(condvar_id) } } /**************************************************************************** * Mutex ****************************************************************************/ /// A wrapper type which provides synchronized access to the underlying data, of /// type `T`. A mutex always provides exclusive access, and concurrent requests /// will block while the mutex is already locked. /// /// # Example /// /// ``` /// use sync::{Mutex, Arc}; /// /// let mutex = Arc::new(Mutex::new(1)); /// let mutex2 = mutex.clone(); /// /// spawn(proc() { /// let mut val = mutex2.lock(); /// *val += 1; /// val.cond.signal(); /// }); /// /// let mut value = mutex.lock(); /// while *value != 2 { /// value.cond.wait(); /// } /// ``` pub struct Mutex { lock: raw::Mutex, failed: Unsafe, data: Unsafe, } /// An guard which is created by locking a mutex. Through this guard the /// underlying data can be accessed. pub struct MutexGuard<'a, T> { data: &'a mut T, /// Inner condition variable connected to the locked mutex that this guard /// was created from. This can be used for atomic-unlock-and-deschedule. pub cond: Condvar<'a>, } impl Mutex { /// Creates a new mutex to protect the user-supplied data. pub fn new(user_data: T) -> Mutex { Mutex::new_with_condvars(user_data, 1) } /// Create a new mutex, with a specified number of associated condvars. /// /// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be /// allowed but any operations on the condvar will fail.) pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex { Mutex { lock: raw::Mutex::new_with_condvars(num_condvars), failed: Unsafe::new(false), data: Unsafe::new(user_data), } } /// Access the underlying mutable data with mutual exclusion from other /// tasks. The returned value is an RAII guard which will unlock the mutex /// when dropped. All concurrent tasks attempting to lock the mutex will /// block while the returned value is still alive. /// /// # Failure /// /// Failing while inside the Mutex will unlock the Mutex while unwinding, so /// that other tasks won't block forever. It will also poison the Mutex: /// any tasks that subsequently try to access it (including those already /// blocked on the mutex) will also fail immediately. #[inline] pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> { let guard = self.lock.lock(); // These two accesses are safe because we're guranteed at this point // that we have exclusive access to this mutex. We are indeed able to // promote ourselves from &Mutex to `&mut T` let poison = unsafe { &mut *self.failed.get() }; let data = unsafe { &mut *self.data.get() }; MutexGuard { data: data, cond: Condvar { name: "Mutex", poison: PoisonOnFail::new(poison, "Mutex"), inner: InnerMutex(guard), }, } } } // FIXME(#13042): these should both have T: Send impl<'a, T> Deref for MutexGuard<'a, T> { fn deref<'a>(&'a self) -> &'a T { &*self.data } } impl<'a, T> DerefMut for MutexGuard<'a, T> { fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self.data } } /**************************************************************************** * R/W lock protected lock ****************************************************************************/ /// A dual-mode reader-writer lock. The data can be accessed mutably or /// immutably, and immutably-accessing tasks may run concurrently. /// /// # Example /// /// ``` /// use sync::{RWLock, Arc}; /// /// let lock1 = Arc::new(RWLock::new(1)); /// let lock2 = lock1.clone(); /// /// spawn(proc() { /// let mut val = lock2.write(); /// *val = 3; /// let val = val.downgrade(); /// println!("{}", *val); /// }); /// /// let val = lock1.read(); /// println!("{}", *val); /// ``` pub struct RWLock { lock: raw::RWLock, failed: Unsafe, data: Unsafe, } /// A guard which is created by locking an rwlock in write mode. Through this /// guard the underlying data can be accessed. pub struct RWLockWriteGuard<'a, T> { data: &'a mut T, /// Inner condition variable that can be used to sleep on the write mode of /// this rwlock. pub cond: Condvar<'a>, } /// A guard which is created by locking an rwlock in read mode. Through this /// guard the underlying data can be accessed. pub struct RWLockReadGuard<'a, T> { data: &'a T, guard: raw::RWLockReadGuard<'a>, } impl RWLock { /// Create a reader/writer lock with the supplied data. pub fn new(user_data: T) -> RWLock { RWLock::new_with_condvars(user_data, 1) } /// Create a reader/writer lock with the supplied data and a specified number /// of condvars (as sync::RWLock::new_with_condvars). pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock { RWLock { lock: raw::RWLock::new_with_condvars(num_condvars), failed: Unsafe::new(false), data: Unsafe::new(user_data), } } /// Access the underlying data mutably. Locks the rwlock in write mode; /// other readers and writers will block. /// /// # Failure /// /// Failing while inside the lock will unlock the lock while unwinding, so /// that other tasks won't block forever. As Mutex.lock, it will also poison /// the lock, so subsequent readers and writers will both also fail. #[inline] pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> { let guard = self.lock.write(); // These two accesses are safe because we're guranteed at this point // that we have exclusive access to this rwlock. We are indeed able to // promote ourselves from &RWLock to `&mut T` let poison = unsafe { &mut *self.failed.get() }; let data = unsafe { &mut *self.data.get() }; RWLockWriteGuard { data: data, cond: Condvar { name: "RWLock", poison: PoisonOnFail::new(poison, "RWLock"), inner: InnerRWLock(guard), }, } } /// Access the underlying data immutably. May run concurrently with other /// reading tasks. /// /// # Failure /// /// Failing will unlock the lock while unwinding. However, unlike all other /// access modes, this will not poison the lock. pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> { let guard = self.lock.read(); PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock"); RWLockReadGuard { guard: guard, data: unsafe { &*self.data.get() }, } } } impl<'a, T: Send + Share> RWLockWriteGuard<'a, T> { /// Consumes this write lock token, returning a new read lock token. /// /// This will allow pending readers to come into the lock. pub fn downgrade(self) -> RWLockReadGuard<'a, T> { let RWLockWriteGuard { data, cond } = self; // convert the data to read-only explicitly let data = &*data; let guard = match cond.inner { InnerMutex(..) => unreachable!(), InnerRWLock(guard) => guard.downgrade() }; RWLockReadGuard { guard: guard, data: data } } } // FIXME(#13042): these should all have T: Send + Share impl<'a, T> Deref for RWLockReadGuard<'a, T> { fn deref<'a>(&'a self) -> &'a T { self.data } } impl<'a, T> Deref for RWLockWriteGuard<'a, T> { fn deref<'a>(&'a self) -> &'a T { &*self.data } } impl<'a, T> DerefMut for RWLockWriteGuard<'a, T> { fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self.data } } /**************************************************************************** * Barrier ****************************************************************************/ /// A barrier enables multiple tasks to synchronize the beginning /// of some computation. /// /// ```rust /// use sync::{Arc, Barrier}; /// /// let barrier = Arc::new(Barrier::new(10)); /// for _ in range(0, 10) { /// let c = barrier.clone(); /// // The same messages will be printed together. /// // You will NOT see any interleaving. /// spawn(proc() { /// println!("before wait"); /// c.wait(); /// println!("after wait"); /// }); /// } /// ``` pub struct Barrier { lock: Mutex, num_tasks: uint, } // The inner state of a double barrier struct BarrierState { count: uint, generation_id: uint, } impl Barrier { /// Create a new barrier that can block a given number of tasks. pub fn new(num_tasks: uint) -> Barrier { Barrier { lock: Mutex::new(BarrierState { count: 0, generation_id: 0, }), num_tasks: num_tasks, } } /// Block the current task until a certain number of tasks is waiting. pub fn wait(&self) { let mut lock = self.lock.lock(); let local_gen = lock.generation_id; lock.count += 1; if lock.count < self.num_tasks { // We need a while loop to guard against spurious wakeups. // http://en.wikipedia.org/wiki/Spurious_wakeup while local_gen == lock.generation_id && lock.count < self.num_tasks { lock.cond.wait(); } } else { lock.count = 0; lock.generation_id += 1; lock.cond.broadcast(); } } } /**************************************************************************** * Tests ****************************************************************************/ #[cfg(test)] mod tests { use std::comm::Empty; use std::task; use arc::Arc; use super::{Mutex, Barrier, RWLock}; #[test] fn test_mutex_arc_condvar() { let arc = Arc::new(Mutex::new(false)); let arc2 = arc.clone(); let (tx, rx) = channel(); task::spawn(proc() { // wait until parent gets in rx.recv(); let mut lock = arc2.lock(); *lock = true; lock.cond.signal(); }); let lock = arc.lock(); tx.send(()); assert!(!*lock); while !*lock { lock.cond.wait(); } } #[test] #[should_fail] fn test_arc_condvar_poison() { let arc = Arc::new(Mutex::new(1)); let arc2 = arc.clone(); let (tx, rx) = channel(); spawn(proc() { rx.recv(); let lock = arc2.lock(); lock.cond.signal(); // Parent should fail when it wakes up. fail!(); }); let lock = arc.lock(); tx.send(()); while *lock == 1 { lock.cond.wait(); } } #[test] #[should_fail] fn test_mutex_arc_poison() { let arc = Arc::new(Mutex::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.lock(); assert_eq!(*lock, 2); }); let lock = arc.lock(); assert_eq!(*lock, 1); } #[test] fn test_mutex_arc_nested() { // Tests nested mutexes and access // to underlaying data. let arc = Arc::new(Mutex::new(1)); let arc2 = Arc::new(Mutex::new(arc)); task::spawn(proc() { let lock = arc2.lock(); let lock2 = lock.deref().lock(); assert_eq!(*lock2, 1); }); } #[test] fn test_mutex_arc_access_in_unwind() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); let _ = task::try::<()>(proc() { struct Unwinder { i: Arc>, } impl Drop for Unwinder { fn drop(&mut self) { let mut lock = self.i.lock(); *lock += 1; } } let _u = Unwinder { i: arc2 }; fail!(); }); let lock = arc.lock(); assert_eq!(*lock, 2); } #[test] #[should_fail] fn test_rw_arc_poison_wr() { let arc = Arc::new(RWLock::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.write(); assert_eq!(*lock, 2); }); let lock = arc.read(); assert_eq!(*lock, 1); } #[test] #[should_fail] fn test_rw_arc_poison_ww() { let arc = Arc::new(RWLock::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.write(); assert_eq!(*lock, 2); }); let lock = arc.write(); assert_eq!(*lock, 1); } #[test] fn test_rw_arc_no_poison_rr() { let arc = Arc::new(RWLock::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.read(); assert_eq!(*lock, 2); }); let lock = arc.read(); assert_eq!(*lock, 1); } #[test] fn test_rw_arc_no_poison_rw() { let arc = Arc::new(RWLock::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.read(); assert_eq!(*lock, 2); }); let lock = arc.write(); assert_eq!(*lock, 1); } #[test] fn test_rw_arc_no_poison_dr() { let arc = Arc::new(RWLock::new(1)); let arc2 = arc.clone(); let _ = task::try(proc() { let lock = arc2.write().downgrade(); assert_eq!(*lock, 2); }); let lock = arc.write(); assert_eq!(*lock, 1); } #[test] fn test_rw_arc() { let arc = Arc::new(RWLock::new(0)); let arc2 = arc.clone(); let (tx, rx) = channel(); task::spawn(proc() { let mut lock = arc2.write(); for _ in range(0, 10) { let tmp = *lock; *lock = -1; task::deschedule(); *lock = tmp + 1; } tx.send(()); }); // Readers try to catch the writer in the act let mut children = Vec::new(); for _ in range(0, 5) { let arc3 = arc.clone(); let mut builder = task::task(); children.push(builder.future_result()); builder.spawn(proc() { let lock = arc3.read(); assert!(*lock >= 0); }); } // Wait for children to pass their asserts for r in children.mut_iter() { assert!(r.recv().is_ok()); } // Wait for writer to finish rx.recv(); let lock = arc.read(); assert_eq!(*lock, 10); } #[test] fn test_rw_arc_access_in_unwind() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); let _ = task::try::<()>(proc() { struct Unwinder { i: Arc>, } impl Drop for Unwinder { fn drop(&mut self) { let mut lock = self.i.write(); *lock += 1; } } let _u = Unwinder { i: arc2 }; fail!(); }); let lock = arc.read(); assert_eq!(*lock, 2); } #[test] fn test_rw_downgrade() { // (1) A downgrader gets in write mode and does cond.wait. // (2) A writer gets in write mode, sets state to 42, and does signal. // (3) Downgrader wakes, sets state to 31337. // (4) tells writer and all other readers to contend as it downgrades. // (5) Writer attempts to set state back to 42, while downgraded task // and all reader tasks assert that it's 31337. let arc = Arc::new(RWLock::new(0)); // Reader tasks let mut reader_convos = Vec::new(); for _ in range(0, 10) { let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); reader_convos.push((tx1, rx2)); let arcn = arc.clone(); task::spawn(proc() { rx1.recv(); // wait for downgrader to give go-ahead let lock = arcn.read(); assert_eq!(*lock, 31337); tx2.send(()); }); } // Writer task let arc2 = arc.clone(); let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); task::spawn(proc() { rx1.recv(); { let mut lock = arc2.write(); assert_eq!(*lock, 0); *lock = 42; lock.cond.signal(); } rx1.recv(); { let mut lock = arc2.write(); // This shouldn't happen until after the downgrade read // section, and all other readers, finish. assert_eq!(*lock, 31337); *lock = 42; } tx2.send(()); }); // Downgrader (us) let mut lock = arc.write(); tx1.send(()); // send to another writer who will wake us up while *lock == 0 { lock.cond.wait(); } assert_eq!(*lock, 42); *lock = 31337; // send to other readers for &(ref mut rc, _) in reader_convos.mut_iter() { rc.send(()) } let lock = lock.downgrade(); // complete handshake with other readers for &(_, ref mut rp) in reader_convos.mut_iter() { rp.recv() } tx1.send(()); // tell writer to try again assert_eq!(*lock, 31337); drop(lock); rx2.recv(); // complete handshake with writer } #[cfg(test)] fn test_rw_write_cond_downgrade_read_race_helper() { // Tests that when a downgrader hands off the "reader cloud" lock // because of a contending reader, a writer can't race to get it // instead, which would result in readers_and_writers. This tests // the raw module rather than this one, but it's here because an // rwarc gives us extra shared state to help check for the race. let x = Arc::new(RWLock::new(true)); let (tx, rx) = channel(); // writer task let xw = x.clone(); task::spawn(proc() { let mut lock = xw.write(); tx.send(()); // tell downgrader it's ok to go lock.cond.wait(); // The core of the test is here: the condvar reacquire path // must involve order_lock, so that it cannot race with a reader // trying to receive the "reader cloud lock hand-off". *lock = false; }); rx.recv(); // wait for writer to get in let lock = x.write(); assert!(*lock); // make writer contend in the cond-reacquire path lock.cond.signal(); // make a reader task to trigger the "reader cloud lock" handoff let xr = x.clone(); let (tx, rx) = channel(); task::spawn(proc() { tx.send(()); drop(xr.read()); }); rx.recv(); // wait for reader task to exist let lock = lock.downgrade(); // if writer mistakenly got in, make sure it mutates state // before we assert on it for _ in range(0, 5) { task::deschedule(); } // make sure writer didn't get in. assert!(*lock); } #[test] fn test_rw_write_cond_downgrade_read_race() { // Ideally the above test case would have deschedule statements in it // that helped to expose the race nearly 100% of the time... but adding // deschedules in the intuitively-right locations made it even less // likely, and I wasn't sure why :( . This is a mediocre "next best" // option. for _ in range(0, 8) { test_rw_write_cond_downgrade_read_race_helper(); } } /************************************************************************ * Barrier tests ************************************************************************/ #[test] fn test_barrier() { let barrier = Arc::new(Barrier::new(10)); let (tx, rx) = channel(); for _ in range(0, 9) { let c = barrier.clone(); let tx = tx.clone(); spawn(proc() { c.wait(); tx.send(true); }); } // At this point, all spawned tasks should be blocked, // so we shouldn't get anything from the port assert!(match rx.try_recv() { Empty => true, _ => false, }); barrier.wait(); // Now, the barrier is cleared and we should get data. for _ in range(0, 9) { rx.recv(); } } }