76e5ed655c
All of the current std::sync primitives have poisoning enable which means that when a task fails inside of a write-access lock then all future attempts to acquire the lock will fail. This strategy ensures that stale data whose invariants are possibly not upheld are never viewed by other tasks to help propagate unexpected panics (bugs in a program) among tasks. Currently there is no way to test whether a mutex or rwlock is poisoned. One method would be to duplicate all the methods with a sister foo_catch function, for example. This pattern is, however, against our [error guidelines][errors]. As a result, this commit exposes the fact that a task has failed internally through the return value of a `Result`. [errors]: https://github.com/rust-lang/rfcs/blob/master/text/0236-error-conventions.md#do-not-provide-both-result-and-fail-variants All methods now return a `LockResult<T>` or a `TryLockResult<T>` which communicates whether the lock was poisoned or not. In a `LockResult`, both the `Ok` and `Err` variants contains the `MutexGuard<T>` that is being returned in order to allow access to the data if poisoning is not desired. This also means that the lock is *always* held upon returning from `.lock()`. A new type, `PoisonError`, was added with one method `into_guard` which can consume the assertion that a lock is poisoned to gain access to the underlying data. This is a breaking change because the signatures of these methods have changed, often incompatible ways. One major difference is that the `wait` methods on a condition variable now consume the guard and return it in as a `LockResult` to indicate whether the lock was poisoned while waiting. Most code can be updated by calling `.unwrap()` on the return value of `.lock()`. [breaking-change]
125 lines
3.5 KiB
Rust
125 lines
3.5 KiB
Rust
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
|
|
// file at the top-level directory of this distribution and at
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
// option. This file may not be copied, modified, or distributed
|
|
// except according to those terms.
|
|
|
|
use kinds::{Send, Sync};
|
|
use sync::{Mutex, Condvar};
|
|
|
|
/// A barrier enables multiple tasks to synchronize the beginning
|
|
/// of some computation.
|
|
///
|
|
/// ```rust
|
|
/// use std::sync::{Arc, Barrier};
|
|
/// use std::thread::Thread;
|
|
///
|
|
/// let barrier = Arc::new(Barrier::new(10));
|
|
/// for _ in range(0u, 10) {
|
|
/// let c = barrier.clone();
|
|
/// // The same messages will be printed together.
|
|
/// // You will NOT see any interleaving.
|
|
/// Thread::spawn(move|| {
|
|
/// println!("before wait");
|
|
/// c.wait();
|
|
/// println!("after wait");
|
|
/// }).detach();
|
|
/// }
|
|
/// ```
|
|
pub struct Barrier {
|
|
lock: Mutex<BarrierState>,
|
|
cvar: Condvar,
|
|
num_threads: uint,
|
|
}
|
|
|
|
unsafe impl Send for Barrier {}
|
|
unsafe impl Sync for Barrier {}
|
|
|
|
// The inner state of a double barrier
|
|
struct BarrierState {
|
|
count: uint,
|
|
generation_id: uint,
|
|
}
|
|
|
|
unsafe impl Send for BarrierState {}
|
|
unsafe impl Sync for BarrierState {}
|
|
|
|
impl Barrier {
|
|
/// Create a new barrier that can block a given number of threads.
|
|
///
|
|
/// A barrier will block `n`-1 threads which call `wait` and then wake up
|
|
/// all threads at once when the `n`th thread calls `wait`.
|
|
pub fn new(n: uint) -> Barrier {
|
|
Barrier {
|
|
lock: Mutex::new(BarrierState {
|
|
count: 0,
|
|
generation_id: 0,
|
|
}),
|
|
cvar: Condvar::new(),
|
|
num_threads: n,
|
|
}
|
|
}
|
|
|
|
/// Block the current thread until all threads has rendezvoused here.
|
|
///
|
|
/// Barriers are re-usable after all threads have rendezvoused once, and can
|
|
/// be used continuously.
|
|
pub fn wait(&self) {
|
|
let mut lock = self.lock.lock().unwrap();
|
|
let local_gen = lock.generation_id;
|
|
lock.count += 1;
|
|
if lock.count < self.num_threads {
|
|
// We need a while loop to guard against spurious wakeups.
|
|
// http://en.wikipedia.org/wiki/Spurious_wakeup
|
|
while local_gen == lock.generation_id &&
|
|
lock.count < self.num_threads {
|
|
lock = self.cvar.wait(lock).unwrap();
|
|
}
|
|
} else {
|
|
lock.count = 0;
|
|
lock.generation_id += 1;
|
|
self.cvar.notify_all();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use prelude::*;
|
|
|
|
use sync::{Arc, Barrier};
|
|
use comm::Empty;
|
|
|
|
#[test]
|
|
fn test_barrier() {
|
|
let barrier = Arc::new(Barrier::new(10));
|
|
let (tx, rx) = channel();
|
|
|
|
for _ in range(0u, 9) {
|
|
let c = barrier.clone();
|
|
let tx = tx.clone();
|
|
spawn(move|| {
|
|
c.wait();
|
|
tx.send(true);
|
|
});
|
|
}
|
|
|
|
// At this point, all spawned tasks should be blocked,
|
|
// so we shouldn't get anything from the port
|
|
assert!(match rx.try_recv() {
|
|
Err(Empty) => true,
|
|
_ => false,
|
|
});
|
|
|
|
barrier.wait();
|
|
// Now, the barrier is cleared and we should get data.
|
|
for _ in range(0u, 9) {
|
|
rx.recv();
|
|
}
|
|
}
|
|
}
|