Replace rust_atomic_increment/decrement and rust_compare_and_swap_ptr with intrinsics.

This commit is contained in:
Luqman Aden 2012-10-21 22:24:56 -04:00
parent e1db959ec2
commit 48582b360c
3 changed files with 146 additions and 25 deletions

View File

@ -14,14 +14,6 @@ extern mod rustrt {
fn rust_task_weaken(ch: rust_port_id);
fn rust_task_unweaken(ch: rust_port_id);
#[rust_stack]
fn rust_atomic_increment(p: &mut libc::intptr_t)
-> libc::intptr_t;
#[rust_stack]
fn rust_atomic_decrement(p: &mut libc::intptr_t)
-> libc::intptr_t;
#[rust_stack]
fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
oldval: libc::uintptr_t,
@ -33,6 +25,15 @@ extern mod rustrt {
fn rust_unlock_little_lock(lock: rust_little_lock);
}
#[abi = "rust-intrinsic"]
extern mod rusti {
#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
fn atomic_xadd(dst: &mut int, src: int) -> int;
fn atomic_xsub(dst: &mut int, src: int) -> int;
}
#[allow(non_camel_case_types)] // runtime type
type rust_port_id = uint;
@ -43,6 +44,7 @@ type GlobalPtr = *libc::uintptr_t;
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
#[cfg(stage0)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
@ -103,6 +105,68 @@ pub unsafe fn chan_from_global_ptr<T: Send>(
}
}
#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
f: fn~(comm::Port<T>)
) -> comm::Chan<T> {
enum Msg {
Proceed,
Abort
}
log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
let is_probably_zero = *global == 0u;
log(debug,~"after is_prob_zero check");
if is_probably_zero {
log(debug,~"is probably zero...");
// There's no global channel. We must make it
let (setup_po, setup_ch) = do task_fn().spawn_conversation
|move f, setup_po, setup_ch| {
let po = comm::Port::<T>();
let ch = comm::Chan(&po);
comm::send(setup_ch, ch);
// Wait to hear if we are the official instance of
// this global task
match comm::recv::<Msg>(setup_po) {
Proceed => f(move po),
Abort => ()
}
};
log(debug,~"before setup recv..");
// This is the proposed global channel
let ch = comm::recv(setup_po);
// 0 is our sentinal value. It is not a valid channel
assert *ch != 0;
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
rusti::atomic_cxchg(
cast::reinterpret_cast(&global),
0, cast::reinterpret_cast(&ch));
let swapped = *global != 0;
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
// Success!
comm::send(setup_ch, Proceed);
ch
} else {
// Somebody else got in before we did
comm::send(setup_ch, Abort);
cast::reinterpret_cast(&*global)
}
} else {
log(debug, ~"global != 0");
cast::reinterpret_cast(&*global)
}
}
#[test]
pub fn test_from_global_chan1() {
@ -305,7 +369,7 @@ struct ArcDestruct<T> {
}
do task::unkillable {
let data: ~ArcData<T> = cast::reinterpret_cast(&self.data);
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
let new_count = rusti::atomic_xsub(&mut data.count, 1) - 1;
assert new_count >= 0;
if new_count == 0 {
// Were we really last, or should we hand off to an unwrapper?
@ -341,6 +405,7 @@ fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
}
}
#[cfg(stage0)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
@ -373,8 +438,76 @@ pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
// Got in. Step 0: Tell destructor not to run. We are now it.
rc.data = ptr::null();
// Step 1 - drop our own reference.
let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
// assert new_count >= 0;
let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
//assert new_count >= 0;
if new_count == 0 {
// We were the last owner. Can unwrap immediately.
// Also we have to free the server endpoints.
let _server: UnwrapProto = cast::transmute(move serverp);
option::swap_unwrap(&mut ptr.data)
// drop glue takes over.
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let end_result =
DeathThroes { ptr: Some(move ptr),
response: Some(move c2) };
let mut p1 = Some(move p1); // argh
do task::rekillable {
pipes::recv_one(option::swap_unwrap(&mut p1));
}
// Got here. Back in the 'unkillable' without getting killed.
// Recover ownership of ptr, then take the data out.
let ptr = option::swap_unwrap(&mut end_result.ptr);
option::swap_unwrap(&mut ptr.data)
// drop glue takes over.
}
} else {
// Somebody else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(move ptr);
// Also we have to free the (rejected) server endpoints.
let _server: UnwrapProto = cast::transmute(move serverp);
fail ~"Another task is already unwrapping this ARC!";
}
}
}
#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
mut ptr: Option<~ArcData<T>>,
mut response: Option<pipes::ChanOne<bool>>,
drop unsafe {
let response = option::swap_unwrap(&mut self.response);
// In case we get killed early, we need to tell the person who
// tried to wake us whether they should hand-off the data to us.
if task::failing() {
pipes::send_one(move response, false);
// Either this swap_unwrap or the one below (at "Got here")
// ought to run.
cast::forget(option::swap_unwrap(&mut self.ptr));
} else {
assert self.ptr.is_none();
pipes::send_one(move response, true);
}
}
}
do task::unkillable {
let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
let (c1,p1) = pipes::oneshot(); // ()
let (c2,p2) = pipes::oneshot(); // bool
let server: UnwrapProto = ~mut Some((move c1,move p2));
let serverp: libc::uintptr_t = cast::transmute(move server);
// Try to put our server end in the unwrapper slot.
rusti::atomic_cxchg(cast::reinterpret_cast(&ptr.unwrapper),
0, serverp as int);
if ptr.unwrapper != 0 {
// Got in. Step 0: Tell destructor not to run. We are now it.
rc.data = ptr::null();
// Step 1 - drop our own reference.
let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
//assert new_count >= 0;
if new_count == 0 {
// We were the last owner. Can unwrap immediately.
// Also we have to free the server endpoints.
@ -452,7 +585,7 @@ pub unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
-> SharedMutableState<T> {
unsafe {
let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
let new_count = rusti::atomic_xadd(&mut ptr.count, 1) + 1;
assert new_count >= 2;
cast::forget(move ptr);
}

View File

@ -830,16 +830,6 @@ rust_compare_and_swap_ptr(intptr_t *address,
return sync::compare_and_swap(address, oldval, newval);
}
extern "C" CDECL intptr_t
rust_atomic_increment(intptr_t *address) {
return sync::increment(address);
}
extern "C" CDECL intptr_t
rust_atomic_decrement(intptr_t *address) {
return sync::decrement(address);
}
extern "C" CDECL void
rust_task_weaken(rust_port_id chan) {
rust_task *task = rust_get_current_task();

View File

@ -178,8 +178,6 @@ rust_dbg_do_nothing
rust_dbg_breakpoint
rust_osmain_sched_id
rust_compare_and_swap_ptr
rust_atomic_increment
rust_atomic_decrement
rust_global_env_chan_ptr
rust_port_take
rust_port_drop
@ -207,4 +205,4 @@ rust_gc_metadata
rust_uv_ip4_port
rust_uv_ip6_port
rust_uv_tcp_getpeername
rust_uv_tcp_getpeername6
rust_uv_tcp_getpeername6