rust/src/librustuv/queue.rs
Alex Crichton f25c81a51a rustuv: Fix a use-after-free on destruction
The uv loop was being destroyed before the async handle was being destroyed, so
closing the async handle was causing a use-after-free in the uv loop. This was
fixed by moving destruction of the queue's async handle to an earlier location
and then actually freeing it once the loop has been dropped.
2013-12-24 19:59:54 -08:00

193 lines
6.3 KiB
Rust

// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A concurrent queue used to signal remote event loops
//!
//! This queue implementation is used to send tasks among event loops. This is
//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
//! handles (to wake up a remote event loop).
//!
//! The uv_async_t is stored next to the event loop, so in order to not keep the
//! event loop alive we use uv_ref and uv_unref in order to control when the
//! async handle is active or not.
#[allow(dead_code)];
use std::cast;
use std::libc::{c_void, c_int};
use std::rt::task::BlockedTask;
use std::unstable::sync::LittleLock;
use mpsc = std::sync::mpsc_queue;
use async::AsyncWatcher;
use super::{Loop, UvHandle};
use uvll;
enum Message {
Task(BlockedTask),
Increment,
Decrement,
}
struct State {
handle: *uvll::uv_async_t,
lock: LittleLock, // see comments in async_cb for why this is needed
}
/// This structure is intended to be stored next to the event loop, and it is
/// used to create new `Queue` structures.
pub struct QueuePool {
priv producer: mpsc::Producer<Message, State>,
priv consumer: mpsc::Consumer<Message, State>,
priv refcnt: uint,
}
/// This type is used to send messages back to the original event loop.
pub struct Queue {
priv queue: mpsc::Producer<Message, State>,
}
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
assert_eq!(status, 0);
let state: &mut QueuePool = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
let packet = unsafe { state.consumer.packet() };
// Remember that there is no guarantee about how many times an async
// callback is called with relation to the number of sends, so process the
// entire queue in a loop.
loop {
match state.consumer.pop() {
mpsc::Data(Task(task)) => {
task.wake().map(|t| t.reawaken(true));
}
mpsc::Data(Increment) => unsafe {
if state.refcnt == 0 {
uvll::uv_ref((*packet).handle);
}
state.refcnt += 1;
},
mpsc::Data(Decrement) => unsafe {
state.refcnt -= 1;
if state.refcnt == 0 {
uvll::uv_unref((*packet).handle);
}
},
mpsc::Empty | mpsc::Inconsistent => break
};
}
// If the refcount is now zero after processing the queue, then there is no
// longer a reference on the async handle and it is possible that this event
// loop can exit. What we're not guaranteed, however, is that a producer in
// the middle of dropping itself is yet done with the handle. It could be
// possible that we saw their Decrement message but they have yet to signal
// on the async handle. If we were to return immediately, the entire uv loop
// could be destroyed meaning the call to uv_async_send would abort()
//
// In order to fix this, an OS mutex is used to wait for the other end to
// finish before we continue. The drop block on a handle will acquire a
// mutex and then drop it after both the push and send have been completed.
// If we acquire the mutex here, then we are guaranteed that there are no
// longer any senders which are holding on to their handles, so we can
// safely allow the event loop to exit.
if state.refcnt == 0 {
unsafe {
let _l = (*packet).lock.lock();
}
}
}
impl QueuePool {
pub fn new(loop_: &mut Loop) -> ~QueuePool {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let (c, p) = mpsc::queue(State {
handle: handle,
lock: LittleLock::new(),
});
let q = ~QueuePool {
producer: p,
consumer: c,
refcnt: 0,
};
unsafe {
assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
uvll::uv_unref(handle);
let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q);
uvll::set_data_for_uv_handle(handle, data);
}
return q;
}
pub fn queue(&mut self) -> Queue {
unsafe {
if self.refcnt == 0 {
uvll::uv_ref((*self.producer.packet()).handle);
}
self.refcnt += 1;
}
Queue { queue: self.producer.clone() }
}
pub fn handle(&self) -> *uvll::uv_async_t {
unsafe { (*self.producer.packet()).handle }
}
}
impl Queue {
pub fn push(&mut self, task: BlockedTask) {
self.queue.push(Task(task));
unsafe {
uvll::uv_async_send((*self.queue.packet()).handle);
}
}
}
impl Clone for Queue {
fn clone(&self) -> Queue {
// Push a request to increment on the queue, but there's no need to
// signal the event loop to process it at this time. We're guaranteed
// that the count is at least one (because we have a queue right here),
// and if the queue is dropped later on it'll see the increment for the
// decrement anyway.
unsafe {
cast::transmute_mut(self).queue.push(Increment);
}
Queue { queue: self.queue.clone() }
}
}
impl Drop for Queue {
fn drop(&mut self) {
// See the comments in the async_cb function for why there is a lock
// that is acquired only on a drop.
unsafe {
let state = self.queue.packet();
let _l = (*state).lock.lock();
self.queue.push(Decrement);
uvll::uv_async_send((*state).handle);
}
}
}
impl Drop for State {
fn drop(&mut self) {
unsafe {
uvll::uv_close(self.handle, cast::transmute(0));
// Note that this does *not* free the handle, that is the
// responsibility of the caller because the uv loop must be closed
// before we deallocate this uv handle.
}
}
}