Implement rwlock.downgrade and tests

This commit is contained in:
Ben Blum 2012-08-13 15:22:32 -04:00
parent 6c4843d9da
commit 7cf21e52eb

View File

@ -142,9 +142,9 @@ struct sem_and_signal_release {
}
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
struct condvar { priv sem: &sem<waitqueue>; }
struct condvar { priv sem: &sem<waitqueue>; drop { } }
impl condvar {
impl &condvar {
/// Atomically drop the associated lock, and block until a signal is sent.
fn wait() {
// Create waiter nobe.
@ -212,8 +212,9 @@ impl condvar {
}
impl &sem<waitqueue> {
fn access_cond<U>(blk: fn(condvar) -> U) -> U {
do self.access { blk(condvar { sem: self }) }
// The only other place that condvars get built is rwlock_write_mode.
fn access_cond<U>(blk: fn(c: &condvar) -> U) -> U {
do self.access { blk(&condvar { sem: self }) }
}
}
@ -272,7 +273,7 @@ impl &mutex {
fn lock<U>(blk: fn() -> U) -> U { (&self.sem).access(blk) }
/// Run a function with ownership of the mutex and a handle to a condvar.
fn lock_cond<U>(blk: fn(condvar) -> U) -> U {
fn lock_cond<U>(blk: fn(c: &condvar) -> U) -> U {
(&self.sem).access_cond(blk)
}
}
@ -321,12 +322,18 @@ impl &rwlock {
do (&self.order_lock).access {
let mut first_reader = false;
do self.state.with |state| {
state.read_mode = true;
first_reader = (state.read_count == 0);
state.read_count += 1;
}
if first_reader {
(&self.access_lock).acquire();
do self.state.with |state| {
// Must happen *after* getting access_lock. If
// this is set while readers are waiting, but
// while a writer holds the lock, the writer will
// be confused if they downgrade-then-unlock.
state.read_mode = true;
}
}
}
release = some(rwlock_release_read(self));
@ -357,7 +364,7 @@ impl &rwlock {
* the waiting task is signalled. (Note: a writer that waited and then
* was signalled might reacquire the lock before other waiting writers.)
*/
fn write_cond<U>(blk: fn(condvar) -> U) -> U {
fn write_cond<U>(blk: fn(c: &condvar) -> U) -> U {
// NB: You might think I should thread the order_lock into the cond
// wait call, so that it gets waited on before access_lock gets
// reacquired upon being woken up. However, (a) this would be not
@ -374,7 +381,62 @@ impl &rwlock {
}
}
// to-do implement downgrade
/**
* As write(), but with the ability to atomically 'downgrade' the lock;
* i.e., to become a reader without letting other writers get the lock in
* the meantime (such as unlocking and then re-locking as a reader would
* do). The block takes a "write mode token" argument, which can be
* transformed into a "read mode token" by calling downgrade(). Example:
*
* do lock.write_downgrade |write_mode| {
* do (&write_mode).write_cond |condvar| {
* ... exclusive access ...
* }
* let read_mode = lock.downgrade(write_mode);
* do (&read_mode).read {
* ... shared access ...
* }
* }
*/
fn write_downgrade<U>(blk: fn(+rwlock_write_mode) -> U) -> U {
// Implementation slightly different from the slicker 'write's above.
// The exit path is conditional on whether the caller downgrades.
let mut _release = none;
unsafe {
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
}
_release = some(rwlock_release_downgrade(self));
}
blk(rwlock_write_mode { lock: self })
}
fn downgrade(+token: rwlock_write_mode) -> rwlock_read_mode {
if !ptr::ref_eq(self, token.lock) {
fail ~"Can't downgrade() with a different rwlock's write_mode!";
}
unsafe {
do task::unkillable {
let mut first_reader = false;
do self.state.with |state| {
assert !state.read_mode;
state.read_mode = true;
first_reader = (state.read_count == 0);
state.read_count += 1;
}
if !first_reader {
// Guaranteed not to let another writer in, because
// another reader was holding the order_lock. Hence they
// must be the one to get the access_lock (because all
// access_locks are acquired with order_lock held).
(&self.access_lock).release();
}
}
}
rwlock_read_mode { lock: token.lock }
}
}
// FIXME(#3136) should go inside of read()
@ -386,8 +448,12 @@ struct rwlock_release_read {
let mut last_reader = false;
do self.lock.state.with |state| {
assert state.read_mode;
assert state.read_count > 0;
state.read_count -= 1;
last_reader = (state.read_count == 0);
if state.read_count == 0 {
last_reader = true;
state.read_mode = false;
}
}
if last_reader {
(&self.lock.access_lock).release();
@ -396,6 +462,56 @@ struct rwlock_release_read {
}
}
// FIXME(#3136) should go inside of downgrade()
struct rwlock_release_downgrade {
lock: &rwlock;
new(lock: &rwlock) { self.lock = lock; }
drop unsafe {
do task::unkillable {
let mut writer_or_last_reader = false;
do self.lock.state.with |state| {
if state.read_mode {
assert state.read_count > 0;
state.read_count -= 1;
if state.read_count == 0 {
// Case 1: Writer downgraded & was the last reader
writer_or_last_reader = true;
state.read_mode = false;
} else {
// Case 2: Writer downgraded & was not the last reader
}
} else {
// Case 3: Writer did not downgrade
writer_or_last_reader = true;
}
}
if writer_or_last_reader {
(&self.lock.access_lock).release();
}
}
}
}
/// The "write permission" token used for rwlock.write_downgrade().
// FIXME(#3145): make lock priv somehow
struct rwlock_write_mode { lock: &rwlock; drop { } }
/// The "read permission" token used for rwlock.write_downgrade().
struct rwlock_read_mode { priv lock: &rwlock; drop { } }
// FIXME(#3145) XXX Region invariance forbids "mode.write(blk)"
impl rwlock_write_mode {
/// Access the pre-downgrade rwlock in write mode.
fn write<U>(blk: fn() -> U) -> U { blk() }
/// Access the pre-downgrade rwlock in write mode with a condvar.
fn write_cond<U>(blk: fn(c: &condvar) -> U) -> U {
blk(&condvar { sem: &self.lock.access_lock })
}
}
impl rwlock_read_mode {
/// Access the post-downgrade rwlock in read mode.
fn read<U>(blk: fn() -> U) -> U { blk() }
}
/****************************************************************************
* Tests
****************************************************************************/
@ -510,9 +626,11 @@ mod tests {
let sharedstate = ~0;
let ptr = ptr::addr_of(*sharedstate);
do task::spawn {
let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
let sharedstate: &mut int =
unsafe { unsafe::reinterpret_cast(ptr) };
access_shared(sharedstate, m2, 10);
c.send(());
}
access_shared(sharedstate, m, 10);
let _ = p.recv();
@ -645,21 +763,27 @@ mod tests {
// child task must have finished by the time try returns
do m.lock_cond |cond| {
let _woken = cond.signal();
// FIXME(#3145) - The semantics of pipes are not quite what I want
// here - the pipe doesn't get 'terminated' if the child was
// punted awake during failure.
// assert !woken;
// FIXME(#3145) this doesn't work
//assert !woken;
}
}
/************************************************************************
* Reader/writer lock tests
************************************************************************/
#[cfg(test)]
fn lock_rwlock_in_mode(x: &rwlock, reader: bool, blk: fn()) {
if reader { x.read(blk); } else { x.write(blk); }
enum rwlock_mode { read, write, downgrade, downgrade_read }
#[cfg(test)]
fn lock_rwlock_in_mode(x: &rwlock, mode: rwlock_mode, blk: fn()) {
match mode {
read => x.read(blk),
write => x.write(blk),
downgrade => do x.write_downgrade |mode| { mode.write(blk); },
downgrade_read =>
do x.write_downgrade |mode| { x.downgrade(mode).read(blk); },
}
}
#[cfg(test)]
fn test_rwlock_exclusion(reader1: bool, reader2: bool) {
fn test_rwlock_exclusion(mode1: rwlock_mode, mode2: rwlock_mode) {
// Test mutual exclusion between readers and writers. Just like the
// mutex mutual exclusion test, a ways above.
let (c,p) = pipes::stream();
@ -668,19 +792,20 @@ mod tests {
let sharedstate = ~0;
let ptr = ptr::addr_of(*sharedstate);
do task::spawn {
let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
access_shared(sharedstate, x2, reader1, 10);
let sharedstate: &mut int =
unsafe { unsafe::reinterpret_cast(ptr) };
access_shared(sharedstate, x2, mode1, 10);
c.send(());
}
access_shared(sharedstate, x, reader2, 10);
access_shared(sharedstate, x, mode2, 10);
let _ = p.recv();
assert *sharedstate == 20;
fn access_shared(sharedstate: &mut int, x: &rwlock, reader: bool,
fn access_shared(sharedstate: &mut int, x: &rwlock, mode: rwlock_mode,
n: uint) {
for n.times {
do lock_rwlock_in_mode(x, reader) {
do lock_rwlock_in_mode(x, mode) {
let oldval = *sharedstate;
task::yield();
*sharedstate = oldval + 1;
@ -690,32 +815,59 @@ mod tests {
}
#[test]
fn test_rwlock_readers_wont_modify_the_data() {
test_rwlock_exclusion(true, false);
test_rwlock_exclusion(false, true);
test_rwlock_exclusion(read, write);
test_rwlock_exclusion(write, read);
test_rwlock_exclusion(read, downgrade);
test_rwlock_exclusion(downgrade, read);
}
#[test]
fn test_rwlock_writers_and_writers() {
test_rwlock_exclusion(false, false);
test_rwlock_exclusion(write, write);
test_rwlock_exclusion(write, downgrade);
test_rwlock_exclusion(downgrade, write);
test_rwlock_exclusion(downgrade, downgrade);
}
#[test]
fn test_rwlock_readers_and_readers() {
#[cfg(test)]
fn test_rwlock_handshake(mode1: rwlock_mode, mode2: rwlock_mode,
make_mode2_go_first: bool) {
// Much like sem_multi_resource.
let x = ~rwlock();
let x2 = ~x.clone();
let (c1,p1) = pipes::stream();
let (c2,p2) = pipes::stream();
do task::spawn {
do x2.read {
if !make_mode2_go_first {
let _ = p2.recv(); // parent sends to us once it locks, or ...
}
do lock_rwlock_in_mode(x2, mode2) {
if make_mode2_go_first {
c1.send(()); // ... we send to it once we lock
}
let _ = p2.recv();
c1.send(());
}
}
do x.read {
if make_mode2_go_first {
let _ = p1.recv(); // child sends to us once it locks, or ...
}
do lock_rwlock_in_mode(x, mode1) {
if !make_mode2_go_first {
c2.send(()); // ... we send to it once we lock
}
c2.send(());
let _ = p1.recv();
}
}
#[test]
fn test_rwlock_readers_and_readers() {
test_rwlock_handshake(read, read, false);
// The downgrader needs to get in before the reader gets in, otherwise
// they cannot end up reading at the same time.
test_rwlock_handshake(downgrade_read, read, false);
test_rwlock_handshake(read, downgrade_read, true);
// Two downgrade_reads can never both end up reading at the same time.
}
#[test]
fn test_rwlock_cond_wait() {
// As test_mutex_cond_wait above.
let x = ~rwlock();
@ -751,26 +903,40 @@ mod tests {
do x.read { } // Just for good measure
}
#[cfg(test)] #[ignore(cfg(windows))]
fn rwlock_kill_helper(reader1: bool, reader2: bool) {
fn rwlock_kill_helper(mode1: rwlock_mode, mode2: rwlock_mode) {
// Mutex must get automatically unlocked if failed/killed within.
let x = ~rwlock();
let x2 = ~x.clone();
let result: result::result<(),()> = do task::try {
do lock_rwlock_in_mode(x2, reader1) {
do lock_rwlock_in_mode(x2, mode1) {
fail;
}
};
assert result.is_err();
// child task must have finished by the time try returns
do lock_rwlock_in_mode(x, reader2) { }
do lock_rwlock_in_mode(x, mode2) { }
}
#[test] #[ignore(cfg(windows))]
fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(true, false); }
fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(read, write); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(false,true ); }
fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(write,read ); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(true, true ); }
fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(read, read ); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(false,false); }
fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(write,write); }
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_rwlock_downgrade_cant_swap() {
// Tests that you can't downgrade with a different rwlock's token.
let x = ~rwlock();
let y = ~rwlock();
do x.write_downgrade |xwrite| {
let mut xopt = some(xwrite);
do y.write_downgrade |_ywrite| {
do y.downgrade(option::swap_unwrap(&mut xopt)).read {
error!("oops, y.downgrade(x) should have failed!");
}
}
}
}
}