diff --git a/src/libsync/sync/mod.rs b/src/libsync/raw.rs similarity index 50% rename from src/libsync/sync/mod.rs rename to src/libsync/raw.rs index 2217706d4f0..36f0748fe71 100644 --- a/src/libsync/sync/mod.rs +++ b/src/libsync/raw.rs @@ -8,42 +8,34 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#[allow(missing_doc)]; - -/** - * The concurrency primitives you know and love. - * - * Maybe once we have a "core exports x only to std" mechanism, these can be - * in std. - */ +//! Raw concurrency primitives you know and love. +//! +//! These primitives are not recommended for general use, but are provided for +//! flavorful use-cases. It is recommended to use the types at the top of the +//! `sync` crate which wrap values directly and provide safer abstractions for +//! containing data. use std::cast; use std::comm; use std::kinds::marker; use std::mem::replace; -use std::sync::arc::UnsafeArc; use std::sync::atomics; use std::unstable::finally::Finally; -use arc::MutexArc; +use mutex; /**************************************************************************** * Internals ****************************************************************************/ -pub mod mutex; -pub mod one; -mod mpsc_intrusive; - // Each waiting task receives on one of these. -#[doc(hidden)] type WaitEnd = Receiver<()>; -#[doc(hidden)] type SignalEnd = Sender<()>; // A doubly-ended queue of waiting tasks. -#[doc(hidden)] -struct WaitQueue { head: Receiver<SignalEnd>, - tail: Sender<SignalEnd> } +struct WaitQueue { + head: Receiver<SignalEnd>, + tail: Sender<SignalEnd>, +} impl WaitQueue { fn new() -> WaitQueue { @@ -90,33 +82,49 @@ impl WaitQueue { } // The building-block used to make semaphores, mutexes, and rwlocks. -struct SemInner<Q> { +struct Sem<Q> { lock: mutex::Mutex, - count: int, - waiters: WaitQueue, - // Can be either unit or another waitqueue. Some sems shouldn't come with - // a condition variable attached, others should. - blocked: Q + // n.b, we need Sem to be `Share`, but the WaitQueue type is not send/share + // (for good reason). We have an internal invariant on this semaphore, + // however, that the queue is never accessed outside of a locked + // context. For this reason, we shove these behind a pointer which will + // be inferred to be `Share`. + // + // FIXME: this requires an extra allocation, which is bad. + inner: *() } -struct Sem<Q>(UnsafeArc<SemInner<Q>>); +struct SemInner<Q> { + count: int, + waiters: WaitQueue, + // Can be either unit or another waitqueue. Some sems shouldn't come with + // a condition variable attached, others should. + blocked: Q, +} -#[doc(hidden)] -impl<Q:Send> Sem<Q> { +#[must_use] +struct SemGuard<'a, Q> { + sem: &'a Sem<Q>, +} + +impl<Q: Send> Sem<Q> { fn new(count: int, q: Q) -> Sem<Q> { - Sem(UnsafeArc::new(SemInner { - count: count, - waiters: WaitQueue::new(), - blocked: q, + let inner = unsafe { + cast::transmute(~SemInner { + waiters: WaitQueue::new(), + count: count, + blocked: q, + }) + }; + Sem { lock: mutex::Mutex::new(), - })) + inner: inner, + } } unsafe fn with(&self, f: |&mut SemInner<Q>|) { - let Sem(ref arc) = *self; - let state = arc.get(); - let _g = (*state).lock.lock(); - f(cast::transmute(state)); + let _g = self.lock.lock(); + f(&mut *(self.inner as *mut SemInner<Q>)) } pub fn acquire(&self) { @@ -130,7 +138,8 @@ impl<Q:Send> Sem<Q> { waiter_nobe = Some(state.waiters.wait_end()); } }); - // Uncomment if you wish to test for sem races. Not valgrind-friendly. + // Uncomment if you wish to test for sem races. Not + // valgrind-friendly. /* for _ in range(0, 1000) { task::deschedule(); } */ // Need to wait outside the exclusive. if waiter_nobe.is_some() { @@ -150,24 +159,42 @@ impl<Q:Send> Sem<Q> { } } - pub fn access<U>(&self, blk: || -> U) -> U { - (|| { - self.acquire(); - blk() - }).finally(|| { - self.release(); - }) + pub fn access<'a>(&'a self) -> SemGuard<'a, Q> { + self.acquire(); + SemGuard { sem: self } } } -#[doc(hidden)] -impl Sem<Vec<WaitQueue> > { - fn new_and_signal(count: int, num_condvars: uint) - -> Sem<Vec<WaitQueue> > { +#[unsafe_destructor] +impl<Q: Send> Drop for Sem<Q> { + fn drop(&mut self) { + let _waiters: ~SemInner<Q> = unsafe { cast::transmute(self.inner) }; + self.inner = 0 as *(); + } +} + +#[unsafe_destructor] +impl<'a, Q: Send> Drop for SemGuard<'a, Q> { + fn drop(&mut self) { + self.sem.release(); + } +} + +impl Sem<Vec<WaitQueue>> { + fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> { let mut queues = Vec::new(); for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } Sem::new(count, queues) } + + // The only other places that condvars get built are rwlock.write_cond() + // and rwlock_write_mode. + pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> { + SemCondGuard { + guard: self.access(), + cvar: Condvar { sem: self, order: Nothing, nopod: marker::NoPod }, + } + } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum @@ -195,27 +222,23 @@ pub struct Condvar<'a> { } impl<'a> Condvar<'a> { - /** - * Atomically drop the associated lock, and block until a signal is sent. - * - * # Failure - * A task which is killed (i.e., by linked failure with another task) - * while waiting on a condition variable will wake up, fail, and unlock - * the associated lock as it unwinds. - */ + /// Atomically drop the associated lock, and block until a signal is sent. + /// + /// # Failure + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, fail, and unlock the associated lock as it unwinds. pub fn wait(&self) { self.wait_on(0) } - /** - * As wait(), but can specify which of multiple condition variables to - * wait on. Only a signal_on() or broadcast_on() with the same condvar_id - * will wake this thread. - * - * The associated lock must have been initialised with an appropriate - * number of condvars. The condvar_id must be between 0 and num_condvars-1 - * or else this call will fail. - * - * wait() is equivalent to wait_on(0). - */ + /// As wait(), but can specify which of multiple condition variables to + /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id + /// will wake this thread. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will fail. + /// + /// wait() is equivalent to wait_on(0). pub fn wait_on(&self, condvar_id: uint) { let mut wait_end = None; let mut out_of_bounds = None; @@ -248,7 +271,10 @@ impl<'a> Condvar<'a> { }).finally(|| { // Reacquire the condvar. match self.order { - Just(lock) => lock.access(|| self.sem.acquire()), + Just(lock) => { + let _g = lock.access(); + self.sem.acquire(); + } Nothing => self.sem.acquire(), } }) @@ -309,7 +335,6 @@ impl<'a> Condvar<'a> { // Checks whether a condvar ID was out of bounds, and fails if so, or does // something else next on success. #[inline] -#[doc(hidden)] fn check_cvar_bounds<U>( out_of_bounds: Option<uint>, id: uint, @@ -325,19 +350,10 @@ fn check_cvar_bounds<U>( } } -#[doc(hidden)] -impl Sem<Vec<WaitQueue> > { - // The only other places that condvars get built are rwlock.write_cond() - // and rwlock_write_mode. - pub fn access_cond<U>(&self, blk: |c: &Condvar| -> U) -> U { - self.access(|| { - blk(&Condvar { - sem: self, - order: Nothing, - nopod: marker::NoPod - }) - }) - } +#[must_use] +struct SemCondGuard<'a> { + guard: SemGuard<'a, Vec<WaitQueue>>, + cvar: Condvar<'a>, } /**************************************************************************** @@ -345,15 +361,15 @@ impl Sem<Vec<WaitQueue> > { ****************************************************************************/ /// A counting, blocking, bounded-waiting semaphore. -pub struct Semaphore { priv sem: Sem<()> } +pub struct Semaphore { + priv sem: Sem<()>, +} - -impl Clone for Semaphore { - /// Create a new handle to the semaphore. - fn clone(&self) -> Semaphore { - let Sem(ref lock) = self.sem; - Semaphore { sem: Sem(lock.clone()) } - } +/// An RAII guard used to represent an acquired resource to a semaphore. When +/// dropped, this value will release the resource back to the semaphore. +#[must_use] +pub struct SemaphoreGuard<'a> { + priv guard: SemGuard<'a, ()>, } impl Semaphore { @@ -362,66 +378,64 @@ impl Semaphore { Semaphore { sem: Sem::new(count, ()) } } - /** - * Acquire a resource represented by the semaphore. Blocks if necessary - * until resource(s) become available. - */ - pub fn acquire(&self) { (&self.sem).acquire() } + /// Acquire a resource represented by the semaphore. Blocks if necessary + /// until resource(s) become available. + pub fn acquire(&self) { self.sem.acquire() } - /** - * Release a held resource represented by the semaphore. Wakes a blocked - * contending task, if any exist. Won't block the caller. - */ - pub fn release(&self) { (&self.sem).release() } + /// Release a held resource represented by the semaphore. Wakes a blocked + /// contending task, if any exist. Won't block the caller. + pub fn release(&self) { self.sem.release() } - /// Run a function with ownership of one of the semaphore's resources. - pub fn access<U>(&self, blk: || -> U) -> U { (&self.sem).access(blk) } + /// Acquire a resource of this semaphore, returning an RAII guard which will + /// release the resource when dropped. + pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> { + SemaphoreGuard { guard: self.sem.access() } + } } /**************************************************************************** * Mutexes ****************************************************************************/ -/** - * A blocking, bounded-waiting, mutual exclusion lock with an associated - * FIFO condition variable. - * - * # Failure - * A task which fails while holding a mutex will unlock the mutex as it - * unwinds. - */ +/// A blocking, bounded-waiting, mutual exclusion lock with an associated +/// FIFO condition variable. +/// +/// # Failure +/// A task which fails while holding a mutex will unlock the mutex as it +/// unwinds. +pub struct Mutex { + priv sem: Sem<Vec<WaitQueue>>, +} -pub struct Mutex { priv sem: Sem<Vec<WaitQueue> > } -impl Clone for Mutex { - /// Create a new handle to the mutex. - fn clone(&self) -> Mutex { - let Sem(ref queue) = self.sem; - Mutex { sem: Sem(queue.clone()) } } +/// An RAII structure which is used to gain access to a mutex's condition +/// variable. Additionally, when a value of this type is dropped, the +/// corresponding mutex is also unlocked. +#[must_use] +pub struct MutexGuard<'a> { + priv guard: SemGuard<'a, Vec<WaitQueue>>, + /// Inner condition variable which is connected to the outer mutex, and can + /// be used for atomic-unlock-and-deschedule. + cond: Condvar<'a>, } impl Mutex { /// Create a new mutex, with one associated condvar. pub fn new() -> Mutex { Mutex::new_with_condvars(1) } - /** - * Create a new mutex, with a specified number of associated condvars. This - * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between - * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but - * any operations on the condvar will fail.) - */ + /// Create a new mutex, with a specified number of associated condvars. This + /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will fail.) pub fn new_with_condvars(num_condvars: uint) -> Mutex { Mutex { sem: Sem::new_and_signal(1, num_condvars) } } - - /// Run a function with ownership of the mutex. - pub fn lock<U>(&self, blk: || -> U) -> U { - (&self.sem).access(blk) - } - - /// Run a function with ownership of the mutex and a handle to a condvar. - pub fn lock_cond<U>(&self, blk: |c: &Condvar| -> U) -> U { - (&self.sem).access_cond(blk) + /// Acquires ownership of this mutex, returning an RAII guard which will + /// unlock the mutex when dropped. The associated condition variable can + /// also be accessed through the returned guard. + pub fn lock<'a>(&'a self) -> MutexGuard<'a> { + let SemCondGuard { guard, cvar } = self.sem.access_cond(); + MutexGuard { guard: guard, cond: cvar } } } @@ -431,118 +445,95 @@ impl Mutex { // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem -#[doc(hidden)] -struct RWLockInner { - // You might ask, "Why don't you need to use an atomic for the mode flag?" - // This flag affects the behaviour of readers (for plain readers, they - // assert on it; for downgraders, they use it to decide which mode to - // unlock for). Consider that the flag is only unset when the very last - // reader exits; therefore, it can never be unset during a reader/reader - // (or reader/downgrader) race. - // By the way, if we didn't care about the assert in the read unlock path, - // we could instead store the mode flag in write_downgrade's stack frame, - // and have the downgrade tokens store a reference to it. - read_mode: bool, +/// A blocking, no-starvation, reader-writer lock with an associated condvar. +/// +/// # Failure +/// +/// A task which fails while holding an rwlock will unlock the rwlock as it +/// unwinds. +pub struct RWLock { + priv order_lock: Semaphore, + priv access_lock: Sem<Vec<WaitQueue>>, + // The only way the count flag is ever accessed is with xadd. Since it is // a read-modify-write operation, multiple xadds on different cores will // always be consistent with respect to each other, so a monotonic/relaxed // consistency ordering suffices (i.e., no extra barriers are needed). + // // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use // acquire/release orderings superfluously. Change these someday. - read_count: atomics::AtomicUint, + priv read_count: atomics::AtomicUint, } -/** - * A blocking, no-starvation, reader-writer lock with an associated condvar. - * - * # Failure - * A task which fails while holding an rwlock will unlock the rwlock as it - * unwinds. - */ -pub struct RWLock { - priv order_lock: Semaphore, - priv access_lock: Sem<Vec<WaitQueue> >, - priv state: UnsafeArc<RWLockInner>, +/// An RAII helper which is created by acquiring a read lock on an RWLock. When +/// dropped, this will unlock the RWLock. +#[must_use] +pub struct RWLockReadGuard<'a> { + priv lock: &'a RWLock, +} + +/// An RAII helper which is created by acquiring a write lock on an RWLock. When +/// dropped, this will unlock the RWLock. +/// +/// A value of this type can also be consumed to downgrade to a read-only lock. +#[must_use] +pub struct RWLockWriteGuard<'a> { + priv lock: &'a RWLock, + /// Inner condition variable that is connected to the write-mode of the + /// outer rwlock. + cond: Condvar<'a>, } impl RWLock { /// Create a new rwlock, with one associated condvar. pub fn new() -> RWLock { RWLock::new_with_condvars(1) } - /** - * Create a new rwlock, with a specified number of associated condvars. - * Similar to mutex_with_condvars. - */ + /// Create a new rwlock, with a specified number of associated condvars. + /// Similar to mutex_with_condvars. pub fn new_with_condvars(num_condvars: uint) -> RWLock { - let state = UnsafeArc::new(RWLockInner { - read_mode: false, + RWLock { + order_lock: Semaphore::new(1), + access_lock: Sem::new_and_signal(1, num_condvars), read_count: atomics::AtomicUint::new(0), - }); - RWLock { order_lock: Semaphore::new(1), - access_lock: Sem::new_and_signal(1, num_condvars), - state: state, } - } - - /// Create a new handle to the rwlock. - pub fn clone(&self) -> RWLock { - let Sem(ref access_lock_queue) = self.access_lock; - RWLock { order_lock: (&(self.order_lock)).clone(), - access_lock: Sem(access_lock_queue.clone()), - state: self.state.clone() } - } - - /** - * Run a function with the rwlock in read mode. Calls to 'read' from other - * tasks may run concurrently with this one. - */ - pub fn read<U>(&self, blk: || -> U) -> U { - unsafe { - (&self.order_lock).access(|| { - let state = &mut *self.state.get(); - let old_count = state.read_count.fetch_add(1, atomics::Acquire); - if old_count == 0 { - (&self.access_lock).acquire(); - state.read_mode = true; - } - }); - (|| { - blk() - }).finally(|| { - let state = &mut *self.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.access_lock).release(); - } - }) } } - /** - * Run a function with the rwlock in write mode. No calls to 'read' or - * 'write' from other tasks will run concurrently with this one. - */ - pub fn write<U>(&self, blk: || -> U) -> U { - (&self.order_lock).acquire(); - (&self.access_lock).access(|| { - (&self.order_lock).release(); - blk() - }) + /// Acquires a read-lock, returning an RAII guard that will unlock the lock + /// when dropped. Calls to 'read' from other tasks may run concurrently with + /// this one. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> { + let _guard = self.order_lock.access(); + let old_count = self.read_count.fetch_add(1, atomics::Acquire); + if old_count == 0 { + self.access_lock.acquire(); + } + RWLockReadGuard { lock: self } } - /** - * As write(), but also with a handle to a condvar. Waiting on this - * condvar will allow readers and writers alike to take the rwlock before - * the waiting task is signalled. (Note: a writer that waited and then - * was signalled might reacquire the lock before other waiting writers.) - */ - pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U { + /// Acquire a write-lock, returning an RAII guard that will unlock the lock + /// when dropped. No calls to 'read' or 'write' from other tasks will run + /// concurrently with this one. + /// + /// You can also downgrade a write to a read by calling the `downgrade` + /// method on the returned guard. Additionally, the guard will contain a + /// `Condvar` attached to this lock. + /// + /// # Example + /// + /// ```rust + /// use sync::raw::RWLock; + /// + /// let lock = RWLock::new(); + /// let write = lock.write(); + /// // ... exclusive access ... + /// let read = write.downgrade(); + /// // ... shared access ... + /// drop(read); + /// ``` + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> { + let _g = self.order_lock.access(); + self.access_lock.acquire(); + // It's important to thread our order lock into the condvar, so that // when a cond.wait() wakes up, it uses it while reacquiring the // access lock. If we permitted a waking-up writer to "cut in line", @@ -569,187 +560,59 @@ impl RWLock { // which can't happen until T2 finishes the downgrade-read entirely. // The astute reader will also note that making waking writers use the // order_lock is better for not starving readers. - (&self.order_lock).acquire(); - (&self.access_lock).access_cond(|cond| { - (&self.order_lock).release(); - let opt_lock = Just(&self.order_lock); - blk(&Condvar { sem: cond.sem, order: opt_lock, - nopod: marker::NoPod }) - }) - } - - /** - * As write(), but with the ability to atomically 'downgrade' the lock; - * i.e., to become a reader without letting other writers get the lock in - * the meantime (such as unlocking and then re-locking as a reader would - * do). The block takes a "write mode token" argument, which can be - * transformed into a "read mode token" by calling downgrade(). Example: - * - * # Example - * - * ```rust - * use sync::RWLock; - * - * let lock = RWLock::new(); - * lock.write_downgrade(|mut write_token| { - * write_token.write_cond(|condvar| { - * // ... exclusive access ... - * }); - * let read_token = lock.downgrade(write_token); - * read_token.read(|| { - * // ... shared access ... - * }) - * }) - * ``` - */ - pub fn write_downgrade<U>(&self, blk: |v: RWLockWriteMode| -> U) -> U { - // Implementation slightly different from the slicker 'write's above. - // The exit path is conditional on whether the caller downgrades. - (&self.order_lock).acquire(); - (&self.access_lock).acquire(); - (&self.order_lock).release(); - (|| { - blk(RWLockWriteMode { lock: self, nopod: marker::NoPod }) - }).finally(|| { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = unsafe { &mut *self.state.get() }; - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.access_lock).release(); - } - }) - } - - /// To be called inside of the write_downgrade block. - pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>) - -> RWLockReadMode<'a> { - if !((self as *RWLock) == (token.lock as *RWLock)) { - fail!("Can't downgrade() with a different rwlock's write_mode!"); - } - unsafe { - let state = &mut *self.state.get(); - assert!(!state.read_mode); - state.read_mode = true; - // If a reader attempts to enter at this point, both the - // downgrader and reader will set the mode flag. This is fine. - let old_count = state.read_count.fetch_add(1, atomics::Release); - // If another reader was already blocking, we need to hand-off - // the "reader cloud" access lock to them. - if old_count != 0 { - // Guaranteed not to let another writer in, because - // another reader was holding the order_lock. Hence they - // must be the one to get the access_lock (because all - // access_locks are acquired with order_lock held). See - // the comment in write_cond for more justification. - (&self.access_lock).release(); + RWLockWriteGuard { + lock: self, + cond: Condvar { + sem: &self.access_lock, + order: Just(&self.order_lock), + nopod: marker::NoPod, } } - RWLockReadMode { lock: token.lock, nopod: marker::NoPod } } } -/// The "write permission" token used for rwlock.write_downgrade(). +impl<'a> RWLockWriteGuard<'a> { + /// Consumes this write lock and converts it into a read lock. + pub fn downgrade(self) -> RWLockReadGuard<'a> { + let lock = self.lock; + // Don't run the destructor of the write guard, we're in charge of + // things from now on + unsafe { cast::forget(self) } -pub struct RWLockWriteMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod } -/// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'a> { priv lock: &'a RWLock, - priv nopod: marker::NoPod } - -impl<'a> RWLockWriteMode<'a> { - /// Access the pre-downgrade rwlock in write mode. - pub fn write<U>(&self, blk: || -> U) -> U { blk() } - /// Access the pre-downgrade rwlock in write mode with a condvar. - pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U { - // Need to make the condvar use the order lock when reacquiring the - // access lock. See comment in RWLock::write_cond for why. - blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), - nopod: marker::NoPod }) - } -} - -impl<'a> RWLockReadMode<'a> { - /// Access the post-downgrade rwlock in read mode. - pub fn read<U>(&self, blk: || -> U) -> U { blk() } -} - -/// A barrier enables multiple tasks to synchronize the beginning -/// of some computation. -/// -/// ```rust -/// use sync::Barrier; -/// -/// let barrier = Barrier::new(10); -/// for _ in range(0, 10) { -/// let c = barrier.clone(); -/// // The same messages will be printed together. -/// // You will NOT see any interleaving. -/// spawn(proc() { -/// println!("before wait"); -/// c.wait(); -/// println!("after wait"); -/// }); -/// } -/// ``` -#[deriving(Clone)] -pub struct Barrier { - priv arc: MutexArc<BarrierState>, - priv num_tasks: uint, -} - -// The inner state of a double barrier -struct BarrierState { - count: uint, - generation_id: uint, -} - -impl Barrier { - /// Create a new barrier that can block a given number of tasks. - pub fn new(num_tasks: uint) -> Barrier { - Barrier { - arc: MutexArc::new(BarrierState { - count: 0, - generation_id: 0, - }), - num_tasks: num_tasks, + let old_count = lock.read_count.fetch_add(1, atomics::Release); + // If another reader was already blocking, we need to hand-off + // the "reader cloud" access lock to them. + if old_count != 0 { + // Guaranteed not to let another writer in, because + // another reader was holding the order_lock. Hence they + // must be the one to get the access_lock (because all + // access_locks are acquired with order_lock held). See + // the comment in write_cond for more justification. + lock.access_lock.release(); } + RWLockReadGuard { lock: lock } } +} - /// Block the current task until a certain number of tasks is waiting. - pub fn wait(&self) { - self.arc.access_cond(|state, cond| { - let local_gen = state.generation_id; - state.count += 1; - if state.count < self.num_tasks { - // We need a while loop to guard against spurious wakeups. - // http://en.wikipedia.org/wiki/Spurious_wakeup - while local_gen == state.generation_id && state.count < self.num_tasks { - cond.wait(); - } - } else { - state.count = 0; - state.generation_id += 1; - cond.broadcast(); - } - }); +#[unsafe_destructor] +impl<'a> Drop for RWLockWriteGuard<'a> { + fn drop(&mut self) { + self.lock.access_lock.release(); + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockReadGuard<'a> { + fn drop(&mut self) { + let old_count = self.lock.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + self.lock.access_lock.release(); + } } } @@ -759,12 +622,12 @@ impl Barrier { #[cfg(test)] mod tests { - use sync::{Semaphore, Mutex, RWLock, Barrier, Condvar}; + use arc::Arc; + use super::{Semaphore, Mutex, RWLock, Condvar}; use std::cast; use std::result; use std::task; - use std::comm::Empty; /************************************************************************ * Semaphore tests @@ -779,26 +642,24 @@ mod tests { #[test] fn test_sem_basic() { let s = Semaphore::new(1); - s.access(|| { }) + let _g = s.access(); } #[test] fn test_sem_as_mutex() { - let s = Semaphore::new(1); + let s = Arc::new(Semaphore::new(1)); let s2 = s.clone(); task::spawn(proc() { - s2.access(|| { - for _ in range(0, 5) { task::deschedule(); } - }) - }); - s.access(|| { + let _g = s2.access(); for _ in range(0, 5) { task::deschedule(); } - }) + }); + let _g = s.access(); + for _ in range(0, 5) { task::deschedule(); } } #[test] fn test_sem_as_cvar() { /* Child waits and parent signals */ let (tx, rx) = channel(); - let s = Semaphore::new(0); + let s = Arc::new(Semaphore::new(0)); let s2 = s.clone(); task::spawn(proc() { s2.acquire(); @@ -810,7 +671,7 @@ mod tests { /* Parent waits and child signals */ let (tx, rx) = channel(); - let s = Semaphore::new(0); + let s = Arc::new(Semaphore::new(0)); let s2 = s.clone(); task::spawn(proc() { for _ in range(0, 5) { task::deschedule(); } @@ -824,40 +685,37 @@ mod tests { fn test_sem_multi_resource() { // Parent and child both get in the critical section at the same // time, and shake hands. - let s = Semaphore::new(2); + let s = Arc::new(Semaphore::new(2)); let s2 = s.clone(); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); task::spawn(proc() { - s2.access(|| { - let _ = rx2.recv(); - tx1.send(()); - }) + let _g = s2.access(); + let _ = rx2.recv(); + tx1.send(()); }); - s.access(|| { - tx2.send(()); - let _ = rx1.recv(); - }) + let _g = s.access(); + tx2.send(()); + let _ = rx1.recv(); } #[test] fn test_sem_runtime_friendly_blocking() { // Force the runtime to schedule two threads on the same sched_loop. // When one blocks, it should schedule the other one. - let s = Semaphore::new(1); + let s = Arc::new(Semaphore::new(1)); let s2 = s.clone(); let (tx, rx) = channel(); - let mut child_data = Some((s2, tx)); - s.access(|| { - let (s2, tx) = child_data.take_unwrap(); + { + let _g = s.access(); task::spawn(proc() { tx.send(()); - s2.access(|| { }); + drop(s2.access()); tx.send(()); }); - let _ = rx.recv(); // wait for child to come alive + rx.recv(); // wait for child to come alive for _ in range(0, 5) { task::deschedule(); } // let the child contend - }); - let _ = rx.recv(); // wait for child to be done + } + rx.recv(); // wait for child to be done } /************************************************************************ * Mutex tests @@ -867,93 +725,90 @@ mod tests { // Unsafely achieve shared state, and do the textbook // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. let (tx, rx) = channel(); - let m = Mutex::new(); + let m = Arc::new(Mutex::new()); let m2 = m.clone(); let mut sharedstate = ~0; { - let ptr: *int = &*sharedstate; + let ptr: *mut int = &mut *sharedstate; task::spawn(proc() { - let sharedstate: &mut int = - unsafe { cast::transmute(ptr) }; - access_shared(sharedstate, &m2, 10); + access_shared(ptr, &m2, 10); tx.send(()); }); } { - access_shared(sharedstate, &m, 10); + access_shared(&mut *sharedstate, &m, 10); let _ = rx.recv(); assert_eq!(*sharedstate, 20); } - fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) { + fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) { for _ in range(0, n) { - m.lock(|| { - let oldval = *sharedstate; - task::deschedule(); - *sharedstate = oldval + 1; - }) + let _g = m.lock(); + let oldval = unsafe { *sharedstate }; + task::deschedule(); + unsafe { *sharedstate = oldval + 1; } } } } #[test] fn test_mutex_cond_wait() { - let m = Mutex::new(); + let m = Arc::new(Mutex::new()); // Child wakes up parent - m.lock_cond(|cond| { + { + let lock = m.lock(); let m2 = m.clone(); task::spawn(proc() { - m2.lock_cond(|cond| { - let woken = cond.signal(); - assert!(woken); - }) + let lock = m2.lock(); + let woken = lock.cond.signal(); + assert!(woken); }); - cond.wait(); - }); + lock.cond.wait(); + } // Parent wakes up child let (tx, rx) = channel(); let m3 = m.clone(); task::spawn(proc() { - m3.lock_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) + let lock = m3.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); }); - let _ = rx.recv(); // Wait until child gets in the mutex - m.lock_cond(|cond| { - let woken = cond.signal(); + rx.recv(); // Wait until child gets in the mutex + { + let lock = m.lock(); + let woken = lock.cond.signal(); assert!(woken); - }); - let _ = rx.recv(); // Wait until child wakes up + } + rx.recv(); // Wait until child wakes up } - #[cfg(test)] + fn test_mutex_cond_broadcast_helper(num_waiters: uint) { - let m = Mutex::new(); - let mut rxs = vec!(); + let m = Arc::new(Mutex::new()); + let mut rxs = Vec::new(); for _ in range(0, num_waiters) { let mi = m.clone(); let (tx, rx) = channel(); rxs.push(rx); task::spawn(proc() { - mi.lock_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) + let lock = mi.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); }); } // wait until all children get in the mutex - for rx in rxs.mut_iter() { let _ = rx.recv(); } - m.lock_cond(|cond| { - let num_woken = cond.broadcast(); + for rx in rxs.mut_iter() { rx.recv(); } + { + let lock = m.lock(); + let num_woken = lock.cond.broadcast(); assert_eq!(num_woken, num_waiters); - }); + } // wait until all children wake up - for rx in rxs.mut_iter() { let _ = rx.recv(); } + for rx in rxs.mut_iter() { rx.recv(); } } #[test] fn test_mutex_cond_broadcast() { @@ -965,61 +820,57 @@ mod tests { } #[test] fn test_mutex_cond_no_waiter() { - let m = Mutex::new(); + let m = Arc::new(Mutex::new()); let m2 = m.clone(); let _ = task::try(proc() { - m.lock_cond(|_x| { }) + drop(m.lock()); }); - m2.lock_cond(|cond| { - assert!(!cond.signal()); - }) + let lock = m2.lock(); + assert!(!lock.cond.signal()); } #[test] fn test_mutex_killed_simple() { use std::any::Any; // Mutex must get automatically unlocked if failed/killed within. - let m = Mutex::new(); + let m = Arc::new(Mutex::new()); let m2 = m.clone(); let result: result::Result<(), ~Any> = task::try(proc() { - m2.lock(|| { - fail!(); - }) + let _lock = m2.lock(); + fail!(); }); assert!(result.is_err()); // child task must have finished by the time try returns - m.lock(|| { }) + drop(m.lock()); } #[test] fn test_mutex_cond_signal_on_0() { // Tests that signal_on(0) is equivalent to signal(). - let m = Mutex::new(); - m.lock_cond(|cond| { - let m2 = m.clone(); - task::spawn(proc() { - m2.lock_cond(|cond| { - cond.signal_on(0); - }) - }); - cond.wait(); - }) + let m = Arc::new(Mutex::new()); + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + lock.cond.signal_on(0); + }); + lock.cond.wait(); } #[test] fn test_mutex_no_condvars() { let result = task::try(proc() { let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.wait(); }) + m.lock().cond.wait(); }); assert!(result.is_err()); let result = task::try(proc() { let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.signal(); }) + m.lock().cond.signal(); }); assert!(result.is_err()); let result = task::try(proc() { let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.broadcast(); }) + m.lock().cond.broadcast(); }); assert!(result.is_err()); } @@ -1029,23 +880,16 @@ mod tests { #[cfg(test)] pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } #[cfg(test)] - fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: ||) { + fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) { match mode { - Read => x.read(blk), - Write => x.write(blk), - Downgrade => - x.write_downgrade(|mode| { - mode.write(|| { blk() }); - }), - DowngradeRead => - x.write_downgrade(|mode| { - let mode = x.downgrade(mode); - mode.read(|| { blk() }); - }), + Read => { let _g = x.read(); blk() } + Write => { let _g = x.write(); blk() } + Downgrade => { let _g = x.write(); blk() } + DowngradeRead => { let _g = x.write().downgrade(); blk() } } } #[cfg(test)] - fn test_rwlock_exclusion(x: &RWLock, + fn test_rwlock_exclusion(x: Arc<RWLock>, mode1: RWLockMode, mode2: RWLockMode) { // Test mutual exclusion between readers and writers. Just like the @@ -1063,14 +907,14 @@ mod tests { }); } { - access_shared(sharedstate, x, mode2, 10); + access_shared(sharedstate, &x, mode2, 10); let _ = rx.recv(); assert_eq!(*sharedstate, 20); } - fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode, - n: uint) { + fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>, + mode: RWLockMode, n: uint) { for _ in range(0, n) { lock_rwlock_in_mode(x, mode, || { let oldval = *sharedstate; @@ -1082,132 +926,127 @@ mod tests { } #[test] fn test_rwlock_readers_wont_modify_the_data() { - test_rwlock_exclusion(&RWLock::new(), Read, Write); - test_rwlock_exclusion(&RWLock::new(), Write, Read); - test_rwlock_exclusion(&RWLock::new(), Read, Downgrade); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead); + test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write); } #[test] fn test_rwlock_writers_and_writers() { - test_rwlock_exclusion(&RWLock::new(), Write, Write); - test_rwlock_exclusion(&RWLock::new(), Write, Downgrade); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Write); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade); } #[cfg(test)] - fn test_rwlock_handshake(x: &RWLock, - mode1: RWLockMode, - mode2: RWLockMode, - make_mode2_go_first: bool) { + fn test_rwlock_handshake(x: Arc<RWLock>, + mode1: RWLockMode, + mode2: RWLockMode, + make_mode2_go_first: bool) { // Much like sem_multi_resource. let x2 = x.clone(); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); task::spawn(proc() { if !make_mode2_go_first { - let _ = rx2.recv(); // parent sends to us once it locks, or ... + rx2.recv(); // parent sends to us once it locks, or ... } lock_rwlock_in_mode(&x2, mode2, || { if make_mode2_go_first { tx1.send(()); // ... we send to it once we lock } - let _ = rx2.recv(); + rx2.recv(); tx1.send(()); }) }); if make_mode2_go_first { - let _ = rx1.recv(); // child sends to us once it locks, or ... + rx1.recv(); // child sends to us once it locks, or ... } - lock_rwlock_in_mode(x, mode1, || { + lock_rwlock_in_mode(&x, mode1, || { if !make_mode2_go_first { tx2.send(()); // ... we send to it once we lock } tx2.send(()); - let _ = rx1.recv(); + rx1.recv(); }) } #[test] fn test_rwlock_readers_and_readers() { - test_rwlock_handshake(&RWLock::new(), Read, Read, false); + test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false); // The downgrader needs to get in before the reader gets in, otherwise // they cannot end up reading at the same time. - test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false); - test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true); + test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false); + test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true); // Two downgrade_reads can never both end up reading at the same time. } #[test] fn test_rwlock_downgrade_unlock() { // Tests that downgrade can unlock the lock in both modes - let x = RWLock::new(); + let x = Arc::new(RWLock::new()); lock_rwlock_in_mode(&x, Downgrade, || { }); - test_rwlock_handshake(&x, Read, Read, false); - let y = RWLock::new(); + test_rwlock_handshake(x, Read, Read, false); + let y = Arc::new(RWLock::new()); lock_rwlock_in_mode(&y, DowngradeRead, || { }); - test_rwlock_exclusion(&y, Write, Write); + test_rwlock_exclusion(y, Write, Write); } #[test] fn test_rwlock_read_recursive() { let x = RWLock::new(); - x.read(|| { x.read(|| { }) }) + let _g1 = x.read(); + let _g2 = x.read(); } #[test] fn test_rwlock_cond_wait() { // As test_mutex_cond_wait above. - let x = RWLock::new(); + let x = Arc::new(RWLock::new()); // Child wakes up parent - x.write_cond(|cond| { + { + let lock = x.write(); let x2 = x.clone(); task::spawn(proc() { - x2.write_cond(|cond| { - let woken = cond.signal(); - assert!(woken); - }) + let lock = x2.write(); + assert!(lock.cond.signal()); }); - cond.wait(); - }); + lock.cond.wait(); + } // Parent wakes up child let (tx, rx) = channel(); let x3 = x.clone(); task::spawn(proc() { - x3.write_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) + let lock = x3.write(); + tx.send(()); + lock.cond.wait(); + tx.send(()); }); - let _ = rx.recv(); // Wait until child gets in the rwlock - x.read(|| { }); // Must be able to get in as a reader in the meantime - x.write_cond(|cond| { // Or as another writer - let woken = cond.signal(); - assert!(woken); - }); - let _ = rx.recv(); // Wait until child wakes up - x.read(|| { }); // Just for good measure + rx.recv(); // Wait until child gets in the rwlock + drop(x.read()); // Must be able to get in as a reader + { + let x = x.write(); + assert!(x.cond.signal()); + } + rx.recv(); // Wait until child wakes up + drop(x.read()); // Just for good measure } #[cfg(test)] - fn test_rwlock_cond_broadcast_helper(num_waiters: uint, - dg1: bool, - dg2: bool) { + fn test_rwlock_cond_broadcast_helper(num_waiters: uint) { // Much like the mutex broadcast test. Downgrade-enabled. - fn lock_cond(x: &RWLock, downgrade: bool, blk: |c: &Condvar|) { - if downgrade { - x.write_downgrade(|mode| { - mode.write_cond(|c| { blk(c) }); - }); - } else { - x.write_cond(|c| { blk(c) }); - } + fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) { + let lock = x.write(); + blk(&lock.cond); } - let x = RWLock::new(); - let mut rxs = vec!(); + + let x = Arc::new(RWLock::new()); + let mut rxs = Vec::new(); for _ in range(0, num_waiters) { let xi = x.clone(); let (tx, rx) = channel(); rxs.push(rx); task::spawn(proc() { - lock_cond(&xi, dg1, |cond| { + lock_cond(&xi, |cond| { tx.send(()); cond.wait(); tx.send(()); @@ -1217,7 +1056,7 @@ mod tests { // wait until all children get in the mutex for rx in rxs.mut_iter() { let _ = rx.recv(); } - lock_cond(&x, dg2, |cond| { + lock_cond(&x, |cond| { let num_woken = cond.broadcast(); assert_eq!(num_woken, num_waiters); }); @@ -1226,21 +1065,15 @@ mod tests { } #[test] fn test_rwlock_cond_broadcast() { - test_rwlock_cond_broadcast_helper(0, true, true); - test_rwlock_cond_broadcast_helper(0, true, false); - test_rwlock_cond_broadcast_helper(0, false, true); - test_rwlock_cond_broadcast_helper(0, false, false); - test_rwlock_cond_broadcast_helper(12, true, true); - test_rwlock_cond_broadcast_helper(12, true, false); - test_rwlock_cond_broadcast_helper(12, false, true); - test_rwlock_cond_broadcast_helper(12, false, false); + test_rwlock_cond_broadcast_helper(0); + test_rwlock_cond_broadcast_helper(12); } #[cfg(test)] fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { use std::any::Any; // Mutex must get automatically unlocked if failed/killed within. - let x = RWLock::new(); + let x = Arc::new(RWLock::new()); let x2 = x.clone(); let result: result::Result<(), ~Any> = task::try(proc() { @@ -1283,48 +1116,4 @@ mod tests { rwlock_kill_helper(Downgrade, DowngradeRead); rwlock_kill_helper(Downgrade, DowngradeRead); } - #[test] #[should_fail] - fn test_rwlock_downgrade_cant_swap() { - // Tests that you can't downgrade with a different rwlock's token. - let x = RWLock::new(); - let y = RWLock::new(); - x.write_downgrade(|xwrite| { - let mut xopt = Some(xwrite); - y.write_downgrade(|_ywrite| { - y.downgrade(xopt.take_unwrap()); - error!("oops, y.downgrade(x) should have failed!"); - }) - }) - } - - /************************************************************************ - * Barrier tests - ************************************************************************/ - #[test] - fn test_barrier() { - let barrier = Barrier::new(10); - let (tx, rx) = channel(); - - for _ in range(0, 9) { - let c = barrier.clone(); - let tx = tx.clone(); - spawn(proc() { - c.wait(); - tx.send(true); - }); - } - - // At this point, all spawned tasks should be blocked, - // so we shouldn't get anything from the port - assert!(match rx.try_recv() { - Empty => true, - _ => false, - }); - - barrier.wait(); - // Now, the barrier is cleared and we should get data. - for _ in range(0, 9) { - rx.recv(); - } - } }