Fix compare_and_swap to not break tests.

This commit is contained in:
Luqman Aden 2012-10-22 21:47:14 -04:00
parent 48582b360c
commit ed447a7447

View File

@ -5,7 +5,6 @@
#[doc(hidden)];
use compare_and_swap = rustrt::rust_compare_and_swap_ptr;
use task::TaskBuilder;
use task::atomically;
@ -39,12 +38,27 @@ type rust_port_id = uint;
type GlobalPtr = *libc::uintptr_t;
// TODO: Remove once snapshots have atomic_cxchg
#[cfg(stage0)]
fn compare_and_swap(address: &mut libc::uintptr_t,
oldval: libc::uintptr_t,
newval: libc::uintptr_t) -> bool {
rustrt::rust_compare_and_swap_ptr(address, oldval, newval)
}
#[cfg(stage1)]
#[cfg(stage2)]
#[cfg(stage3)]
fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
let old = rusti::atomic_cxchg(address, oldval, newval);
old == oldval
}
/**
* Atomically gets a channel from a pointer to a pointer-sized memory location
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
#[cfg(stage0)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
@ -86,70 +100,8 @@ pub unsafe fn chan_from_global_ptr<T: Send>(
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
let swapped = compare_and_swap(
cast::reinterpret_cast(&global),
0u, cast::reinterpret_cast(&ch));
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
// Success!
comm::send(setup_ch, Proceed);
ch
} else {
// Somebody else got in before we did
comm::send(setup_ch, Abort);
cast::reinterpret_cast(&*global)
}
} else {
log(debug, ~"global != 0");
cast::reinterpret_cast(&*global)
}
}
#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
f: fn~(comm::Port<T>)
) -> comm::Chan<T> {
enum Msg {
Proceed,
Abort
}
log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
let is_probably_zero = *global == 0u;
log(debug,~"after is_prob_zero check");
if is_probably_zero {
log(debug,~"is probably zero...");
// There's no global channel. We must make it
let (setup_po, setup_ch) = do task_fn().spawn_conversation
|move f, setup_po, setup_ch| {
let po = comm::Port::<T>();
let ch = comm::Chan(&po);
comm::send(setup_ch, ch);
// Wait to hear if we are the official instance of
// this global task
match comm::recv::<Msg>(setup_po) {
Proceed => f(move po),
Abort => ()
}
};
log(debug,~"before setup recv..");
// This is the proposed global channel
let ch = comm::recv(setup_po);
// 0 is our sentinal value. It is not a valid channel
assert *ch != 0;
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
rusti::atomic_cxchg(
cast::reinterpret_cast(&global),
0, cast::reinterpret_cast(&ch));
let swapped = *global != 0;
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
@ -405,7 +357,6 @@ fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
}
}
#[cfg(stage0)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
@ -471,74 +422,6 @@ pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
}
}
#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
mut ptr: Option<~ArcData<T>>,
mut response: Option<pipes::ChanOne<bool>>,
drop unsafe {
let response = option::swap_unwrap(&mut self.response);
// In case we get killed early, we need to tell the person who
// tried to wake us whether they should hand-off the data to us.
if task::failing() {
pipes::send_one(move response, false);
// Either this swap_unwrap or the one below (at "Got here")
// ought to run.
cast::forget(option::swap_unwrap(&mut self.ptr));
} else {
assert self.ptr.is_none();
pipes::send_one(move response, true);
}
}
}
do task::unkillable {
let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
let (c1,p1) = pipes::oneshot(); // ()
let (c2,p2) = pipes::oneshot(); // bool
let server: UnwrapProto = ~mut Some((move c1,move p2));
let serverp: libc::uintptr_t = cast::transmute(move server);
// Try to put our server end in the unwrapper slot.
rusti::atomic_cxchg(cast::reinterpret_cast(&ptr.unwrapper),
0, serverp as int);
if ptr.unwrapper != 0 {
// Got in. Step 0: Tell destructor not to run. We are now it.
rc.data = ptr::null();
// Step 1 - drop our own reference.
let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
//assert new_count >= 0;
if new_count == 0 {
// We were the last owner. Can unwrap immediately.
// Also we have to free the server endpoints.
let _server: UnwrapProto = cast::transmute(move serverp);
option::swap_unwrap(&mut ptr.data)
// drop glue takes over.
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let end_result =
DeathThroes { ptr: Some(move ptr),
response: Some(move c2) };
let mut p1 = Some(move p1); // argh
do task::rekillable {
pipes::recv_one(option::swap_unwrap(&mut p1));
}
// Got here. Back in the 'unkillable' without getting killed.
// Recover ownership of ptr, then take the data out.
let ptr = option::swap_unwrap(&mut end_result.ptr);
option::swap_unwrap(&mut ptr.data)
// drop glue takes over.
}
} else {
// Somebody else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(move ptr);
// Also we have to free the (rejected) server endpoints.
let _server: UnwrapProto = cast::transmute(move serverp);
fail ~"Another task is already unwrapping this ARC!";
}
}
}
/**
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
*