Remove rt::{mutex, exclusive}
This commit is contained in:
parent
7fd7ce682d
commit
d8e4780b0b
@ -1627,9 +1627,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_swap_remove_noncopyable() {
|
||||
// Tests that we don't accidentally run destructors twice.
|
||||
let mut v = vec![rt::exclusive::Exclusive::new(()),
|
||||
rt::exclusive::Exclusive::new(()),
|
||||
rt::exclusive::Exclusive::new(())];
|
||||
let mut v = vec![Box::new(()), Box::new(()), Box::new(())];
|
||||
let mut _e = v.swap_remove(0);
|
||||
assert_eq!(v.len(), 2);
|
||||
_e = v.swap_remove(1);
|
||||
|
@ -52,9 +52,7 @@ pub struct Packet<T> {
|
||||
/// the other shared channel already had the code implemented
|
||||
channels: atomic::AtomicUint,
|
||||
|
||||
/// The state field is protected by this mutex
|
||||
lock: NativeMutex,
|
||||
state: UnsafeCell<State<T>>,
|
||||
lock: Mutex<State<T>>,
|
||||
}
|
||||
|
||||
struct State<T> {
|
||||
@ -107,9 +105,25 @@ pub enum Failure {
|
||||
|
||||
/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
|
||||
/// in the meantime. This re-locks the mutex upon returning.
|
||||
fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
|
||||
guard: MutexGuard<'b, State<T>>,
|
||||
f: fn(BlockedTask) -> Blocker)
|
||||
-> MutexGuard<'a, State<T>>
|
||||
{
|
||||
let me: Box<Task> = Local::take();
|
||||
me.deschedule(1, |task| {
|
||||
match mem::replace(&mut guard.blocker, f(task)) {
|
||||
NoneBlocked => {}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
mem::drop(guard);
|
||||
Ok(())
|
||||
});
|
||||
lock.lock()
|
||||
}
|
||||
|
||||
/// Wakes up a thread, dropping the lock at the correct time
|
||||
fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
|
||||
/// Wakes up a task, dropping the lock at the correct time
|
||||
fn wakeup<T>(task: BlockedTask, guard: MutexGuard<State<T>>) {
|
||||
// We need to be careful to wake up the waiting task *outside* of the mutex
|
||||
// in case it incurs a context switch.
|
||||
drop(guard);
|
||||
@ -120,8 +134,7 @@ impl<T: Send> Packet<T> {
|
||||
pub fn new(cap: uint) -> Packet<T> {
|
||||
Packet {
|
||||
channels: atomic::AtomicUint::new(1),
|
||||
lock: unsafe { NativeMutex::new() },
|
||||
state: UnsafeCell::new(State {
|
||||
lock: Mutex::new(State {
|
||||
disconnected: false,
|
||||
blocker: NoneBlocked,
|
||||
cap: cap,
|
||||
@ -161,17 +174,17 @@ impl<T: Send> Packet<T> {
|
||||
if guard.disconnected { return Err(t) }
|
||||
guard.buf.enqueue(t);
|
||||
|
||||
match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
// if our capacity is 0, then we need to wait for a receiver to be
|
||||
// available to take our data. After waiting, we check again to make
|
||||
// sure the port didn't go away in the meantime. If it did, we need
|
||||
// to hand back our data.
|
||||
NoneBlocked if state.cap == 0 => {
|
||||
NoneBlocked if guard.cap == 0 => {
|
||||
let mut canceled = false;
|
||||
assert!(state.canceled.is_none());
|
||||
state.canceled = Some(unsafe { mem::transmute(&mut canceled) });
|
||||
wait(&mut state.blocker, BlockedSender, &self.lock);
|
||||
if canceled {Err(state.buf.dequeue())} else {Ok(())}
|
||||
assert!(guard.canceled.is_none());
|
||||
guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
|
||||
let guard = wait(&self.lock, guard, BlockedSender);
|
||||
if canceled {Err(guard.buf.dequeue())} else {Ok(())}
|
||||
}
|
||||
|
||||
// success, we buffered some data
|
||||
@ -185,15 +198,15 @@ impl<T: Send> Packet<T> {
|
||||
}
|
||||
|
||||
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
|
||||
let (guard, state) = self.lock();
|
||||
if state.disconnected {
|
||||
let guard = self.lock.lock();
|
||||
if guard.disconnected {
|
||||
Err(super::RecvDisconnected(t))
|
||||
} else if state.buf.size() == state.buf.cap() {
|
||||
} else if guard.buf.size() == guard.buf.cap() {
|
||||
Err(super::Full(t))
|
||||
} else if state.cap == 0 {
|
||||
} else if guard.cap == 0 {
|
||||
// With capacity 0, even though we have buffer space we can't
|
||||
// transfer the data unless there's a receiver waiting.
|
||||
match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
NoneBlocked => Err(super::Full(t)),
|
||||
BlockedSender(..) => unreachable!(),
|
||||
BlockedReceiver(token) => {
|
||||
@ -227,28 +240,28 @@ impl<T: Send> Packet<T> {
|
||||
// Wait for the buffer to have something in it. No need for a while loop
|
||||
// because we're the only receiver.
|
||||
let mut waited = false;
|
||||
if !state.disconnected && state.buf.size() == 0 {
|
||||
wait(&mut state.blocker, BlockedReceiver, &self.lock);
|
||||
if !guard.disconnected && guard.buf.size() == 0 {
|
||||
wait(&mut guard.blocker, BlockedReceiver, &self.lock);
|
||||
waited = true;
|
||||
}
|
||||
if state.disconnected && state.buf.size() == 0 { return Err(()) }
|
||||
if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
|
||||
|
||||
// Pick up the data, wake up our neighbors, and carry on
|
||||
assert!(state.buf.size() > 0);
|
||||
let ret = state.buf.dequeue();
|
||||
assert!(guard.buf.size() > 0);
|
||||
let ret = guard.buf.dequeue();
|
||||
self.wakeup_senders(waited, guard, state);
|
||||
return Ok(ret);
|
||||
}
|
||||
|
||||
pub fn try_recv(&self) -> Result<T, Failure> {
|
||||
let (guard, state) = self.lock();
|
||||
let guard = self.lock();
|
||||
|
||||
// Easy cases first
|
||||
if state.disconnected { return Err(Disconnected) }
|
||||
if state.buf.size() == 0 { return Err(Empty) }
|
||||
if guard.disconnected { return Err(Disconnected) }
|
||||
if guard.buf.size() == 0 { return Err(Empty) }
|
||||
|
||||
// Be sure to wake up neighbors
|
||||
let ret = Ok(state.buf.dequeue());
|
||||
let ret = Ok(guard.buf.dequeue());
|
||||
self.wakeup_senders(false, guard, state);
|
||||
|
||||
return ret;
|
||||
@ -265,8 +278,8 @@ impl<T: Send> Packet<T> {
|
||||
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
|
||||
// need to ACK the sender. If we waited, then the sender waking us up
|
||||
// was already the ACK.
|
||||
let pending_sender2 = if state.cap == 0 && !waited {
|
||||
match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
let pending_sender2 = if guard.cap == 0 && !waited {
|
||||
match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
NoneBlocked => None,
|
||||
BlockedReceiver(..) => unreachable!(),
|
||||
BlockedSender(token) => {
|
||||
@ -277,7 +290,7 @@ impl<T: Send> Packet<T> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
mem::drop((state, guard));
|
||||
mem::drop(guard);
|
||||
|
||||
// only outside of the lock do we wake up the pending tasks
|
||||
pending_sender1.map(|t| t.signal());
|
||||
@ -298,10 +311,10 @@ impl<T: Send> Packet<T> {
|
||||
}
|
||||
|
||||
// Not much to do other than wake up a receiver if one's there
|
||||
let (guard, state) = self.lock();
|
||||
if state.disconnected { return }
|
||||
state.disconnected = true;
|
||||
match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
let guard = self.lock();
|
||||
if guard.disconnected { return }
|
||||
guard.disconnected = true;
|
||||
match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
NoneBlocked => {}
|
||||
BlockedSender(..) => unreachable!(),
|
||||
BlockedReceiver(token) => wakeup(token, guard),
|
||||
@ -309,27 +322,27 @@ impl<T: Send> Packet<T> {
|
||||
}
|
||||
|
||||
pub fn drop_port(&self) {
|
||||
let (guard, state) = self.lock();
|
||||
let guard = self.lock();
|
||||
|
||||
if state.disconnected { return }
|
||||
state.disconnected = true;
|
||||
if guard.disconnected { return }
|
||||
guard.disconnected = true;
|
||||
|
||||
// If the capacity is 0, then the sender may want its data back after
|
||||
// we're disconnected. Otherwise it's now our responsibility to destroy
|
||||
// the buffered data. As with many other portions of this code, this
|
||||
// needs to be careful to destroy the data *outside* of the lock to
|
||||
// prevent deadlock.
|
||||
let _data = if state.cap != 0 {
|
||||
mem::replace(&mut state.buf.buf, Vec::new())
|
||||
let _data = if guard.cap != 0 {
|
||||
mem::replace(&mut guard.buf.buf, Vec::new())
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let mut queue = mem::replace(&mut state.queue, Queue {
|
||||
let mut queue = mem::replace(&mut guard.queue, Queue {
|
||||
head: 0 as *mut Node,
|
||||
tail: 0 as *mut Node,
|
||||
});
|
||||
|
||||
let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
NoneBlocked => None,
|
||||
BlockedSender(token) => {
|
||||
*guard.canceled.take().unwrap() = true;
|
||||
@ -337,7 +350,7 @@ impl<T: Send> Packet<T> {
|
||||
}
|
||||
BlockedReceiver(..) => unreachable!(),
|
||||
};
|
||||
mem::drop((state, guard));
|
||||
mem::drop(guard);
|
||||
|
||||
loop {
|
||||
match queue.dequeue() {
|
||||
@ -355,8 +368,8 @@ impl<T: Send> Packet<T> {
|
||||
// If Ok, the value is whether this port has data, if Err, then the upgraded
|
||||
// port needs to be checked instead of this one.
|
||||
pub fn can_recv(&self) -> bool {
|
||||
let (_g, state) = self.lock();
|
||||
state.disconnected || state.buf.size() > 0
|
||||
let guard = self.lock();
|
||||
guard.disconnected || guard.buf.size() > 0
|
||||
}
|
||||
|
||||
// Attempts to start selection on this port. This can either succeed or fail
|
||||
@ -380,8 +393,8 @@ impl<T: Send> Packet<T> {
|
||||
//
|
||||
// The return value indicates whether there's data on this port.
|
||||
pub fn abort_selection(&self) -> bool {
|
||||
let (_g, state) = self.lock();
|
||||
match mem::replace(&mut state.blocker, NoneBlocked) {
|
||||
let guard = self.lock();
|
||||
match mem::replace(&mut guard.blocker, NoneBlocked) {
|
||||
NoneBlocked => true,
|
||||
BlockedSender(token) => {
|
||||
guard.blocker = BlockedSender(token);
|
||||
@ -396,9 +409,9 @@ impl<T: Send> Packet<T> {
|
||||
impl<T: Send> Drop for Packet<T> {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(self.channels.load(atomic::SeqCst), 0);
|
||||
let (_g, state) = self.lock();
|
||||
assert!(state.queue.dequeue().is_none());
|
||||
assert!(state.canceled.is_none());
|
||||
let guard = self.lock();
|
||||
assert!(guard.queue.dequeue().is_none());
|
||||
assert!(guard.canceled.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,10 +51,10 @@ mod imp {
|
||||
use string::String;
|
||||
use mem;
|
||||
|
||||
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
|
||||
use sync::mutex::{StaticMutex, MUTEX_INIT};
|
||||
|
||||
static mut GLOBAL_ARGS_PTR: uint = 0;
|
||||
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
|
||||
static LOCK: NativeMutex = MUTEX_INIT;
|
||||
|
||||
pub unsafe fn init(argc: int, argv: *const *const u8) {
|
||||
let args = load_argc_and_argv(argc, argv);
|
||||
|
@ -17,20 +17,18 @@ use core::prelude::*;
|
||||
use libc;
|
||||
use boxed::Box;
|
||||
use vec::Vec;
|
||||
use sync::{atomic, Once, ONCE_INIT};
|
||||
use sync::{Mutex, atomic, Once, ONCE_INIT};
|
||||
use mem;
|
||||
use thunk::Thunk;
|
||||
|
||||
use rt::exclusive::Exclusive;
|
||||
|
||||
type Queue = Exclusive<Vec<Thunk>>;
|
||||
type Queue = Mutex<Vec<Thunk>>;
|
||||
|
||||
static INIT: Once = ONCE_INIT;
|
||||
static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT;
|
||||
static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL;
|
||||
|
||||
fn init() {
|
||||
let state: Box<Queue> = box Exclusive::new(Vec::new());
|
||||
let state: Box<Queue> = box Mutex::new(Vec::new());
|
||||
unsafe {
|
||||
QUEUE.store(mem::transmute(state), atomic::SeqCst);
|
||||
libc::atexit(run);
|
||||
|
@ -71,9 +71,6 @@ pub mod backtrace;
|
||||
mod macros;
|
||||
|
||||
// These should be refactored/moved/made private over time
|
||||
pub mod mutex;
|
||||
pub mod thread;
|
||||
pub mod exclusive;
|
||||
pub mod util;
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
|
@ -58,10 +58,9 @@
|
||||
|
||||
use prelude::*;
|
||||
|
||||
use rt::exclusive::Exclusive;
|
||||
use rt;
|
||||
use sync::atomic::{mod, AtomicUint};
|
||||
use sync::{Once, ONCE_INIT};
|
||||
use sync::{Mutex, Once, ONCE_INIT};
|
||||
|
||||
use sys::thread_local as imp;
|
||||
|
||||
@ -143,7 +142,7 @@ pub const INIT_INNER: StaticKeyInner = StaticKeyInner {
|
||||
};
|
||||
|
||||
static INIT_KEYS: Once = ONCE_INIT;
|
||||
static mut KEYS: *mut Exclusive<Vec<imp::Key>> = 0 as *mut _;
|
||||
static mut KEYS: *mut Mutex<Vec<imp::Key>> = 0 as *mut _;
|
||||
|
||||
impl StaticKey {
|
||||
/// Gets the value associated with this TLS key
|
||||
|
@ -89,7 +89,7 @@ use libc;
|
||||
use mem;
|
||||
use option::{Some, None, Option};
|
||||
use result::{Ok, Err};
|
||||
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
|
||||
use sync::{StaticMutex, MUTEX_INIT};
|
||||
|
||||
use sys_common::backtrace::*;
|
||||
|
||||
@ -150,7 +150,7 @@ pub fn write(w: &mut Writer) -> IoResult<()> {
|
||||
// is semi-reasonable in terms of printing anyway, and we know that all
|
||||
// I/O done here is blocking I/O, not green I/O, so we don't have to
|
||||
// worry about this being a native vs green mutex.
|
||||
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
|
||||
static LOCK: StaticMutex = MUTEX_INIT;
|
||||
let _g = unsafe { LOCK.lock() };
|
||||
|
||||
try!(writeln!(w, "stack backtrace:"));
|
||||
|
@ -30,7 +30,7 @@ use ops::Drop;
|
||||
use option::{Some, None};
|
||||
use path::Path;
|
||||
use result::{Ok, Err};
|
||||
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
|
||||
use sync::{StaticMutex, MUTEX_INIT};
|
||||
use slice::SliceExt;
|
||||
use str::StrPrelude;
|
||||
use dynamic_lib::DynamicLibrary;
|
||||
@ -293,7 +293,7 @@ impl Drop for Cleanup {
|
||||
pub fn write(w: &mut Writer) -> IoResult<()> {
|
||||
// According to windows documentation, all dbghelp functions are
|
||||
// single-threaded.
|
||||
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
|
||||
static LOCK: StaticMutex = MUTEX_INIT;
|
||||
let _g = unsafe { LOCK.lock() };
|
||||
|
||||
// Open up dbghelp.dll, we don't link to it explicitly because it can't
|
||||
|
Loading…
x
Reference in New Issue
Block a user