cleaned up uv/net

This commit is contained in:
Eric Reed 2013-06-26 10:17:10 -07:00
parent 87ecfb7435
commit ce97bd4c8b

View File

@ -44,8 +44,8 @@ pub fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
pub fn uv_ip4_to_ip4(addr: *sockaddr_in) -> IpAddr {
let ip4_size = 16;
let buf = vec::from_elem(ip4_size + 1 /*null terminated*/, 0u8);
unsafe { ip4_name(addr, vec::raw::to_ptr(buf), ip4_size as u64) };
let port = unsafe { ip4_port(addr) };
unsafe { uvll::ip4_name(addr, vec::raw::to_ptr(buf), ip4_size as u64) };
let port = unsafe { uvll::ip4_port(addr) };
let ip_str = str::from_bytes_slice(buf).trim_right_chars(&'\x00');
let ip: ~[u8] = ip_str.split_iter('.')
.transform(|s: &str| -> u8 {
@ -71,13 +71,11 @@ impl StreamWatcher {
data.read_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
@ -85,8 +83,7 @@ impl StreamWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
@ -108,22 +105,15 @@ impl StreamWatcher {
}
let req = WriteRequest::new();
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
}
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
let cb = stream_watcher.get_watcher_data().write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
@ -132,9 +122,7 @@ impl StreamWatcher {
pub fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
assert_eq!(0, uvll::accept(self_handle, stream_handle));
}
assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
}
pub fn close(self, cb: NullCallback) {
@ -149,10 +137,7 @@ impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
let data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
stream_watcher.get_watcher_data().close_cb.swap_unwrap()();
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
@ -160,8 +145,7 @@ impl StreamWatcher {
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
@ -188,9 +172,7 @@ impl TcpWatcher {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
let result = unsafe { uvll::tcp_bind(self.native_handle(), addr) };
if result == 0 {
Ok(())
} else {
@ -212,9 +194,9 @@ impl TcpWatcher {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
rtdebug!("connect_t: %x", connect_handle as uint);
assert!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
addr, connect_cb));
assert_eq!(0,
uvll::tcp_connect(connect_handle, self.native_handle(),
addr, connect_cb));
}
}
_ => fail!()
@ -225,10 +207,7 @@ impl TcpWatcher {
let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let cb = stream_watcher.get_watcher_data().connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
@ -245,15 +224,13 @@ impl TcpWatcher {
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
assert!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
}
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let data = stream_watcher.get_watcher_data();
let cb = data.connect_cb.get_ref();
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(stream_watcher, status);
}
@ -314,8 +291,7 @@ impl UdpWatcher {
data.udp_recv_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::udp_recv_start(handle, alloc_cb, recv_cb); }
unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
@ -331,17 +307,14 @@ impl UdpWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let data = udp_watcher.get_watcher_data();
let cb = data.udp_recv_cb.get_ref();
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(handle, nread as c_int);
let address = uv_ip4_to_ip4(addr);
(*cb)(udp_watcher, nread as int, buf, address, flags as uint, status);
(*cb)(udp_watcher, nread as int, buf, uv_ip4_to_ip4(addr), flags as uint, status);
}
}
pub fn recv_stop(&self) {
let handle = self.native_handle();
unsafe { uvll::udp_recv_stop(handle); }
unsafe { uvll::udp_recv_stop(self.native_handle()); }
}
pub fn send(&self, buf: Buf, address: IpAddr, cb: UdpSendCallback) {
@ -357,7 +330,7 @@ impl UdpWatcher {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
unsafe {
assert!(0 == uvll::udp_send(req.native_handle(),
assert_eq!(0, uvll::udp_send(req.native_handle(),
self.native_handle(),
[buf], addr, send_cb));
}
@ -411,12 +384,9 @@ impl Request for ConnectRequest { }
impl ConnectRequest {
fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc_req(UV_CONNECT)
};
let connect_handle = unsafe { malloc_req(UV_CONNECT) };
assert!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
ConnectRequest(connect_handle as *uvll::uv_connect_t)
}
fn stream(&self) -> StreamWatcher {
@ -432,8 +402,7 @@ impl ConnectRequest {
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
@ -447,12 +416,9 @@ impl Request for WriteRequest { }
impl WriteRequest {
pub fn new() -> WriteRequest {
let write_handle = unsafe {
malloc_req(UV_WRITE)
};
let write_handle = unsafe { malloc_req(UV_WRITE) };
assert!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
WriteRequest(write_handle as *uvll::uv_write_t)
}
pub fn stream(&self) -> StreamWatcher {
@ -483,16 +449,14 @@ impl UdpSendRequest {
pub fn new() -> UdpSendRequest {
let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
assert!(send_handle.is_not_null());
let send_handle = send_handle as *uvll::uv_udp_send_t;
UdpSendRequest(send_handle)
UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
}
pub fn handle(&self) -> UdpWatcher {
unsafe {
NativeHandle::from_native_handle(
uvll::get_udp_handle_from_send_req(
self.native_handle()))
}
let send_request_handle = unsafe {
uvll::get_udp_handle_from_send_req(self.native_handle())
};
NativeHandle::from_native_handle(send_request_handle)
}
pub fn delete(self) {