Put the initialisation value into the store buffer

This commit is contained in:
Andy Wang 2022-05-16 23:05:36 +01:00
parent 577054c6de
commit 92145373c3
No known key found for this signature in database
GPG Key ID: 181B49F9F38F3374
4 changed files with 137 additions and 53 deletions

View File

@ -533,7 +533,11 @@ fn write_scalar_atomic(
let this = self.eval_context_mut();
this.allow_data_races_mut(move |this| this.write_scalar(val, &(*dest).into()))?;
this.validate_atomic_store(dest, atomic)?;
this.buffered_atomic_write(val, dest, atomic)
// FIXME: it's not possible to get the value before write_scalar. A read_scalar will cause
// side effects from a read the program did not perform. So we have to initialise
// the store buffer with the value currently being written
// ONCE this is fixed please remove the hack in buffered_atomic_write() in weak_memory.rs
this.buffered_atomic_write(val, dest, atomic, val)
}
/// Perform an atomic operation on a memory location.
@ -556,7 +560,12 @@ fn atomic_op_immediate(
this.validate_atomic_rmw(place, atomic)?;
this.buffered_atomic_rmw(val.to_scalar_or_uninit(), place, atomic)?;
this.buffered_atomic_rmw(
val.to_scalar_or_uninit(),
place,
atomic,
old.to_scalar_or_uninit(),
)?;
Ok(old)
}
@ -575,7 +584,7 @@ fn atomic_exchange_scalar(
this.validate_atomic_rmw(place, atomic)?;
this.buffered_atomic_rmw(new, place, atomic)?;
this.buffered_atomic_rmw(new, place, atomic, old)?;
Ok(old)
}
@ -603,7 +612,12 @@ fn atomic_min_max_scalar(
this.validate_atomic_rmw(place, atomic)?;
this.buffered_atomic_rmw(new_val.to_scalar_or_uninit(), place, atomic)?;
this.buffered_atomic_rmw(
new_val.to_scalar_or_uninit(),
place,
atomic,
old.to_scalar_or_uninit(),
)?;
// Return the old value.
Ok(old)
@ -654,14 +668,14 @@ fn atomic_compare_exchange_scalar(
if cmpxchg_success {
this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?;
this.validate_atomic_rmw(place, success)?;
this.buffered_atomic_rmw(new, place, success)?;
this.buffered_atomic_rmw(new, place, success, old.to_scalar_or_uninit())?;
} else {
this.validate_atomic_load(place, fail)?;
// A failed compare exchange is equivalent to a load, reading from the latest store
// in the modification order.
// Since `old` is only a value and not the store element, we need to separately
// find it in our store buffer and perform load_impl on it.
this.perform_read_on_buffered_latest(place, fail)?;
this.perform_read_on_buffered_latest(place, fail, old.to_scalar_or_uninit())?;
}
// Return the old value.

View File

@ -81,11 +81,11 @@
pub struct StoreBufferAlloc {
/// Store buffer of each atomic object in this allocation
// Behind a RefCell because we need to allocate/remove on read access
store_buffer: RefCell<AllocationMap<StoreBuffer>>,
store_buffers: RefCell<AllocationMap<StoreBuffer>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct StoreBuffer {
pub(super) struct StoreBuffer {
// Stores to this location in modification order
buffer: VecDeque<StoreElement>,
}
@ -111,21 +111,23 @@ struct StoreElement {
impl StoreBufferAlloc {
pub fn new_allocation() -> Self {
Self { store_buffer: RefCell::new(AllocationMap::new()) }
Self { store_buffers: RefCell::new(AllocationMap::new()) }
}
/// Gets a store buffer associated with an atomic object in this allocation
fn get_store_buffer<'tcx>(
/// Or creates one with the specified initial value
fn get_or_create_store_buffer<'tcx>(
&self,
range: AllocRange,
init: ScalarMaybeUninit<Tag>,
) -> InterpResult<'tcx, Ref<'_, StoreBuffer>> {
let access_type = self.store_buffer.borrow().access_type(range);
let access_type = self.store_buffers.borrow().access_type(range);
let pos = match access_type {
AccessType::PerfectlyOverlapping(pos) => pos,
AccessType::Empty(pos) => {
// First atomic access on this range, allocate a new StoreBuffer
let mut buffer = self.store_buffer.borrow_mut();
buffer.insert_at_pos(pos, range, StoreBuffer::default());
let new_buffer = StoreBuffer::new(init);
let mut buffers = self.store_buffers.borrow_mut();
buffers.insert_at_pos(pos, range, new_buffer);
pos
}
AccessType::ImperfectlyOverlapping(pos_range) => {
@ -140,20 +142,22 @@ fn get_store_buffer<'tcx>(
}
}
};
Ok(Ref::map(self.store_buffer.borrow(), |buffer| &buffer[pos]))
Ok(Ref::map(self.store_buffers.borrow(), |buffer| &buffer[pos]))
}
/// Gets a mutable store buffer associated with an atomic object in this allocation
fn get_store_buffer_mut<'tcx>(
fn get_or_create_store_buffer_mut<'tcx>(
&mut self,
range: AllocRange,
init: ScalarMaybeUninit<Tag>,
) -> InterpResult<'tcx, &mut StoreBuffer> {
let buffer = self.store_buffer.get_mut();
let access_type = buffer.access_type(range);
let buffers = self.store_buffers.get_mut();
let access_type = buffers.access_type(range);
let pos = match access_type {
AccessType::PerfectlyOverlapping(pos) => pos,
AccessType::Empty(pos) => {
buffer.insert_at_pos(pos, range, StoreBuffer::default());
let new_buffer = StoreBuffer::new(init);
buffers.insert_at_pos(pos, range, new_buffer);
pos
}
AccessType::ImperfectlyOverlapping(pos_range) => {
@ -164,19 +168,28 @@ fn get_store_buffer_mut<'tcx>(
}
}
};
Ok(&mut buffer[pos])
}
}
impl Default for StoreBuffer {
fn default() -> Self {
let mut buffer = VecDeque::new();
buffer.reserve(STORE_BUFFER_LIMIT);
Self { buffer }
Ok(&mut buffers[pos])
}
}
impl<'mir, 'tcx: 'mir> StoreBuffer {
fn new(init: ScalarMaybeUninit<Tag>) -> Self {
let mut buffer = VecDeque::new();
buffer.reserve(STORE_BUFFER_LIMIT);
let mut ret = Self { buffer };
let store_elem = StoreElement {
// The thread index and timestamp of the initialisation write
// are never meaningfully used, so it's fine to leave them as 0
store_index: VectorIdx::from(0),
timestamp: 0,
val: init,
is_seqcst: false,
loads: RefCell::new(FxHashMap::default()),
};
ret.buffer.push_back(store_elem);
ret
}
/// Reads from the last store in modification order
fn read_from_last_store(&self, global: &GlobalState) {
let store_elem = self.buffer.back();
@ -192,7 +205,7 @@ fn buffered_read(
is_seqcst: bool,
rng: &mut (impl rand::Rng + ?Sized),
validate: impl FnOnce() -> InterpResult<'tcx>,
) -> InterpResult<'tcx, Option<ScalarMaybeUninit<Tag>>> {
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
// Having a live borrow to store_buffer while calling validate_atomic_load is fine
// because the race detector doesn't touch store_buffer
@ -210,10 +223,8 @@ fn buffered_read(
// requires access to ThreadClockSet.clock, which is updated by the race detector
validate()?;
let loaded = store_elem.map(|store_elem| {
let (index, clocks) = global.current_thread_state();
store_elem.load_impl(index, &clocks)
});
let loaded = store_elem.load_impl(index, &clocks);
Ok(loaded)
}
@ -230,23 +241,18 @@ fn buffered_write(
}
/// Selects a valid store element in the buffer.
/// The buffer does not contain the value used to initialise the atomic object
/// so a fresh atomic object has an empty store buffer and this function
/// will return `None`. In this case, the caller should ensure that the non-buffered
/// value from `MiriEvalContext::read_scalar()` is observed by the program, which is
/// the initial value of the atomic object. `MiriEvalContext::read_scalar()` is always
/// the latest value in modification order so it is always correct to be observed by any thread.
fn fetch_store<R: rand::Rng + ?Sized>(
&self,
is_seqcst: bool,
clocks: &ThreadClockSet,
rng: &mut R,
) -> Option<&StoreElement> {
) -> &StoreElement {
use rand::seq::IteratorRandom;
let mut found_sc = false;
// FIXME: this should be an inclusive take_while (stops after a false predicate, but
// FIXME: we want an inclusive take_while (stops after a false predicate, but
// includes the element that gave the false), but such function doesn't yet
// exist in the standard libary https://github.com/rust-lang/rust/issues/62208
// so we have to hack around it with keep_searching
let mut keep_searching = true;
let candidates = self
.buffer
@ -303,7 +309,9 @@ fn fetch_store<R: rand::Rng + ?Sized>(
}
});
candidates.choose(rng)
candidates
.choose(rng)
.expect("store buffer cannot be empty, an element is populated on construction")
}
/// ATOMIC STORE IMPL in the paper (except we don't need the location's vector clock)
@ -366,6 +374,7 @@ fn buffered_atomic_rmw(
new_val: ScalarMaybeUninit<Tag>,
place: &MPlaceTy<'tcx, Tag>,
atomic: AtomicRwOp,
init: ScalarMaybeUninit<Tag>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
@ -379,7 +388,7 @@ fn buffered_atomic_rmw(
global.sc_write();
}
let range = alloc_range(base_offset, place.layout.size);
let buffer = alloc_buffers.get_store_buffer_mut(range)?;
let buffer = alloc_buffers.get_or_create_store_buffer_mut(range, init)?;
buffer.read_from_last_store(global);
buffer.buffered_write(new_val, global, atomic == AtomicRwOp::SeqCst)?;
}
@ -401,8 +410,10 @@ fn buffered_atomic_read(
global.sc_read();
}
let mut rng = this.machine.rng.borrow_mut();
let buffer =
alloc_buffers.get_store_buffer(alloc_range(base_offset, place.layout.size))?;
let buffer = alloc_buffers.get_or_create_store_buffer(
alloc_range(base_offset, place.layout.size),
latest_in_mo,
)?;
let loaded = buffer.buffered_read(
global,
atomic == AtomicReadOp::SeqCst,
@ -410,7 +421,7 @@ fn buffered_atomic_read(
validate,
)?;
return Ok(loaded.unwrap_or(latest_in_mo));
return Ok(loaded);
}
}
@ -424,6 +435,7 @@ fn buffered_atomic_write(
val: ScalarMaybeUninit<Tag>,
dest: &MPlaceTy<'tcx, Tag>,
atomic: AtomicWriteOp,
init: ScalarMaybeUninit<Tag>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(dest.ptr)?;
@ -435,8 +447,23 @@ fn buffered_atomic_write(
if atomic == AtomicWriteOp::SeqCst {
global.sc_write();
}
let buffer =
alloc_buffers.get_store_buffer_mut(alloc_range(base_offset, dest.layout.size))?;
// UGLY HACK: in write_scalar_atomic() we don't know the value before our write,
// so init == val always. If the buffer is fresh then we would've duplicated an entry,
// so we need to remove it.
let was_empty = matches!(
alloc_buffers
.store_buffers
.borrow()
.access_type(alloc_range(base_offset, dest.layout.size)),
AccessType::Empty(_)
);
let buffer = alloc_buffers
.get_or_create_store_buffer_mut(alloc_range(base_offset, dest.layout.size), init)?;
if was_empty {
buffer.buffer.pop_front();
}
buffer.buffered_write(val, global, atomic == AtomicWriteOp::SeqCst)?;
}
@ -451,6 +478,7 @@ fn perform_read_on_buffered_latest(
&self,
place: &MPlaceTy<'tcx, Tag>,
atomic: AtomicReadOp,
init: ScalarMaybeUninit<Tag>,
) -> InterpResult<'tcx> {
let this = self.eval_context_ref();
@ -461,7 +489,8 @@ fn perform_read_on_buffered_latest(
let size = place.layout.size;
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() {
let buffer = alloc_buffers.get_store_buffer(alloc_range(base_offset, size))?;
let buffer = alloc_buffers
.get_or_create_store_buffer(alloc_range(base_offset, size), init)?;
buffer.read_from_last_store(global);
}
}

View File

@ -34,8 +34,6 @@ unsafe impl<T> Sync for EvilSend<T> {}
// multiple times
fn static_atomic(val: usize) -> &'static AtomicUsize {
let ret = Box::leak(Box::new(AtomicUsize::new(val)));
// A workaround to put the initialisation value in the store buffer
ret.store(val, Relaxed);
ret
}
@ -205,8 +203,19 @@ fn test_sc_store_buffering() {
assert_ne!((a, b), (0, 0));
}
fn test_single_thread() {
let x = AtomicUsize::new(42);
assert_eq!(x.load(Relaxed), 42);
x.store(43, Relaxed);
assert_eq!(x.load(Relaxed), 43);
}
pub fn main() {
for _ in 0..100 {
test_single_thread();
test_mixed_access();
test_load_buffering_acq_rel();
test_message_passing();

View File

@ -22,11 +22,17 @@ unsafe impl<T> Sync for EvilSend<T> {}
// multiple times
fn static_atomic(val: usize) -> &'static AtomicUsize {
let ret = Box::leak(Box::new(AtomicUsize::new(val)));
// A workaround to put the initialisation value in the store buffer
ret.store(val, Relaxed);
ret
}
// Spins until it reads the given value
fn reads_value(loc: &AtomicUsize, val: usize) -> usize {
while loc.load(Relaxed) != val {
std::hint::spin_loop();
}
val
}
fn relaxed() -> bool {
let x = static_atomic(0);
let j1 = spawn(move || {
@ -64,6 +70,31 @@ fn seq_cst() -> bool {
r3 == 1
}
fn initialization_write() -> bool {
let x = static_atomic(11);
assert_eq!(x.load(Relaxed), 11);
let wait = static_atomic(0);
let j1 = spawn(move || {
x.store(22, Relaxed);
// Relaxed is intentional. We want to test if the thread 2 reads the initialisation write
// after a relaxed write
wait.store(1, Relaxed);
});
let j2 = spawn(move || {
reads_value(wait, 1);
x.load(Relaxed)
});
j1.join().unwrap();
let r2 = j2.join().unwrap();
r2 == 11
}
// Asserts that the function returns true at least once in 100 runs
macro_rules! assert_once {
($f:ident) => {
@ -74,4 +105,5 @@ macro_rules! assert_once {
pub fn main() {
assert_once!(relaxed);
assert_once!(seq_cst);
assert_once!(initialization_write);
}