Touch up semaphores; add another test

This commit is contained in:
Ben Blum 2012-08-07 18:42:33 -04:00
parent a3f9e18b7a
commit 64ba7a31cb

View File

@ -17,19 +17,23 @@ type signal_end = pipes::chan<()>;
type waitqueue = { head: pipes::port<signal_end>,
tail: pipes::chan<signal_end> };
fn new_waiter() -> (signal_end, wait_end) { pipes::stream() }
fn waitqueue() -> waitqueue {
let (tail, head) = pipes::stream();
{ head: head, tail: tail }
}
/// A counting semaphore.
enum semaphore = exclusive<{
/// A counting, blocking, bounded-waiting semaphore.
enum semaphore = exclusive<semaphore_inner>;
type semaphore_inner = {
mut count: int,
waiters: waitqueue,
}>;
//blocked: waitqueue,
};
/// Create a new semaphore with the specified count.
fn new_semaphore(count: int) -> semaphore {
let (tail, head) = pipes::stream();
semaphore(exclusive({ mut count: count,
waiters: { head: head, tail: tail } }))
waiters: waitqueue(), /* blocked: waitqueue() */ }))
}
impl semaphore for &semaphore {
@ -42,20 +46,21 @@ impl semaphore for &semaphore {
* Acquires a resource represented by the semaphore. Blocks if necessary
* until resource(s) become available.
*/
fn wait() {
fn acquire() {
let mut waiter_nobe = none;
unsafe {
do (**self).with |state| {
state.count -= 1;
if state.count < 0 {
let (signal_end,wait_end) = new_waiter();
let (signal_end,wait_end) = pipes::stream();
waiter_nobe = some(wait_end);
// Enqueue ourself.
state.waiters.tail.send(signal_end);
}
}
}
for 1000.times { task::yield(); }
// Uncomment if you wish to test for sem races. Not valgrind-friendly.
/* for 1000.times { task::yield(); } */
// Need to wait outside the exclusive.
if waiter_nobe.is_some() {
let _ = option::unwrap(waiter_nobe).recv();
@ -66,7 +71,7 @@ impl semaphore for &semaphore {
* Release a held resource represented by the semaphore. Wakes a blocked
* contending task, if any exist.
*/
fn signal() {
fn release() {
unsafe {
do (**self).with |state| {
state.count += 1;
@ -85,7 +90,7 @@ impl semaphore for &semaphore {
/// Runs a function with ownership of one of the semaphore's resources.
fn access<U>(blk: fn() -> U) -> U {
self.wait();
self.acquire();
let _x = sem_release(self);
blk()
}
@ -95,7 +100,7 @@ impl semaphore for &semaphore {
struct sem_release {
sem: &semaphore;
new(sem: &semaphore) { self.sem = sem; }
drop { self.sem.signal(); }
drop { self.sem.release(); }
}
#[cfg(test)]
@ -120,11 +125,11 @@ mod tests {
let s = ~new_semaphore(0);
let s2 = ~s.clone();
do task::spawn {
s2.wait();
s2.acquire();
c.send(());
}
for 10.times { task::yield(); }
s.signal();
s.release();
let _ = p.recv();
/* Parent waits and child signals */
@ -133,14 +138,16 @@ mod tests {
let s2 = ~s.clone();
do task::spawn {
for 10.times { task::yield(); }
s2.signal();
s2.release();
let _ = p.recv();
}
s.wait();
s.acquire();
c.send(());
}
#[test]
fn test_sem_mutual_exclusion() {
// Unsafely achieve shared state, and do the textbook
// "load tmp <- ptr; inc tmp; store ptr <- tmp" dance.
let (c,p) = pipes::stream();
let s = ~new_semaphore(1);
let s2 = ~s.clone();
@ -167,7 +174,28 @@ mod tests {
}
}
#[test]
fn test_sem_multi_resource() {
// Parent and child both get in the critical section at the same
// time, and shake hands.
let s = ~new_semaphore(2);
let s2 = ~s.clone();
let (c1,p1) = pipes::stream();
let (c2,p2) = pipes::stream();
do task::spawn {
do s2.access {
let _ = p2.recv();
c1.send(());
}
}
do s.access {
c2.send(());
let _ = p1.recv();
}
}
#[test]
fn test_sem_runtime_friendly_blocking() {
// Force the runtime to schedule two threads on the same sched_loop.
// When one blocks, it should schedule the other one.
do task::spawn_sched(task::manual_threads(1)) {
let s = ~new_semaphore(1);
let s2 = ~s.clone();