diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 947fade096b..2425c909bf3 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -148,7 +148,7 @@ mod test { } #[test] - fn smoke_test() { + fn smoke_test_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); @@ -168,7 +168,27 @@ mod test { } #[test] - fn read_eof() { + fn smoke_test_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == 99); + } + + do spawntask_immediately { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + } + } + } + + #[test] + fn read_eof_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); @@ -188,7 +208,27 @@ mod test { } #[test] - fn read_eof_twice() { + fn read_eof_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let mut buf = [0]; + let nread = stream.read(buf); + assert!(nread.is_none()); + } + + do spawntask_immediately { + let _stream = TcpStream::connect(addr); + // Close + } + } + } + + #[test] + fn read_eof_twice_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); @@ -210,7 +250,29 @@ mod test { } #[test] - fn write_close() { + fn read_eof_twice_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let mut buf = [0]; + let nread = stream.read(buf); + assert!(nread.is_none()); + let nread = stream.read(buf); + assert!(nread.is_none()); + } + + do spawntask_immediately { + let _stream = TcpStream::connect(addr); + // Close + } + } + } + + #[test] + fn write_close_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); @@ -239,7 +301,36 @@ mod test { } #[test] - fn multiple_connect_serial() { + fn write_close_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let buf = [0]; + loop { + let mut stop = false; + do io_error::cond.trap(|e| { + // NB: ECONNRESET on linux, EPIPE on mac + assert!(e.kind == ConnectionReset || e.kind == BrokenPipe); + stop = true; + }).in { + stream.write(buf); + } + if stop { break } + } + } + + do spawntask_immediately { + let _stream = TcpStream::connect(addr); + // Close + } + } + } + + #[test] + fn multiple_connect_serial_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); let max = 10; @@ -264,7 +355,32 @@ mod test { } #[test] - fn multiple_connect_interleaved_greedy_schedule() { + fn multiple_connect_serial_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + let max = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for max.times { + let mut stream = listener.accept(); + let mut buf = [0]; + stream.read(buf); + assert_eq!(buf[0], 99); + } + } + + do spawntask_immediately { + for max.times { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + } + } + } + } + + #[test] + fn multiple_connect_interleaved_greedy_schedule_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; @@ -303,7 +419,46 @@ mod test { } #[test] - fn multiple_connect_interleaved_lazy_schedule() { + fn multiple_connect_interleaved_greedy_schedule_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + static MAX: int = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for int::range(0, MAX) |i| { + let stream = Cell::new(listener.accept()); + rtdebug!("accepted"); + // Start another task to handle the connection + do spawntask_immediately { + let mut stream = stream.take(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == i as u8); + rtdebug!("read"); + } + } + } + + connect(0, addr); + + fn connect(i: int, addr: IpAddr) { + if i == MAX { return } + + do spawntask_immediately { + rtdebug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + rtdebug!("writing"); + stream.write([i as u8]); + } + } + } + } + + #[test] + fn multiple_connect_interleaved_lazy_schedule_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; @@ -340,5 +495,43 @@ mod test { } } } + #[test] + fn multiple_connect_interleaved_lazy_schedule_ip6() { + do run_in_newsched_task { + let addr = next_test_ip6(); + static MAX: int = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for int::range(0, MAX) |_| { + let stream = Cell::new(listener.accept()); + rtdebug!("accepted"); + // Start another task to handle the connection + do spawntask_later { + let mut stream = stream.take(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == 99); + rtdebug!("read"); + } + } + } + + connect(0, addr); + + fn connect(i: int, addr: IpAddr) { + if i == MAX { return } + + do spawntask_later { + rtdebug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + rtdebug!("writing"); + stream.write([99]); + } + } + } + } } diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index c66f7d8ce06..f3b52783573 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -115,7 +115,7 @@ mod test { } #[test] - fn socket_smoke_test() { + fn socket_smoke_test_ip4() { do run_in_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); @@ -147,7 +147,39 @@ mod test { } #[test] - fn stream_smoke_test() { + fn socket_smoke_test_ip6() { + do run_in_newsched_task { + let server_ip = next_test_ip6(); + let client_ip = next_test_ip6(); + + do spawntask_immediately { + match UdpSocket::bind(server_ip) { + Some(server) => { + let mut buf = [0]; + match server.recvfrom(buf) { + Some((nread, src)) => { + assert_eq!(nread, 1); + assert_eq!(buf[0], 99); + assert_eq!(src, client_ip); + } + None => fail!() + } + } + None => fail!() + } + } + + do spawntask_immediately { + match UdpSocket::bind(client_ip) { + Some(client) => client.sendto([99], server_ip), + None => fail!() + } + } + } + } + + #[test] + fn stream_smoke_test_ip4() { do run_in_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); @@ -182,4 +214,41 @@ mod test { } } } + + #[test] + fn stream_smoke_test_ip6() { + do run_in_newsched_task { + let server_ip = next_test_ip6(); + let client_ip = next_test_ip6(); + + do spawntask_immediately { + match UdpSocket::bind(server_ip) { + Some(server) => { + let server = ~server; + let mut stream = server.connect(client_ip); + let mut buf = [0]; + match stream.read(buf) { + Some(nread) => { + assert_eq!(nread, 1); + assert_eq!(buf[0], 99); + } + None => fail!() + } + } + None => fail!() + } + } + + do spawntask_immediately { + match UdpSocket::bind(client_ip) { + Some(client) => { + let client = ~client; + let mut stream = client.connect(server_ip); + stream.write([99]); + } + None => fail!() + } + } + } + } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index e38c952f744..bcbdea03234 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -23,6 +23,7 @@ pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; +pub type RtioTcpSocketObject = (); // TODO pub trait EventLoop { fn run(&mut self); @@ -48,16 +49,39 @@ pub trait IoFactory { fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError>; } -pub trait RtioTcpListener { +pub trait RtioTcpListener : RtioSocket { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; + fn accept_simultaneously(&self); + fn dont_accept_simultaneously(&self); } -pub trait RtioTcpStream { +pub trait RtioTcpStream : RtioSocket { fn read(&self, buf: &mut [u8]) -> Result; fn write(&self, buf: &[u8]) -> Result<(), IoError>; + fn peer_name(&self) -> IpAddr; + fn control_congestion(&self); + fn nodelay(&self); + fn keepalive(&self, delay_in_seconds: uint); + fn letdie(&self); } -pub trait RtioUdpSocket { +pub trait RtioSocket { + fn socket_name(&self) -> IpAddr; +} + +pub trait RtioUdpSocket : RtioSocket { fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>; fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>; + + fn join_multicast(&self, multi: IpAddr); + fn leave_multicast(&self, multi: IpAddr); + + fn loop_multicast_locally(&self); + fn dont_loop_multicast_locally(&self); + + fn multicast_time_to_live(&self, ttl: int); + fn time_to_live(&self, ttl: int); + + fn hear_broadcasts(&self); + fn ignore_broadcasts(&self); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index b0e49684014..e1b338e2cad 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -17,7 +17,7 @@ use iterator::IteratorUtil; use vec::{OwnedVector, MutableVector}; use result::{Result, Ok, Err}; use unstable::run_in_bare_thread; -use super::io::net::ip::{IpAddr, Ipv4}; +use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; use rt::comm::oneshot; use rt::task::Task; use rt::thread::Thread; @@ -405,11 +405,16 @@ pub fn next_test_port() -> u16 { } } -/// Get a unique localhost:port pair starting at 9600 +/// Get a unique IPv4 localhost:port pair starting at 9600 pub fn next_test_ip4() -> IpAddr { Ipv4(127, 0, 0, 1, next_test_port()) } +/// Get a unique IPv6 localhost:port pair starting at 9600 +pub fn next_test_ip6() -> IpAddr { + Ipv6(0, 0, 0, 0, 0, 0, 0, 1, next_test_port()) +} + /// Get a constant that represents the number of times to repeat stress tests. Default 1. pub fn stress_factor() -> uint { use os::getenv; diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index dc766b2d7f8..4c3cde7d6df 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -15,48 +15,144 @@ use rt::uv::uvll::*; use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback}; use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, status_to_maybe_uv_error}; -use rt::io::net::ip::{IpAddr, Ipv4}; +use rt::io::net::ip::{IpAddr, Ipv4, Ipv6}; use rt::uv::last_uv_error; use vec; use str; use from_str::{FromStr}; +use num; -pub fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T { - match addr { - Ipv4(a, b, c, d, p) => { - unsafe { - let addr = malloc_ip4_addr(fmt!("%u.%u.%u.%u", - a as uint, - b as uint, - c as uint, - d as uint), p as int); - do (|| { - f(addr) - }).finally { - free_ip4_addr(addr); - } - } +enum UvIpAddr { + UvIpv4(*sockaddr_in), + UvIpv6(*sockaddr_in6), +} + +fn sockaddr_to_UvIpAddr(addr: *uvll::sockaddr) -> UvIpAddr { + unsafe { + assert!((is_ip4_addr(addr) || is_ip6_addr(addr))); + assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr))); + match addr { + _ if is_ip4_addr(addr) => UvIpv4(as_sockaddr_in(addr)), + _ if is_ip6_addr(addr) => UvIpv6(as_sockaddr_in6(addr)), + _ => fail!(), } - _ => fail!() // TODO ipv6 } } -pub fn uv_ip4_to_ip4(addr: *sockaddr_in) -> IpAddr { - let ip4_size = 16; - let buf = vec::from_elem(ip4_size + 1 /*null terminated*/, 0u8); - unsafe { uvll::ip4_name(addr, vec::raw::to_ptr(buf), ip4_size as u64) }; - let port = unsafe { uvll::ip4_port(addr) }; - let ip_str = str::from_bytes_slice(buf).trim_right_chars(&'\x00'); - let ip: ~[u8] = ip_str.split_iter('.') - .transform(|s: &str| -> u8 { - let x = FromStr::from_str(s); - assert!(x.is_some()); - x.unwrap() }) - .collect(); - assert!(ip.len() >= 4); - Ipv4(ip[0], ip[1], ip[2], ip[3], port as u16) +fn ip_as_uv_ip(addr: IpAddr, f: &fn(UvIpAddr) -> T) -> T { + let malloc = match addr { + Ipv4(*) => malloc_ip4_addr, + Ipv6(*) => malloc_ip6_addr, + }; + let wrap = match addr { + Ipv4(*) => UvIpv4, + Ipv6(*) => UvIpv6, + }; + let ip_str = match addr { + Ipv4(x1, x2, x3, x4, _) => + fmt!("%u.%u.%u.%u", x1 as uint, x2 as uint, x3 as uint, x4 as uint), + Ipv6(x1, x2, x3, x4, x5, x6, x7, x8, _) => + fmt!("%x:%x:%x:%x:%x:%x:%x:%x", + x1 as uint, x2 as uint, x3 as uint, x4 as uint, + x5 as uint, x6 as uint, x7 as uint, x8 as uint), + }; + let port = match addr { + Ipv4(_, _, _, _, p) | Ipv6(_, _, _, _, _, _, _, _, p) => p as int + }; + let free = match addr { + Ipv4(*) => free_ip4_addr, + Ipv6(*) => free_ip6_addr, + }; + + let addr = unsafe { malloc(ip_str, port) }; + do (|| { + f(wrap(addr)) + }).finally { + unsafe { free(addr) }; + } } +fn uv_ip_as_ip(addr: UvIpAddr, f: &fn(IpAddr) -> T) -> T { + let ip_size = match addr { + UvIpv4(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/, + UvIpv6(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/, + }; + let ip_name = { + let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8); + unsafe { + match addr { + UvIpv4(addr) => uvll::ip4_name(addr, vec::raw::to_ptr(buf), ip_size as u64), + UvIpv6(addr) => uvll::ip6_name(addr, vec::raw::to_ptr(buf), ip_size as u64), + } + }; + buf + }; + let ip_port = unsafe { + let port = match addr { + UvIpv4(addr) => uvll::ip4_port(addr), + UvIpv6(addr) => uvll::ip6_port(addr), + }; + port as u16 + }; + let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00'); + let ip = match addr { + UvIpv4(*) => { + let ip: ~[u8] = + ip_str.split_iter('.') + .transform(|s: &str| -> u8 { FromStr::from_str(s).unwrap() }) + .collect(); + assert_eq!(ip.len(), 4); + Ipv4(ip[0], ip[1], ip[2], ip[3], ip_port) + }, + UvIpv6(*) => { + let ip: ~[u16] = { + let read_hex_segment = |s: &str| -> u16 { + num::FromStrRadix::from_str_radix(s, 16u).unwrap() + }; + let convert_each_segment = |s: &str| -> ~[u16] { + match s { + "" => ~[], + s => s.split_iter(':').transform(read_hex_segment).collect(), + } + }; + let expand_shorthand_and_convert = |s: &str| -> ~[~[u16]] { + s.split_str_iter("::").transform(convert_each_segment).collect() + }; + match expand_shorthand_and_convert(ip_str) { + [x] => x, // no shorthand found + [l, r] => l + vec::from_elem(8 - l.len() - r.len(), 0u16) + r, // fill the gap + _ => fail!(), // impossible. only one shorthand allowed. + } + }; + assert_eq!(ip.len(), 8); + Ipv6(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7], ip_port) + }, + }; + + // finally run the closure + f(ip) +} + +fn uv_ip_to_ip(addr: UvIpAddr) -> IpAddr { + use util; + uv_ip_as_ip(addr, util::id) +} + +#[cfg(test)] +#[test] +fn test_ip4_conversion() { + use rt; + let ip4 = rt::test::next_test_ip4(); + assert_eq!(ip4, ip_as_uv_ip(ip4, uv_ip_to_ip)); +} + +#[cfg(test)] +#[test] +fn test_ip6_conversion() { + use rt; + let ip6 = rt::test::next_test_ip6(); + assert_eq!(ip6, ip_as_uv_ip(ip6, uv_ip_to_ip)); +} // uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t // and uv_file_t @@ -169,18 +265,17 @@ impl TcpWatcher { } pub fn bind(&mut self, address: IpAddr) -> Result<(), UvError> { - match address { - Ipv4(*) => { - do ip4_as_uv_ip4(address) |addr| { - let result = unsafe { uvll::tcp_bind(self.native_handle(), addr) }; - if result == 0 { - Ok(()) - } else { - Err(last_uv_error(self)) - } + do ip_as_uv_ip(address) |addr| { + let result = unsafe { + match addr { + UvIpv4(addr) => uvll::tcp_bind(self.native_handle(), addr), + UvIpv6(addr) => uvll::tcp_bind6(self.native_handle(), addr), } + }; + match result { + 0 => Ok(()), + _ => Err(last_uv_error(self)), } - _ => fail!() } } @@ -190,16 +285,13 @@ impl TcpWatcher { self.get_watcher_data().connect_cb = Some(cb); let connect_handle = ConnectRequest::new().native_handle(); - match address { - Ipv4(*) => { - do ip4_as_uv_ip4(address) |addr| { - rtdebug!("connect_t: %x", connect_handle as uint); - assert_eq!(0, - uvll::tcp_connect(connect_handle, self.native_handle(), - addr, connect_cb)); - } - } - _ => fail!() + rtdebug!("connect_t: %x", connect_handle as uint); + do ip_as_uv_ip(address) |addr| { + let result = match addr { + UvIpv4(addr) => uvll::tcp_connect(connect_handle, self.native_handle(), addr, connect_cb), + UvIpv6(addr) => uvll::tcp_connect6(connect_handle, self.native_handle(), addr, connect_cb), + }; + assert_eq!(0, result); } extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { @@ -266,20 +358,17 @@ impl UdpWatcher { } pub fn bind(&self, address: IpAddr) -> Result<(), UvError> { - match address { - Ipv4(*) => { - do ip4_as_uv_ip4(address) |addr| { - let result = unsafe { - uvll::udp_bind(self.native_handle(), addr, 0u32) - }; - if result == 0 { - Ok(()) - } else { - Err(last_uv_error(self)) - } + do ip_as_uv_ip(address) |addr| { + let result = unsafe { + match addr { + UvIpv4(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32), + UvIpv6(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32), } + }; + match result { + 0 => Ok(()), + _ => Err(last_uv_error(self)), } - _ => fail!() // TODO ipv6 } } @@ -299,17 +388,15 @@ impl UdpWatcher { return (*alloc_cb)(suggested_size as uint); } - /* TODO the socket address should actually be a pointer to - either a sockaddr_in or sockaddr_in6. - In libuv, the udp_recv callback takes a struct *sockaddr */ extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, - addr: *uvll::sockaddr_in, flags: c_uint) { + addr: *uvll::sockaddr, flags: c_uint) { rtdebug!("buf addr: %x", buf.base as uint); rtdebug!("buf len: %d", buf.len as int); let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); let status = status_to_maybe_uv_error(handle, nread as c_int); - (*cb)(udp_watcher, nread as int, buf, uv_ip4_to_ip4(addr), flags as uint, status); + let addr = uv_ip_to_ip(sockaddr_to_UvIpAddr(addr)); + (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); } } @@ -326,17 +413,14 @@ impl UdpWatcher { } let req = UdpSendRequest::new(); - match address { - Ipv4(*) => { - do ip4_as_uv_ip4(address) |addr| { - unsafe { - assert_eq!(0, uvll::udp_send(req.native_handle(), - self.native_handle(), - [buf], addr, send_cb)); - } + do ip_as_uv_ip(address) |addr| { + let result = unsafe { + match addr { + UvIpv4(addr) => uvll::udp_send(req.native_handle(), self.native_handle(), [buf], addr, send_cb), + UvIpv6(addr) => uvll::udp_send6(req.native_handle(), self.native_handle(), [buf], addr, send_cb), } - } - _ => fail!() // TODO ipv6 + }; + assert_eq!(0, result); } extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { @@ -486,13 +570,7 @@ mod test { use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf}; #[test] - fn test_ip4_conversion() { - let ip4 = next_test_ip4(); - assert_eq!(ip4, ip4_as_uv_ip4(ip4, uv_ip4_to_ip4)); - } - - #[test] - fn connect_close() { + fn connect_close_ip4() { do run_in_bare_thread() { let mut loop_ = Loop::new(); let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; @@ -510,7 +588,25 @@ mod test { } #[test] - fn udp_bind_close() { + fn connect_close_ip6() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = next_test_ip6(); + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("tcp_watcher.connect!"); + assert!(status.is_some()); + assert_eq!(status.get().name(), ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); + } + } + + #[test] + fn udp_bind_close_ip4() { do run_in_bare_thread() { let mut loop_ = Loop::new(); let udp_watcher = { UdpWatcher::new(&mut loop_) }; @@ -523,7 +619,20 @@ mod test { } #[test] - fn listen() { + fn udp_bind_close_ip6() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let udp_watcher = { UdpWatcher::new(&mut loop_) }; + let addr = next_test_ip6(); + udp_watcher.bind(addr); + udp_watcher.close(||()); + loop_.run(); + loop_.close(); + } + } + + #[test] + fn listen_ip4() { do run_in_bare_thread() { static MAX: int = 10; let mut loop_ = Loop::new(); @@ -532,10 +641,82 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |server_stream_watcher, status| { + do server_tcp_watcher.listen |mut server_stream_watcher, status| { + rtdebug!("listened!"); + assert!(status.is_none()); + let mut loop_ = loop_; + let client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell::new(0); + let server_stream_watcher = server_stream_watcher; + rtdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| { + + rtdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + rtdebug!("got %d bytes", nread); + let buf = buf.unwrap(); + for buf.slice(0, nread as uint).each |byte| { + assert!(*byte == count as u8); + rtdebug!("%u", *byte as uint); + count += 1; + } + } else { + assert_eq!(count, MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } + } + count_cell.put_back(count); + } + } + + let _client_thread = do Thread::start { + rtdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |mut stream_watcher, status| { + rtdebug!("connecting"); + assert!(status.is_none()); + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + let buf = slice_to_uv_buf(msg); + let msg_cell = Cell::new(msg); + do stream_watcher.write(buf) |stream_watcher, status| { + rtdebug!("writing"); + assert!(status.is_none()); + let msg_cell = Cell::new(msg_cell.take()); + stream_watcher.close(||ignore(msg_cell.take())); + } + } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; + loop_.run(); + loop_.close(); + } + } + + #[test] + fn listen_ip6() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = next_test_ip6(); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + rtdebug!("listening"); + do server_tcp_watcher.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); - let mut server_stream_watcher = server_stream_watcher; let mut loop_ = loop_; let client_tcp_watcher = TcpWatcher::new(&mut loop_); let mut client_tcp_watcher = client_tcp_watcher.as_stream(); @@ -574,10 +755,9 @@ mod test { rtdebug!("starting client thread"); let mut loop_ = Loop::new(); let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - do tcp_watcher.connect(addr) |stream_watcher, status| { + do tcp_watcher.connect(addr) |mut stream_watcher, status| { rtdebug!("connecting"); assert!(status.is_none()); - let mut stream_watcher = stream_watcher; let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; let buf = slice_to_uv_buf(msg); let msg_cell = Cell::new(msg); @@ -599,7 +779,7 @@ mod test { } #[test] - fn udp_recv() { + fn udp_recv_ip4() { do run_in_bare_thread() { static MAX: int = 10; let mut loop_ = Loop::new(); @@ -656,4 +836,63 @@ mod test { loop_.close(); } } + + #[test] + fn udp_recv_ip6() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let server_addr = next_test_ip6(); + let client_addr = next_test_ip6(); + + let server = UdpWatcher::new(&loop_); + assert!(server.bind(server_addr).is_ok()); + + rtdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + + do server.recv_start(alloc) |server, nread, buf, src, flags, status| { + server.recv_stop(); + rtdebug!("i'm reading!"); + assert!(status.is_none()); + assert_eq!(flags, 0); + assert_eq!(src, client_addr); + + let buf = vec_from_uv_buf(buf); + let mut count = 0; + rtdebug!("got %d bytes", nread); + + let buf = buf.unwrap(); + for buf.slice(0, nread as uint).iter().advance() |&byte| { + assert!(byte == count as u8); + rtdebug!("%u", byte as uint); + count += 1; + } + assert_eq!(count, MAX); + + server.close(||{}); + } + + do Thread::start { + let mut loop_ = Loop::new(); + let client = UdpWatcher::new(&loop_); + assert!(client.bind(client_addr).is_ok()); + let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let buf = slice_to_uv_buf(msg); + do client.send(buf, server_addr) |client, status| { + rtdebug!("writing"); + assert!(status.is_none()); + client.close(||{}); + } + + loop_.run(); + loop_.close(); + }; + + loop_.run(); + loop_.close(); + } + } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 1ae6cd8b17b..e1ff8ba1e22 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -60,9 +60,8 @@ impl EventLoop for UvEventLoop { fn callback(&mut self, f: ~fn()) { let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); - do idle_watcher.start |idle_watcher, status| { + do idle_watcher.start |mut idle_watcher, status| { assert!(status.is_none()); - let mut idle_watcher = idle_watcher; idle_watcher.stop(); idle_watcher.close(||()); f(); @@ -218,7 +217,7 @@ impl IoFactory for UvIoFactory { rtdebug!("connect: in connect callback"); if status.is_none() { rtdebug!("status is none"); - let res = Ok(~UvTcpStream { watcher: stream_watcher }); + let res = Ok(~UvTcpStream(stream_watcher)); // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } @@ -313,6 +312,11 @@ impl Drop for UvTcpListener { } } +impl RtioSocket for UvTcpListener { + // TODO + fn socket_name(&self) -> IpAddr { fail!(); } +} + impl RtioTcpListener for UvTcpListener { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { @@ -329,15 +333,14 @@ impl RtioTcpListener for UvTcpListener { let incoming_streams_cell = Cell::new(incoming_streams_cell.take()); let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |server_stream_watcher, status| { + do server_tcp_watcher.listen |mut server_stream_watcher, status| { let maybe_stream = if status.is_none() { - let mut server_stream_watcher = server_stream_watcher; let mut loop_ = server_stream_watcher.event_loop(); let client_tcp_watcher = TcpWatcher::new(&mut loop_); let client_tcp_watcher = client_tcp_watcher.as_stream(); // XXX: Need's to be surfaced in interface server_stream_watcher.accept(client_tcp_watcher); - Ok(~UvTcpStream { watcher: client_tcp_watcher }) + Ok(~UvTcpStream(client_tcp_watcher)) } else { Err(standard_error(OtherIoError)) }; @@ -349,25 +352,22 @@ impl RtioTcpListener for UvTcpListener { return self.incoming_streams.recv(); } + + // TODO + fn accept_simultaneously(&self) { fail!(); } + fn dont_accept_simultaneously(&self) { fail!(); } } // FIXME #6090: Prefer newtype structs but Drop doesn't work -pub struct UvTcpStream { - watcher: StreamWatcher -} - -impl UvTcpStream { - fn watcher(&self) -> StreamWatcher { self.watcher } -} +pub struct UvTcpStream(StreamWatcher); impl Drop for UvTcpStream { fn finalize(&self) { rtdebug!("closing tcp stream"); - let watcher = self.watcher(); let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); - do watcher.close { + do self.close { let scheduler = Local::take::(); scheduler.resume_task_immediately(task_cell.take()); } @@ -375,6 +375,11 @@ impl Drop for UvTcpStream { } } +impl RtioSocket for UvTcpStream { + // TODO + fn socket_name(&self) -> IpAddr { fail!(); } +} + impl RtioTcpStream for UvTcpStream { fn read(&self, buf: &mut [u8]) -> Result { let result_cell = Cell::new_empty(); @@ -382,25 +387,23 @@ impl RtioTcpStream for UvTcpStream { let scheduler = Local::take::(); assert!(scheduler.in_task_context()); - let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); assert!(!sched.in_task_context()); - let mut watcher = watcher; let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do watcher.read_start(alloc) |watcher, nread, _buf, status| { + let mut watcher = **self; + do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { // Stop reading so that no read callbacks are // triggered before the user calls `read` again. // XXX: Is there a performance impact to calling // stop here? - let mut watcher = watcher; watcher.read_stop(); let result = if status.is_none() { @@ -426,12 +429,11 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); assert!(scheduler.in_task_context()); - let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { - let mut watcher = watcher; let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let mut watcher = **self; do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) @@ -449,6 +451,13 @@ impl RtioTcpStream for UvTcpStream { assert!(!result_cell.is_empty()); return result_cell.take(); } + + // TODO + fn peer_name(&self) -> IpAddr { fail!(); } + fn control_congestion(&self) { fail!(); } + fn nodelay(&self) { fail!(); } + fn keepalive(&self, _delay_in_seconds: uint) { fail!(); } + fn letdie(&self) { fail!(); } } pub struct UvUdpSocket(UdpWatcher); @@ -467,6 +476,11 @@ impl Drop for UvUdpSocket { } } +impl RtioSocket for UvUdpSocket { + // TODO + fn socket_name(&self) -> IpAddr { fail!(); } +} + impl RtioUdpSocket for UvUdpSocket { fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> { let result_cell = Cell::new_empty(); @@ -530,6 +544,19 @@ impl RtioUdpSocket for UvUdpSocket { assert!(!result_cell.is_empty()); return result_cell.take(); } + + // TODO + fn join_multicast(&self, _multi: IpAddr) { fail!(); } + fn leave_multicast(&self, _multi: IpAddr) { fail!(); } + + fn loop_multicast_locally(&self) { fail!(); } + fn dont_loop_multicast_locally(&self) { fail!(); } + + fn multicast_time_to_live(&self, _ttl: int) { fail!(); } + fn time_to_live(&self, _ttl: int) { fail!(); } + + fn hear_broadcasts(&self) { fail!(); } + fn ignore_broadcasts(&self) { fail!(); } } #[test] diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 7035cb6a5e8..62bf8f27af9 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -74,8 +74,10 @@ pub type uv_alloc_cb = *u8; pub type uv_udp_send_cb = *u8; pub type uv_udp_recv_cb = *u8; +pub type sockaddr = c_void; pub type sockaddr_in = c_void; pub type sockaddr_in6 = c_void; +pub type uv_membership = c_void; #[deriving(Eq)] pub enum uv_handle_type { @@ -231,6 +233,31 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_ return rust_uv_get_udp_handle_from_send_req(send_req); } +pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int { + return rust_uv_udp_getsockname(handle, name); +} + +pub unsafe fn udp_get_sockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int { + return rust_uv_udp_getsockname6(handle, name); +} + +pub unsafe fn udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char, + interface_addr: *c_char, membership: uv_membership) -> c_int { + return rust_uv_udp_set_membership(handle, multicast_addr, interface_addr, membership); +} + +pub unsafe fn udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int { + return rust_uv_udp_set_multicast_loop(handle, on); +} + +pub unsafe fn udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int { + return rust_uv_udp_set_multicast_ttl(handle, ttl); +} + +pub unsafe fn udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int { + return rust_uv_udp_set_broadcast(handle, on); +} + pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int { return rust_uv_tcp_init(loop_handle, handle); } @@ -261,6 +288,26 @@ pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) - return rust_uv_tcp_getpeername6(tcp_handle_ptr, name); } +pub unsafe fn tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int { + return rust_uv_tcp_getsockname(handle, name); +} + +pub unsafe fn tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int { + return rust_uv_tcp_getsockname6(handle, name); +} + +pub unsafe fn tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int { + return rust_uv_tcp_nodelay(handle, enable); +} + +pub unsafe fn tcp_keepalive(handle: *uv_tcp_t, enable: c_int, delay: c_uint) -> c_int { + return rust_uv_tcp_keepalive(handle, enable, delay); +} + +pub unsafe fn tcp_simultaneous_accepts(handle: *uv_tcp_t, enable: c_int) -> c_int { + return rust_uv_tcp_simultaneous_accepts(handle, enable); +} + pub unsafe fn listen(stream: *T, backlog: c_int, cb: *u8) -> c_int { return rust_uv_listen(stream as *c_void, backlog, cb); } @@ -318,6 +365,22 @@ pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int { return rust_uv_timer_stop(timer_ptr); } +pub unsafe fn is_ip4_addr(addr: *sockaddr) -> bool { + match rust_uv_is_ipv4_sockaddr(addr) { 0 => false, _ => true } +} + +pub unsafe fn is_ip6_addr(addr: *sockaddr) -> bool { + match rust_uv_is_ipv6_sockaddr(addr) { 0 => false, _ => true } +} + +pub unsafe fn as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in { + return rust_uv_sockaddr_as_sockaddr_in(addr); +} + +pub unsafe fn as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6 { + return rust_uv_sockaddr_as_sockaddr_in6(addr); +} + pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in { do str::as_c_str(ip) |ip_buf| { rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int) @@ -451,25 +514,42 @@ extern { fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int; fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint; fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint; - fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, tcp_handle_ptr: *uv_tcp_t, after_cb: *u8, + fn rust_uv_tcp_connect(req: *uv_connect_t, handle: *uv_tcp_t, cb: *u8, addr: *sockaddr_in) -> c_int; fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int; - fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t, tcp_handle_ptr: *uv_tcp_t, after_cb: *u8, + fn rust_uv_tcp_connect6(req: *uv_connect_t, handle: *uv_tcp_t, cb: *u8, addr: *sockaddr_in6) -> c_int; fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int; fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int; fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int; + fn rust_uv_tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int; + fn rust_uv_tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int; + fn rust_uv_tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int; + fn rust_uv_tcp_keepalive(handle: *uv_tcp_t, enable: c_int, delay: c_uint) -> c_int; + fn rust_uv_tcp_simultaneous_accepts(handle: *uv_tcp_t, enable: c_int) -> c_int; fn rust_uv_udp_init(loop_handle: *uv_loop_t, handle_ptr: *uv_udp_t) -> c_int; fn rust_uv_udp_bind(server: *uv_udp_t, addr: *sockaddr_in, flags: c_uint) -> c_int; fn rust_uv_udp_bind6(server: *uv_udp_t, addr: *sockaddr_in6, flags: c_uint) -> c_int; - fn rust_uv_udp_send(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t, buf_cnt: c_int, - addr: *sockaddr_in, cb: *u8) -> c_int; - fn rust_uv_udp_send6(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t, buf_cnt: c_int, - addr: *sockaddr_in6, cb: *u8) -> c_int; + fn rust_uv_udp_send(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t, + buf_cnt: c_int, addr: *sockaddr_in, cb: *u8) -> c_int; + fn rust_uv_udp_send6(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t, + buf_cnt: c_int, addr: *sockaddr_in6, cb: *u8) -> c_int; fn rust_uv_udp_recv_start(server: *uv_udp_t, on_alloc: *u8, on_recv: *u8) -> c_int; fn rust_uv_udp_recv_stop(server: *uv_udp_t) -> c_int; fn rust_uv_get_udp_handle_from_send_req(req: *uv_udp_send_t) -> *uv_udp_t; + fn rust_uv_udp_getsockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int; + fn rust_uv_udp_getsockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int; + fn rust_uv_udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char, + interface_addr: *c_char, membership: uv_membership) -> c_int; + fn rust_uv_udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int; + fn rust_uv_udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int; + fn rust_uv_udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int; + + fn rust_uv_is_ipv4_sockaddr(addr: *sockaddr) -> c_int; + fn rust_uv_is_ipv6_sockaddr(addr: *sockaddr) -> c_int; + fn rust_uv_sockaddr_as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in; + fn rust_uv_sockaddr_as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6; fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int; fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int; diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 6032ed1a6bd..32ccc9ba4a8 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -293,6 +293,38 @@ rust_uv_tcp_getpeername6 return uv_tcp_getpeername(handle, (sockaddr*)name, &namelen); } +extern "C" int +rust_uv_tcp_getsockname +(uv_tcp_t* handle, sockaddr_in* name) { + int namelen = sizeof(sockaddr_in); + return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen); +} + +extern "C" int +rust_uv_tcp_getsockname6 +(uv_tcp_t* handle, sockaddr_in6* name) { + int namelen = sizeof(sockaddr_in6); + return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen); +} + +extern "C" int +rust_uv_tcp_nodelay +(uv_tcp_t* handle, int enable) { + return uv_tcp_nodelay(handle, enable); +} + +extern "C" int +rust_uv_tcp_keepalive +(uv_tcp_t* handle, int enable, unsigned int delay) { + return uv_tcp_keepalive(handle, enable, delay); +} + +extern "C" int +rust_uv_tcp_simultaneous_accepts +(uv_tcp_t* handle, int enable) { + return uv_tcp_simultaneous_accepts(handle, enable); +} + extern "C" int rust_uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) { return uv_udp_init(loop, handle); @@ -335,6 +367,44 @@ rust_uv_get_udp_handle_from_send_req(uv_udp_send_t* send_req) { return send_req->handle; } +extern "C" int +rust_uv_udp_getsockname +(uv_udp_t* handle, sockaddr_in* name) { + int namelen = sizeof(sockaddr_in); + return uv_udp_getsockname(handle, (sockaddr*)name, &namelen); +} + +extern "C" int +rust_uv_udp_getsockname6 +(uv_udp_t* handle, sockaddr_in6* name) { + int namelen = sizeof(sockaddr_in6); + return uv_udp_getsockname(handle, (sockaddr*)name, &namelen); +} + +extern "C" int +rust_uv_udp_set_membership +(uv_udp_t* handle, const char* m_addr, const char* i_addr, uv_membership membership) { + return uv_udp_set_membership(handle, m_addr, i_addr, membership); +} + +extern "C" int +rust_uv_udp_set_multicast_loop +(uv_udp_t* handle, int on) { + return uv_udp_set_multicast_loop(handle, on); +} + +extern "C" int +rust_uv_udp_set_multicast_ttl +(uv_udp_t* handle, int ttl) { + return uv_udp_set_multicast_ttl(handle, ttl); +} + +extern "C" int +rust_uv_udp_set_broadcast +(uv_udp_t* handle, int on) { + return uv_udp_set_broadcast(handle, on); +} + extern "C" int rust_uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { @@ -587,10 +657,34 @@ extern "C" void rust_uv_freeaddrinfo(addrinfo* res) { uv_freeaddrinfo(res); } + +extern "C" int +rust_uv_is_ipv4_sockaddr(sockaddr* addr) { + return addr->sa_family == AF_INET; +} + +extern "C" int +rust_uv_is_ipv6_sockaddr(sockaddr* addr) { + return addr->sa_family == AF_INET6; +} + +extern "C" sockaddr_in* +rust_uv_sockaddr_as_sockaddr_in(sockaddr* addr) { +// return (sockaddr_in*)addr->sa_data; + return (sockaddr_in*)addr; +} + +extern "C" sockaddr_in6* +rust_uv_sockaddr_as_sockaddr_in6(sockaddr* addr) { + //return (sockaddr_in6*)addr->sa_data; + return (sockaddr_in6*)addr; +} + extern "C" bool rust_uv_is_ipv4_addrinfo(addrinfo* input) { return input->ai_family == AF_INET; } + extern "C" bool rust_uv_is_ipv6_addrinfo(addrinfo* input) { return input->ai_family == AF_INET6; diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index b604f60cba6..e8a46cf5a65 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -105,6 +105,11 @@ rust_uv_tcp_connect rust_uv_tcp_bind rust_uv_tcp_connect6 rust_uv_tcp_bind6 +rust_uv_tcp_getsockname +rust_uv_tcp_getsockname6 +rust_uv_tcp_nodelay +rust_uv_tcp_keepalive +rust_uv_tcp_simultaneous_accepts rust_uv_udp_init rust_uv_udp_bind rust_uv_udp_bind6 @@ -113,6 +118,16 @@ rust_uv_udp_send6 rust_uv_udp_recv_start rust_uv_udp_recv_stop rust_uv_get_udp_handle_from_send_req +rust_uv_udp_getsockname +rust_uv_udp_getsockname6 +rust_uv_udp_set_membership +rust_uv_udp_set_multicast_loop +rust_uv_udp_set_multicast_ttl +rust_uv_udp_set_broadcast +rust_uv_is_ipv4_sockaddr +rust_uv_is_ipv6_sockaddr +rust_uv_sockaddr_as_sockaddr_in +rust_uv_sockaddr_as_sockaddr_in6 rust_uv_listen rust_uv_accept rust_uv_write