Correctly reset steals when hitting MAX_STEALS
The previous code erroneously assumed that 'steals > cnt' was always true, but that was a false assumption. The code was altered to decrement steals to a minimum of 0 instead of taking all of cnt into account. I didn't include the exact test from #12295 because it could run for quite awhile, and instead set the threshold for MAX_STEALS to much lower during testing. I found that this triggered the old bug quite frequently when running without this fix. Closes #12295
This commit is contained in:
parent
836ffb5288
commit
bea7862d94
@ -18,6 +18,7 @@
|
||||
/// module. You'll also note that the implementation of the shared and stream
|
||||
/// channels are quite similar, and this is no coincidence!
|
||||
|
||||
use cmp;
|
||||
use int;
|
||||
use iter::Iterator;
|
||||
use kinds::Send;
|
||||
@ -35,6 +36,9 @@ use mpsc = sync::mpsc_queue;
|
||||
|
||||
static DISCONNECTED: int = int::MIN;
|
||||
static FUDGE: int = 1024;
|
||||
#[cfg(test)]
|
||||
static MAX_STEALS: int = 5;
|
||||
#[cfg(not(test))]
|
||||
static MAX_STEALS: int = 1 << 20;
|
||||
|
||||
pub struct Packet<T> {
|
||||
@ -307,7 +311,11 @@ impl<T: Send> Packet<T> {
|
||||
DISCONNECTED => {
|
||||
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
||||
}
|
||||
n => { self.steals -= n; }
|
||||
n => {
|
||||
let m = cmp::min(n, self.steals);
|
||||
self.steals -= m;
|
||||
self.cnt.fetch_add(n - m, atomics::SeqCst);
|
||||
}
|
||||
}
|
||||
assert!(self.steals >= 0);
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
/// High level implementation details can be found in the comment of the parent
|
||||
/// module.
|
||||
|
||||
use cmp;
|
||||
use comm::Port;
|
||||
use int;
|
||||
use iter::Iterator;
|
||||
@ -32,6 +33,9 @@ use sync::atomics;
|
||||
use vec::OwnedVector;
|
||||
|
||||
static DISCONNECTED: int = int::MIN;
|
||||
#[cfg(test)]
|
||||
static MAX_STEALS: int = 5;
|
||||
#[cfg(not(test))]
|
||||
static MAX_STEALS: int = 1 << 20;
|
||||
|
||||
pub struct Packet<T> {
|
||||
@ -198,11 +202,16 @@ impl<T: Send> Packet<T> {
|
||||
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
|
||||
match self.queue.pop() {
|
||||
// If we stole some data, record to that effect (this will be
|
||||
// factored into cnt later on). Note that we don't allow steals to
|
||||
// grow without bound in order to prevent eventual overflow of
|
||||
// either steals or cnt as an overflow would have catastrophic
|
||||
// results. Also note that we don't unconditionally set steals to 0
|
||||
// because it can be true that steals > cnt.
|
||||
// factored into cnt later on).
|
||||
//
|
||||
// Note that we don't allow steals to grow without bound in order to
|
||||
// prevent eventual overflow of either steals or cnt as an overflow
|
||||
// would have catastrophic results. Sometimes, steals > cnt, but
|
||||
// other times cnt > steals, so we don't know the relation between
|
||||
// steals and cnt. This code path is executed only rarely, so we do
|
||||
// a pretty slow operation, of swapping 0 into cnt, taking steals
|
||||
// down as much as possible (without going negative), and then
|
||||
// adding back in whatever we couldn't factor into steals.
|
||||
Some(data) => {
|
||||
self.steals += 1;
|
||||
if self.steals > MAX_STEALS {
|
||||
@ -210,7 +219,11 @@ impl<T: Send> Packet<T> {
|
||||
DISCONNECTED => {
|
||||
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
||||
}
|
||||
n => { self.steals -= n; }
|
||||
n => {
|
||||
let m = cmp::min(n, self.steals);
|
||||
self.steals -= m;
|
||||
self.cnt.fetch_add(n - m, atomics::SeqCst);
|
||||
}
|
||||
}
|
||||
assert!(self.steals >= 0);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user