/* Multi-producer/single-consumer queue * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and documentation are * those of the authors and should not be interpreted as representing official * policies, either expressed or implied, of Dmitry Vyukov. */ //! A mostly lock-free multi-producer, single consumer queue. use unstable::sync::UnsafeArc; use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; use ptr::{mut_null, to_mut_unsafe_ptr}; use cast; use option::*; use clone::Clone; use default::Default; use kinds::Send; use fmt; struct Node { next: AtomicPtr>, value: Option, } impl Node { fn new(value: T) -> Node { Node{next: AtomicPtr::new(mut_null()), value: Some(value)} } } impl Default for Node { fn default() -> Node { Node{next: AtomicPtr::new(mut_null()), value: None} } } struct State { pad0: [u8, ..64], head: AtomicPtr>, pad1: [u8, ..64], stub: Node, pad2: [u8, ..64], tail: *mut Node, pad3: [u8, ..64], } struct Queue { priv state: UnsafeArc>, } impl Clone for Queue { fn clone(&self) -> Queue { Queue { state: self.state.clone() } } } impl fmt::Default for Queue { fn fmt(value: &Queue, f: &mut fmt::Formatter) { write!(f.buf, "Queue({})", value.state.get()); } } impl Queue { pub fn new() -> Queue { let mut q = Queue{state: UnsafeArc::new(State { pad0: [0, ..64], head: AtomicPtr::new(mut_null()), pad1: [0, ..64], stub: Default::default(), pad2: [0, ..64], tail: mut_null(), pad3: [0, ..64], })}; let stub = q.get_stub_unsafe(); q.get_head().store(stub, Relaxed); q.set_tail(stub); q } pub fn push(&mut self, value: T) { unsafe { let node = cast::transmute(~Node::new(value)); self.push_node(node); } } fn push_node(&mut self, node: *mut Node) { unsafe { (*node).next.store(mut_null(), Release); let prev = self.get_head().swap(node, Relaxed); (*prev).next.store(node, Release); } } fn get_stub_unsafe(&mut self) -> *mut Node { unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } } fn get_head(&mut self) -> &mut AtomicPtr> { unsafe { &mut (*self.state.get()).head } } fn get_tail(&mut self) -> *mut Node { unsafe { (*self.state.get()).tail } } fn set_tail(&mut self, tail: *mut Node) { unsafe { (*self.state.get()).tail = tail } } pub fn casual_pop(&mut self) -> Option { self.pop() } pub fn pop(&mut self) -> Option { unsafe { let mut tail = self.get_tail(); let mut next = (*tail).next.load(Acquire); let stub = self.get_stub_unsafe(); if tail == stub { if mut_null() == next { return None } self.set_tail(next); tail = next; next = (*next).next.load(Acquire); } if next != mut_null() { let tail: ~Node = cast::transmute(tail); self.set_tail(next); return tail.value } let head = self.get_head().load(Relaxed); if tail != head { return None } self.push_node(stub); next = (*tail).next.load(Acquire); if next != mut_null() { let tail: ~Node = cast::transmute(tail); self.set_tail(next); return tail.value } } None } } #[cfg(test)] mod tests { use prelude::*; use option::*; use task; use comm; use super::Queue; #[test] fn test() { let nthreads = 8u; let nmsgs = 1000u; let mut q = Queue::new(); assert_eq!(None, q.pop()); for _ in range(0, nthreads) { let (port, chan) = comm::stream(); chan.send(q.clone()); do task::spawn_sched(task::SingleThreaded) { let mut q = port.recv(); for i in range(0, nmsgs) { q.push(i); } } } let mut i = 0u; loop { match q.pop() { None => {}, Some(_) => { i += 1; if i == nthreads*nmsgs { break } } } } } }