rust/src/libstd/comm/stream.rs

482 lines
19 KiB
Rust
Raw Normal View History

// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/// Stream channels
///
/// This is the flavor of channels which are optimized for one sender and one
/// receiver. The sender will be upgraded to a shared channel if the channel is
/// cloned.
///
/// High level implementation details can be found in the comment of the parent
/// module.
use cmp;
use comm::Receiver;
use int;
use iter::Iterator;
use kinds::Send;
use ops::Drop;
use option::{Some, None};
use result::{Ok, Err, Result};
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use rt::thread::Thread;
use spsc = sync::spsc_queue;
use sync::atomics;
use vec::OwnedVector;
static DISCONNECTED: int = int::MIN;
#[cfg(test)]
static MAX_STEALS: int = 5;
#[cfg(not(test))]
static MAX_STEALS: int = 1 << 20;
pub struct Packet<T> {
queue: spsc::Queue<Message<T>>, // internal queue for all message
cnt: atomics::AtomicInt, // How many items are on this channel
steals: int, // How many times has a port received without blocking?
to_wake: atomics::AtomicUint, // Task to wake up
port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
}
pub enum Failure<T> {
Empty,
Disconnected,
Upgraded(Receiver<T>),
}
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
UpWoke(BlockedTask),
}
pub enum SelectionResult<T> {
SelSuccess,
SelCanceled(BlockedTask),
SelUpgraded(BlockedTask, Receiver<T>),
}
// Any message could contain an "upgrade request" to a new shared port, so the
// internal queue it's a queue of T, but rather Message<T>
enum Message<T> {
Data(T),
GoUp(Receiver<T>),
}
impl<T: Send> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
queue: spsc::Queue::new(128),
cnt: atomics::AtomicInt::new(0),
steals: 0,
to_wake: atomics::AtomicUint::new(0),
port_dropped: atomics::AtomicBool::new(false),
}
}
pub fn send(&mut self, t: T) -> bool {
match self.do_send(Data(t)) {
UpSuccess => true,
UpDisconnected => false,
UpWoke(task) => {
task.wake().map(|t| t.reawaken());
true
}
}
}
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
self.do_send(GoUp(up))
}
fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
// Use an acquire/release ordering to maintain the same position with
// respect to the atomic loads below
if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
self.queue.push(t);
match self.cnt.fetch_add(1, atomics::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
-1 => UpWoke(self.take_to_wake()),
// As as described before, SPSC queues must be >= -2
-2 => UpSuccess,
// Be sure to preserve the disconnected state, and the return value
// in this case is going to be whether our data was received or not.
// This manifests itself on whether we have an empty queue or not.
//
// Primarily, are required to drain the queue here because the port
// will never remove this data. We can only have at most one item to
// drain (the port drains the rest).
DISCONNECTED => {
self.cnt.store(DISCONNECTED, atomics::SeqCst);
let first = self.queue.pop();
let second = self.queue.pop();
assert!(second.is_none());
match first {
Some(..) => UpSuccess, // we failed to send the data
None => UpDisconnected, // we successfully sent data
}
}
// Otherwise we just sent some data on a non-waiting queue, so just
// make sure the world is sane and carry on!
n => { assert!(n >= 0); UpSuccess }
}
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> BlockedTask {
let task = self.to_wake.load(atomics::SeqCst);
self.to_wake.store(0, atomics::SeqCst);
assert!(task != 0);
unsafe { BlockedTask::cast_from_uint(task) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
let n = unsafe { task.cast_to_uint() };
self.to_wake.store(n, atomics::SeqCst);
let steals = self.steals;
self.steals = 0;
match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Ok(()) }
}
}
self.to_wake.store(0, atomics::SeqCst);
Err(unsafe { BlockedTask::cast_from_uint(n) })
}
pub fn recv(&mut self) -> Result<T, Failure<T>> {
// Optimistic preflight check (scheduling is expensive).
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
// Welp, our channel has no data. Deschedule the current task and
// initiate the blocking protocol.
let task: ~Task = Local::take();
task.deschedule(1, |task| {
self.decrement(task)
});
match self.try_recv() {
// Messages which actually popped from the queue shouldn't count as
// a steal, so offset the decrement here (we already have our
// "steal" factored into the channel count above).
data @ Ok(..) |
data @ Err(Upgraded(..)) => {
self.steals -= 1;
data
}
data => data,
}
}
pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
match self.queue.pop() {
// If we stole some data, record to that effect (this will be
// factored into cnt later on).
//
// Note that we don't allow steals to grow without bound in order to
// prevent eventual overflow of either steals or cnt as an overflow
// would have catastrophic results. Sometimes, steals > cnt, but
// other times cnt > steals, so we don't know the relation between
// steals and cnt. This code path is executed only rarely, so we do
// a pretty slow operation, of swapping 0 into cnt, taking steals
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => {
if self.steals > MAX_STEALS {
match self.cnt.swap(0, atomics::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, atomics::SeqCst);
}
n => {
let m = cmp::min(n, self.steals);
self.steals -= m;
self.bump(n - m);
}
}
assert!(self.steals >= 0);
}
self.steals += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
}
}
None => {
match self.cnt.load(atomics::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
// This is a little bit of a tricky case. We failed to pop
// data above, and then we have viewed that the channel is
// disconnected. In this window more data could have been
// sent on the channel. It doesn't really make sense to
// return that the channel is disconnected when there's
// actually data on it, so be extra sure there's no data by
// popping one more time.
//
// We can ignore steals because the other end is
// disconnected and we'll never need to really factor in our
// steals again.
_ => {
match self.queue.pop() {
Some(Data(t)) => Ok(t),
Some(GoUp(up)) => Err(Upgraded(up)),
None => Err(Disconnected),
}
}
}
}
}
}
pub fn drop_chan(&mut self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
-1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
}
pub fn drop_port(&mut self) {
// Dropping a port seems like a fairly trivial thing. In theory all we
// need to do is flag that we're disconnected and then everything else
// can take over (we don't have anyone to wake up).
//
// The catch for Ports is that we want to drop the entire contents of
// the queue. There are multiple reasons for having this property, the
// largest of which is that if another chan is waiting in this channel
// (but not received yet), then waiting on that port will cause a
// deadlock.
//
// So if we accept that we must now destroy the entire contents of the
// queue, this code may make a bit more sense. The tricky part is that
// we can't let any in-flight sends go un-dropped, we have to make sure
// *everything* is dropped and nothing new will come onto the channel.
// The first thing we do is set a flag saying that we're done for. All
// sends are gated on this flag, so we're immediately guaranteed that
// there are a bounded number of active sends that we'll have to deal
// with.
self.port_dropped.store(true, atomics::SeqCst);
// Now that we're guaranteed to deal with a bounded number of senders,
// we need to drain the queue. This draining process happens atomically
// with respect to the "count" of the channel. If the count is nonzero
// (with steals taken into account), then there must be data on the
// channel. In this case we drain everything and then try again. We will
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
let mut steals = self.steals;
while {
let cnt = self.cnt.compare_and_swap(
steals, DISCONNECTED, atomics::SeqCst);
cnt != DISCONNECTED && cnt != steals
} {
loop {
match self.queue.pop() {
Some(..) => { steals += 1; }
None => break
}
}
}
// At this point in time, we have gated all future senders from sending,
// and we have flagged the channel as being disconnected. The senders
// still have some responsibility, however, because some sends may not
// complete until after we flag the disconnection. There are more
// details in the sending methods that see DISCONNECTED
}
////////////////////////////////////////////////////////////////////////////
// select implementation
////////////////////////////////////////////////////////////////////////////
// Tests to see whether this port can receive without blocking. If Ok is
// returned, then that's the answer. If Err is returned, then the returned
// port needs to be queried instead (an upgrade happened)
pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
// We peek at the queue to see if there's anything on it, and we use
// this return value to determine if we should pop from the queue and
// upgrade this channel immediately. If it looks like we've got an
// upgrade pending, then go through the whole recv rigamarole to update
// the internal state.
match self.queue.peek() {
Some(&GoUp(..)) => {
match self.recv() {
Err(Upgraded(port)) => Err(port),
_ => unreachable!(),
}
}
Some(..) => Ok(true),
None => Ok(false)
}
}
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: int) -> int {
match self.cnt.fetch_add(amt, atomics::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, atomics::SeqCst);
DISCONNECTED
}
n => n
}
}
// Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade.
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
match self.decrement(task) {
Ok(()) => SelSuccess,
Err(task) => {
let ret = match self.queue.peek() {
Some(&GoUp(..)) => {
match self.queue.pop() {
Some(GoUp(port)) => SelUpgraded(task, port),
_ => unreachable!(),
}
}
Some(..) => SelCanceled(task),
None => SelCanceled(task),
};
// Undo our decrement above, and we should be guaranteed that the
// previous value is positive because we're not going to sleep
let prev = self.bump(1);
assert!(prev == DISCONNECTED || prev >= 0);
return ret;
}
}
}
// Removes a previous task from being blocked in this port
pub fn abort_selection(&mut self,
was_upgrade: bool) -> Result<bool, Receiver<T>> {
// If we're aborting selection after upgrading from a oneshot, then
// we're guarantee that no one is waiting. The only way that we could
// have seen the upgrade is if data was actually sent on the channel
// half again. For us, this means that there is guaranteed to be data on
// this channel. Furthermore, we're guaranteed that there was no
// start_selection previously, so there's no need to modify `self.cnt`
// at all.
//
// Hence, because of these invariants, we immediately return `Ok(true)`.
// Note that the data may not actually be sent on the channel just yet.
// The other end could have flagged the upgrade but not sent data to
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(self.steals, 0);
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
return Ok(true)
}
// We want to make sure that the count on the channel goes non-negative,
// and in the stream case we can have at most one steal, so just assume
// that we had one steal.
let steals = 1;
let prev = self.bump(steals + 1);
// If we were previously disconnected, then we know for sure that there
// is no task in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
assert!(cur >= 0);
// If the previous count was negative, then we just made things go
// positive, hence we passed the -1 boundary and we're responsible
// for removing the to_wake() field and trashing it.
//
// If the previous count was positive then we're in a tougher
// situation. A possible race is that a sender just incremented
// through -1 (meaning it's going to try to wake a task up), but it
// hasn't yet read the to_wake. In order to prevent a future recv()
// from waking up too early (this sender picking up the plastered
// over to_wake), we spin loop here waiting for to_wake to be 0.
// Note that this entire select() implementation needs an overhaul,
// and this is *not* the worst part of it, so this is not done as a
// final solution but rather out of necessity for now to get
// something working.
if prev < 0 {
self.take_to_wake().trash();
} else {
while self.to_wake.load(atomics::SeqCst) != 0 {
Thread::yield_now();
}
}
assert_eq!(self.steals, 0);
self.steals = steals;
// if we were previously positive, then there's surely data to
// receive
prev >= 0
};
// Now that we've determined that this queue "has data", we peek at the
// queue to see if the data is an upgrade or not. If it's an upgrade,
// then we need to destroy this port and abort selection on the
// upgraded port.
if has_data {
match self.queue.peek() {
Some(&GoUp(..)) => {
match self.queue.pop() {
Some(GoUp(port)) => Err(port),
_ => unreachable!(),
}
}
_ => Ok(true),
}
} else {
Ok(false)
}
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Packet<T> {
fn drop(&mut self) {
// Note that this load is not only an assert for correctness about
// disconnection, but also a proper fence before the read of
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
}
}