From a68ec98166bf638c6cbf4036f51036012695718d Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Fri, 14 Nov 2014 14:33:51 -0800 Subject: [PATCH] Rewrite sync::mutex as thin layer over native mutexes Previously, sync::mutex had to split between green and native runtime systems and thus could not simply use the native mutex facility. This commit rewrites sync::mutex to link directly to native mutexes; in the future, the two will probably be coalesced into a single module (once librustrt is pulled into libstd wholesale). --- src/libsync/lib.rs | 1 - src/libsync/mpsc_intrusive.rs | 144 ------------- src/libsync/mutex.rs | 390 ++-------------------------------- 3 files changed, 12 insertions(+), 523 deletions(-) delete mode 100644 src/libsync/mpsc_intrusive.rs diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs index ffff32f04c4..ec5b08fa754 100644 --- a/src/libsync/lib.rs +++ b/src/libsync/lib.rs @@ -54,7 +54,6 @@ pub mod atomic; // Concurrent data structures -mod mpsc_intrusive; pub mod spsc_queue; pub mod mpsc_queue; pub mod mpmc_bounded_queue; diff --git a/src/libsync/mpsc_intrusive.rs b/src/libsync/mpsc_intrusive.rs deleted file mode 100644 index 1f7841de7c1..00000000000 --- a/src/libsync/mpsc_intrusive.rs +++ /dev/null @@ -1,144 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, - * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, - * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module implements an intrusive MPSC queue. This queue is incredibly -//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. - -#![experimental] - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/intrusive-mpsc-node-based-queue - -use core::prelude::*; - -use core::atomic; -use core::mem; -use core::cell::UnsafeCell; - -// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static -// initialization. - -pub struct Node { - pub next: atomic::AtomicUint, - pub data: T, -} - -pub struct DummyNode { - pub next: atomic::AtomicUint, -} - -pub struct Queue { - pub head: atomic::AtomicUint, - pub tail: UnsafeCell<*mut Node>, - pub stub: DummyNode, -} - -impl Queue { - pub fn new() -> Queue { - Queue { - head: atomic::AtomicUint::new(0), - tail: UnsafeCell::new(0 as *mut Node), - stub: DummyNode { - next: atomic::AtomicUint::new(0), - }, - } - } - - pub unsafe fn push(&self, node: *mut Node) { - (*node).next.store(0, atomic::Release); - let prev = self.head.swap(node as uint, atomic::AcqRel); - - // Note that this code is slightly modified to allow static - // initialization of these queues with rust's flavor of static - // initialization. - if prev == 0 { - self.stub.next.store(node as uint, atomic::Release); - } else { - let prev = prev as *mut Node; - (*prev).next.store(node as uint, atomic::Release); - } - } - - /// You'll note that the other MPSC queue in std::sync is non-intrusive and - /// returns a `PopResult` here to indicate when the queue is inconsistent. - /// An "inconsistent state" in the other queue means that a pusher has - /// pushed, but it hasn't finished linking the rest of the chain. - /// - /// This queue also suffers from this problem, but I currently haven't been - /// able to detangle when this actually happens. This code is translated - /// verbatim from the website above, and is more complicated than the - /// non-intrusive version. - /// - /// Right now consumers of this queue must be ready for this fact. Just - /// because `pop` returns `None` does not mean that there is not data - /// on the queue. - pub unsafe fn pop(&self) -> Option<*mut Node> { - let tail = *self.tail.get(); - let mut tail = if !tail.is_null() {tail} else { - mem::transmute(&self.stub) - }; - let mut next = (*tail).next(atomic::Relaxed); - if tail as uint == &self.stub as *const DummyNode as uint { - if next.is_null() { - return None; - } - *self.tail.get() = next; - tail = next; - next = (*next).next(atomic::Relaxed); - } - if !next.is_null() { - *self.tail.get() = next; - return Some(tail); - } - let head = self.head.load(atomic::Acquire) as *mut Node; - if tail != head { - return None; - } - let stub = mem::transmute(&self.stub); - self.push(stub); - next = (*tail).next(atomic::Relaxed); - if !next.is_null() { - *self.tail.get() = next; - return Some(tail); - } - return None - } -} - -impl Node { - pub fn new(t: T) -> Node { - Node { - data: t, - next: atomic::AtomicUint::new(0), - } - } - pub unsafe fn next(&self, ord: atomic::Ordering) -> *mut Node { - mem::transmute::>(self.next.load(ord)) - } -} diff --git a/src/libsync/mutex.rs b/src/libsync/mutex.rs index e05f3e1910b..6672126f55c 100644 --- a/src/libsync/mutex.rs +++ b/src/libsync/mutex.rs @@ -8,80 +8,22 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! A proper mutex implementation regardless of the "flavor of task" which is -//! acquiring the lock. +//! A simple native mutex implementation. Warning: this API is likely +//! to change soon. -// # Implementation of Rust mutexes -// -// Most answers to the question of "how do I use a mutex" are "use pthreads", -// but for Rust this isn't quite sufficient. Green threads cannot acquire an OS -// mutex because they can context switch among many OS threads, leading to -// deadlocks with other green threads. -// -// Another problem for green threads grabbing an OS mutex is that POSIX dictates -// that unlocking a mutex on a different thread from where it was locked is -// undefined behavior. Remember that green threads can migrate among OS threads, -// so this would mean that we would have to pin green threads to OS threads, -// which is less than ideal. -// -// ## Using deschedule/reawaken -// -// We already have primitives for descheduling/reawakening tasks, so they're the -// first obvious choice when implementing a mutex. The idea would be to have a -// concurrent queue that everyone is pushed on to, and then the owner of the -// mutex is the one popping from the queue. -// -// Unfortunately, this is not very performant for native tasks. The suspected -// reason for this is that each native thread is suspended on its own condition -// variable, unique from all the other threads. In this situation, the kernel -// has no idea what the scheduling semantics are of the user program, so all of -// the threads are distributed among all cores on the system. This ends up -// having very expensive wakeups of remote cores high up in the profile when -// handing off the mutex among native tasks. On the other hand, when using an OS -// mutex, the kernel knows that all native threads are contended on the same -// mutex, so they're in theory all migrated to a single core (fast context -// switching). -// -// ## Mixing implementations -// -// From that above information, we have two constraints. The first is that -// green threads can't touch os mutexes, and the second is that native tasks -// pretty much *must* touch an os mutex. -// -// As a compromise, the queueing implementation is used for green threads and -// the os mutex is used for native threads (why not have both?). This ends up -// leading to fairly decent performance for both native threads and green -// threads on various workloads (uncontended and contended). -// -// The crux of this implementation is an atomic work which is CAS'd on many -// times in order to manage a few flags about who's blocking where and whether -// it's locked or not. +#![allow(dead_code)] use core::prelude::*; use self::Flavor::*; use alloc::boxed::Box; -use core::atomic; -use core::mem; -use core::cell::UnsafeCell; -use rustrt::local::Local; use rustrt::mutex; -use rustrt::task::{BlockedTask, Task}; -use rustrt::thread::Thread; - -use mpsc_intrusive as q; pub const LOCKED: uint = 1 << 0; -pub const GREEN_BLOCKED: uint = 1 << 1; -pub const NATIVE_BLOCKED: uint = 1 << 2; +pub const BLOCKED: uint = 1 << 1; /// A mutual exclusion primitive useful for protecting shared data /// -/// This mutex is an implementation of a lock for all flavors of tasks which may -/// be grabbing. A common problem with green threads is that they cannot grab -/// locks (if they reschedule during the lock a contender could deadlock the -/// system), but this mutex does *not* suffer this problem. -/// /// This mutex will properly block tasks waiting for the lock to become /// available. The mutex can also be statically initialized or created via a /// `new` constructor. @@ -107,14 +49,6 @@ pub struct Mutex { lock: Box, } -#[deriving(PartialEq, Show)] -enum Flavor { - Unlocked, - TryLockAcquisition, - GreenAcquisition, - NativeAcquisition, -} - /// The static mutex type is provided to allow for static allocation of mutexes. /// /// Note that this is a separate type because using a Mutex correctly means that @@ -137,310 +71,35 @@ enum Flavor { /// // lock is unlocked here. /// ``` pub struct StaticMutex { - /// Current set of flags on this mutex - state: atomic::AtomicUint, - /// an OS mutex used by native threads lock: mutex::StaticNativeMutex, - - /// Type of locking operation currently on this mutex - flavor: UnsafeCell, - /// uint-cast of the green thread waiting for this mutex - green_blocker: UnsafeCell, - /// uint-cast of the native thread waiting for this mutex - native_blocker: UnsafeCell, - - /// A concurrent mpsc queue used by green threads, along with a count used - /// to figure out when to dequeue and enqueue. - q: q::Queue, - green_cnt: atomic::AtomicUint, } /// An RAII implementation of a "scoped lock" of a mutex. When this structure is /// dropped (falls out of scope), the lock will be unlocked. #[must_use] pub struct Guard<'a> { - lock: &'a StaticMutex, + guard: mutex::LockGuard<'a>, +} + +fn lift_guard(guard: mutex::LockGuard) -> Guard { + Guard { guard: guard } } /// Static initialization of a mutex. This constant can be used to initialize /// other mutex constants. pub const MUTEX_INIT: StaticMutex = StaticMutex { - lock: mutex::NATIVE_MUTEX_INIT, - state: atomic::INIT_ATOMIC_UINT, - flavor: UnsafeCell { value: Unlocked }, - green_blocker: UnsafeCell { value: 0 }, - native_blocker: UnsafeCell { value: 0 }, - green_cnt: atomic::INIT_ATOMIC_UINT, - q: q::Queue { - head: atomic::INIT_ATOMIC_UINT, - tail: UnsafeCell { value: 0 as *mut q::Node }, - stub: q::DummyNode { - next: atomic::INIT_ATOMIC_UINT, - } - } + lock: mutex::NATIVE_MUTEX_INIT }; impl StaticMutex { /// Attempts to grab this lock, see `Mutex::try_lock` pub fn try_lock<'a>(&'a self) -> Option> { - // Attempt to steal the mutex from an unlocked state. - // - // FIXME: this can mess up the fairness of the mutex, seems bad - match self.state.compare_and_swap(0, LOCKED, atomic::SeqCst) { - 0 => { - // After acquiring the mutex, we can safely access the inner - // fields. - let prev = unsafe { - mem::replace(&mut *self.flavor.get(), TryLockAcquisition) - }; - assert_eq!(prev, Unlocked); - Some(Guard::new(self)) - } - _ => None - } + unsafe { self.lock.trylock().map(lift_guard) } } /// Acquires this lock, see `Mutex::lock` pub fn lock<'a>(&'a self) -> Guard<'a> { - // First, attempt to steal the mutex from an unlocked state. The "fast - // path" needs to have as few atomic instructions as possible, and this - // one cmpxchg is already pretty expensive. - // - // FIXME: this can mess up the fairness of the mutex, seems bad - match self.try_lock() { - Some(guard) => return guard, - None => {} - } - - // After we've failed the fast path, then we delegate to the different - // locking protocols for green/native tasks. This will select two tasks - // to continue further (one native, one green). - let t: Box = Local::take(); - let can_block = t.can_block(); - let native_bit; - if can_block { - self.native_lock(t); - native_bit = NATIVE_BLOCKED; - } else { - self.green_lock(t); - native_bit = GREEN_BLOCKED; - } - - // After we've arbitrated among task types, attempt to re-acquire the - // lock (avoids a deschedule). This is very important to do in order to - // allow threads coming out of the native_lock function to try their - // best to not hit a cvar in deschedule. - let mut old = match self.state.compare_and_swap(0, LOCKED, - atomic::SeqCst) { - 0 => { - let flavor = if can_block { - NativeAcquisition - } else { - GreenAcquisition - }; - // We've acquired the lock, so this unsafe access to flavor is - // allowed. - unsafe { *self.flavor.get() = flavor; } - return Guard::new(self) - } - old => old, - }; - - // Alright, everything else failed. We need to deschedule ourselves and - // flag ourselves as waiting. Note that this case should only happen - // regularly in native/green contention. Due to try_lock and the header - // of lock stealing the lock, it's also possible for native/native - // contention to hit this location, but as less common. - let t: Box = Local::take(); - t.deschedule(1, |task| { - let task = unsafe { task.cast_to_uint() }; - - // These accesses are protected by the respective native/green - // mutexes which were acquired above. - let prev = if can_block { - unsafe { mem::replace(&mut *self.native_blocker.get(), task) } - } else { - unsafe { mem::replace(&mut *self.green_blocker.get(), task) } - }; - assert_eq!(prev, 0); - - loop { - assert_eq!(old & native_bit, 0); - // If the old state was locked, then we need to flag ourselves - // as blocking in the state. If the old state was unlocked, then - // we attempt to acquire the mutex. Everything here is a CAS - // loop that'll eventually make progress. - if old & LOCKED != 0 { - old = match self.state.compare_and_swap(old, - old | native_bit, - atomic::SeqCst) { - n if n == old => return Ok(()), - n => n - }; - } else { - assert_eq!(old, 0); - old = match self.state.compare_and_swap(old, - old | LOCKED, - atomic::SeqCst) { - n if n == old => { - // After acquiring the lock, we have access to the - // flavor field, and we've regained access to our - // respective native/green blocker field. - let prev = if can_block { - unsafe { - *self.native_blocker.get() = 0; - mem::replace(&mut *self.flavor.get(), - NativeAcquisition) - } - } else { - unsafe { - *self.green_blocker.get() = 0; - mem::replace(&mut *self.flavor.get(), - GreenAcquisition) - } - }; - assert_eq!(prev, Unlocked); - return Err(unsafe { - BlockedTask::cast_from_uint(task) - }) - } - n => n, - }; - } - } - }); - - Guard::new(self) - } - - // Tasks which can block are super easy. These tasks just call the blocking - // `lock()` function on an OS mutex - fn native_lock(&self, t: Box) { - Local::put(t); - unsafe { self.lock.lock_noguard(); } - } - - fn native_unlock(&self) { - unsafe { self.lock.unlock_noguard(); } - } - - fn green_lock(&self, t: Box) { - // Green threads flag their presence with an atomic counter, and if they - // fail to be the first to the mutex, they enqueue themselves on a - // concurrent internal queue with a stack-allocated node. - // - // FIXME: There isn't a cancellation currently of an enqueue, forcing - // the unlocker to spin for a bit. - if self.green_cnt.fetch_add(1, atomic::SeqCst) == 0 { - Local::put(t); - return - } - - let mut node = q::Node::new(0); - t.deschedule(1, |task| { - unsafe { - node.data = task.cast_to_uint(); - self.q.push(&mut node); - } - Ok(()) - }); - } - - fn green_unlock(&self) { - // If we're the only green thread, then no need to check the queue, - // otherwise the fixme above forces us to spin for a bit. - if self.green_cnt.fetch_sub(1, atomic::SeqCst) == 1 { return } - let node; - loop { - match unsafe { self.q.pop() } { - Some(t) => { node = t; break; } - None => Thread::yield_now(), - } - } - let task = unsafe { BlockedTask::cast_from_uint((*node).data) }; - task.wake().map(|t| t.reawaken()); - } - - fn unlock(&self) { - // Unlocking this mutex is a little tricky. We favor any task that is - // manually blocked (not in each of the separate locks) in order to help - // provide a little fairness (green threads will wake up the pending - // native thread and native threads will wake up the pending green - // thread). - // - // There's also the question of when we unlock the actual green/native - // locking halves as well. If we're waking up someone, then we can wait - // to unlock until we've acquired the task to wake up (we're guaranteed - // the mutex memory is still valid when there's contenders), but as soon - // as we don't find any contenders we must unlock the mutex, and *then* - // flag the mutex as unlocked. - // - // This flagging can fail, leading to another round of figuring out if a - // task needs to be woken, and in this case it's ok that the "mutex - // halves" are unlocked, we're just mainly dealing with the atomic state - // of the outer mutex. - let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) }; - - let mut state = self.state.load(atomic::SeqCst); - let mut unlocked = false; - let task; - loop { - assert!(state & LOCKED != 0); - if state & GREEN_BLOCKED != 0 { - self.unset(state, GREEN_BLOCKED); - task = unsafe { - *self.flavor.get() = GreenAcquisition; - let task = mem::replace(&mut *self.green_blocker.get(), 0); - BlockedTask::cast_from_uint(task) - }; - break; - } else if state & NATIVE_BLOCKED != 0 { - self.unset(state, NATIVE_BLOCKED); - task = unsafe { - *self.flavor.get() = NativeAcquisition; - let task = mem::replace(&mut *self.native_blocker.get(), 0); - BlockedTask::cast_from_uint(task) - }; - break; - } else { - assert_eq!(state, LOCKED); - if !unlocked { - match flavor { - GreenAcquisition => { self.green_unlock(); } - NativeAcquisition => { self.native_unlock(); } - TryLockAcquisition => {} - Unlocked => unreachable!(), - } - unlocked = true; - } - match self.state.compare_and_swap(LOCKED, 0, atomic::SeqCst) { - LOCKED => return, - n => { state = n; } - } - } - } - if !unlocked { - match flavor { - GreenAcquisition => { self.green_unlock(); } - NativeAcquisition => { self.native_unlock(); } - TryLockAcquisition => {} - Unlocked => unreachable!(), - } - } - - task.wake().map(|t| t.reawaken()); - } - - /// Loops around a CAS to unset the `bit` in `state` - fn unset(&self, mut state: uint, bit: uint) { - loop { - assert!(state & bit != 0); - let new = state ^ bit; - match self.state.compare_and_swap(state, new, atomic::SeqCst) { - n if n == state => break, - n => { state = n; } - } - } + lift_guard(unsafe { self.lock.lock() }) } /// Deallocates resources associated with this static mutex. @@ -463,12 +122,6 @@ impl Mutex { pub fn new() -> Mutex { Mutex { lock: box StaticMutex { - state: atomic::AtomicUint::new(0), - flavor: UnsafeCell::new(Unlocked), - green_blocker: UnsafeCell::new(0), - native_blocker: UnsafeCell::new(0), - green_cnt: atomic::AtomicUint::new(0), - q: q::Queue::new(), lock: unsafe { mutex::StaticNativeMutex::new() }, } } @@ -494,25 +147,6 @@ impl Mutex { pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() } } -impl<'a> Guard<'a> { - fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> { - if cfg!(debug) { - // once we've acquired a lock, it's ok to access the flavor - assert!(unsafe { *lock.flavor.get() != Unlocked }); - assert!(lock.state.load(atomic::SeqCst) & LOCKED != 0); - } - Guard { lock: lock } - } -} - -#[unsafe_destructor] -impl<'a> Drop for Guard<'a> { - #[inline] - fn drop(&mut self) { - self.lock.unlock(); - } -} - impl Drop for Mutex { fn drop(&mut self) { // This is actually safe b/c we know that there is no further usage of