2013-12-05 19:56:17 -06:00
|
|
|
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
|
2013-10-03 21:23:47 -05:00
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
|
|
* modification, are permitted provided that the following conditions are met:
|
2013-10-25 20:33:05 -05:00
|
|
|
*
|
2013-10-03 21:23:47 -05:00
|
|
|
* 1. Redistributions of source code must retain the above copyright notice,
|
|
|
|
* this list of conditions and the following disclaimer.
|
2013-10-25 20:33:05 -05:00
|
|
|
*
|
2013-10-03 21:23:47 -05:00
|
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
|
|
* documentation and/or other materials provided with the distribution.
|
2013-10-25 20:33:05 -05:00
|
|
|
*
|
2013-10-03 21:23:47 -05:00
|
|
|
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
|
|
|
|
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
|
|
|
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
|
|
|
|
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
|
|
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|
|
|
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
|
|
|
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
|
|
|
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
|
|
|
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
*
|
|
|
|
* The views and conclusions contained in the software and documentation are
|
|
|
|
* those of the authors and should not be interpreted as representing official
|
|
|
|
* policies, either expressed or implied, of Dmitry Vyukov.
|
|
|
|
*/
|
|
|
|
|
|
|
|
//! A mostly lock-free multi-producer, single consumer queue.
|
2013-12-12 19:27:37 -06:00
|
|
|
//!
|
|
|
|
//! This module contains an implementation of a concurrent MPSC queue. This
|
|
|
|
//! queue can be used to share data between tasks, and is also used as the
|
|
|
|
//! building block of channels in rust.
|
|
|
|
//!
|
|
|
|
//! Note that the current implementation of this queue has a caveat of the `pop`
|
|
|
|
//! method, and see the method for more information about it. Due to this
|
|
|
|
//! caveat, this queue may not be appropriate for all use-cases.
|
2013-10-03 21:23:47 -05:00
|
|
|
|
2013-12-05 19:56:17 -06:00
|
|
|
// http://www.1024cores.net/home/lock-free-algorithms
|
|
|
|
// /queues/non-intrusive-mpsc-node-based-queue
|
|
|
|
|
2013-10-03 21:23:47 -05:00
|
|
|
use cast;
|
|
|
|
use kinds::Send;
|
2013-12-05 19:56:17 -06:00
|
|
|
use ops::Drop;
|
|
|
|
use option::{Option, None, Some};
|
2013-12-12 19:27:37 -06:00
|
|
|
use ptr::RawPtr;
|
|
|
|
use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
|
2013-12-05 19:56:17 -06:00
|
|
|
|
2013-12-12 19:27:37 -06:00
|
|
|
/// A result of the `pop` function.
|
2013-12-05 19:56:17 -06:00
|
|
|
pub enum PopResult<T> {
|
|
|
|
/// Some data has been popped
|
|
|
|
Data(T),
|
|
|
|
/// The queue is empty
|
|
|
|
Empty,
|
|
|
|
/// The queue is in an inconsistent state. Popping data should succeed, but
|
|
|
|
/// some pushers have yet to make enough progress in order allow a pop to
|
|
|
|
/// succeed. It is recommended that a pop() occur "in the near future" in
|
|
|
|
/// order to see if the sender has made progress or not
|
|
|
|
Inconsistent,
|
|
|
|
}
|
2013-10-03 21:23:47 -05:00
|
|
|
|
|
|
|
struct Node<T> {
|
|
|
|
next: AtomicPtr<Node<T>>,
|
|
|
|
value: Option<T>,
|
|
|
|
}
|
|
|
|
|
2014-01-06 17:23:37 -06:00
|
|
|
/// The multi-producer single-consumer structure. This is not cloneable, but it
|
|
|
|
/// may be safely shared so long as it is guaranteed that there is only one
|
|
|
|
/// popper at a time (many pushers are allowed).
|
|
|
|
pub struct Queue<T> {
|
|
|
|
priv head: AtomicPtr<Node<T>>,
|
|
|
|
priv tail: *mut Node<T>,
|
2013-12-05 19:56:17 -06:00
|
|
|
}
|
2013-10-03 21:23:47 -05:00
|
|
|
|
2013-12-05 19:56:17 -06:00
|
|
|
impl<T> Node<T> {
|
|
|
|
unsafe fn new(v: Option<T>) -> *mut Node<T> {
|
|
|
|
cast::transmute(~Node {
|
|
|
|
next: AtomicPtr::new(0 as *mut Node<T>),
|
|
|
|
value: v,
|
|
|
|
})
|
2013-10-07 02:43:51 -05:00
|
|
|
}
|
2013-12-05 19:56:17 -06:00
|
|
|
}
|
2013-10-07 02:43:51 -05:00
|
|
|
|
2014-01-06 17:23:37 -06:00
|
|
|
impl<T: Send> Queue<T> {
|
|
|
|
/// Creates a new queue that is safe to share among multiple producers and
|
|
|
|
/// one consumer.
|
|
|
|
pub fn new() -> Queue<T> {
|
|
|
|
let stub = unsafe { Node::new(None) };
|
|
|
|
Queue {
|
2013-12-05 19:56:17 -06:00
|
|
|
head: AtomicPtr::new(stub),
|
|
|
|
tail: stub,
|
2013-10-03 21:23:47 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-01-06 17:23:37 -06:00
|
|
|
/// Pushes a new value onto this queue.
|
|
|
|
pub fn push(&mut self, t: T) {
|
2013-10-07 03:17:09 -05:00
|
|
|
unsafe {
|
2014-01-06 17:23:37 -06:00
|
|
|
let n = Node::new(Some(t));
|
|
|
|
let prev = self.head.swap(n, AcqRel);
|
|
|
|
(*prev).next.store(n, Release);
|
2013-10-07 03:17:09 -05:00
|
|
|
}
|
2013-10-07 02:43:51 -05:00
|
|
|
}
|
|
|
|
|
2013-12-12 19:27:37 -06:00
|
|
|
/// Pops some data from this queue.
|
|
|
|
///
|
|
|
|
/// Note that the current implementation means that this function cannot
|
|
|
|
/// return `Option<T>`. It is possible for this queue to be in an
|
2014-02-17 05:53:45 -06:00
|
|
|
/// inconsistent state where many pushes have succeeded and completely
|
2013-12-12 19:27:37 -06:00
|
|
|
/// finished, but pops cannot return `Some(t)`. This inconsistent state
|
|
|
|
/// happens when a pusher is pre-empted at an inopportune moment.
|
|
|
|
///
|
|
|
|
/// This inconsistent state means that this queue does indeed have data, but
|
|
|
|
/// it does not currently have access to it at this time.
|
2013-12-05 19:56:17 -06:00
|
|
|
pub fn pop(&mut self) -> PopResult<T> {
|
2014-01-06 17:23:37 -06:00
|
|
|
unsafe {
|
|
|
|
let tail = self.tail;
|
|
|
|
let next = (*tail).next.load(Acquire);
|
|
|
|
|
|
|
|
if !next.is_null() {
|
|
|
|
self.tail = next;
|
|
|
|
assert!((*tail).value.is_none());
|
|
|
|
assert!((*next).value.is_some());
|
|
|
|
let ret = (*next).value.take_unwrap();
|
|
|
|
let _: ~Node<T> = cast::transmute(tail);
|
|
|
|
return Data(ret);
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
|
|
|
|
}
|
2013-12-05 19:56:17 -06:00
|
|
|
}
|
2014-01-06 17:23:37 -06:00
|
|
|
|
2013-12-12 19:27:37 -06:00
|
|
|
/// Attempts to pop data from this queue, but doesn't attempt too hard. This
|
|
|
|
/// will canonicalize inconsistent states to a `None` value.
|
2013-12-05 19:56:17 -06:00
|
|
|
pub fn casual_pop(&mut self) -> Option<T> {
|
|
|
|
match self.pop() {
|
|
|
|
Data(t) => Some(t),
|
|
|
|
Empty | Inconsistent => None,
|
|
|
|
}
|
|
|
|
}
|
2014-01-06 17:23:37 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
#[unsafe_destructor]
|
|
|
|
impl<T: Send> Drop for Queue<T> {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
unsafe {
|
|
|
|
let mut cur = self.tail;
|
|
|
|
while !cur.is_null() {
|
|
|
|
let next = (*cur).next.load(Relaxed);
|
|
|
|
let _: ~Node<T> = cast::transmute(cur);
|
|
|
|
cur = next;
|
|
|
|
}
|
|
|
|
}
|
2013-10-07 02:43:51 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-10-03 21:23:47 -05:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use prelude::*;
|
2013-12-05 19:56:17 -06:00
|
|
|
|
2013-12-12 23:38:57 -06:00
|
|
|
use native;
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 20:31:48 -06:00
|
|
|
use super::{Queue, Data, Empty, Inconsistent};
|
|
|
|
use sync::arc::UnsafeArc;
|
2013-12-05 19:56:17 -06:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_full() {
|
2014-01-06 17:23:37 -06:00
|
|
|
let mut q = Queue::new();
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 20:31:48 -06:00
|
|
|
q.push(~1);
|
|
|
|
q.push(~2);
|
2013-12-05 19:56:17 -06:00
|
|
|
}
|
2013-10-03 21:23:47 -05:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test() {
|
|
|
|
let nthreads = 8u;
|
|
|
|
let nmsgs = 1000u;
|
2014-01-06 17:23:37 -06:00
|
|
|
let mut q = Queue::new();
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 20:31:48 -06:00
|
|
|
match q.pop() {
|
2013-12-05 19:56:17 -06:00
|
|
|
Empty => {}
|
|
|
|
Inconsistent | Data(..) => fail!()
|
|
|
|
}
|
2014-03-09 16:58:32 -05:00
|
|
|
let (tx, rx) = channel();
|
2014-01-06 17:23:37 -06:00
|
|
|
let q = UnsafeArc::new(q);
|
2013-10-03 21:23:47 -05:00
|
|
|
|
|
|
|
for _ in range(0, nthreads) {
|
2014-03-09 16:58:32 -05:00
|
|
|
let tx = tx.clone();
|
2014-01-06 17:23:37 -06:00
|
|
|
let q = q.clone();
|
2014-01-26 21:42:26 -06:00
|
|
|
native::task::spawn(proc() {
|
2013-10-03 21:23:47 -05:00
|
|
|
for i in range(0, nmsgs) {
|
2014-01-06 17:23:37 -06:00
|
|
|
unsafe { (*q.get()).push(i); }
|
2013-10-03 21:23:47 -05:00
|
|
|
}
|
2014-03-09 16:58:32 -05:00
|
|
|
tx.send(());
|
2014-01-26 21:42:26 -06:00
|
|
|
});
|
2013-10-03 21:23:47 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
let mut i = 0u;
|
2013-12-05 19:56:17 -06:00
|
|
|
while i < nthreads * nmsgs {
|
2014-01-06 17:23:37 -06:00
|
|
|
match unsafe { (*q.get()).pop() } {
|
2013-12-05 19:56:17 -06:00
|
|
|
Empty | Inconsistent => {},
|
|
|
|
Data(_) => { i += 1 }
|
2013-10-03 21:23:47 -05:00
|
|
|
}
|
|
|
|
}
|
2014-03-09 16:58:32 -05:00
|
|
|
drop(tx);
|
2013-12-12 23:38:57 -06:00
|
|
|
for _ in range(0, nthreads) {
|
2014-03-09 16:58:32 -05:00
|
|
|
rx.recv();
|
2013-12-12 23:38:57 -06:00
|
|
|
}
|
2013-10-03 21:23:47 -05:00
|
|
|
}
|
|
|
|
}
|