// Copyright 2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! A concurrent queue used to signal remote event loops //! //! This queue implementation is used to send tasks among event loops. This is //! backed by a multi-producer/single-consumer queue from libstd and uv_async_t //! handles (to wake up a remote event loop). //! //! The uv_async_t is stored next to the event loop, so in order to not keep the //! event loop alive we use uv_ref and uv_unref in order to control when the //! async handle is active or not. #[allow(dead_code)]; use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; use super::{Loop, UvHandle}; use uvll; enum Message { Task(BlockedTask), Increment, Decrement, } struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { priv producer: mpsc::Producer, priv consumer: mpsc::Consumer, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { priv queue: mpsc::Producer, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); let state: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; let packet = unsafe { state.consumer.packet() }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { match state.consumer.pop() { mpsc::Data(Task(task)) => { let _ = task.wake().map(|t| t.reawaken(true)); } mpsc::Data(Increment) => unsafe { if state.refcnt == 0 { uvll::uv_ref((*packet).handle); } state.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { state.refcnt -= 1; if state.refcnt == 0 { uvll::uv_unref((*packet).handle); } }, mpsc::Empty | mpsc::Inconsistent => break }; } // If the refcount is now zero after processing the queue, then there is no // longer a reference on the async handle and it is possible that this event // loop can exit. What we're not guaranteed, however, is that a producer in // the middle of dropping itself is yet done with the handle. It could be // possible that we saw their Decrement message but they have yet to signal // on the async handle. If we were to return immediately, the entire uv loop // could be destroyed meaning the call to uv_async_send would abort() // // In order to fix this, an OS mutex is used to wait for the other end to // finish before we continue. The drop block on a handle will acquire a // mutex and then drop it after both the push and send have been completed. // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. if state.refcnt == 0 { unsafe { let _l = (*packet).lock.lock(); } } } impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); let (c, p) = mpsc::queue(State { handle: handle, lock: LittleLock::new(), }); let q = ~QueuePool { producer: p, consumer: c, refcnt: 0, }; unsafe { assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0); uvll::uv_unref(handle); let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q); uvll::set_data_for_uv_handle(handle, data); } return q; } pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { uvll::uv_ref((*self.producer.packet()).handle); } self.refcnt += 1; } Queue { queue: self.producer.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { unsafe { (*self.producer.packet()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { self.queue.push(Task(task)); unsafe { uvll::uv_async_send((*self.queue.packet()).handle); } } } impl Clone for Queue { fn clone(&self) -> Queue { // Push a request to increment on the queue, but there's no need to // signal the event loop to process it at this time. We're guaranteed // that the count is at least one (because we have a queue right here), // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { cast::transmute_mut(self).queue.push(Increment); } Queue { queue: self.queue.clone() } } } impl Drop for Queue { fn drop(&mut self) { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { let state = self.queue.packet(); let _l = (*state).lock.lock(); self.queue.push(Decrement); uvll::uv_async_send((*state).handle); } } } impl Drop for State { fn drop(&mut self) { unsafe { uvll::uv_close(self.handle, cast::transmute(0)); // Note that this does *not* free the handle, that is the // responsibility of the caller because the uv loop must be closed // before we deallocate this uv handle. } } }