std: Implement yields on receives for channels

This will prevent a deadlock when a task spins in a try_recv when using channel
communication routines is a clear location for a M:N scheduling to happen.
This commit is contained in:
Alex Crichton 2013-12-13 18:27:13 -08:00
parent 14caf00c92
commit 8be66e212b

View File

@ -238,7 +238,6 @@ use rt::local::Local;
use rt::task::{Task, BlockedTask};
use rt::thread::Thread;
use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
use task;
use vec::{ImmutableVector, OwnedVector};
use spsc = sync::spsc_queue;
@ -346,6 +345,7 @@ struct Packet {
selection_id: uint,
select_next: *mut Packet,
select_prev: *mut Packet,
recv_cnt: int,
}
///////////////////////////////////////////////////////////////////////////////
@ -367,6 +367,7 @@ impl Packet {
selection_id: 0,
select_next: 0 as *mut Packet,
select_prev: 0 as *mut Packet,
recv_cnt: 0,
}
}
@ -611,8 +612,9 @@ impl<T: Send> Chan<T> {
// the TLS overhead can be a bit much.
n => {
assert!(n >= 0);
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
task::deschedule();
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
task.maybe_yield();
}
true
}
@ -704,8 +706,9 @@ impl<T: Send> SharedChan<T> {
DISCONNECTED => {} // oh well, we tried
-1 => { (*packet).wakeup(can_resched); }
n => {
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
task::deschedule();
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
task.maybe_yield();
}
}
}
@ -773,6 +776,18 @@ impl<T: Send> Port<T> {
// This is a "best effort" situation, so if a queue is inconsistent just
// don't worry about it.
let this = unsafe { cast::transmute_mut(self) };
// See the comment about yielding on sends, but the same applies here.
// If a thread is spinning in try_recv we should try
unsafe {
let packet = this.queue.packet();
(*packet).recv_cnt += 1;
if (*packet).recv_cnt % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
task.maybe_yield();
}
}
let ret = match this.queue {
SPSC(ref mut queue) => queue.pop(),
MPSC(ref mut queue) => match queue.pop() {