core::rt: Convert some uv functions to extension methods

This commit is contained in:
Brian Anderson 2013-04-27 17:47:33 -07:00
parent cfd183db15
commit 6ab02c03da
3 changed files with 131 additions and 152 deletions

View File

@ -77,9 +77,7 @@ pub trait Request { }
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
pub trait Watcher {
fn event_loop(&self) -> Loop;
}
pub trait Watcher { }
pub type NullCallback = ~fn();
impl Callback for NullCallback { }
@ -123,12 +121,7 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
impl Watcher for IdleWatcher { }
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
@ -146,14 +139,14 @@ pub impl IdleWatcher {
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
self.set_callback(cb);
unsafe {
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
let cb: &IdleCallback = idle_watcher.borrow_callback();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
@ -167,9 +160,11 @@ pub impl IdleWatcher {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
unsafe {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
idle_watcher.drop_callback::<IdleCallback>();
uvll::idle_delete(handle);
}
}
}
}
@ -224,7 +219,7 @@ fn error_smoke_test() {
pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
unsafe {
let loop_ = loop_from_watcher(watcher);
let loop_ = watcher.event_loop();
UvError(uvll::last_error(loop_.native_handle()))
}
}
@ -288,73 +283,6 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError>
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
@ -364,35 +292,94 @@ struct WatcherData {
alloc_cb: Option<AllocCallback>,
}
pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
pub trait WatcherInterop {
fn event_loop(&self) -> Loop;
fn set_callback<CB: Callback>(&mut self, cb: CB);
fn drop_callback<CB: Callback>(&mut self);
fn borrow_callback<CB: Callback>(&self) -> &CB;
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
}
pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>(
watcher: &'r mut W) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
/// Get the uv event loop from a Watcher
pub fn event_loop(&self) -> Loop {
unsafe {
let handle = self.native_handle();
let loop_ = uvll::get_loop_for_uv_handle(handle);
NativeHandle::from_native_handle(loop_)
}
}
}
pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
pub fn set_callback<CB: Callback>(&mut self, cb: CB) {
unsafe {
self.drop_callback::<CB>();
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = transmute::<~CB, *c_void>(boxed_cb);
uvll::set_data_for_uv_handle(self.native_handle(), data);
}
}
/// Delete a callback from a handle's custom data
pub fn drop_callback<CB: Callback>(&mut self) {
unsafe {
let handle = self.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
pub fn borrow_callback<CB: Callback>(&self) -> &CB {
unsafe {
let handle = self.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
pub fn install_watcher_data(&mut self) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
}
}
pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data(&mut self) {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
}

View File

@ -10,14 +10,11 @@
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use cast::transmute_mut_region;
use util::ignore;
use rt::uv::uvll;
use rt::uv::uvll::*;
use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
loop_from_watcher, status_to_maybe_uv_error,
install_watcher_data, get_watcher_data, drop_watcher_data,
vec_to_uv_buf, vec_from_uv_buf, slice_to_uv_buf};
status_to_maybe_uv_error, vec_to_uv_buf, vec_from_uv_buf, slice_to_uv_buf};
use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::last_uv_error;
@ -49,12 +46,7 @@ fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
impl Watcher for StreamWatcher { }
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
@ -66,17 +58,18 @@ impl Callback for AllocCallback { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
{
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
@ -85,7 +78,7 @@ pub impl StreamWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
@ -101,17 +94,18 @@ pub impl StreamWatcher {
}
fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
{
let data = self.get_watcher_data();
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
}
let req = WriteRequest::new();
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
self.native_handle(),
bufs, write_cb));
}
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
@ -119,7 +113,7 @@ pub impl StreamWatcher {
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
@ -139,7 +133,7 @@ pub impl StreamWatcher {
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = get_watcher_data(&mut this);
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
@ -149,9 +143,10 @@ pub impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()();
let mut data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
@ -168,12 +163,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
impl Watcher for TcpWatcher { }
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
@ -184,8 +174,8 @@ pub impl TcpWatcher {
let handle = malloc_handle(UV_TCP);
assert!(handle.is_not_null());
assert!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
@ -210,8 +200,8 @@ pub impl TcpWatcher {
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
assert!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
assert!(self.get_watcher_data().connect_cb.is_none());
self.get_watcher_data().connect_cb = Some(cb);
let connect_handle = ConnectRequest::new().native_handle();
match address {
@ -232,7 +222,7 @@ pub impl TcpWatcher {
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
@ -242,10 +232,11 @@ pub impl TcpWatcher {
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
{
let data = self.get_watcher_data();
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
}
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
@ -257,7 +248,7 @@ pub impl TcpWatcher {
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
let cb = stream_watcher.get_watcher_data().connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}

View File

@ -192,9 +192,10 @@ impl RtioTcpListener for UvTcpListener {
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
// XXX: Needs to be surfaced in interface
let mut loop_ = server_stream_watcher.event_loop();
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Ok(~UvTcpStream { watcher: client_tcp_watcher })
} else {